LSST Applications g0f08755f38+9c285cab97,g1635faa6d4+13f3999e92,g1653933729+a8ce1bb630,g1a0ca8cf93+bf6eb00ceb,g28da252d5a+0829b12dee,g29321ee8c0+5700dc9eac,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+fde0dd39b6,g5fbc88fb19+17cd334064,g781aacb6e4+a8ce1bb630,g80478fca09+55a9465950,g82479be7b0+d730eedb7d,g858d7b2824+9c285cab97,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+2a84bb7594,gacf8899fa4+c69c5206e8,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+9634bc57db,gcf0d15dbbd+4b7d09cae4,gda3e153d99+9c285cab97,gda6a2b7d83+4b7d09cae4,gdaeeff99f8+1711a396fd,ge2409df99d+5e831397f4,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+41c94011de,gf3fb38a9a8+8f07a9901b,gfb92a5be7c+9c285cab97,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Functions | Variables
lsst.pipe.tasks.healSparseMapping Namespace Reference

Classes

class  HealSparseInputMapConfig
 
class  HealSparseInputMapTask
 
class  HealSparseMapFormatter
 
class  HealSparsePropertyMapConnections
 

Functions

 _is_power_of_two (value)
 

Variables

Sky sky_map map object
 
 tract : `int`
 
 band : `str`
 
 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
 
 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
 
 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
 
 ccd_row : `lsst.afw.table.ExposureRecord`
 
 x : `np.ndarray`
 
 y : `np.ndarray`
 
 calib_scale : `np.ndarray`
 
 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
 
 radec : `numpy.ndarray`
 
 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
 
 nside_coverage : `int`
 
 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
 
 consolidated_map : `healsparse.HealSparseMap`
 

Function Documentation

◆ _is_power_of_two()

lsst.pipe.tasks.healSparseMapping._is_power_of_two ( value)
protected
Check that value is a power of two.

Parameters
----------
value : `int`
    Value to check.

Returns
-------
is_power_of_two : `bool`
    True if value is a power of two; False otherwise, or
    if value is not an integer.

Definition at line 85 of file healSparseMapping.py.

85def _is_power_of_two(value):
86 """Check that value is a power of two.
87
88 Parameters
89 ----------
90 value : `int`
91 Value to check.
92
93 Returns
94 -------
95 is_power_of_two : `bool`
96 True if value is a power of two; False otherwise, or
97 if value is not an integer.
98 """
99 if not isinstance(value, numbers.Integral):
100 return False
101
102 # See https://stackoverflow.com/questions/57025836
103 # Every power of 2 has exactly 1 bit set to 1; subtracting
104 # 1 therefore flips every preceding bit. If you and that
105 # together with the original value it must be 0.
106 return (value & (value - 1) == 0) and value != 0
107
108

Variable Documentation

◆ band

lsst.pipe.tasks.healSparseMapping.band : `str`

Definition at line 521 of file healSparseMapping.py.

◆ calib_scale

lsst.pipe.tasks.healSparseMapping.calib_scale : `np.ndarray`

Definition at line 668 of file healSparseMapping.py.

◆ ccd_row

lsst.pipe.tasks.healSparseMapping.ccd_row : `lsst.afw.table.ExposureRecord`
tract_info = sky_map[tract]

tract_maps_initialized = False

for patch in input_map_dict.keys():
    self.log.info("Making maps for band %s, tract %d, patch %d.",
                  band, tract, patch)

    patch_info = tract_info[patch]

    input_map = input_map_dict[patch].get()

    # Initialize the tract maps as soon as we have the first input
    # map for getting nside information.
    if not tract_maps_initialized:
        # We use the first input map nside information to initialize
        # the tract maps
        nside_coverage = self._compute_nside_coverage_tract(tract_info)
        nside = input_map.nside_sparse

        do_compute_approx_psf = False
        # Initialize the tract maps
        for property_map in self.property_maps:
            property_map.initialize_tract_maps(nside_coverage, nside)
            if property_map.requires_psf:
                do_compute_approx_psf = True

        tract_maps_initialized = True

    if input_map.valid_pixels.size == 0:
        self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.",
                         band, tract, patch)
        continue

    coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
    coadd_inputs = coadd_dict[patch].get(component="coaddInputs")

    coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())

    # Crop input_map to the inner polygon of the patch
    poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
    patch_radec = self._vertices_to_radec(poly_vertices)
    patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
                             value=np.arange(input_map.wide_mask_maxbits))
    with warnings.catch_warnings():
        # Healsparse will emit a warning if nside coverage is greater than
        # 128.  In the case of generating patch input maps, and not global
        # maps, high nside coverage works fine, so we can suppress this
        # warning.
        warnings.simplefilter("ignore")
        patch_poly_map = patch_poly.get_map_like(input_map)
        input_map = hsp.and_intersection([input_map, patch_poly_map])

    valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)

    # Check if there are no valid pixels for the inner (unique) patch region
    if valid_pixels.size == 0:
        continue

    # Initialize the value accumulators
    for property_map in self.property_maps:
        property_map.initialize_values(valid_pixels.size)
        property_map.zeropoint = coadd_zeropoint

    # Initialize the weight and counter accumulators
    total_weights = np.zeros(valid_pixels.size)
    total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)

    for bit, ccd_row in enumerate(coadd_inputs.ccds):
        # Which pixels in the map are used by this visit/detector
        inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))

        # Check if there are any valid pixels in the map from this deteector.
        if inmap.size == 0:
            continue

        # visit, detector_id, weight = input_dict[bit]
        visit = ccd_row["visit"]
        detector_id = ccd_row["ccd"]
        weight = ccd_row["weight"]

        x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
        scalings = self._compute_calib_scale(ccd_row, x, y)

        if do_compute_approx_psf:
            psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
        else:
            psf_array = None

        total_weights[inmap] += weight
        total_inputs[inmap] += 1

        # Retrieve the correct visitSummary row
        if visit not in visit_summary_dict:
            msg = f"Visit {visit} not found in visit_summaries."
            raise pipeBase.RepeatableQuantumError(msg)
        row = visit_summary_dict[visit].find(detector_id)
        if row is None:
            msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
            raise pipeBase.RepeatableQuantumError(msg)

        # Accumulate the values
        for property_map in self.property_maps:
            property_map.accumulate_values(inmap,
                                           vpix_ra[inmap],
                                           vpix_dec[inmap],
                                           weight,
                                           scalings,
                                           row,
                                           psf_array=psf_array)

    # Finalize the mean values and set the tract maps
    for property_map in self.property_maps:
        property_map.finalize_mean_values(total_weights, total_inputs)
        property_map.set_map_values(valid_pixels)

def _compute_calib_scale(self, ccd_row, x, y):

Definition at line 659 of file healSparseMapping.py.

◆ coadd_dict

lsst.pipe.tasks.healSparseMapping.coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]

Definition at line 523 of file healSparseMapping.py.

◆ consolidated_map

lsst.pipe.tasks.healSparseMapping.consolidated_map : `healsparse.HealSparseMap`

Definition at line 943 of file healSparseMapping.py.

◆ input_map_dict

lsst.pipe.tasks.healSparseMapping.input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]

Definition at line 525 of file healSparseMapping.py.

◆ input_refs

lsst.pipe.tasks.healSparseMapping.input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]

Definition at line 938 of file healSparseMapping.py.

◆ nside_coverage

lsst.pipe.tasks.healSparseMapping.nside_coverage : `int`

Definition at line 710 of file healSparseMapping.py.

◆ radec

lsst.pipe.tasks.healSparseMapping.radec : `numpy.ndarray`

Definition at line 692 of file healSparseMapping.py.

◆ sky_map

Sky lsst.pipe.tasks.healSparseMapping.sky_map map object
property_maps = BasePropertyMap.registry.makeField(
multi=True,
default=["exposure_time",
"psf_size",
"psf_e1",
"psf_e2",
"psf_maglim",
"sky_noise",
"sky_background",
"dcr_dra",
"dcr_ddec",
"dcr_e1",
"dcr_e2",
"epoch"],
doc="Property map computation objects",
)

def setDefaults(self):
self.property_maps["exposure_time"].do_sum = True
self.property_maps["psf_size"].do_weighted_mean = True
self.property_maps["psf_e1"].do_weighted_mean = True
self.property_maps["psf_e2"].do_weighted_mean = True
self.property_maps["psf_maglim"].do_weighted_mean = True
self.property_maps["sky_noise"].do_weighted_mean = True
self.property_maps["sky_background"].do_weighted_mean = True
self.property_maps["dcr_dra"].do_weighted_mean = True
self.property_maps["dcr_ddec"].do_weighted_mean = True
self.property_maps["dcr_e1"].do_weighted_mean = True
self.property_maps["dcr_e2"].do_weighted_mean = True
self.property_maps["epoch"].do_mean = True
self.property_maps["epoch"].do_min = True
self.property_maps["epoch"].do_max = True


class HealSparsePropertyMapTask(pipeBase.PipelineTask):
ConfigClass = HealSparsePropertyMapConfig
_DefaultName = "healSparsePropertyMapTask"

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.property_maps = PropertyMapMap()
    for name, config, PropertyMapClass in self.config.property_maps.apply():
        self.property_maps[name] = PropertyMapClass(config, name)

@timeMethod
def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    sky_map = inputs.pop("sky_map")

    tract = butlerQC.quantum.dataId["tract"]
    band = butlerQC.quantum.dataId["band"]

    input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
    coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}

    visit_summary_dict = {ref.dataId["visit"]: ref.get()
                          for ref in inputs["visit_summaries"]}

    self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)

    # Write the outputs
    for name, property_map in self.property_maps.items():
        if property_map.config.do_min:
            butlerQC.put(property_map.min_map,
                         getattr(outputRefs, f"{name}_map_min"))
        if property_map.config.do_max:
            butlerQC.put(property_map.max_map,
                         getattr(outputRefs, f"{name}_map_max"))
        if property_map.config.do_mean:
            butlerQC.put(property_map.mean_map,
                         getattr(outputRefs, f"{name}_map_mean"))
        if property_map.config.do_weighted_mean:
            butlerQC.put(property_map.weighted_mean_map,
                         getattr(outputRefs, f"{name}_map_weighted_mean"))
        if property_map.config.do_sum:
            butlerQC.put(property_map.sum_map,
                         getattr(outputRefs, f"{name}_map_sum"))

def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
num_patches = tract_info.getNumPatches()

# Compute approximate patch area
patch_info = tract_info.getPatchInfo(0)
vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
radec = self._vertices_to_radec(vertices)
delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))

tract_area = num_patches[0]*num_patches[1]*patch_area
# Start with a fairly low nside and increase until we find the approximate area.
nside_coverage_tract = 32
while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
    nside_coverage_tract = 2*nside_coverage_tract
# Step back one, but don't go bigger pixels than nside=32 or smaller
# than 128 (recommended by healsparse).
nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))

return nside_coverage_tract


class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
                                          dimensions=("band", "skymap",),
                                          defaultTemplates={"coaddName": "deep"}):
sky_map = pipeBase.connectionTypes.Input(
doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
storageClass="SkyMap",
dimensions=("skymap",),
)

# Create output connections for all possible maps defined in the
# registry.  The vars() trick used here allows us to set class attributes
# programatically.  Taken from
# https://stackoverflow.com/questions/2519807/
# setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
for name in BasePropertyMap.registry:
vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
    doc=f"Minimum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_map_min",
    storageClass="HealSparseMap",
    dimensions=("tract", "skymap", "band"),
    multiple=True,
    deferLoad=True,
)
vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
    doc=f"Minumum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
    storageClass="HealSparseMap",
    dimensions=("skymap", "band"),
)
vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
    doc=f"Maximum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_map_max",
    storageClass="HealSparseMap",
    dimensions=("tract", "skymap", "band"),
    multiple=True,
    deferLoad=True,
)
vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
    doc=f"Minumum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
    storageClass="HealSparseMap",
    dimensions=("skymap", "band"),
)
vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
    doc=f"Mean-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_map_mean",
    storageClass="HealSparseMap",
    dimensions=("tract", "skymap", "band"),
    multiple=True,
    deferLoad=True,
)
vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
    doc=f"Minumum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
    storageClass="HealSparseMap",
    dimensions=("skymap", "band"),
)
vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
    doc=f"Weighted mean-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
    storageClass="HealSparseMap",
    dimensions=("tract", "skymap", "band"),
    multiple=True,
    deferLoad=True,
)
vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
    doc=f"Minumum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
    storageClass="HealSparseMap",
    dimensions=("skymap", "band"),
)
vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
    doc=f"Sum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_map_sum",
    storageClass="HealSparseMap",
    dimensions=("tract", "skymap", "band"),
    multiple=True,
    deferLoad=True,
)
vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
    doc=f"Minumum-value map of {name}",
    name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
    storageClass="HealSparseMap",
    dimensions=("skymap", "band"),
)

def __init__(self, *, config=None):
super().__init__(config=config)

# Not all possible maps in the registry will be configured to run.
# Here we remove the unused connections.
for name in BasePropertyMap.registry:
    if name not in config.property_maps:
        prop_config = BasePropertyMapConfig()
        prop_config.do_min = False
        prop_config.do_max = False
        prop_config.do_mean = False
        prop_config.do_weighted_mean = False
        prop_config.do_sum = False
    else:
        prop_config = config.property_maps[name]

    if not prop_config.do_min:
        self.inputs.remove(f"{name}_map_min")
        self.outputs.remove(f"{name}_consolidated_map_min")
    if not prop_config.do_max:
        self.inputs.remove(f"{name}_map_max")
        self.outputs.remove(f"{name}_consolidated_map_max")
    if not prop_config.do_mean:
        self.inputs.remove(f"{name}_map_mean")
        self.outputs.remove(f"{name}_consolidated_map_mean")
    if not prop_config.do_weighted_mean:
        self.inputs.remove(f"{name}_map_weighted_mean")
        self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
    if not prop_config.do_sum:
        self.inputs.remove(f"{name}_map_sum")
        self.outputs.remove(f"{name}_consolidated_map_sum")


class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
                                     pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
property_maps = BasePropertyMap.registry.makeField(
multi=True,
default=["exposure_time",
         "psf_size",
         "psf_e1",
         "psf_e2",
         "psf_maglim",
         "sky_noise",
         "sky_background",
         "dcr_dra",
         "dcr_ddec",
         "dcr_e1",
         "dcr_e2",
         "epoch"],
doc="Property map computation objects",
)
nside_coverage = pexConfig.Field(
doc="Consolidated HealSparse coverage map nside.  Must be power of 2.",
dtype=int,
default=32,
check=_is_power_of_two,
)

def setDefaults(self):
self.property_maps["exposure_time"].do_sum = True
self.property_maps["psf_size"].do_weighted_mean = True
self.property_maps["psf_e1"].do_weighted_mean = True
self.property_maps["psf_e2"].do_weighted_mean = True
self.property_maps["psf_maglim"].do_weighted_mean = True
self.property_maps["sky_noise"].do_weighted_mean = True
self.property_maps["sky_background"].do_weighted_mean = True
self.property_maps["dcr_dra"].do_weighted_mean = True
self.property_maps["dcr_ddec"].do_weighted_mean = True
self.property_maps["dcr_e1"].do_weighted_mean = True
self.property_maps["dcr_e2"].do_weighted_mean = True
self.property_maps["epoch"].do_mean = True
self.property_maps["epoch"].do_min = True
self.property_maps["epoch"].do_max = True


class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
ConfigClass = ConsolidateHealSparsePropertyMapConfig
_DefaultName = "consolidateHealSparsePropertyMapTask"

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.property_maps = PropertyMapMap()
    for name, config, PropertyMapClass in self.config.property_maps.apply():
        self.property_maps[name] = PropertyMapClass(config, name)

@timeMethod
def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    sky_map = inputs.pop("sky_map")

    # These need to be consolidated one at a time to conserve memory.
    for name in self.config.property_maps.names:
        for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
            map_type = f"{name}_map_{type_}"
            if map_type in inputs:
                input_refs = {ref.dataId['tract']: ref
                              for ref in inputs[map_type]}
                consolidated_map = self.consolidate_map(sky_map, input_refs)
                butlerQC.put(consolidated_map,
                             getattr(outputRefs, f"{name}_consolidated_map_{type_}"))

def consolidate_map(self, sky_map, input_refs):

Definition at line 518 of file healSparseMapping.py.

◆ tract

lsst.pipe.tasks.healSparseMapping.tract : `int`

Definition at line 519 of file healSparseMapping.py.

◆ tract_info

lsst.pipe.tasks.healSparseMapping.tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
                  x in lonlats])
return radec

def _compute_nside_coverage_tract(self, tract_info):

Definition at line 705 of file healSparseMapping.py.

◆ vertices

lsst.pipe.tasks.healSparseMapping.vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
photo_calib = ccd_row.getPhotoCalib()
bf = photo_calib.computeScaledCalibration()
if bf.getBBox() == ccd_row.getBBox():
    # Track variable calibration over the detector
    calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
else:
    # Spatially constant calibration
    calib_scale = photo_calib.getCalibrationMean()

return calib_scale

def _vertices_to_radec(self, vertices):

Definition at line 687 of file healSparseMapping.py.

◆ visit_summary_dict

lsst.pipe.tasks.healSparseMapping.visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]

Definition at line 527 of file healSparseMapping.py.

◆ x

lsst.pipe.tasks.healSparseMapping.x : `np.ndarray`

Definition at line 661 of file healSparseMapping.py.

◆ y

lsst.pipe.tasks.healSparseMapping.y : `np.ndarray`

Definition at line 663 of file healSparseMapping.py.