LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+1b3060144d,g18429d2f64+f642bf4753,g199a45376c+0ba108daf9,g1fd858c14a+2dcf163641,g262e1987ae+7b8c96d2ca,g29ae962dfc+3bd6ecb08a,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53e1a9e7c5,g4595892280+fef73a337f,g47891489e3+2efcf17695,g4d44eb3520+642b70b07e,g53246c7159+8c5ae1fdc5,g67b6fd64d1+2efcf17695,g67fd3c3899+b70e05ef52,g74acd417e5+317eb4c7d4,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+2efcf17695,g8d7436a09f+3be3c13596,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+a4e7b16d9b,g97be763408+ad77d7208f,g9dd6db0277+b70e05ef52,ga681d05dcb+a3f46e7fff,gabf8522325+735880ea63,gac2eed3f23+2efcf17695,gb89ab40317+2efcf17695,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+b70e05ef52,gdab6d2f7ff+317eb4c7d4,gdc713202bf+b70e05ef52,gdfd2d52018+b10e285e0f,ge365c994fd+310e8507c4,ge410e46f29+2efcf17695,geaed405ab2+562b3308c0,gffca2db377+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
healSparseMapping.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
28
29from collections import defaultdict
30import esutil
31import warnings
32import numbers
33import numpy as np
34import hpgeom as hpg
35import healsparse as hsp
36
37import lsst.pex.config as pexConfig
38import lsst.pipe.base as pipeBase
39import lsst.geom
40import lsst.afw.geom as afwGeom
41from lsst.daf.butler import Formatter
42from lsst.skymap import BaseSkyMap
43from lsst.utils.timer import timeMethod
44from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
45 PropertyMapMap, compute_approx_psf_size_and_shape)
46
47
48class HealSparseMapFormatter(Formatter):
49 """Interface for reading and writing healsparse.HealSparseMap files."""
50 unsupportedParameters = frozenset()
51 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
52 extension = '.hsp'
53
54 def read(self, component=None):
55 # Docstring inherited from Formatter.read.
56 path = self.fileDescriptor.location.path
57
58 if component == 'coverage':
59 try:
60 data = hsp.HealSparseCoverage.read(path)
61 except (OSError, RuntimeError):
62 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
63
64 return data
65
66 if self.fileDescriptor.parameters is None:
67 pixels = None
68 degrade_nside = None
69 else:
70 pixels = self.fileDescriptor.parameters.get('pixels', None)
71 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
72 try:
73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
74 except (OSError, RuntimeError):
75 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
76
77 return data
78
79 def write(self, inMemoryDataset):
80 # Docstring inherited from Formatter.write.
81 # Update the location with the formatter-preferred file extension
82 self.fileDescriptor.location.updateExtension(self.extension)
83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
84
85
87 """Check that value is a power of two.
88
89 Parameters
90 ----------
91 value : `int`
92 Value to check.
93
94 Returns
95 -------
96 is_power_of_two : `bool`
97 True if value is a power of two; False otherwise, or
98 if value is not an integer.
99 """
100 if not isinstance(value, numbers.Integral):
101 return False
102
103 # See https://stackoverflow.com/questions/57025836
104 # Every power of 2 has exactly 1 bit set to 1; subtracting
105 # 1 therefore flips every preceding bit. If you and that
106 # together with the original value it must be 0.
107 return (value & (value - 1) == 0) and value != 0
108
109
110class HealSparseInputMapConfig(pexConfig.Config):
111 """Configuration parameters for HealSparseInputMapTask"""
112 nside = pexConfig.Field(
113 doc="Mapping healpix nside. Must be power of 2.",
114 dtype=int,
115 default=32768,
116 check=_is_power_of_two,
117 )
118 nside_coverage = pexConfig.Field(
119 doc="HealSparse coverage map nside. Must be power of 2.",
120 dtype=int,
121 default=256,
122 check=_is_power_of_two,
123 )
124 bad_mask_min_coverage = pexConfig.Field(
125 doc=("Minimum area fraction of a map healpixel pixel that must be "
126 "covered by bad pixels to be removed from the input map. "
127 "This is approximate."),
128 dtype=float,
129 default=0.5,
130 )
131
132
133class HealSparseInputMapTask(pipeBase.Task):
134 """Task for making a HealSparse input map."""
135
136 ConfigClass = HealSparseInputMapConfig
137 _DefaultName = "healSparseInputMap"
138
139 def __init__(self, **kwargs):
140 pipeBase.Task.__init__(self, **kwargs)
141
142 self.ccd_input_map = None
143
144 def build_ccd_input_map(self, bbox, wcs, ccds):
145 """Build a map from ccd valid polygons or bounding boxes.
146
147 Parameters
148 ----------
149 bbox : `lsst.geom.Box2I`
150 Bounding box for region to build input map.
151 wcs : `lsst.afw.geom.SkyWcs`
152 WCS object for region to build input map.
153 ccds : `lsst.afw.table.ExposureCatalog`
154 Exposure catalog with ccd data from coadd inputs.
155 """
156 with warnings.catch_warnings():
157 # Healsparse will emit a warning if nside coverage is greater than
158 # 128. In the case of generating patch input maps, and not global
159 # maps, high nside coverage works fine, so we can suppress this
160 # warning.
161 warnings.simplefilter("ignore")
162 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
163 nside_sparse=self.config.nside,
164 dtype=hsp.WIDE_MASK,
165 wide_mask_maxbits=len(ccds))
166 self._wcs = wcs
167 self._bbox = bbox
168 self._ccds = ccds
169
170 pixel_scale = wcs.getPixelScale(bbox.getCenter()).asArcseconds()
171 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.)
172 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
173
174 metadata = {}
176 self._bits_per_visit = defaultdict(list)
177 for bit, ccd_row in enumerate(ccds):
178 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
179 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
180 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
181
182 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
183 self._bits_per_visit[ccd_row["visit"]].append(bit)
184
185 ccd_poly = ccd_row.getValidPolygon()
186 if ccd_poly is None:
187 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
188 # Detectors need to be rendered with their own wcs.
189 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
190
191 # Create a ccd healsparse polygon
192 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
193 dec=ccd_poly_radec[: -1, 1],
194 value=[bit])
195 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse),
196 [bit])
197
198 # Cut down to the overall bounding box with associated wcs.
199 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
200 bbox_poly_radec = self._pixels_to_radec(self._wcs,
201 bbox_afw_poly.convexHull().getVertices())
202 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
203 value=np.arange(self.ccd_input_map.wide_mask_maxbits))
204 with warnings.catch_warnings():
205 warnings.simplefilter("ignore")
206 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
207 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
208 self.ccd_input_map.metadata = metadata
209
210 # Create a temporary map to hold the count of bad pixels in each healpix pixel
211 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()]
212
213 with warnings.catch_warnings():
214 # Healsparse will emit a warning if nside coverage is greater than
215 # 128. In the case of generating patch input maps, and not global
216 # maps, high nside coverage works fine, so we can suppress this
217 # warning.
218 warnings.simplefilter("ignore")
219 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
220 nside_coverage=self.config.nside_coverage,
221 nside_sparse=self.config.nside,
222 dtype=dtype,
223 primary=dtype[0][0])
224
225 self._ccd_input_pixels = self.ccd_input_map.valid_pixels
226
227 # Don't set input bad map if there are no ccds which overlap the bbox.
228 if len(self._ccd_input_pixels) > 0:
229 # Ensure these are sorted.
230 self._ccd_input_pixels = np.sort(self._ccd_input_pixels)
231
232 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype)
233
234 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
235 """Mask a subregion from a visit.
236 This must be run after build_ccd_input_map initializes
237 the overall map.
238
239 Parameters
240 ----------
241 bbox : `lsst.geom.Box2I`
242 Bounding box from region to mask.
243 visit : `int`
244 Visit number corresponding to warp with mask.
245 mask : `lsst.afw.image.MaskX`
246 Mask plane from warp exposure.
247 bit_mask_value : `int`
248 Bit mask to check for bad pixels.
249
250 Raises
251 ------
252 RuntimeError : Raised if build_ccd_input_map was not run first.
253 """
254 if self.ccd_input_map is None:
255 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
256
257 if len(self._ccd_input_pixels) == 0:
258 # This tract has no coverage, so there is nothing to do.
259 return
260
261 # Find the bad pixels and convert to healpix
262 bad_pixels = np.where(mask.array & bit_mask_value)
263 if len(bad_pixels[0]) == 0:
264 # No bad pixels
265 return
266
267 # Bad pixels come from warps which use the overall wcs.
268 bad_ra, bad_dec = self._wcs.pixelToSkyArray(
269 bad_pixels[1].astype(np.float64) + bbox.getMinX(),
270 bad_pixels[0].astype(np.float64) + bbox.getMinY(),
271 degrees=True,
272 )
273 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
274
275 # Check if any of these "bad" pixels are in the valid footprint.
276 match_input, match_bad = esutil.numpy_util.match(self._ccd_input_pixels, bad_hpix, presorted=True)
277 if len(match_bad) == 0:
278 return
279
280 bad_hpix = bad_hpix[match_bad]
281
282 # Create a view of the column we need to add to.
283 count_map_visit = self._ccd_input_bad_count_map[f"v{visit}"]
284 # Add the bad pixels to the accumulator. Note that the view
285 # cannot append pixels, but the match above ensures we are
286 # only adding to pixels that are already in the coverage
287 # map and initialized.
288 count_map_visit.update_values_pix(bad_hpix, 1, operation="add")
289
291 """Use accumulated mask information to finalize the masking of
292 ccd_input_map.
293
294 Raises
295 ------
296 RuntimeError : Raised if build_ccd_input_map was not run first.
297 """
298 if self.ccd_input_map is None:
299 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
300
301 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels]
302 for visit in self._bits_per_visit:
303 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad)
304 if to_mask.size == 0:
305 continue
306 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask],
307 self._bits_per_visit[visit])
308
309 # Clear memory
310 self._ccd_input_bad_count_map = None
311
312 def _pixels_to_radec(self, wcs, pixels):
313 """Convert pixels to ra/dec positions using a wcs.
314
315 Parameters
316 ----------
317 wcs : `lsst.afw.geom.SkyWcs`
318 WCS object.
319 pixels : `list` [`lsst.geom.Point2D`]
320 List of pixels to convert.
321
322 Returns
323 -------
324 radec : `numpy.ndarray`
325 Nx2 array of ra/dec positions associated with pixels.
326 """
327 sph_pts = wcs.pixelToSky(pixels)
328 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
329 for sph in sph_pts])
330
331
332class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
333 dimensions=("tract", "band", "skymap",),
334 defaultTemplates={"coaddName": "deep",
335 "calexpType": ""}):
336 input_maps = pipeBase.connectionTypes.Input(
337 doc="Healsparse bit-wise coadd input maps",
338 name="{coaddName}Coadd_inputMap",
339 storageClass="HealSparseMap",
340 dimensions=("tract", "patch", "skymap", "band"),
341 multiple=True,
342 deferLoad=True,
343 )
344 coadd_exposures = pipeBase.connectionTypes.Input(
345 doc="Coadded exposures associated with input_maps",
346 name="{coaddName}Coadd",
347 storageClass="ExposureF",
348 dimensions=("tract", "patch", "skymap", "band"),
349 multiple=True,
350 deferLoad=True,
351 )
352 visit_summaries = pipeBase.connectionTypes.Input(
353 doc="Visit summary tables with aggregated statistics",
354 name="finalVisitSummary",
355 storageClass="ExposureCatalog",
356 dimensions=("instrument", "visit"),
357 multiple=True,
358 deferLoad=True,
359 )
360 sky_map = pipeBase.connectionTypes.Input(
361 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
362 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
363 storageClass="SkyMap",
364 dimensions=("skymap",),
365 )
366
367 # Create output connections for all possible maps defined in the
368 # registry. The vars() trick used here allows us to set class attributes
369 # programatically. Taken from
370 # https://stackoverflow.com/questions/2519807/
371 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
372 for name in BasePropertyMap.registry:
373 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
374 doc=f"Minimum-value map of {name}",
375 name=f"{{coaddName}}Coadd_{name}_map_min",
376 storageClass="HealSparseMap",
377 dimensions=("tract", "skymap", "band"),
378 )
379 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
380 doc=f"Maximum-value map of {name}",
381 name=f"{{coaddName}}Coadd_{name}_map_max",
382 storageClass="HealSparseMap",
383 dimensions=("tract", "skymap", "band"),
384 )
385 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
386 doc=f"Mean-value map of {name}",
387 name=f"{{coaddName}}Coadd_{name}_map_mean",
388 storageClass="HealSparseMap",
389 dimensions=("tract", "skymap", "band"),
390 )
391 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
392 doc=f"Weighted mean-value map of {name}",
393 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
394 storageClass="HealSparseMap",
395 dimensions=("tract", "skymap", "band"),
396 )
397 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
398 doc=f"Sum-value map of {name}",
399 name=f"{{coaddName}}Coadd_{name}_map_sum",
400 storageClass="HealSparseMap",
401 dimensions=("tract", "skymap", "band"),
402 )
403
404 def __init__(self, *, config=None):
405 super().__init__(config=config)
406
407 # Not all possible maps in the registry will be configured to run.
408 # Here we remove the unused connections.
409 for name in BasePropertyMap.registry:
410 if name not in config.property_maps:
411 prop_config = BasePropertyMapConfig()
412 prop_config.do_min = False
413 prop_config.do_max = False
414 prop_config.do_mean = False
415 prop_config.do_weighted_mean = False
416 prop_config.do_sum = False
417 else:
418 prop_config = config.property_maps[name]
419
420 if not prop_config.do_min:
421 self.outputs.remove(f"{name}_map_min")
422 if not prop_config.do_max:
423 self.outputs.remove(f"{name}_map_max")
424 if not prop_config.do_mean:
425 self.outputs.remove(f"{name}_map_mean")
426 if not prop_config.do_weighted_mean:
427 self.outputs.remove(f"{name}_map_weighted_mean")
428 if not prop_config.do_sum:
429 self.outputs.remove(f"{name}_map_sum")
430
431
432class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
433 pipelineConnections=HealSparsePropertyMapConnections):
434 """Configuration parameters for HealSparsePropertyMapTask"""
435 property_maps = BasePropertyMap.registry.makeField(
436 multi=True,
437 default=["exposure_time",
438 "psf_size",
439 "psf_e1",
440 "psf_e2",
441 "psf_maglim",
442 "sky_noise",
443 "sky_background",
444 "dcr_dra",
445 "dcr_ddec",
446 "dcr_e1",
447 "dcr_e2",
448 "epoch"],
449 doc="Property map computation objects",
450 )
451
452 def setDefaults(self):
453 self.property_maps["exposure_time"].do_sum = True
454 self.property_maps["psf_size"].do_weighted_mean = True
455 self.property_maps["psf_e1"].do_weighted_mean = True
456 self.property_maps["psf_e2"].do_weighted_mean = True
457 self.property_maps["psf_maglim"].do_weighted_mean = True
458 self.property_maps["sky_noise"].do_weighted_mean = True
459 self.property_maps["sky_background"].do_weighted_mean = True
460 self.property_maps["dcr_dra"].do_weighted_mean = True
461 self.property_maps["dcr_ddec"].do_weighted_mean = True
462 self.property_maps["dcr_e1"].do_weighted_mean = True
463 self.property_maps["dcr_e2"].do_weighted_mean = True
464 self.property_maps["epoch"].do_mean = True
465 self.property_maps["epoch"].do_min = True
466 self.property_maps["epoch"].do_max = True
467
468
469class HealSparsePropertyMapTask(pipeBase.PipelineTask):
470 """Task to compute Healsparse property maps.
471
472 This task will compute individual property maps (per tract, per
473 map type, per band). These maps cover the full coadd tract, and
474 are not truncated to the inner tract region.
475 """
476 ConfigClass = HealSparsePropertyMapConfig
477 _DefaultName = "healSparsePropertyMapTask"
478
479 def __init__(self, **kwargs):
480 super().__init__(**kwargs)
481 self.property_maps = PropertyMapMap()
482 for name, config, PropertyMapClass in self.config.property_maps.apply():
483 self.property_maps[name] = PropertyMapClass(config, name)
484
485 @timeMethod
486 def runQuantum(self, butlerQC, inputRefs, outputRefs):
487 inputs = butlerQC.get(inputRefs)
488
489 sky_map = inputs.pop("sky_map")
490
491 tract = butlerQC.quantum.dataId["tract"]
492 band = butlerQC.quantum.dataId["band"]
493
494 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
495 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
496
497 visit_summary_dict = {ref.dataId["visit"]: ref.get()
498 for ref in inputs["visit_summaries"]}
499
500 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
501
502 # Write the outputs
503 for name, property_map in self.property_maps.items():
504 if property_map.config.do_min:
505 butlerQC.put(property_map.min_map,
506 getattr(outputRefs, f"{name}_map_min"))
507 if property_map.config.do_max:
508 butlerQC.put(property_map.max_map,
509 getattr(outputRefs, f"{name}_map_max"))
510 if property_map.config.do_mean:
511 butlerQC.put(property_map.mean_map,
512 getattr(outputRefs, f"{name}_map_mean"))
513 if property_map.config.do_weighted_mean:
514 butlerQC.put(property_map.weighted_mean_map,
515 getattr(outputRefs, f"{name}_map_weighted_mean"))
516 if property_map.config.do_sum:
517 butlerQC.put(property_map.sum_map,
518 getattr(outputRefs, f"{name}_map_sum"))
519
520 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
521 """Run the healsparse property task.
522
523 Parameters
524 ----------
525 sky_map : Sky map object
526 tract : `int`
527 Tract number.
528 band : `str`
529 Band name for logging.
530 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
531 Dictionary of coadd exposure references. Keys are patch numbers.
532 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
533 Dictionary of input map references. Keys are patch numbers.
534 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
535 Dictionary of visit summary tables. Keys are visit numbers.
536
537 Raises
538 ------
539 RepeatableQuantumError
540 If visit_summary_dict is missing any visits or detectors found in an
541 input map. This leads to an inconsistency between what is in the coadd
542 (via the input map) and the visit summary tables which contain data
543 to compute the maps.
544 """
545 tract_info = sky_map[tract]
546
547 tract_maps_initialized = False
548
549 for patch in input_map_dict.keys():
550 self.log.info("Making maps for band %s, tract %d, patch %d.",
551 band, tract, patch)
552
553 patch_info = tract_info[patch]
554
555 input_map = input_map_dict[patch].get()
556
557 # Initialize the tract maps as soon as we have the first input
558 # map for getting nside information.
559 if not tract_maps_initialized:
560 # We use the first input map nside information to initialize
561 # the tract maps
562 nside_coverage = self._compute_nside_coverage_tract(tract_info)
563 nside = input_map.nside_sparse
564
565 do_compute_approx_psf = False
566 # Initialize the tract maps
567 for property_map in self.property_maps:
568 property_map.initialize_tract_maps(nside_coverage, nside)
569 if property_map.requires_psf:
570 do_compute_approx_psf = True
571
572 tract_maps_initialized = True
573
574 if input_map.valid_pixels.size == 0:
575 self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.",
576 band, tract, patch)
577 continue
578
579 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
580 coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
581
582 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
583
584 # Crop input_map to the inner polygon of the patch
585 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
586 patch_radec = self._vertices_to_radec(poly_vertices)
587 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
588 value=np.arange(input_map.wide_mask_maxbits))
589 with warnings.catch_warnings():
590 # Healsparse will emit a warning if nside coverage is greater than
591 # 128. In the case of generating patch input maps, and not global
592 # maps, high nside coverage works fine, so we can suppress this
593 # warning.
594 warnings.simplefilter("ignore")
595 patch_poly_map = patch_poly.get_map_like(input_map)
596 input_map = hsp.and_intersection([input_map, patch_poly_map])
597
598 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
599
600 # Check if there are no valid pixels for the inner (unique) patch region
601 if valid_pixels.size == 0:
602 continue
603
604 # Initialize the value accumulators
605 for property_map in self.property_maps:
606 property_map.initialize_values(valid_pixels.size)
607 property_map.zeropoint = coadd_zeropoint
608
609 # Initialize the weight and counter accumulators
610 total_weights = np.zeros(valid_pixels.size)
611 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
612
613 for bit, ccd_row in enumerate(coadd_inputs.ccds):
614 # Which pixels in the map are used by this visit/detector
615 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
616
617 # Check if there are any valid pixels in the map from this deteector.
618 if inmap.size == 0:
619 continue
620
621 # visit, detector_id, weight = input_dict[bit]
622 visit = ccd_row["visit"]
623 detector_id = ccd_row["ccd"]
624 weight = ccd_row["weight"]
625
626 x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
627 scalings = self._compute_calib_scale(ccd_row, x, y)
628
629 if do_compute_approx_psf:
630 psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
631 else:
632 psf_array = None
633
634 total_weights[inmap] += weight
635 total_inputs[inmap] += 1
636
637 # Retrieve the correct visitSummary row
638 if visit not in visit_summary_dict:
639 msg = f"Visit {visit} not found in visit_summaries."
640 raise pipeBase.RepeatableQuantumError(msg)
641 row = visit_summary_dict[visit].find(detector_id)
642 if row is None:
643 msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
644 raise pipeBase.RepeatableQuantumError(msg)
645
646 # Accumulate the values
647 for property_map in self.property_maps:
648 property_map.accumulate_values(inmap,
649 vpix_ra[inmap],
650 vpix_dec[inmap],
651 weight,
652 scalings,
653 row,
654 psf_array=psf_array)
655
656 # Finalize the mean values and set the tract maps
657 for property_map in self.property_maps:
658 property_map.finalize_mean_values(total_weights, total_inputs)
659 property_map.set_map_values(valid_pixels)
660
661 def _compute_calib_scale(self, ccd_row, x, y):
662 """Compute calibration scaling values.
663
664 Parameters
665 ----------
666 ccd_row : `lsst.afw.table.ExposureRecord`
667 Exposure metadata for a given detector exposure.
668 x : `np.ndarray`
669 Array of x positions.
670 y : `np.ndarray`
671 Array of y positions.
672
673 Returns
674 -------
675 calib_scale : `np.ndarray`
676 Array of calibration scale values.
677 """
678 photo_calib = ccd_row.getPhotoCalib()
679 bf = photo_calib.computeScaledCalibration()
680 if bf.getBBox() == ccd_row.getBBox():
681 # Track variable calibration over the detector
682 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
683 else:
684 # Spatially constant calibration
685 calib_scale = photo_calib.getCalibrationMean()
686
687 return calib_scale
688
689 def _vertices_to_radec(self, vertices):
690 """Convert polygon vertices to ra/dec.
691
692 Parameters
693 ----------
694 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
695 Vertices for bounding polygon.
696
697 Returns
698 -------
699 radec : `numpy.ndarray`
700 Nx2 array of ra/dec positions (in degrees) associated with vertices.
701 """
702 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
703 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
704 x in lonlats])
705 return radec
706
707 def _compute_nside_coverage_tract(self, tract_info):
708 """Compute the optimal coverage nside for a tract.
709
710 Parameters
711 ----------
712 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
713 Tract information object.
714
715 Returns
716 -------
717 nside_coverage : `int`
718 Optimal coverage nside for a tract map.
719 """
720 num_patches = tract_info.getNumPatches()
721
722 # Compute approximate patch area
723 patch_info = tract_info.getPatchInfo(0)
724 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
725 radec = self._vertices_to_radec(vertices)
726 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
727 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
728 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
729
730 tract_area = num_patches[0]*num_patches[1]*patch_area
731 # Start with a fairly low nside and increase until we find the approximate area.
732 nside_coverage_tract = 32
733 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
734 nside_coverage_tract = 2*nside_coverage_tract
735 # Step back one, but don't go bigger pixels than nside=32 or smaller
736 # than 128 (recommended by healsparse).
737 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
738
739 return nside_coverage_tract
740
741
742class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
743 dimensions=("band", "skymap",),
744 defaultTemplates={"coaddName": "deep"}):
745 sky_map = pipeBase.connectionTypes.Input(
746 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
747 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
748 storageClass="SkyMap",
749 dimensions=("skymap",),
750 )
751
752 # Create output connections for all possible maps defined in the
753 # registry. The vars() trick used here allows us to set class attributes
754 # programatically. Taken from
755 # https://stackoverflow.com/questions/2519807/
756 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
757 for name in BasePropertyMap.registry:
758 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
759 doc=f"Minimum-value map of {name}",
760 name=f"{{coaddName}}Coadd_{name}_map_min",
761 storageClass="HealSparseMap",
762 dimensions=("tract", "skymap", "band"),
763 multiple=True,
764 deferLoad=True,
765 )
766 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
767 doc=f"Minumum-value map of {name}",
768 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
769 storageClass="HealSparseMap",
770 dimensions=("skymap", "band"),
771 )
772 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
773 doc=f"Maximum-value map of {name}",
774 name=f"{{coaddName}}Coadd_{name}_map_max",
775 storageClass="HealSparseMap",
776 dimensions=("tract", "skymap", "band"),
777 multiple=True,
778 deferLoad=True,
779 )
780 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
781 doc=f"Minumum-value map of {name}",
782 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
783 storageClass="HealSparseMap",
784 dimensions=("skymap", "band"),
785 )
786 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
787 doc=f"Mean-value map of {name}",
788 name=f"{{coaddName}}Coadd_{name}_map_mean",
789 storageClass="HealSparseMap",
790 dimensions=("tract", "skymap", "band"),
791 multiple=True,
792 deferLoad=True,
793 )
794 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
795 doc=f"Minumum-value map of {name}",
796 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
797 storageClass="HealSparseMap",
798 dimensions=("skymap", "band"),
799 )
800 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
801 doc=f"Weighted mean-value map of {name}",
802 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
803 storageClass="HealSparseMap",
804 dimensions=("tract", "skymap", "band"),
805 multiple=True,
806 deferLoad=True,
807 )
808 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
809 doc=f"Minumum-value map of {name}",
810 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
811 storageClass="HealSparseMap",
812 dimensions=("skymap", "band"),
813 )
814 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
815 doc=f"Sum-value map of {name}",
816 name=f"{{coaddName}}Coadd_{name}_map_sum",
817 storageClass="HealSparseMap",
818 dimensions=("tract", "skymap", "band"),
819 multiple=True,
820 deferLoad=True,
821 )
822 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
823 doc=f"Minumum-value map of {name}",
824 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
825 storageClass="HealSparseMap",
826 dimensions=("skymap", "band"),
827 )
828
829 def __init__(self, *, config=None):
830 super().__init__(config=config)
831
832 # Not all possible maps in the registry will be configured to run.
833 # Here we remove the unused connections.
834 for name in BasePropertyMap.registry:
835 if name not in config.property_maps:
836 prop_config = BasePropertyMapConfig()
837 prop_config.do_min = False
838 prop_config.do_max = False
839 prop_config.do_mean = False
840 prop_config.do_weighted_mean = False
841 prop_config.do_sum = False
842 else:
843 prop_config = config.property_maps[name]
844
845 if not prop_config.do_min:
846 self.inputs.remove(f"{name}_map_min")
847 self.outputs.remove(f"{name}_consolidated_map_min")
848 if not prop_config.do_max:
849 self.inputs.remove(f"{name}_map_max")
850 self.outputs.remove(f"{name}_consolidated_map_max")
851 if not prop_config.do_mean:
852 self.inputs.remove(f"{name}_map_mean")
853 self.outputs.remove(f"{name}_consolidated_map_mean")
854 if not prop_config.do_weighted_mean:
855 self.inputs.remove(f"{name}_map_weighted_mean")
856 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
857 if not prop_config.do_sum:
858 self.inputs.remove(f"{name}_map_sum")
859 self.outputs.remove(f"{name}_consolidated_map_sum")
860
861
862class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
863 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
864 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
865 property_maps = BasePropertyMap.registry.makeField(
866 multi=True,
867 default=["exposure_time",
868 "psf_size",
869 "psf_e1",
870 "psf_e2",
871 "psf_maglim",
872 "sky_noise",
873 "sky_background",
874 "dcr_dra",
875 "dcr_ddec",
876 "dcr_e1",
877 "dcr_e2",
878 "epoch"],
879 doc="Property map computation objects",
880 )
881 nside_coverage = pexConfig.Field(
882 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
883 dtype=int,
884 default=32,
885 check=_is_power_of_two,
886 )
887
888 def setDefaults(self):
889 self.property_maps["exposure_time"].do_sum = True
890 self.property_maps["psf_size"].do_weighted_mean = True
891 self.property_maps["psf_e1"].do_weighted_mean = True
892 self.property_maps["psf_e2"].do_weighted_mean = True
893 self.property_maps["psf_maglim"].do_weighted_mean = True
894 self.property_maps["sky_noise"].do_weighted_mean = True
895 self.property_maps["sky_background"].do_weighted_mean = True
896 self.property_maps["dcr_dra"].do_weighted_mean = True
897 self.property_maps["dcr_ddec"].do_weighted_mean = True
898 self.property_maps["dcr_e1"].do_weighted_mean = True
899 self.property_maps["dcr_e2"].do_weighted_mean = True
900 self.property_maps["epoch"].do_mean = True
901 self.property_maps["epoch"].do_min = True
902 self.property_maps["epoch"].do_max = True
903
904
905class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
906 """Task to consolidate HealSparse property maps.
907
908 This task will take all the individual tract-based maps (per map type,
909 per band) and consolidate them into one survey-wide map (per map type,
910 per band). Each tract map is truncated to its inner region before
911 consolidation.
912 """
913 ConfigClass = ConsolidateHealSparsePropertyMapConfig
914 _DefaultName = "consolidateHealSparsePropertyMapTask"
915
916 def __init__(self, **kwargs):
917 super().__init__(**kwargs)
918 self.property_maps = PropertyMapMap()
919 for name, config, PropertyMapClass in self.config.property_maps.apply():
920 self.property_maps[name] = PropertyMapClass(config, name)
921
922 @timeMethod
923 def runQuantum(self, butlerQC, inputRefs, outputRefs):
924 inputs = butlerQC.get(inputRefs)
925
926 sky_map = inputs.pop("sky_map")
927
928 # These need to be consolidated one at a time to conserve memory.
929 for name in self.config.property_maps.names:
930 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
931 map_type = f"{name}_map_{type_}"
932 if map_type in inputs:
933 input_refs = {ref.dataId['tract']: ref
934 for ref in inputs[map_type]}
935 consolidated_map = self.consolidate_map(sky_map, input_refs)
936 butlerQC.put(consolidated_map,
937 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
938
939 def consolidate_map(self, sky_map, input_refs):
940 """Consolidate the healsparse property maps.
941
942 Parameters
943 ----------
944 sky_map : Sky map object
945 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
946 Dictionary of tract_id mapping to dataref.
947
948 Returns
949 -------
950 consolidated_map : `healsparse.HealSparseMap`
951 Consolidated HealSparse map.
952 """
953 # First, we read in the coverage maps to know how much memory
954 # to allocate
955 cov_mask = None
956 nside_coverage_inputs = None
957 for tract_id in input_refs:
958 cov = input_refs[tract_id].get(component='coverage')
959 if cov_mask is None:
960 cov_mask = cov.coverage_mask
961 nside_coverage_inputs = cov.nside_coverage
962 else:
963 cov_mask |= cov.coverage_mask
964
965 cov_pix_inputs, = np.where(cov_mask)
966
967 # Compute the coverage pixels for the desired nside_coverage
968 if nside_coverage_inputs == self.config.nside_coverage:
969 cov_pix = cov_pix_inputs
970 elif nside_coverage_inputs > self.config.nside_coverage:
971 # Converting from higher resolution coverage to lower
972 # resolution coverage.
973 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
974 nside_coverage_inputs)
975 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
976 else:
977 # Converting from lower resolution coverage to higher
978 # resolution coverage.
979 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
980 self.config.nside_coverage)
981 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
982
983 # Now read in each tract map and build the consolidated map.
984 consolidated_map = None
985 for tract_id in input_refs:
986 input_map = input_refs[tract_id].get()
987 if consolidated_map is None:
988 consolidated_map = hsp.HealSparseMap.make_empty(
989 self.config.nside_coverage,
990 input_map.nside_sparse,
991 input_map.dtype,
992 sentinel=input_map._sentinel,
993 cov_pixels=cov_pix,
994 metadata=input_map.metadata,
995 )
996
997 # Only use pixels that are properly inside the tract.
998 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
999 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
1000
1001 in_tract = (vpix_tract_ids == tract_id)
1002
1003 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
1004
1005 return consolidated_map
Cartesian polygons.
Definition Polygon.h:59
A floating-point coordinate rectangle geometry.
Definition Box.h:413
mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition LonLat.h:55