property_maps = BasePropertyMap.registry.makeField(
multi=True,
default=["exposure_time",
"psf_size",
"psf_e1",
"psf_e2",
"psf_maglim",
"sky_noise",
"sky_background",
"dcr_dra",
"dcr_ddec",
"dcr_e1",
"dcr_e2",
"epoch"],
doc="Property map computation objects",
)
def setDefaults(self):
self.property_maps["exposure_time"].do_sum = True
self.property_maps["psf_size"].do_weighted_mean = True
self.property_maps["psf_e1"].do_weighted_mean = True
self.property_maps["psf_e2"].do_weighted_mean = True
self.property_maps["psf_maglim"].do_weighted_mean = True
self.property_maps["sky_noise"].do_weighted_mean = True
self.property_maps["sky_background"].do_weighted_mean = True
self.property_maps["dcr_dra"].do_weighted_mean = True
self.property_maps["dcr_ddec"].do_weighted_mean = True
self.property_maps["dcr_e1"].do_weighted_mean = True
self.property_maps["dcr_e2"].do_weighted_mean = True
self.property_maps["epoch"].do_mean = True
self.property_maps["epoch"].do_min = True
self.property_maps["epoch"].do_max = True
class HealSparsePropertyMapTask(pipeBase.PipelineTask):
ConfigClass = HealSparsePropertyMapConfig
_DefaultName = "healSparsePropertyMapTask"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.property_maps = PropertyMapMap()
for name, config, PropertyMapClass in self.config.property_maps.apply():
self.property_maps[name] = PropertyMapClass(config, name)
@timeMethod
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
sky_map = inputs.pop("sky_map")
tract = butlerQC.quantum.dataId["tract"]
band = butlerQC.quantum.dataId["band"]
input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
visit_summary_dict = {ref.dataId["visit"]: ref.get()
for ref in inputs["visit_summaries"]}
self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
# Write the outputs
for name, property_map in self.property_maps.items():
if property_map.config.do_min:
butlerQC.put(property_map.min_map,
getattr(outputRefs, f"{name}_map_min"))
if property_map.config.do_max:
butlerQC.put(property_map.max_map,
getattr(outputRefs, f"{name}_map_max"))
if property_map.config.do_mean:
butlerQC.put(property_map.mean_map,
getattr(outputRefs, f"{name}_map_mean"))
if property_map.config.do_weighted_mean:
butlerQC.put(property_map.weighted_mean_map,
getattr(outputRefs, f"{name}_map_weighted_mean"))
if property_map.config.do_sum:
butlerQC.put(property_map.sum_map,
getattr(outputRefs, f"{name}_map_sum"))
def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
num_patches = tract_info.getNumPatches()
# Compute approximate patch area
patch_info = tract_info.getPatchInfo(0)
vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
radec = self._vertices_to_radec(vertices)
delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
tract_area = num_patches[0]*num_patches[1]*patch_area
# Start with a fairly low nside and increase until we find the approximate area.
nside_coverage_tract = 32
while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
nside_coverage_tract = 2*nside_coverage_tract
# Step back one, but don't go bigger pixels than nside=32 or smaller
# than 128 (recommended by healsparse).
nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
return nside_coverage_tract
class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
dimensions=("band", "skymap",),
defaultTemplates={"coaddName": "deep"}):
sky_map = pipeBase.connectionTypes.Input(
doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
storageClass="SkyMap",
dimensions=("skymap",),
)
# Create output connections for all possible maps defined in the
# registry. The vars() trick used here allows us to set class attributes
# programatically. Taken from
# https://stackoverflow.com/questions/2519807/
# setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
for name in BasePropertyMap.registry:
vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
doc=f"Minimum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_map_min",
storageClass="HealSparseMap",
dimensions=("tract", "skymap", "band"),
multiple=True,
deferLoad=True,
)
vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
doc=f"Minumum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
storageClass="HealSparseMap",
dimensions=("skymap", "band"),
)
vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
doc=f"Maximum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_map_max",
storageClass="HealSparseMap",
dimensions=("tract", "skymap", "band"),
multiple=True,
deferLoad=True,
)
vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
doc=f"Minumum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
storageClass="HealSparseMap",
dimensions=("skymap", "band"),
)
vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
doc=f"Mean-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_map_mean",
storageClass="HealSparseMap",
dimensions=("tract", "skymap", "band"),
multiple=True,
deferLoad=True,
)
vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
doc=f"Minumum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
storageClass="HealSparseMap",
dimensions=("skymap", "band"),
)
vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
doc=f"Weighted mean-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
storageClass="HealSparseMap",
dimensions=("tract", "skymap", "band"),
multiple=True,
deferLoad=True,
)
vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
doc=f"Minumum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
storageClass="HealSparseMap",
dimensions=("skymap", "band"),
)
vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
doc=f"Sum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_map_sum",
storageClass="HealSparseMap",
dimensions=("tract", "skymap", "band"),
multiple=True,
deferLoad=True,
)
vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
doc=f"Minumum-value map of {name}",
name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
storageClass="HealSparseMap",
dimensions=("skymap", "band"),
)
def __init__(self, *, config=None):
super().__init__(config=config)
# Not all possible maps in the registry will be configured to run.
# Here we remove the unused connections.
for name in BasePropertyMap.registry:
if name not in config.property_maps:
prop_config = BasePropertyMapConfig()
prop_config.do_min = False
prop_config.do_max = False
prop_config.do_mean = False
prop_config.do_weighted_mean = False
prop_config.do_sum = False
else:
prop_config = config.property_maps[name]
if not prop_config.do_min:
self.inputs.remove(f"{name}_map_min")
self.outputs.remove(f"{name}_consolidated_map_min")
if not prop_config.do_max:
self.inputs.remove(f"{name}_map_max")
self.outputs.remove(f"{name}_consolidated_map_max")
if not prop_config.do_mean:
self.inputs.remove(f"{name}_map_mean")
self.outputs.remove(f"{name}_consolidated_map_mean")
if not prop_config.do_weighted_mean:
self.inputs.remove(f"{name}_map_weighted_mean")
self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
if not prop_config.do_sum:
self.inputs.remove(f"{name}_map_sum")
self.outputs.remove(f"{name}_consolidated_map_sum")
class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
property_maps = BasePropertyMap.registry.makeField(
multi=True,
default=["exposure_time",
"psf_size",
"psf_e1",
"psf_e2",
"psf_maglim",
"sky_noise",
"sky_background",
"dcr_dra",
"dcr_ddec",
"dcr_e1",
"dcr_e2",
"epoch"],
doc="Property map computation objects",
)
nside_coverage = pexConfig.Field(
doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
dtype=int,
default=32,
check=_is_power_of_two,
)
def setDefaults(self):
self.property_maps["exposure_time"].do_sum = True
self.property_maps["psf_size"].do_weighted_mean = True
self.property_maps["psf_e1"].do_weighted_mean = True
self.property_maps["psf_e2"].do_weighted_mean = True
self.property_maps["psf_maglim"].do_weighted_mean = True
self.property_maps["sky_noise"].do_weighted_mean = True
self.property_maps["sky_background"].do_weighted_mean = True
self.property_maps["dcr_dra"].do_weighted_mean = True
self.property_maps["dcr_ddec"].do_weighted_mean = True
self.property_maps["dcr_e1"].do_weighted_mean = True
self.property_maps["dcr_e2"].do_weighted_mean = True
self.property_maps["epoch"].do_mean = True
self.property_maps["epoch"].do_min = True
self.property_maps["epoch"].do_max = True
class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
ConfigClass = ConsolidateHealSparsePropertyMapConfig
_DefaultName = "consolidateHealSparsePropertyMapTask"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.property_maps = PropertyMapMap()
for name, config, PropertyMapClass in self.config.property_maps.apply():
self.property_maps[name] = PropertyMapClass(config, name)
@timeMethod
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
sky_map = inputs.pop("sky_map")
# These need to be consolidated one at a time to conserve memory.
for name in self.config.property_maps.names:
for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
map_type = f"{name}_map_{type_}"
if map_type in inputs:
input_refs = {ref.dataId['tract']: ref
for ref in inputs[map_type]}
consolidated_map = self.consolidate_map(sky_map, input_refs)
butlerQC.put(consolidated_map,
getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
def consolidate_map(self, sky_map, input_refs):
Definition at line 518 of file healSparseMapping.py.