LSST Applications g180d380827+6621f76652,g2079a07aa2+86d27d4dc4,g2305ad1205+f5a9e323a1,g2bbee38e9b+c6a8a0fb72,g337abbeb29+c6a8a0fb72,g33d1c0ed96+c6a8a0fb72,g3a166c0a6a+c6a8a0fb72,g3ddfee87b4+9a10e1fe7b,g48712c4677+c9a099281a,g487adcacf7+f2e03ea30b,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+aead732c78,g64a986408d+eddffb812c,g858d7b2824+eddffb812c,g864b0138d7+aa38e45daa,g974c55ee3d+f37bf00e57,g99cad8db69+119519a52d,g9c22b2923f+e2510deafe,g9ddcbc5298+9a081db1e4,ga1e77700b3+03d07e1c1f,gb0e22166c9+60f28cb32d,gb23b769143+eddffb812c,gba4ed39666+c2a2e4ac27,gbb8dafda3b+27317ec8e9,gbd998247f1+585e252eca,gc120e1dc64+5817c176a8,gc28159a63d+c6a8a0fb72,gc3e9b769f7+6707aea8b4,gcf0d15dbbd+9a10e1fe7b,gdaeeff99f8+f9a426f77a,ge6526c86ff+6a2e01d432,ge79ae78c31+c6a8a0fb72,gee10cc3b42+585e252eca,gff1a9f87cc+eddffb812c,v27.0.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
healSparseMapping.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
28
29from collections import defaultdict
30import warnings
31import numbers
32import numpy as np
33import hpgeom as hpg
34import healsparse as hsp
35
36import lsst.pex.config as pexConfig
37import lsst.pipe.base as pipeBase
38import lsst.geom
39import lsst.afw.geom as afwGeom
40from lsst.daf.butler import Formatter
41from lsst.skymap import BaseSkyMap
42from lsst.utils.timer import timeMethod
43from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
44 PropertyMapMap, compute_approx_psf_size_and_shape)
45
46
47class HealSparseMapFormatter(Formatter):
48 """Interface for reading and writing healsparse.HealSparseMap files."""
49 unsupportedParameters = frozenset()
50 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
51 extension = '.hsp'
52
53 def read(self, component=None):
54 # Docstring inherited from Formatter.read.
55 path = self.fileDescriptor.location.path
56
57 if component == 'coverage':
58 try:
59 data = hsp.HealSparseCoverage.read(path)
60 except (OSError, RuntimeError):
61 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
62
63 return data
64
65 if self.fileDescriptor.parameters is None:
66 pixels = None
67 degrade_nside = None
68 else:
69 pixels = self.fileDescriptor.parameters.get('pixels', None)
70 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
71 try:
72 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
73 except (OSError, RuntimeError):
74 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
75
76 return data
77
78 def write(self, inMemoryDataset):
79 # Docstring inherited from Formatter.write.
80 # Update the location with the formatter-preferred file extension
81 self.fileDescriptor.location.updateExtension(self.extensionextension)
82 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
83
84
86 """Check that value is a power of two.
87
88 Parameters
89 ----------
90 value : `int`
91 Value to check.
92
93 Returns
94 -------
95 is_power_of_two : `bool`
96 True if value is a power of two; False otherwise, or
97 if value is not an integer.
98 """
99 if not isinstance(value, numbers.Integral):
100 return False
101
102 # See https://stackoverflow.com/questions/57025836
103 # Every power of 2 has exactly 1 bit set to 1; subtracting
104 # 1 therefore flips every preceding bit. If you and that
105 # together with the original value it must be 0.
106 return (value & (value - 1) == 0) and value != 0
107
108
109class HealSparseInputMapConfig(pexConfig.Config):
110 """Configuration parameters for HealSparseInputMapTask"""
111 nside = pexConfig.Field(
112 doc="Mapping healpix nside. Must be power of 2.",
113 dtype=int,
114 default=32768,
115 check=_is_power_of_two,
116 )
117 nside_coverage = pexConfig.Field(
118 doc="HealSparse coverage map nside. Must be power of 2.",
119 dtype=int,
120 default=256,
121 check=_is_power_of_two,
122 )
123 bad_mask_min_coverage = pexConfig.Field(
124 doc=("Minimum area fraction of a map healpixel pixel that must be "
125 "covered by bad pixels to be removed from the input map. "
126 "This is approximate."),
127 dtype=float,
128 default=0.5,
129 )
130
131
132class HealSparseInputMapTask(pipeBase.Task):
133 """Task for making a HealSparse input map."""
134
135 ConfigClass = HealSparseInputMapConfig
136 _DefaultName = "healSparseInputMap"
137
138 def __init__(self, **kwargs):
139 pipeBase.Task.__init__(self, **kwargs)
140
141 self.ccd_input_map = None
142
143 def build_ccd_input_map(self, bbox, wcs, ccds):
144 """Build a map from ccd valid polygons or bounding boxes.
145
146 Parameters
147 ----------
148 bbox : `lsst.geom.Box2I`
149 Bounding box for region to build input map.
150 wcs : `lsst.afw.geom.SkyWcs`
151 WCS object for region to build input map.
152 ccds : `lsst.afw.table.ExposureCatalog`
153 Exposure catalog with ccd data from coadd inputs.
154 """
155 with warnings.catch_warnings():
156 # Healsparse will emit a warning if nside coverage is greater than
157 # 128. In the case of generating patch input maps, and not global
158 # maps, high nside coverage works fine, so we can suppress this
159 # warning.
160 warnings.simplefilter("ignore")
161 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
162 nside_sparse=self.config.nside,
163 dtype=hsp.WIDE_MASK,
164 wide_mask_maxbits=len(ccds))
165 self._wcs = wcs
166 self._bbox = bbox
167 self._ccds = ccds
168
169 pixel_scale = wcs.getPixelScale().asArcseconds()
170 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.)
171 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
172
173 metadata = {}
175 self._bits_per_visit = defaultdict(list)
176 for bit, ccd_row in enumerate(ccds):
177 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
178 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
179 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
180
181 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
182 self._bits_per_visit[ccd_row["visit"]].append(bit)
183
184 ccd_poly = ccd_row.getValidPolygon()
185 if ccd_poly is None:
186 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
187 # Detectors need to be rendered with their own wcs.
188 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
189
190 # Create a ccd healsparse polygon
191 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
192 dec=ccd_poly_radec[: -1, 1],
193 value=[bit])
194 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse),
195 [bit])
196
197 # Cut down to the overall bounding box with associated wcs.
198 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
199 bbox_poly_radec = self._pixels_to_radec(self._wcs,
200 bbox_afw_poly.convexHull().getVertices())
201 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
202 value=np.arange(self.ccd_input_map.wide_mask_maxbits))
203 with warnings.catch_warnings():
204 warnings.simplefilter("ignore")
205 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
206 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
207 self.ccd_input_map.metadata = metadata
208
209 # Create a temporary map to hold the count of bad pixels in each healpix pixel
210 self._ccd_input_pixels = self.ccd_input_map.valid_pixels
211
212 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()]
213
214 with warnings.catch_warnings():
215 # Healsparse will emit a warning if nside coverage is greater than
216 # 128. In the case of generating patch input maps, and not global
217 # maps, high nside coverage works fine, so we can suppress this
218 # warning.
219 warnings.simplefilter("ignore")
220 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
221 nside_coverage=self.config.nside_coverage,
222 nside_sparse=self.config.nside,
223 dtype=dtype,
224 primary=dtype[0][0])
225
226 # Don't set input bad map if there are no ccds which overlap the bbox.
227 if len(self._ccd_input_pixels) > 0:
228 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype)
229
230 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
231 """Mask a subregion from a visit.
232 This must be run after build_ccd_input_map initializes
233 the overall map.
234
235 Parameters
236 ----------
237 bbox : `lsst.geom.Box2I`
238 Bounding box from region to mask.
239 visit : `int`
240 Visit number corresponding to warp with mask.
241 mask : `lsst.afw.image.MaskX`
242 Mask plane from warp exposure.
243 bit_mask_value : `int`
244 Bit mask to check for bad pixels.
245
246 Raises
247 ------
248 RuntimeError : Raised if build_ccd_input_map was not run first.
249 """
250 if self.ccd_input_map is None:
251 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
252
253 # Find the bad pixels and convert to healpix
254 bad_pixels = np.where(mask.array & bit_mask_value)
255 if len(bad_pixels[0]) == 0:
256 # No bad pixels
257 return
258
259 # Bad pixels come from warps which use the overall wcs.
260 bad_ra, bad_dec = self._wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
261 bad_pixels[0].astype(np.float64),
262 degrees=True)
263 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
264
265 # Count the number of bad image pixels in each healpix pixel
266 min_bad_hpix = bad_hpix.min()
267 bad_hpix_count = np.zeros(bad_hpix.max() - min_bad_hpix + 1, dtype=np.int32)
268 np.add.at(bad_hpix_count, bad_hpix - min_bad_hpix, 1)
269
270 # Add these to the accumulator map.
271 # We need to make sure that the "primary" array has valid values for
272 # this pixel to be registered in the accumulator map.
273 pix_to_add, = np.where(bad_hpix_count > 0)
274 count_map_arr = self._ccd_input_bad_count_map[min_bad_hpix + pix_to_add]
275 primary = self._ccd_input_bad_count_map.primary
276 count_map_arr[primary] = np.clip(count_map_arr[primary], 0, None)
277
278 count_map_arr[f"v{visit}"] = np.clip(count_map_arr[f"v{visit}"], 0, None)
279 count_map_arr[f"v{visit}"] += bad_hpix_count[pix_to_add]
280
281 self._ccd_input_bad_count_map[min_bad_hpix + pix_to_add] = count_map_arr
282
284 """Use accumulated mask information to finalize the masking of
285 ccd_input_map.
286
287 Raises
288 ------
289 RuntimeError : Raised if build_ccd_input_map was not run first.
290 """
291 if self.ccd_input_map is None:
292 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
293
294 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels]
295 for visit in self._bits_per_visit:
296 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad)
297 if to_mask.size == 0:
298 continue
299 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask],
300 self._bits_per_visit[visit])
301
302 # Clear memory
303 self._ccd_input_bad_count_map = None
304
305 def _pixels_to_radec(self, wcs, pixels):
306 """Convert pixels to ra/dec positions using a wcs.
307
308 Parameters
309 ----------
310 wcs : `lsst.afw.geom.SkyWcs`
311 WCS object.
312 pixels : `list` [`lsst.geom.Point2D`]
313 List of pixels to convert.
314
315 Returns
316 -------
317 radec : `numpy.ndarray`
318 Nx2 array of ra/dec positions associated with pixels.
319 """
320 sph_pts = wcs.pixelToSky(pixels)
321 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
322 for sph in sph_pts])
323
324
325class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
326 dimensions=("tract", "band", "skymap",),
327 defaultTemplates={"coaddName": "deep",
328 "calexpType": ""}):
329 input_maps = pipeBase.connectionTypes.Input(
330 doc="Healsparse bit-wise coadd input maps",
331 name="{coaddName}Coadd_inputMap",
332 storageClass="HealSparseMap",
333 dimensions=("tract", "patch", "skymap", "band"),
334 multiple=True,
335 deferLoad=True,
336 )
337 coadd_exposures = pipeBase.connectionTypes.Input(
338 doc="Coadded exposures associated with input_maps",
339 name="{coaddName}Coadd",
340 storageClass="ExposureF",
341 dimensions=("tract", "patch", "skymap", "band"),
342 multiple=True,
343 deferLoad=True,
344 )
345 visit_summaries = pipeBase.connectionTypes.Input(
346 doc="Visit summary tables with aggregated statistics",
347 name="finalVisitSummary",
348 storageClass="ExposureCatalog",
349 dimensions=("instrument", "visit"),
350 multiple=True,
351 deferLoad=True,
352 )
353 sky_map = pipeBase.connectionTypes.Input(
354 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
355 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
356 storageClass="SkyMap",
357 dimensions=("skymap",),
358 )
359
360 # Create output connections for all possible maps defined in the
361 # registry. The vars() trick used here allows us to set class attributes
362 # programatically. Taken from
363 # https://stackoverflow.com/questions/2519807/
364 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
365 for name in BasePropertyMap.registry:
366 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
367 doc=f"Minimum-value map of {name}",
368 name=f"{{coaddName}}Coadd_{name}_map_min",
369 storageClass="HealSparseMap",
370 dimensions=("tract", "skymap", "band"),
371 )
372 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
373 doc=f"Maximum-value map of {name}",
374 name=f"{{coaddName}}Coadd_{name}_map_max",
375 storageClass="HealSparseMap",
376 dimensions=("tract", "skymap", "band"),
377 )
378 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
379 doc=f"Mean-value map of {name}",
380 name=f"{{coaddName}}Coadd_{name}_map_mean",
381 storageClass="HealSparseMap",
382 dimensions=("tract", "skymap", "band"),
383 )
384 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
385 doc=f"Weighted mean-value map of {name}",
386 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
387 storageClass="HealSparseMap",
388 dimensions=("tract", "skymap", "band"),
389 )
390 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
391 doc=f"Sum-value map of {name}",
392 name=f"{{coaddName}}Coadd_{name}_map_sum",
393 storageClass="HealSparseMap",
394 dimensions=("tract", "skymap", "band"),
395 )
396
397 def __init__(self, *, config=None):
398 super().__init__(config=config)
399
400 # Not all possible maps in the registry will be configured to run.
401 # Here we remove the unused connections.
402 for name in BasePropertyMap.registry:
403 if name not in config.property_maps:
404 prop_config = BasePropertyMapConfig()
405 prop_config.do_min = False
406 prop_config.do_max = False
407 prop_config.do_mean = False
408 prop_config.do_weighted_mean = False
409 prop_config.do_sum = False
410 else:
411 prop_config = config.property_maps[name]
412
413 if not prop_config.do_min:
414 self.outputs.remove(f"{name}_map_min")
415 if not prop_config.do_max:
416 self.outputs.remove(f"{name}_map_max")
417 if not prop_config.do_mean:
418 self.outputs.remove(f"{name}_map_mean")
419 if not prop_config.do_weighted_mean:
420 self.outputs.remove(f"{name}_map_weighted_mean")
421 if not prop_config.do_sum:
422 self.outputs.remove(f"{name}_map_sum")
423
424
425class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
426 pipelineConnections=HealSparsePropertyMapConnections):
427 """Configuration parameters for HealSparsePropertyMapTask"""
428 property_maps = BasePropertyMap.registry.makeField(
429 multi=True,
430 default=["exposure_time",
431 "psf_size",
432 "psf_e1",
433 "psf_e2",
434 "psf_maglim",
435 "sky_noise",
436 "sky_background",
437 "dcr_dra",
438 "dcr_ddec",
439 "dcr_e1",
440 "dcr_e2",
441 "epoch"],
442 doc="Property map computation objects",
443 )
444
445 def setDefaults(self):
446 self.property_maps["exposure_time"].do_sum = True
447 self.property_maps["psf_size"].do_weighted_mean = True
448 self.property_maps["psf_e1"].do_weighted_mean = True
449 self.property_maps["psf_e2"].do_weighted_mean = True
450 self.property_maps["psf_maglim"].do_weighted_mean = True
451 self.property_maps["sky_noise"].do_weighted_mean = True
452 self.property_maps["sky_background"].do_weighted_mean = True
453 self.property_maps["dcr_dra"].do_weighted_mean = True
454 self.property_maps["dcr_ddec"].do_weighted_mean = True
455 self.property_maps["dcr_e1"].do_weighted_mean = True
456 self.property_maps["dcr_e2"].do_weighted_mean = True
457 self.property_maps["epoch"].do_mean = True
458 self.property_maps["epoch"].do_min = True
459 self.property_maps["epoch"].do_max = True
460
461
462class HealSparsePropertyMapTask(pipeBase.PipelineTask):
463 """Task to compute Healsparse property maps.
464
465 This task will compute individual property maps (per tract, per
466 map type, per band). These maps cover the full coadd tract, and
467 are not truncated to the inner tract region.
468 """
469 ConfigClass = HealSparsePropertyMapConfig
470 _DefaultName = "healSparsePropertyMapTask"
471
472 def __init__(self, **kwargs):
473 super().__init__(**kwargs)
474 self.property_maps = PropertyMapMap()
475 for name, config, PropertyMapClass in self.config.property_maps.apply():
476 self.property_maps[name] = PropertyMapClass(config, name)
477
478 @timeMethod
479 def runQuantum(self, butlerQC, inputRefs, outputRefs):
480 inputs = butlerQC.get(inputRefs)
481
482 sky_map = inputs.pop("sky_map")
483
484 tract = butlerQC.quantum.dataId["tract"]
485 band = butlerQC.quantum.dataId["band"]
486
487 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
488 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
489
490 visit_summary_dict = {ref.dataId["visit"]: ref.get()
491 for ref in inputs["visit_summaries"]}
492
493 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
494
495 # Write the outputs
496 for name, property_map in self.property_maps.items():
497 if property_map.config.do_min:
498 butlerQC.put(property_map.min_map,
499 getattr(outputRefs, f"{name}_map_min"))
500 if property_map.config.do_max:
501 butlerQC.put(property_map.max_map,
502 getattr(outputRefs, f"{name}_map_max"))
503 if property_map.config.do_mean:
504 butlerQC.put(property_map.mean_map,
505 getattr(outputRefs, f"{name}_map_mean"))
506 if property_map.config.do_weighted_mean:
507 butlerQC.put(property_map.weighted_mean_map,
508 getattr(outputRefs, f"{name}_map_weighted_mean"))
509 if property_map.config.do_sum:
510 butlerQC.put(property_map.sum_map,
511 getattr(outputRefs, f"{name}_map_sum"))
512
513 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
514 """Run the healsparse property task.
515
516 Parameters
517 ----------
518 sky_map : Sky map object
519 tract : `int`
520 Tract number.
521 band : `str`
522 Band name for logging.
523 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
524 Dictionary of coadd exposure references. Keys are patch numbers.
525 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
526 Dictionary of input map references. Keys are patch numbers.
527 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
528 Dictionary of visit summary tables. Keys are visit numbers.
529
530 Raises
531 ------
532 RepeatableQuantumError
533 If visit_summary_dict is missing any visits or detectors found in an
534 input map. This leads to an inconsistency between what is in the coadd
535 (via the input map) and the visit summary tables which contain data
536 to compute the maps.
537 """
538 tract_info = sky_map[tract]
539
540 tract_maps_initialized = False
541
542 for patch in input_map_dict.keys():
543 self.log.info("Making maps for band %s, tract %d, patch %d.",
544 band, tract, patch)
545
546 patch_info = tract_info[patch]
547
548 input_map = input_map_dict[patch].get()
549
550 # Initialize the tract maps as soon as we have the first input
551 # map for getting nside information.
552 if not tract_maps_initialized:
553 # We use the first input map nside information to initialize
554 # the tract maps
555 nside_coverage = self._compute_nside_coverage_tract(tract_info)
556 nside = input_map.nside_sparse
557
558 do_compute_approx_psf = False
559 # Initialize the tract maps
560 for property_map in self.property_maps:
561 property_map.initialize_tract_maps(nside_coverage, nside)
562 if property_map.requires_psf:
563 do_compute_approx_psf = True
564
565 tract_maps_initialized = True
566
567 if input_map.valid_pixels.size == 0:
568 self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.",
569 band, tract, patch)
570 continue
571
572 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
573 coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
574
575 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
576
577 # Crop input_map to the inner polygon of the patch
578 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
579 patch_radec = self._vertices_to_radec(poly_vertices)
580 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
581 value=np.arange(input_map.wide_mask_maxbits))
582 with warnings.catch_warnings():
583 # Healsparse will emit a warning if nside coverage is greater than
584 # 128. In the case of generating patch input maps, and not global
585 # maps, high nside coverage works fine, so we can suppress this
586 # warning.
587 warnings.simplefilter("ignore")
588 patch_poly_map = patch_poly.get_map_like(input_map)
589 input_map = hsp.and_intersection([input_map, patch_poly_map])
590
591 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
592
593 # Check if there are no valid pixels for the inner (unique) patch region
594 if valid_pixels.size == 0:
595 continue
596
597 # Initialize the value accumulators
598 for property_map in self.property_maps:
599 property_map.initialize_values(valid_pixels.size)
600 property_map.zeropoint = coadd_zeropoint
601
602 # Initialize the weight and counter accumulators
603 total_weights = np.zeros(valid_pixels.size)
604 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
605
606 for bit, ccd_row in enumerate(coadd_inputs.ccds):
607 # Which pixels in the map are used by this visit/detector
608 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
609
610 # Check if there are any valid pixels in the map from this deteector.
611 if inmap.size == 0:
612 continue
613
614 # visit, detector_id, weight = input_dict[bit]
615 visit = ccd_row["visit"]
616 detector_id = ccd_row["ccd"]
617 weight = ccd_row["weight"]
618
619 x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
620 scalings = self._compute_calib_scale(ccd_row, x, y)
621
622 if do_compute_approx_psf:
623 psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
624 else:
625 psf_array = None
626
627 total_weights[inmap] += weight
628 total_inputs[inmap] += 1
629
630 # Retrieve the correct visitSummary row
631 if visit not in visit_summary_dict:
632 msg = f"Visit {visit} not found in visit_summaries."
633 raise pipeBase.RepeatableQuantumError(msg)
634 row = visit_summary_dict[visit].find(detector_id)
635 if row is None:
636 msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
637 raise pipeBase.RepeatableQuantumError(msg)
638
639 # Accumulate the values
640 for property_map in self.property_maps:
641 property_map.accumulate_values(inmap,
642 vpix_ra[inmap],
643 vpix_dec[inmap],
644 weight,
645 scalings,
646 row,
647 psf_array=psf_array)
648
649 # Finalize the mean values and set the tract maps
650 for property_map in self.property_maps:
651 property_map.finalize_mean_values(total_weights, total_inputs)
652 property_map.set_map_values(valid_pixels)
653
654 def _compute_calib_scale(self, ccd_row, x, y):
655 """Compute calibration scaling values.
656
657 Parameters
658 ----------
659 ccd_row : `lsst.afw.table.ExposureRecord`
660 Exposure metadata for a given detector exposure.
661 x : `np.ndarray`
662 Array of x positions.
663 y : `np.ndarray`
664 Array of y positions.
665
666 Returns
667 -------
668 calib_scale : `np.ndarray`
669 Array of calibration scale values.
670 """
671 photo_calib = ccd_row.getPhotoCalib()
672 bf = photo_calib.computeScaledCalibration()
673 if bf.getBBox() == ccd_row.getBBox():
674 # Track variable calibration over the detector
675 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
676 else:
677 # Spatially constant calibration
678 calib_scale = photo_calib.getCalibrationMean()
679
680 return calib_scale
681
682 def _vertices_to_radec(self, vertices):
683 """Convert polygon vertices to ra/dec.
684
685 Parameters
686 ----------
687 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
688 Vertices for bounding polygon.
689
690 Returns
691 -------
692 radec : `numpy.ndarray`
693 Nx2 array of ra/dec positions (in degrees) associated with vertices.
694 """
695 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
696 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
697 x in lonlats])
698 return radec
699
700 def _compute_nside_coverage_tract(self, tract_info):
701 """Compute the optimal coverage nside for a tract.
702
703 Parameters
704 ----------
705 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
706 Tract information object.
707
708 Returns
709 -------
710 nside_coverage : `int`
711 Optimal coverage nside for a tract map.
712 """
713 num_patches = tract_info.getNumPatches()
714
715 # Compute approximate patch area
716 patch_info = tract_info.getPatchInfo(0)
717 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
718 radec = self._vertices_to_radec(vertices)
719 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
720 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
721 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
722
723 tract_area = num_patches[0]*num_patches[1]*patch_area
724 # Start with a fairly low nside and increase until we find the approximate area.
725 nside_coverage_tract = 32
726 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
727 nside_coverage_tract = 2*nside_coverage_tract
728 # Step back one, but don't go bigger pixels than nside=32 or smaller
729 # than 128 (recommended by healsparse).
730 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
731
732 return nside_coverage_tract
733
734
735class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
736 dimensions=("band", "skymap",),
737 defaultTemplates={"coaddName": "deep"}):
738 sky_map = pipeBase.connectionTypes.Input(
739 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
740 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
741 storageClass="SkyMap",
742 dimensions=("skymap",),
743 )
744
745 # Create output connections for all possible maps defined in the
746 # registry. The vars() trick used here allows us to set class attributes
747 # programatically. Taken from
748 # https://stackoverflow.com/questions/2519807/
749 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
750 for name in BasePropertyMap.registry:
751 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
752 doc=f"Minimum-value map of {name}",
753 name=f"{{coaddName}}Coadd_{name}_map_min",
754 storageClass="HealSparseMap",
755 dimensions=("tract", "skymap", "band"),
756 multiple=True,
757 deferLoad=True,
758 )
759 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
760 doc=f"Minumum-value map of {name}",
761 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
762 storageClass="HealSparseMap",
763 dimensions=("skymap", "band"),
764 )
765 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
766 doc=f"Maximum-value map of {name}",
767 name=f"{{coaddName}}Coadd_{name}_map_max",
768 storageClass="HealSparseMap",
769 dimensions=("tract", "skymap", "band"),
770 multiple=True,
771 deferLoad=True,
772 )
773 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
774 doc=f"Minumum-value map of {name}",
775 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
776 storageClass="HealSparseMap",
777 dimensions=("skymap", "band"),
778 )
779 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
780 doc=f"Mean-value map of {name}",
781 name=f"{{coaddName}}Coadd_{name}_map_mean",
782 storageClass="HealSparseMap",
783 dimensions=("tract", "skymap", "band"),
784 multiple=True,
785 deferLoad=True,
786 )
787 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
788 doc=f"Minumum-value map of {name}",
789 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
790 storageClass="HealSparseMap",
791 dimensions=("skymap", "band"),
792 )
793 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
794 doc=f"Weighted mean-value map of {name}",
795 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
796 storageClass="HealSparseMap",
797 dimensions=("tract", "skymap", "band"),
798 multiple=True,
799 deferLoad=True,
800 )
801 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
802 doc=f"Minumum-value map of {name}",
803 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
804 storageClass="HealSparseMap",
805 dimensions=("skymap", "band"),
806 )
807 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
808 doc=f"Sum-value map of {name}",
809 name=f"{{coaddName}}Coadd_{name}_map_sum",
810 storageClass="HealSparseMap",
811 dimensions=("tract", "skymap", "band"),
812 multiple=True,
813 deferLoad=True,
814 )
815 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
816 doc=f"Minumum-value map of {name}",
817 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
818 storageClass="HealSparseMap",
819 dimensions=("skymap", "band"),
820 )
821
822 def __init__(self, *, config=None):
823 super().__init__(config=config)
824
825 # Not all possible maps in the registry will be configured to run.
826 # Here we remove the unused connections.
827 for name in BasePropertyMap.registry:
828 if name not in config.property_maps:
829 prop_config = BasePropertyMapConfig()
830 prop_config.do_min = False
831 prop_config.do_max = False
832 prop_config.do_mean = False
833 prop_config.do_weighted_mean = False
834 prop_config.do_sum = False
835 else:
836 prop_config = config.property_maps[name]
837
838 if not prop_config.do_min:
839 self.inputs.remove(f"{name}_map_min")
840 self.outputs.remove(f"{name}_consolidated_map_min")
841 if not prop_config.do_max:
842 self.inputs.remove(f"{name}_map_max")
843 self.outputs.remove(f"{name}_consolidated_map_max")
844 if not prop_config.do_mean:
845 self.inputs.remove(f"{name}_map_mean")
846 self.outputs.remove(f"{name}_consolidated_map_mean")
847 if not prop_config.do_weighted_mean:
848 self.inputs.remove(f"{name}_map_weighted_mean")
849 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
850 if not prop_config.do_sum:
851 self.inputs.remove(f"{name}_map_sum")
852 self.outputs.remove(f"{name}_consolidated_map_sum")
853
854
855class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
856 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
857 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
858 property_maps = BasePropertyMap.registry.makeField(
859 multi=True,
860 default=["exposure_time",
861 "psf_size",
862 "psf_e1",
863 "psf_e2",
864 "psf_maglim",
865 "sky_noise",
866 "sky_background",
867 "dcr_dra",
868 "dcr_ddec",
869 "dcr_e1",
870 "dcr_e2",
871 "epoch"],
872 doc="Property map computation objects",
873 )
874 nside_coverage = pexConfig.Field(
875 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
876 dtype=int,
877 default=32,
878 check=_is_power_of_two,
879 )
880
881 def setDefaults(self):
882 self.property_maps["exposure_time"].do_sum = True
883 self.property_maps["psf_size"].do_weighted_mean = True
884 self.property_maps["psf_e1"].do_weighted_mean = True
885 self.property_maps["psf_e2"].do_weighted_mean = True
886 self.property_maps["psf_maglim"].do_weighted_mean = True
887 self.property_maps["sky_noise"].do_weighted_mean = True
888 self.property_maps["sky_background"].do_weighted_mean = True
889 self.property_maps["dcr_dra"].do_weighted_mean = True
890 self.property_maps["dcr_ddec"].do_weighted_mean = True
891 self.property_maps["dcr_e1"].do_weighted_mean = True
892 self.property_maps["dcr_e2"].do_weighted_mean = True
893 self.property_maps["epoch"].do_mean = True
894 self.property_maps["epoch"].do_min = True
895 self.property_maps["epoch"].do_max = True
896
897
898class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
899 """Task to consolidate HealSparse property maps.
900
901 This task will take all the individual tract-based maps (per map type,
902 per band) and consolidate them into one survey-wide map (per map type,
903 per band). Each tract map is truncated to its inner region before
904 consolidation.
905 """
906 ConfigClass = ConsolidateHealSparsePropertyMapConfig
907 _DefaultName = "consolidateHealSparsePropertyMapTask"
908
909 def __init__(self, **kwargs):
910 super().__init__(**kwargs)
911 self.property_maps = PropertyMapMap()
912 for name, config, PropertyMapClass in self.config.property_maps.apply():
913 self.property_maps[name] = PropertyMapClass(config, name)
914
915 @timeMethod
916 def runQuantum(self, butlerQC, inputRefs, outputRefs):
917 inputs = butlerQC.get(inputRefs)
918
919 sky_map = inputs.pop("sky_map")
920
921 # These need to be consolidated one at a time to conserve memory.
922 for name in self.config.property_maps.names:
923 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
924 map_type = f"{name}_map_{type_}"
925 if map_type in inputs:
926 input_refs = {ref.dataId['tract']: ref
927 for ref in inputs[map_type]}
928 consolidated_map = self.consolidate_map(sky_map, input_refs)
929 butlerQC.put(consolidated_map,
930 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
931
932 def consolidate_map(self, sky_map, input_refs):
933 """Consolidate the healsparse property maps.
934
935 Parameters
936 ----------
937 sky_map : Sky map object
938 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
939 Dictionary of tract_id mapping to dataref.
940
941 Returns
942 -------
943 consolidated_map : `healsparse.HealSparseMap`
944 Consolidated HealSparse map.
945 """
946 # First, we read in the coverage maps to know how much memory
947 # to allocate
948 cov_mask = None
949 nside_coverage_inputs = None
950 for tract_id in input_refs:
951 cov = input_refs[tract_id].get(component='coverage')
952 if cov_mask is None:
953 cov_mask = cov.coverage_mask
954 nside_coverage_inputs = cov.nside_coverage
955 else:
956 cov_mask |= cov.coverage_mask
957
958 cov_pix_inputs, = np.where(cov_mask)
959
960 # Compute the coverage pixels for the desired nside_coverage
961 if nside_coverage_inputs == self.config.nside_coverage:
962 cov_pix = cov_pix_inputs
963 elif nside_coverage_inputs > self.config.nside_coverage:
964 # Converting from higher resolution coverage to lower
965 # resolution coverage.
966 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
967 nside_coverage_inputs)
968 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
969 else:
970 # Converting from lower resolution coverage to higher
971 # resolution coverage.
972 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
973 self.config.nside_coverage)
974 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
975
976 # Now read in each tract map and build the consolidated map.
977 consolidated_map = None
978 for tract_id in input_refs:
979 input_map = input_refs[tract_id].get()
980 if consolidated_map is None:
981 consolidated_map = hsp.HealSparseMap.make_empty(
982 self.config.nside_coverage,
983 input_map.nside_sparse,
984 input_map.dtype,
985 sentinel=input_map._sentinel,
986 cov_pixels=cov_pix,
987 metadata=input_map.metadata,
988 )
989
990 # Only use pixels that are properly inside the tract.
991 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
992 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
993
994 in_tract = (vpix_tract_ids == tract_id)
995
996 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
997
998 return consolidated_map
std::vector< SchemaItem< Flag > > * items
Cartesian polygons.
Definition Polygon.h:59
A floating-point coordinate rectangle geometry.
Definition Box.h:413
mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition LonLat.h:55