LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
healSparseMapping.py
Go to the documentation of this file.
2# LSST Data Management System
3# Copyright 2008-2021 AURA/LSST.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <http://www.lsstcorp.org/LegalNotices/>.
21#
22from collections import defaultdict
23import warnings
24import numbers
25import numpy as np
26import healpy as hp
27import healsparse as hsp
28
29import lsst.pex.config as pexConfig
30import lsst.pipe.base as pipeBase
31import lsst.geom
32import lsst.afw.geom as afwGeom
33from lsst.daf.butler import Formatter
34from lsst.skymap import BaseSkyMap
35from lsst.utils.timer import timeMethod
36from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
37 PropertyMapMap, compute_approx_psf_size_and_shape)
38
39
40__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
41 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
42 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
43 "ConsolidateHealSparsePropertyMapConnections",
44 "ConsolidateHealSparsePropertyMapConfig",
45 "ConsolidateHealSparsePropertyMapTask"]
46
47
48class HealSparseMapFormatter(Formatter):
49 """Interface for reading and writing healsparse.HealSparseMap files."""
50 unsupportedParameters = frozenset()
51 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
52 extension = '.hsp'
53
54 def read(self, component=None):
55 # Docstring inherited from Formatter.read.
56 path = self.fileDescriptor.location.path
57
58 if component == 'coverage':
59 try:
60 data = hsp.HealSparseCoverage.read(path)
61 except (OSError, RuntimeError):
62 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
63
64 return data
65
66 if self.fileDescriptor.parameters is None:
67 pixels = None
68 degrade_nside = None
69 else:
70 pixels = self.fileDescriptor.parameters.get('pixels', None)
71 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
72 try:
73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
74 except (OSError, RuntimeError):
75 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
76
77 return data
78
79 def write(self, inMemoryDataset):
80 # Docstring inherited from Formatter.write.
81 # Update the location with the formatter-preferred file extension
82 self.fileDescriptor.location.updateExtension(self.extensionextension)
83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
84
85
86def _is_power_of_two(value):
87 """Check that value is a power of two.
88
89 Parameters
90 ----------
91 value : `int`
92 Value to check.
93
94 Returns
95 -------
96 is_power_of_two : `bool`
97 True if value is a power of two; False otherwise, or
98 if value is not an integer.
99 """
100 if not isinstance(value, numbers.Integral):
101 return False
102
103 # See https://stackoverflow.com/questions/57025836
104 # Every power of 2 has exactly 1 bit set to 1; subtracting
105 # 1 therefore flips every preceding bit. If you and that
106 # together with the original value it must be 0.
107 return (value & (value - 1) == 0) and value != 0
108
109
110class HealSparseInputMapConfig(pexConfig.Config):
111 """Configuration parameters for HealSparseInputMapTask"""
112 nside = pexConfig.Field(
113 doc="Mapping healpix nside. Must be power of 2.",
114 dtype=int,
115 default=32768,
116 check=_is_power_of_two,
117 )
118 nside_coverage = pexConfig.Field(
119 doc="HealSparse coverage map nside. Must be power of 2.",
120 dtype=int,
121 default=256,
122 check=_is_power_of_two,
123 )
124 bad_mask_min_coverage = pexConfig.Field(
125 doc=("Minimum area fraction of a map healpixel pixel that must be "
126 "covered by bad pixels to be removed from the input map. "
127 "This is approximate."),
128 dtype=float,
129 default=0.5,
130 )
131
132
133class HealSparseInputMapTask(pipeBase.Task):
134 """Task for making a HealSparse input map."""
135
136 ConfigClass = HealSparseInputMapConfig
137 _DefaultName = "healSparseInputMap"
138
139 def __init__(self, **kwargs):
140 pipeBase.Task.__init__(self, **kwargs)
141
142 self.ccd_input_mapccd_input_map = None
143
144 def build_ccd_input_map(self, bbox, wcs, ccds):
145 """Build a map from ccd valid polygons or bounding boxes.
146
147 Parameters
148 ----------
149 bbox : `lsst.geom.Box2I`
150 Bounding box for region to build input map.
152 WCS object for region to build input map.
154 Exposure catalog with ccd data from coadd inputs.
155 """
156 with warnings.catch_warnings():
157 # Healsparse will emit a warning if nside coverage is greater than
158 # 128. In the case of generating patch input maps, and not global
159 # maps, high nside coverage works fine, so we can suppress this
160 # warning.
161 warnings.simplefilter("ignore")
162 self.ccd_input_mapccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
163 nside_sparse=self.config.nside,
164 dtype=hsp.WIDE_MASK,
165 wide_mask_maxbits=len(ccds))
166 self._wcs_wcs = wcs
167 self._bbox_bbox = bbox
168 self._ccds_ccds = ccds
169
170 pixel_scale = wcs.getPixelScale().asArcseconds()
171 hpix_area_arcsec2 = hp.nside2pixarea(self.config.nside, degrees=True)*(3600.**2.)
172 self._min_bad_min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
173
174 metadata = {}
175 self._bits_per_visit_ccd_bits_per_visit_ccd = {}
176 self._bits_per_visit_bits_per_visit = defaultdict(list)
177 for bit, ccd_row in enumerate(ccds):
178 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
179 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
180 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
181
182 self._bits_per_visit_ccd_bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
183 self._bits_per_visit_bits_per_visit[ccd_row["visit"]].append(bit)
184
185 ccd_poly = ccd_row.getValidPolygon()
186 if ccd_poly is None:
187 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
188 # Detectors need to be rendered with their own wcs.
189 ccd_poly_radec = self._pixels_to_radec_pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
190
191 # Create a ccd healsparse polygon
192 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
193 dec=ccd_poly_radec[: -1, 1],
194 value=[bit])
195 self.ccd_input_mapccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_mapccd_input_map.nside_sparse),
196 [bit])
197
198 # Cut down to the overall bounding box with associated wcs.
199 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
200 bbox_poly_radec = self._pixels_to_radec_pixels_to_radec(self._wcs_wcs,
201 bbox_afw_poly.convexHull().getVertices())
202 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
203 value=np.arange(self.ccd_input_mapccd_input_map.wide_mask_maxbits))
204 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_mapccd_input_map)
205 self.ccd_input_mapccd_input_map = hsp.and_intersection([self.ccd_input_mapccd_input_map, bbox_poly_map])
206 self.ccd_input_mapccd_input_map.metadata = metadata
207
208 # Create a temporary map to hold the count of bad pixels in each healpix pixel
209 self._ccd_input_pixels_ccd_input_pixels = self.ccd_input_mapccd_input_map.valid_pixels
210
211 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit_bits_per_visit.keys()]
212
213 with warnings.catch_warnings():
214 # Healsparse will emit a warning if nside coverage is greater than
215 # 128. In the case of generating patch input maps, and not global
216 # maps, high nside coverage works fine, so we can suppress this
217 # warning.
218 warnings.simplefilter("ignore")
219 self._ccd_input_bad_count_map_ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
220 nside_coverage=self.config.nside_coverage,
221 nside_sparse=self.config.nside,
222 dtype=dtype,
223 primary=dtype[0][0])
224
225 # Don't set input bad map if there are no ccds which overlap the bbox.
226 if len(self._ccd_input_pixels_ccd_input_pixels) > 0:
227 self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels] = np.zeros(1, dtype=dtype)
228
229 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
230 """Mask a subregion from a visit.
231 This must be run after build_ccd_input_map initializes
232 the overall map.
233
234 Parameters
235 ----------
236 bbox : `lsst.geom.Box2I`
237 Bounding box from region to mask.
238 visit : `int`
239 Visit number corresponding to warp with mask.
240 mask : `lsst.afw.image.MaskX`
241 Mask plane from warp exposure.
242 bit_mask_value : `int`
243 Bit mask to check for bad pixels.
244
245 Raises
246 ------
247 RuntimeError : Raised if build_ccd_input_map was not run first.
248 """
249 if self.ccd_input_mapccd_input_map is None:
250 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
251
252 # Find the bad pixels and convert to healpix
253 bad_pixels = np.where(mask.array & bit_mask_value)
254 if len(bad_pixels[0]) == 0:
255 # No bad pixels
256 return
257
258 # Bad pixels come from warps which use the overall wcs.
259 bad_ra, bad_dec = self._wcs_wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
260 bad_pixels[0].astype(np.float64),
261 degrees=True)
262 bad_hpix = hp.ang2pix(self.config.nside, bad_ra, bad_dec,
263 lonlat=True, nest=True)
264
265 # Count the number of bad image pixels in each healpix pixel
266 min_bad_hpix = bad_hpix.min()
267 bad_hpix_count = np.zeros(bad_hpix.max() - min_bad_hpix + 1, dtype=np.int32)
268 np.add.at(bad_hpix_count, bad_hpix - min_bad_hpix, 1)
269
270 # Add these to the accumulator map.
271 # We need to make sure that the "primary" array has valid values for
272 # this pixel to be registered in the accumulator map.
273 pix_to_add, = np.where(bad_hpix_count > 0)
274 count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add]
275 primary = self._ccd_input_bad_count_map_ccd_input_bad_count_map.primary
276 count_map_arr[primary] = np.clip(count_map_arr[primary], 0, None)
277
278 count_map_arr[f"v{visit}"] = np.clip(count_map_arr[f"v{visit}"], 0, None)
279 count_map_arr[f"v{visit}"] += bad_hpix_count[pix_to_add]
280
281 self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add] = count_map_arr
282
284 """Use accumulated mask information to finalize the masking of
285 ccd_input_map.
286
287 Raises
288 ------
289 RuntimeError : Raised if build_ccd_input_map was not run first.
290 """
291 if self.ccd_input_mapccd_input_map is None:
292 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
293
294 count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels]
295 for visit in self._bits_per_visit_bits_per_visit:
296 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad_min_bad)
297 if to_mask.size == 0:
298 continue
299 self.ccd_input_mapccd_input_map.clear_bits_pix(self._ccd_input_pixels_ccd_input_pixels[to_mask],
300 self._bits_per_visit_bits_per_visit[visit])
301
302 # Clear memory
303 self._ccd_input_bad_count_map_ccd_input_bad_count_map = None
304
305 def _pixels_to_radec(self, wcs, pixels):
306 """Convert pixels to ra/dec positions using a wcs.
307
308 Parameters
309 ----------
311 WCS object.
312 pixels : `list` [`lsst.geom.Point2D`]
313 List of pixels to convert.
314
315 Returns
316 -------
317 radec : `numpy.ndarray`
318 Nx2 array of ra/dec positions associated with pixels.
319 """
320 sph_pts = wcs.pixelToSky(pixels)
321 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
322 for sph in sph_pts])
323
324
325class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
326 dimensions=("tract", "band", "skymap",),
327 defaultTemplates={"coaddName": "deep",
328 "calexpType": ""}):
329 input_maps = pipeBase.connectionTypes.Input(
330 doc="Healsparse bit-wise coadd input maps",
331 name="{coaddName}Coadd_inputMap",
332 storageClass="HealSparseMap",
333 dimensions=("tract", "patch", "skymap", "band"),
334 multiple=True,
335 deferLoad=True,
336 )
337 coadd_exposures = pipeBase.connectionTypes.Input(
338 doc="Coadded exposures associated with input_maps",
339 name="{coaddName}Coadd",
340 storageClass="ExposureF",
341 dimensions=("tract", "patch", "skymap", "band"),
342 multiple=True,
343 deferLoad=True,
344 )
345 visit_summaries = pipeBase.connectionTypes.Input(
346 doc="Visit summary tables with aggregated statistics",
347 name="{calexpType}visitSummary",
348 storageClass="ExposureCatalog",
349 dimensions=("instrument", "visit"),
350 multiple=True,
351 deferLoad=True,
352 )
353 sky_map = pipeBase.connectionTypes.Input(
354 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
355 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
356 storageClass="SkyMap",
357 dimensions=("skymap",),
358 )
359
360 # Create output connections for all possible maps defined in the
361 # registry. The vars() trick used here allows us to set class attributes
362 # programatically. Taken from
363 # https://stackoverflow.com/questions/2519807/
364 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
365 for name in BasePropertyMap.registry:
366 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
367 doc=f"Minimum-value map of {name}",
368 name=f"{{coaddName}}Coadd_{name}_map_min",
369 storageClass="HealSparseMap",
370 dimensions=("tract", "skymap", "band"),
371 )
372 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
373 doc=f"Maximum-value map of {name}",
374 name=f"{{coaddName}}Coadd_{name}_map_max",
375 storageClass="HealSparseMap",
376 dimensions=("tract", "skymap", "band"),
377 )
378 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
379 doc=f"Mean-value map of {name}",
380 name=f"{{coaddName}}Coadd_{name}_map_mean",
381 storageClass="HealSparseMap",
382 dimensions=("tract", "skymap", "band"),
383 )
384 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
385 doc=f"Weighted mean-value map of {name}",
386 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
387 storageClass="HealSparseMap",
388 dimensions=("tract", "skymap", "band"),
389 )
390 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
391 doc=f"Sum-value map of {name}",
392 name=f"{{coaddName}}Coadd_{name}_map_sum",
393 storageClass="HealSparseMap",
394 dimensions=("tract", "skymap", "band"),
395 )
396
397 def __init__(self, *, config=None):
398 super().__init__(config=config)
399
400 # Not all possible maps in the registry will be configured to run.
401 # Here we remove the unused connections.
402 for name in BasePropertyMap.registry:
403 if name not in config.property_maps:
404 prop_config = BasePropertyMapConfig()
405 prop_config.do_min = False
406 prop_config.do_max = False
407 prop_config.do_mean = False
408 prop_config.do_weighted_mean = False
409 prop_config.do_sum = False
410 else:
411 prop_config = config.property_maps[name]
412
413 if not prop_config.do_min:
414 self.outputs.remove(f"{name}_map_min")
415 if not prop_config.do_max:
416 self.outputs.remove(f"{name}_map_max")
417 if not prop_config.do_mean:
418 self.outputs.remove(f"{name}_map_mean")
419 if not prop_config.do_weighted_mean:
420 self.outputs.remove(f"{name}_map_weighted_mean")
421 if not prop_config.do_sum:
422 self.outputs.remove(f"{name}_map_sum")
423
424
425class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
426 pipelineConnections=HealSparsePropertyMapConnections):
427 """Configuration parameters for HealSparsePropertyMapTask"""
428 property_maps = BasePropertyMap.registry.makeField(
429 multi=True,
430 default=["exposure_time",
431 "psf_size",
432 "psf_e1",
433 "psf_e2",
434 "psf_maglim",
435 "sky_noise",
436 "sky_background",
437 "dcr_dra",
438 "dcr_ddec",
439 "dcr_e1",
440 "dcr_e2"],
441 doc="Property map computation objects",
442 )
443
444 def setDefaults(self):
445 self.property_maps["exposure_time"].do_sum = True
446 self.property_maps["psf_size"].do_weighted_mean = True
447 self.property_maps["psf_e1"].do_weighted_mean = True
448 self.property_maps["psf_e2"].do_weighted_mean = True
449 self.property_maps["psf_maglim"].do_weighted_mean = True
450 self.property_maps["sky_noise"].do_weighted_mean = True
451 self.property_maps["sky_background"].do_weighted_mean = True
452 self.property_maps["dcr_dra"].do_weighted_mean = True
453 self.property_maps["dcr_ddec"].do_weighted_mean = True
454 self.property_maps["dcr_e1"].do_weighted_mean = True
455 self.property_maps["dcr_e2"].do_weighted_mean = True
456
457
458class HealSparsePropertyMapTask(pipeBase.PipelineTask):
459 """Task to compute Healsparse property maps.
460
461 This task will compute individual property maps (per tract, per
462 map type, per band). These maps cover the full coadd tract, and
463 are not truncated to the inner tract region.
464 """
465 ConfigClass = HealSparsePropertyMapConfig
466 _DefaultName = "healSparsePropertyMapTask"
467
468 def __init__(self, **kwargs):
469 super().__init__(**kwargs)
470 self.property_maps = PropertyMapMap()
471 for name, config, PropertyMapClass in self.config.property_maps.apply():
472 self.property_maps[name] = PropertyMapClass(config, name)
473
474 @timeMethod
475 def runQuantum(self, butlerQC, inputRefs, outputRefs):
476 inputs = butlerQC.get(inputRefs)
477
478 sky_map = inputs.pop("sky_map")
479
480 tract = butlerQC.quantum.dataId["tract"]
481 band = butlerQC.quantum.dataId["band"]
482
483 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
484 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
485
486 visit_summary_dict = {ref.dataId["visit"]: ref.get()
487 for ref in inputs["visit_summaries"]}
488
489 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
490
491 # Write the outputs
492 for name, property_map in self.property_maps.items():
493 if property_map.config.do_min:
494 butlerQC.put(property_map.min_map,
495 getattr(outputRefs, f"{name}_map_min"))
496 if property_map.config.do_max:
497 butlerQC.put(property_map.max_map,
498 getattr(outputRefs, f"{name}_map_max"))
499 if property_map.config.do_mean:
500 butlerQC.put(property_map.mean_map,
501 getattr(outputRefs, f"{name}_map_mean"))
502 if property_map.config.do_weighted_mean:
503 butlerQC.put(property_map.weighted_mean_map,
504 getattr(outputRefs, f"{name}_map_weighted_mean"))
505 if property_map.config.do_sum:
506 butlerQC.put(property_map.sum_map,
507 getattr(outputRefs, f"{name}_map_sum"))
508
509 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
510 """Run the healsparse property task.
511
512 Parameters
513 ----------
514 sky_map : Sky map object
515 tract : `int`
516 Tract number.
517 band : `str`
518 Band name for logging.
519 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
520 Dictionary of coadd exposure references. Keys are patch numbers.
521 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
522 Dictionary of input map references. Keys are patch numbers.
523 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
524 Dictionary of visit summary tables. Keys are visit numbers.
525
526 Raises
527 ------
528 RepeatableQuantumError
529 If visit_summary_dict is missing any visits or detectors found in an
530 input map. This leads to an inconsistency between what is in the coadd
531 (via the input map) and the visit summary tables which contain data
532 to compute the maps.
533 """
534 tract_info = sky_map[tract]
535
536 tract_maps_initialized = False
537
538 for patch in input_map_dict.keys():
539 self.log.info("Making maps for band %s, tract %d, patch %d.",
540 band, tract, patch)
541
542 patch_info = tract_info[patch]
543
544 input_map = input_map_dict[patch].get()
545 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
546 coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
547
548 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
549
550 # Crop input_map to the inner polygon of the patch
551 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
552 patch_radec = self._vertices_to_radec(poly_vertices)
553 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
554 value=np.arange(input_map.wide_mask_maxbits))
555 patch_poly_map = patch_poly.get_map_like(input_map)
556 input_map = hsp.and_intersection([input_map, patch_poly_map])
557
558 if not tract_maps_initialized:
559 # We use the first input map nside information to initialize
560 # the tract maps
561 nside_coverage = self._compute_nside_coverage_tract(tract_info)
562 nside = input_map.nside_sparse
563
564 do_compute_approx_psf = False
565 # Initialize the tract maps
566 for property_map in self.property_maps:
567 property_map.initialize_tract_maps(nside_coverage, nside)
568 if property_map.requires_psf:
569 do_compute_approx_psf = True
570
571 tract_maps_initialized = True
572
573 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
574
575 # Check if there are no valid pixels for the inner (unique) patch region
576 if valid_pixels.size == 0:
577 continue
578
579 # Initialize the value accumulators
580 for property_map in self.property_maps:
581 property_map.initialize_values(valid_pixels.size)
582 property_map.zeropoint = coadd_zeropoint
583
584 # Initialize the weight and counter accumulators
585 total_weights = np.zeros(valid_pixels.size)
586 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
587
588 for bit, ccd_row in enumerate(coadd_inputs.ccds):
589 # Which pixels in the map are used by this visit/detector
590 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
591
592 # Check if there are any valid pixels in the map from this deteector.
593 if inmap.size == 0:
594 continue
595
596 # visit, detector_id, weight = input_dict[bit]
597 visit = ccd_row["visit"]
598 detector_id = ccd_row["ccd"]
599 weight = ccd_row["weight"]
600
601 x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
602 scalings = self._compute_calib_scale(ccd_row, x, y)
603
604 if do_compute_approx_psf:
605 psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
606 else:
607 psf_array = None
608
609 total_weights[inmap] += weight
610 total_inputs[inmap] += 1
611
612 # Retrieve the correct visitSummary row
613 if visit not in visit_summary_dict:
614 msg = f"Visit {visit} not found in visit_summaries."
615 raise pipeBase.RepeatableQuantumError(msg)
616 row = visit_summary_dict[visit].find(detector_id)
617 if row is None:
618 msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
619 raise pipeBase.RepeatableQuantumError(msg)
620
621 # Accumulate the values
622 for property_map in self.property_maps:
623 property_map.accumulate_values(inmap,
624 vpix_ra[inmap],
625 vpix_dec[inmap],
626 weight,
627 scalings,
628 row,
629 psf_array=psf_array)
630
631 # Finalize the mean values and set the tract maps
632 for property_map in self.property_maps:
633 property_map.finalize_mean_values(total_weights, total_inputs)
634 property_map.set_map_values(valid_pixels)
635
636 def _compute_calib_scale(self, ccd_row, x, y):
637 """Compute calibration scaling values.
638
639 Parameters
640 ----------
642 Exposure metadata for a given detector exposure.
643 x : `np.ndarray`
644 Array of x positions.
645 y : `np.ndarray`
646 Array of y positions.
647
648 Returns
649 -------
650 calib_scale : `np.ndarray`
651 Array of calibration scale values.
652 """
653 photo_calib = ccd_row.getPhotoCalib()
654 bf = photo_calib.computeScaledCalibration()
655 if bf.getBBox() == ccd_row.getBBox():
656 # Track variable calibration over the detector
657 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
658 else:
659 # Spatially constant calibration
660 calib_scale = photo_calib.getCalibrationMean()
661
662 return calib_scale
663
664 def _vertices_to_radec(self, vertices):
665 """Convert polygon vertices to ra/dec.
666
667 Parameters
668 ----------
669 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
670 Vertices for bounding polygon.
671
672 Returns
673 -------
674 radec : `numpy.ndarray`
675 Nx2 array of ra/dec positions (in degrees) associated with vertices.
676 """
677 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
678 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
679 x in lonlats])
680 return radec
681
682 def _compute_nside_coverage_tract(self, tract_info):
683 """Compute the optimal coverage nside for a tract.
684
685 Parameters
686 ----------
688 Tract information object.
689
690 Returns
691 -------
692 nside_coverage : `int`
693 Optimal coverage nside for a tract map.
694 """
695 num_patches = tract_info.getNumPatches()
696
697 # Compute approximate patch area
698 patch_info = tract_info.getPatchInfo(0)
699 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
700 radec = self._vertices_to_radec(vertices)
701 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
702 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
703 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
704
705 tract_area = num_patches[0]*num_patches[1]*patch_area
706 # Start with a fairly low nside and increase until we find the approximate area.
707 nside_coverage_tract = 32
708 while hp.nside2pixarea(nside_coverage_tract, degrees=True) > tract_area:
709 nside_coverage_tract = 2*nside_coverage_tract
710 # Step back one, but don't go bigger pixels than nside=32 or smaller
711 # than 128 (recommended by healsparse).
712 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
713
714 return nside_coverage_tract
715
716
717class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
718 dimensions=("band", "skymap",),
719 defaultTemplates={"coaddName": "deep"}):
720 sky_map = pipeBase.connectionTypes.Input(
721 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
722 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
723 storageClass="SkyMap",
724 dimensions=("skymap",),
725 )
726
727 # Create output connections for all possible maps defined in the
728 # registry. The vars() trick used here allows us to set class attributes
729 # programatically. Taken from
730 # https://stackoverflow.com/questions/2519807/
731 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
732 for name in BasePropertyMap.registry:
733 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
734 doc=f"Minimum-value map of {name}",
735 name=f"{{coaddName}}Coadd_{name}_map_min",
736 storageClass="HealSparseMap",
737 dimensions=("tract", "skymap", "band"),
738 multiple=True,
739 deferLoad=True,
740 )
741 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
742 doc=f"Minumum-value map of {name}",
743 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
744 storageClass="HealSparseMap",
745 dimensions=("skymap", "band"),
746 )
747 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
748 doc=f"Maximum-value map of {name}",
749 name=f"{{coaddName}}Coadd_{name}_map_max",
750 storageClass="HealSparseMap",
751 dimensions=("tract", "skymap", "band"),
752 multiple=True,
753 deferLoad=True,
754 )
755 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
756 doc=f"Minumum-value map of {name}",
757 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
758 storageClass="HealSparseMap",
759 dimensions=("skymap", "band"),
760 )
761 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
762 doc=f"Mean-value map of {name}",
763 name=f"{{coaddName}}Coadd_{name}_map_mean",
764 storageClass="HealSparseMap",
765 dimensions=("tract", "skymap", "band"),
766 multiple=True,
767 deferLoad=True,
768 )
769 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
770 doc=f"Minumum-value map of {name}",
771 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
772 storageClass="HealSparseMap",
773 dimensions=("skymap", "band"),
774 )
775 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
776 doc=f"Weighted mean-value map of {name}",
777 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
778 storageClass="HealSparseMap",
779 dimensions=("tract", "skymap", "band"),
780 multiple=True,
781 deferLoad=True,
782 )
783 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
784 doc=f"Minumum-value map of {name}",
785 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
786 storageClass="HealSparseMap",
787 dimensions=("skymap", "band"),
788 )
789 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
790 doc=f"Sum-value map of {name}",
791 name=f"{{coaddName}}Coadd_{name}_map_sum",
792 storageClass="HealSparseMap",
793 dimensions=("tract", "skymap", "band"),
794 multiple=True,
795 deferLoad=True,
796 )
797 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
798 doc=f"Minumum-value map of {name}",
799 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
800 storageClass="HealSparseMap",
801 dimensions=("skymap", "band"),
802 )
803
804 def __init__(self, *, config=None):
805 super().__init__(config=config)
806
807 # Not all possible maps in the registry will be configured to run.
808 # Here we remove the unused connections.
809 for name in BasePropertyMap.registry:
810 if name not in config.property_maps:
811 prop_config = BasePropertyMapConfig()
812 prop_config.do_min = False
813 prop_config.do_max = False
814 prop_config.do_mean = False
815 prop_config.do_weighted_mean = False
816 prop_config.do_sum = False
817 else:
818 prop_config = config.property_maps[name]
819
820 if not prop_config.do_min:
821 self.inputs.remove(f"{name}_map_min")
822 self.outputs.remove(f"{name}_consolidated_map_min")
823 if not prop_config.do_max:
824 self.inputs.remove(f"{name}_map_max")
825 self.outputs.remove(f"{name}_consolidated_map_max")
826 if not prop_config.do_mean:
827 self.inputs.remove(f"{name}_map_mean")
828 self.outputs.remove(f"{name}_consolidated_map_mean")
829 if not prop_config.do_weighted_mean:
830 self.inputs.remove(f"{name}_map_weighted_mean")
831 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
832 if not prop_config.do_sum:
833 self.inputs.remove(f"{name}_map_sum")
834 self.outputs.remove(f"{name}_consolidated_map_sum")
835
836
837class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
838 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
839 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
840 property_maps = BasePropertyMap.registry.makeField(
841 multi=True,
842 default=["exposure_time",
843 "psf_size",
844 "psf_e1",
845 "psf_e2",
846 "psf_maglim",
847 "sky_noise",
848 "sky_background",
849 "dcr_dra",
850 "dcr_ddec",
851 "dcr_e1",
852 "dcr_e2"],
853 doc="Property map computation objects",
854 )
855 nside_coverage = pexConfig.Field(
856 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
857 dtype=int,
858 default=32,
859 check=_is_power_of_two,
860 )
861
862 def setDefaults(self):
863 self.property_maps["exposure_time"].do_sum = True
864 self.property_maps["psf_size"].do_weighted_mean = True
865 self.property_maps["psf_e1"].do_weighted_mean = True
866 self.property_maps["psf_e2"].do_weighted_mean = True
867 self.property_maps["psf_maglim"].do_weighted_mean = True
868 self.property_maps["sky_noise"].do_weighted_mean = True
869 self.property_maps["sky_background"].do_weighted_mean = True
870 self.property_maps["dcr_dra"].do_weighted_mean = True
871 self.property_maps["dcr_ddec"].do_weighted_mean = True
872 self.property_maps["dcr_e1"].do_weighted_mean = True
873 self.property_maps["dcr_e2"].do_weighted_mean = True
874
875
876class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
877 """Task to consolidate HealSparse property maps.
878
879 This task will take all the individual tract-based maps (per map type,
880 per band) and consolidate them into one survey-wide map (per map type,
881 per band). Each tract map is truncated to its inner region before
882 consolidation.
883 """
884 ConfigClass = ConsolidateHealSparsePropertyMapConfig
885 _DefaultName = "consolidateHealSparsePropertyMapTask"
886
887 def __init__(self, **kwargs):
888 super().__init__(**kwargs)
889 self.property_maps = PropertyMapMap()
890 for name, config, PropertyMapClass in self.config.property_maps.apply():
891 self.property_maps[name] = PropertyMapClass(config, name)
892
893 @timeMethod
894 def runQuantum(self, butlerQC, inputRefs, outputRefs):
895 inputs = butlerQC.get(inputRefs)
896
897 sky_map = inputs.pop("sky_map")
898
899 # These need to be consolidated one at a time to conserve memory.
900 for name in self.config.property_maps.names:
901 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
902 map_type = f"{name}_map_{type_}"
903 if map_type in inputs:
904 input_refs = {ref.dataId['tract']: ref
905 for ref in inputs[map_type]}
906 consolidated_map = self.consolidate_map(sky_map, input_refs)
907 butlerQC.put(consolidated_map,
908 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
909
910 def consolidate_map(self, sky_map, input_refs):
911 """Consolidate the healsparse property maps.
912
913 Parameters
914 ----------
915 sky_map : Sky map object
916 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
917 Dictionary of tract_id mapping to dataref.
918
919 Returns
920 -------
921 consolidated_map : `healsparse.HealSparseMap`
922 Consolidated HealSparse map.
923 """
924 # First, we read in the coverage maps to know how much memory
925 # to allocate
926 cov_mask = None
927 nside_coverage_inputs = None
928 for tract_id in input_refs:
929 cov = input_refs[tract_id].get(component='coverage')
930 if cov_mask is None:
931 cov_mask = cov.coverage_mask
932 nside_coverage_inputs = cov.nside_coverage
933 else:
934 cov_mask |= cov.coverage_mask
935
936 cov_pix_inputs, = np.where(cov_mask)
937
938 # Compute the coverage pixels for the desired nside_coverage
939 if nside_coverage_inputs == self.config.nside_coverage:
940 cov_pix = cov_pix_inputs
941 elif nside_coverage_inputs > self.config.nside_coverage:
942 # Converting from higher resolution coverage to lower
943 # resolution coverage.
944 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
945 nside_coverage_inputs)
946 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
947 else:
948 # Converting from lower resolution coverage to higher
949 # resolution coverage.
950 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
951 self.config.nside_coverage)
952 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
953
954 # Now read in each tract map and build the consolidated map.
955 consolidated_map = None
956 for tract_id in input_refs:
957 input_map = input_refs[tract_id].get()
958 if consolidated_map is None:
959 consolidated_map = hsp.HealSparseMap.make_empty(
960 self.config.nside_coverage,
961 input_map.nside_sparse,
962 input_map.dtype,
963 sentinel=input_map._sentinel,
964 cov_pixels=cov_pix)
965
966 # Only use pixels that are properly inside the tract.
967 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
968 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
969
970 in_tract = (vpix_tract_ids == tract_id)
971
972 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
973
974 return consolidated_map
std::vector< SchemaItem< Flag > > * items
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
Cartesian polygons.
Definition: Polygon.h:59
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
Record class used to store exposure metadata.
Definition: Exposure.h:79
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
An integer coordinate rectangle.
Definition: Box.h:55
def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition: LonLat.h:48
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Definition: UnitVector3d.h:55
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Definition: getTemplate.py:596
def compute_approx_psf_size_and_shape(ccd_row, ra, dec, nx=20, ny=20, orderx=2, ordery=2)