Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+9666464c73,g0fba68d861+079660c10e,g1fd858c14a+94f68680cf,g208c678f98+627fe8cd4e,g271391ec13+ac98094cfc,g2c84ff76c0+12036dbf49,g2c9e612ef2+a92a2e6025,g35bb328faa+fcb1d3bbc8,g4d2262a081+bcdfaf528c,g4e0f332c67+c58e4b632d,g53246c7159+fcb1d3bbc8,g60b5630c4e+a92a2e6025,g67b6fd64d1+9d1b2ab50a,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+506db7da85,g89139ef638+9d1b2ab50a,g8d6b6b353c+a92a2e6025,g9125e01d80+fcb1d3bbc8,g989de1cb63+9d1b2ab50a,g9f33ca652e+d1749da127,ga2b97cdc51+a92a2e6025,gabe3b4be73+1e0a283bba,gb1101e3267+6ecbd0580e,gb58c049af0+f03b321e39,gb89ab40317+9d1b2ab50a,gb90eeb9370+384e1fc23b,gcf25f946ba+506db7da85,gd315a588df+382ef11c06,gd6cbbdb0b4+75aa4b1db4,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+a095917f21,ge278dab8ac+c61fbefdff,ge410e46f29+9d1b2ab50a,ge82c20c137+e12a08b75a,gf67bdafdda+9d1b2ab50a,gfd5510ef7b+df344d16e5,v29.0.0.rc2
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
healSparseMapping.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
28
29from collections import defaultdict
30import esutil
31import warnings
32import numbers
33import numpy as np
34import hpgeom as hpg
35import healsparse as hsp
36
37import lsst.pex.config as pexConfig
38import lsst.pipe.base as pipeBase
39import lsst.geom
40import lsst.afw.geom as afwGeom
41from lsst.daf.butler import Formatter
42from lsst.skymap import BaseSkyMap
43from lsst.utils.timer import timeMethod
44from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
45 PropertyMapMap, compute_approx_psf_size_and_shape)
46
47
48class HealSparseMapFormatter(Formatter):
49 """Interface for reading and writing healsparse.HealSparseMap files."""
50 unsupportedParameters = frozenset()
51 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
52 extension = '.hsp'
53
54 def read(self, component=None):
55 # Docstring inherited from Formatter.read.
56 path = self.fileDescriptor.location.path
57
58 if component == 'coverage':
59 try:
60 data = hsp.HealSparseCoverage.read(path)
61 except (OSError, RuntimeError):
62 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
63
64 return data
65
66 if self.fileDescriptor.parameters is None:
67 pixels = None
68 degrade_nside = None
69 else:
70 pixels = self.fileDescriptor.parameters.get('pixels', None)
71 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
72 try:
73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
74 except (OSError, RuntimeError):
75 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
76
77 return data
78
79 def write(self, inMemoryDataset):
80 # Docstring inherited from Formatter.write.
81 # Update the location with the formatter-preferred file extension
82 self.fileDescriptor.location.updateExtension(self.extension)
83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
84
85
87 """Check that value is a power of two.
88
89 Parameters
90 ----------
91 value : `int`
92 Value to check.
93
94 Returns
95 -------
96 is_power_of_two : `bool`
97 True if value is a power of two; False otherwise, or
98 if value is not an integer.
99 """
100 if not isinstance(value, numbers.Integral):
101 return False
102
103 # See https://stackoverflow.com/questions/57025836
104 # Every power of 2 has exactly 1 bit set to 1; subtracting
105 # 1 therefore flips every preceding bit. If you and that
106 # together with the original value it must be 0.
107 return (value & (value - 1) == 0) and value != 0
108
109
110class HealSparseInputMapConfig(pexConfig.Config):
111 """Configuration parameters for HealSparseInputMapTask"""
112 nside = pexConfig.Field(
113 doc="Mapping healpix nside. Must be power of 2.",
114 dtype=int,
115 default=32768,
116 check=_is_power_of_two,
117 )
118 nside_coverage = pexConfig.Field(
119 doc="HealSparse coverage map nside. Must be power of 2.",
120 dtype=int,
121 default=256,
122 check=_is_power_of_two,
123 )
124 bad_mask_min_coverage = pexConfig.Field(
125 doc=("Minimum area fraction of a map healpixel pixel that must be "
126 "covered by bad pixels to be removed from the input map. "
127 "This is approximate."),
128 dtype=float,
129 default=0.5,
130 )
131
132
133class HealSparseInputMapTask(pipeBase.Task):
134 """Task for making a HealSparse input map."""
135
136 ConfigClass = HealSparseInputMapConfig
137 _DefaultName = "healSparseInputMap"
138
139 def __init__(self, **kwargs):
140 pipeBase.Task.__init__(self, **kwargs)
141
142 self.ccd_input_map = None
143
144 def build_ccd_input_map(self, bbox, wcs, ccds):
145 """Build a map from ccd valid polygons or bounding boxes.
146
147 Parameters
148 ----------
149 bbox : `lsst.geom.Box2I`
150 Bounding box for region to build input map.
151 wcs : `lsst.afw.geom.SkyWcs`
152 WCS object for region to build input map.
153 ccds : `lsst.afw.table.ExposureCatalog`
154 Exposure catalog with ccd data from coadd inputs.
155 """
156 with warnings.catch_warnings():
157 # Healsparse will emit a warning if nside coverage is greater than
158 # 128. In the case of generating patch input maps, and not global
159 # maps, high nside coverage works fine, so we can suppress this
160 # warning.
161 warnings.simplefilter("ignore")
162 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
163 nside_sparse=self.config.nside,
164 dtype=hsp.WIDE_MASK,
165 wide_mask_maxbits=len(ccds))
166 self._wcs = wcs
167 self._bbox = bbox
168 self._ccds = ccds
169
170 pixel_scale = wcs.getPixelScale(bbox.getCenter()).asArcseconds()
171 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.)
172 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
173
174 metadata = {}
176 self._bits_per_visit = defaultdict(list)
177 for bit, ccd_row in enumerate(ccds):
178 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
179 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
180 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
181
182 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
183 self._bits_per_visit[ccd_row["visit"]].append(bit)
184
185 ccd_poly = ccd_row.getValidPolygon()
186 if ccd_poly is None:
187 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
188 # Detectors need to be rendered with their own wcs.
189 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
190
191 # Create a ccd healsparse polygon
192 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
193 dec=ccd_poly_radec[: -1, 1],
194 value=[bit])
195 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse),
196 [bit])
197
198 # Cut down to the overall bounding box with associated wcs.
199 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
200 bbox_poly_radec = self._pixels_to_radec(self._wcs,
201 bbox_afw_poly.convexHull().getVertices())
202 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
203 value=np.arange(self.ccd_input_map.wide_mask_maxbits))
204 with warnings.catch_warnings():
205 warnings.simplefilter("ignore")
206 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
207 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
208 self.ccd_input_map.metadata = metadata
209
210 # Create a temporary map to hold the count of bad pixels in each healpix pixel
211 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()]
212
213 with warnings.catch_warnings():
214 # Healsparse will emit a warning if nside coverage is greater than
215 # 128. In the case of generating patch input maps, and not global
216 # maps, high nside coverage works fine, so we can suppress this
217 # warning.
218 warnings.simplefilter("ignore")
219 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
220 nside_coverage=self.config.nside_coverage,
221 nside_sparse=self.config.nside,
222 dtype=dtype,
223 primary=dtype[0][0])
224
225 self._ccd_input_pixels = self.ccd_input_map.valid_pixels
226
227 # Don't set input bad map if there are no ccds which overlap the bbox.
228 if len(self._ccd_input_pixels) > 0:
229 # Ensure these are sorted.
230 self._ccd_input_pixels = np.sort(self._ccd_input_pixels)
231
232 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype)
233
234 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
235 """Mask a subregion from a visit.
236 This must be run after build_ccd_input_map initializes
237 the overall map.
238
239 Parameters
240 ----------
241 bbox : `lsst.geom.Box2I`
242 Bounding box from region to mask.
243 visit : `int`
244 Visit number corresponding to warp with mask.
245 mask : `lsst.afw.image.MaskX`
246 Mask plane from warp exposure.
247 bit_mask_value : `int`
248 Bit mask to check for bad pixels.
249
250 Raises
251 ------
252 RuntimeError : Raised if build_ccd_input_map was not run first.
253 """
254 if self.ccd_input_map is None:
255 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
256
257 if len(self._ccd_input_pixels) == 0:
258 # This tract has no coverage, so there is nothing to do.
259 return
260
261 # Find the bad pixels and convert to healpix
262 bad_pixels = np.where(mask.array & bit_mask_value)
263 if len(bad_pixels[0]) == 0:
264 # No bad pixels
265 return
266
267 # Bad pixels come from warps which use the overall wcs.
268 bad_ra, bad_dec = self._wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
269 bad_pixels[0].astype(np.float64),
270 degrees=True)
271 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
272
273 # Check if any of these "bad" pixels are in the valid footprint.
274 match_input, match_bad = esutil.numpy_util.match(self._ccd_input_pixels, bad_hpix, presorted=True)
275 if len(match_bad) == 0:
276 return
277
278 bad_hpix = bad_hpix[match_bad]
279
280 # Create a view of the column we need to add to.
281 count_map_visit = self._ccd_input_bad_count_map[f"v{visit}"]
282 # Add the bad pixels to the accumulator. Note that the view
283 # cannot append pixels, but the match above ensures we are
284 # only adding to pixels that are already in the coverage
285 # map and initialized.
286 count_map_visit.update_values_pix(bad_hpix, 1, operation="add")
287
289 """Use accumulated mask information to finalize the masking of
290 ccd_input_map.
291
292 Raises
293 ------
294 RuntimeError : Raised if build_ccd_input_map was not run first.
295 """
296 if self.ccd_input_map is None:
297 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
298
299 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels]
300 for visit in self._bits_per_visit:
301 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad)
302 if to_mask.size == 0:
303 continue
304 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask],
305 self._bits_per_visit[visit])
306
307 # Clear memory
308 self._ccd_input_bad_count_map = None
309
310 def _pixels_to_radec(self, wcs, pixels):
311 """Convert pixels to ra/dec positions using a wcs.
312
313 Parameters
314 ----------
315 wcs : `lsst.afw.geom.SkyWcs`
316 WCS object.
317 pixels : `list` [`lsst.geom.Point2D`]
318 List of pixels to convert.
319
320 Returns
321 -------
322 radec : `numpy.ndarray`
323 Nx2 array of ra/dec positions associated with pixels.
324 """
325 sph_pts = wcs.pixelToSky(pixels)
326 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
327 for sph in sph_pts])
328
329
330class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
331 dimensions=("tract", "band", "skymap",),
332 defaultTemplates={"coaddName": "deep",
333 "calexpType": ""}):
334 input_maps = pipeBase.connectionTypes.Input(
335 doc="Healsparse bit-wise coadd input maps",
336 name="{coaddName}Coadd_inputMap",
337 storageClass="HealSparseMap",
338 dimensions=("tract", "patch", "skymap", "band"),
339 multiple=True,
340 deferLoad=True,
341 )
342 coadd_exposures = pipeBase.connectionTypes.Input(
343 doc="Coadded exposures associated with input_maps",
344 name="{coaddName}Coadd",
345 storageClass="ExposureF",
346 dimensions=("tract", "patch", "skymap", "band"),
347 multiple=True,
348 deferLoad=True,
349 )
350 visit_summaries = pipeBase.connectionTypes.Input(
351 doc="Visit summary tables with aggregated statistics",
352 name="finalVisitSummary",
353 storageClass="ExposureCatalog",
354 dimensions=("instrument", "visit"),
355 multiple=True,
356 deferLoad=True,
357 )
358 sky_map = pipeBase.connectionTypes.Input(
359 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
360 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
361 storageClass="SkyMap",
362 dimensions=("skymap",),
363 )
364
365 # Create output connections for all possible maps defined in the
366 # registry. The vars() trick used here allows us to set class attributes
367 # programatically. Taken from
368 # https://stackoverflow.com/questions/2519807/
369 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
370 for name in BasePropertyMap.registry:
371 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
372 doc=f"Minimum-value map of {name}",
373 name=f"{{coaddName}}Coadd_{name}_map_min",
374 storageClass="HealSparseMap",
375 dimensions=("tract", "skymap", "band"),
376 )
377 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
378 doc=f"Maximum-value map of {name}",
379 name=f"{{coaddName}}Coadd_{name}_map_max",
380 storageClass="HealSparseMap",
381 dimensions=("tract", "skymap", "band"),
382 )
383 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
384 doc=f"Mean-value map of {name}",
385 name=f"{{coaddName}}Coadd_{name}_map_mean",
386 storageClass="HealSparseMap",
387 dimensions=("tract", "skymap", "band"),
388 )
389 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
390 doc=f"Weighted mean-value map of {name}",
391 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
392 storageClass="HealSparseMap",
393 dimensions=("tract", "skymap", "band"),
394 )
395 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
396 doc=f"Sum-value map of {name}",
397 name=f"{{coaddName}}Coadd_{name}_map_sum",
398 storageClass="HealSparseMap",
399 dimensions=("tract", "skymap", "band"),
400 )
401
402 def __init__(self, *, config=None):
403 super().__init__(config=config)
404
405 # Not all possible maps in the registry will be configured to run.
406 # Here we remove the unused connections.
407 for name in BasePropertyMap.registry:
408 if name not in config.property_maps:
409 prop_config = BasePropertyMapConfig()
410 prop_config.do_min = False
411 prop_config.do_max = False
412 prop_config.do_mean = False
413 prop_config.do_weighted_mean = False
414 prop_config.do_sum = False
415 else:
416 prop_config = config.property_maps[name]
417
418 if not prop_config.do_min:
419 self.outputs.remove(f"{name}_map_min")
420 if not prop_config.do_max:
421 self.outputs.remove(f"{name}_map_max")
422 if not prop_config.do_mean:
423 self.outputs.remove(f"{name}_map_mean")
424 if not prop_config.do_weighted_mean:
425 self.outputs.remove(f"{name}_map_weighted_mean")
426 if not prop_config.do_sum:
427 self.outputs.remove(f"{name}_map_sum")
428
429
430class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
431 pipelineConnections=HealSparsePropertyMapConnections):
432 """Configuration parameters for HealSparsePropertyMapTask"""
433 property_maps = BasePropertyMap.registry.makeField(
434 multi=True,
435 default=["exposure_time",
436 "psf_size",
437 "psf_e1",
438 "psf_e2",
439 "psf_maglim",
440 "sky_noise",
441 "sky_background",
442 "dcr_dra",
443 "dcr_ddec",
444 "dcr_e1",
445 "dcr_e2",
446 "epoch"],
447 doc="Property map computation objects",
448 )
449
450 def setDefaults(self):
451 self.property_maps["exposure_time"].do_sum = True
452 self.property_maps["psf_size"].do_weighted_mean = True
453 self.property_maps["psf_e1"].do_weighted_mean = True
454 self.property_maps["psf_e2"].do_weighted_mean = True
455 self.property_maps["psf_maglim"].do_weighted_mean = True
456 self.property_maps["sky_noise"].do_weighted_mean = True
457 self.property_maps["sky_background"].do_weighted_mean = True
458 self.property_maps["dcr_dra"].do_weighted_mean = True
459 self.property_maps["dcr_ddec"].do_weighted_mean = True
460 self.property_maps["dcr_e1"].do_weighted_mean = True
461 self.property_maps["dcr_e2"].do_weighted_mean = True
462 self.property_maps["epoch"].do_mean = True
463 self.property_maps["epoch"].do_min = True
464 self.property_maps["epoch"].do_max = True
465
466
467class HealSparsePropertyMapTask(pipeBase.PipelineTask):
468 """Task to compute Healsparse property maps.
469
470 This task will compute individual property maps (per tract, per
471 map type, per band). These maps cover the full coadd tract, and
472 are not truncated to the inner tract region.
473 """
474 ConfigClass = HealSparsePropertyMapConfig
475 _DefaultName = "healSparsePropertyMapTask"
476
477 def __init__(self, **kwargs):
478 super().__init__(**kwargs)
479 self.property_maps = PropertyMapMap()
480 for name, config, PropertyMapClass in self.config.property_maps.apply():
481 self.property_maps[name] = PropertyMapClass(config, name)
482
483 @timeMethod
484 def runQuantum(self, butlerQC, inputRefs, outputRefs):
485 inputs = butlerQC.get(inputRefs)
486
487 sky_map = inputs.pop("sky_map")
488
489 tract = butlerQC.quantum.dataId["tract"]
490 band = butlerQC.quantum.dataId["band"]
491
492 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
493 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
494
495 visit_summary_dict = {ref.dataId["visit"]: ref.get()
496 for ref in inputs["visit_summaries"]}
497
498 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
499
500 # Write the outputs
501 for name, property_map in self.property_maps.items():
502 if property_map.config.do_min:
503 butlerQC.put(property_map.min_map,
504 getattr(outputRefs, f"{name}_map_min"))
505 if property_map.config.do_max:
506 butlerQC.put(property_map.max_map,
507 getattr(outputRefs, f"{name}_map_max"))
508 if property_map.config.do_mean:
509 butlerQC.put(property_map.mean_map,
510 getattr(outputRefs, f"{name}_map_mean"))
511 if property_map.config.do_weighted_mean:
512 butlerQC.put(property_map.weighted_mean_map,
513 getattr(outputRefs, f"{name}_map_weighted_mean"))
514 if property_map.config.do_sum:
515 butlerQC.put(property_map.sum_map,
516 getattr(outputRefs, f"{name}_map_sum"))
517
518 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
519 """Run the healsparse property task.
520
521 Parameters
522 ----------
523 sky_map : Sky map object
524 tract : `int`
525 Tract number.
526 band : `str`
527 Band name for logging.
528 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
529 Dictionary of coadd exposure references. Keys are patch numbers.
530 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
531 Dictionary of input map references. Keys are patch numbers.
532 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
533 Dictionary of visit summary tables. Keys are visit numbers.
534
535 Raises
536 ------
537 RepeatableQuantumError
538 If visit_summary_dict is missing any visits or detectors found in an
539 input map. This leads to an inconsistency between what is in the coadd
540 (via the input map) and the visit summary tables which contain data
541 to compute the maps.
542 """
543 tract_info = sky_map[tract]
544
545 tract_maps_initialized = False
546
547 for patch in input_map_dict.keys():
548 self.log.info("Making maps for band %s, tract %d, patch %d.",
549 band, tract, patch)
550
551 patch_info = tract_info[patch]
552
553 input_map = input_map_dict[patch].get()
554
555 # Initialize the tract maps as soon as we have the first input
556 # map for getting nside information.
557 if not tract_maps_initialized:
558 # We use the first input map nside information to initialize
559 # the tract maps
560 nside_coverage = self._compute_nside_coverage_tract(tract_info)
561 nside = input_map.nside_sparse
562
563 do_compute_approx_psf = False
564 # Initialize the tract maps
565 for property_map in self.property_maps:
566 property_map.initialize_tract_maps(nside_coverage, nside)
567 if property_map.requires_psf:
568 do_compute_approx_psf = True
569
570 tract_maps_initialized = True
571
572 if input_map.valid_pixels.size == 0:
573 self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.",
574 band, tract, patch)
575 continue
576
577 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
578 coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
579
580 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
581
582 # Crop input_map to the inner polygon of the patch
583 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
584 patch_radec = self._vertices_to_radec(poly_vertices)
585 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
586 value=np.arange(input_map.wide_mask_maxbits))
587 with warnings.catch_warnings():
588 # Healsparse will emit a warning if nside coverage is greater than
589 # 128. In the case of generating patch input maps, and not global
590 # maps, high nside coverage works fine, so we can suppress this
591 # warning.
592 warnings.simplefilter("ignore")
593 patch_poly_map = patch_poly.get_map_like(input_map)
594 input_map = hsp.and_intersection([input_map, patch_poly_map])
595
596 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
597
598 # Check if there are no valid pixels for the inner (unique) patch region
599 if valid_pixels.size == 0:
600 continue
601
602 # Initialize the value accumulators
603 for property_map in self.property_maps:
604 property_map.initialize_values(valid_pixels.size)
605 property_map.zeropoint = coadd_zeropoint
606
607 # Initialize the weight and counter accumulators
608 total_weights = np.zeros(valid_pixels.size)
609 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
610
611 for bit, ccd_row in enumerate(coadd_inputs.ccds):
612 # Which pixels in the map are used by this visit/detector
613 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
614
615 # Check if there are any valid pixels in the map from this deteector.
616 if inmap.size == 0:
617 continue
618
619 # visit, detector_id, weight = input_dict[bit]
620 visit = ccd_row["visit"]
621 detector_id = ccd_row["ccd"]
622 weight = ccd_row["weight"]
623
624 x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
625 scalings = self._compute_calib_scale(ccd_row, x, y)
626
627 if do_compute_approx_psf:
628 psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
629 else:
630 psf_array = None
631
632 total_weights[inmap] += weight
633 total_inputs[inmap] += 1
634
635 # Retrieve the correct visitSummary row
636 if visit not in visit_summary_dict:
637 msg = f"Visit {visit} not found in visit_summaries."
638 raise pipeBase.RepeatableQuantumError(msg)
639 row = visit_summary_dict[visit].find(detector_id)
640 if row is None:
641 msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
642 raise pipeBase.RepeatableQuantumError(msg)
643
644 # Accumulate the values
645 for property_map in self.property_maps:
646 property_map.accumulate_values(inmap,
647 vpix_ra[inmap],
648 vpix_dec[inmap],
649 weight,
650 scalings,
651 row,
652 psf_array=psf_array)
653
654 # Finalize the mean values and set the tract maps
655 for property_map in self.property_maps:
656 property_map.finalize_mean_values(total_weights, total_inputs)
657 property_map.set_map_values(valid_pixels)
658
659 def _compute_calib_scale(self, ccd_row, x, y):
660 """Compute calibration scaling values.
661
662 Parameters
663 ----------
664 ccd_row : `lsst.afw.table.ExposureRecord`
665 Exposure metadata for a given detector exposure.
666 x : `np.ndarray`
667 Array of x positions.
668 y : `np.ndarray`
669 Array of y positions.
670
671 Returns
672 -------
673 calib_scale : `np.ndarray`
674 Array of calibration scale values.
675 """
676 photo_calib = ccd_row.getPhotoCalib()
677 bf = photo_calib.computeScaledCalibration()
678 if bf.getBBox() == ccd_row.getBBox():
679 # Track variable calibration over the detector
680 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
681 else:
682 # Spatially constant calibration
683 calib_scale = photo_calib.getCalibrationMean()
684
685 return calib_scale
686
687 def _vertices_to_radec(self, vertices):
688 """Convert polygon vertices to ra/dec.
689
690 Parameters
691 ----------
692 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
693 Vertices for bounding polygon.
694
695 Returns
696 -------
697 radec : `numpy.ndarray`
698 Nx2 array of ra/dec positions (in degrees) associated with vertices.
699 """
700 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
701 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
702 x in lonlats])
703 return radec
704
705 def _compute_nside_coverage_tract(self, tract_info):
706 """Compute the optimal coverage nside for a tract.
707
708 Parameters
709 ----------
710 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
711 Tract information object.
712
713 Returns
714 -------
715 nside_coverage : `int`
716 Optimal coverage nside for a tract map.
717 """
718 num_patches = tract_info.getNumPatches()
719
720 # Compute approximate patch area
721 patch_info = tract_info.getPatchInfo(0)
722 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
723 radec = self._vertices_to_radec(vertices)
724 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
725 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
726 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
727
728 tract_area = num_patches[0]*num_patches[1]*patch_area
729 # Start with a fairly low nside and increase until we find the approximate area.
730 nside_coverage_tract = 32
731 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
732 nside_coverage_tract = 2*nside_coverage_tract
733 # Step back one, but don't go bigger pixels than nside=32 or smaller
734 # than 128 (recommended by healsparse).
735 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
736
737 return nside_coverage_tract
738
739
740class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
741 dimensions=("band", "skymap",),
742 defaultTemplates={"coaddName": "deep"}):
743 sky_map = pipeBase.connectionTypes.Input(
744 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
745 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
746 storageClass="SkyMap",
747 dimensions=("skymap",),
748 )
749
750 # Create output connections for all possible maps defined in the
751 # registry. The vars() trick used here allows us to set class attributes
752 # programatically. Taken from
753 # https://stackoverflow.com/questions/2519807/
754 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
755 for name in BasePropertyMap.registry:
756 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
757 doc=f"Minimum-value map of {name}",
758 name=f"{{coaddName}}Coadd_{name}_map_min",
759 storageClass="HealSparseMap",
760 dimensions=("tract", "skymap", "band"),
761 multiple=True,
762 deferLoad=True,
763 )
764 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
765 doc=f"Minumum-value map of {name}",
766 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
767 storageClass="HealSparseMap",
768 dimensions=("skymap", "band"),
769 )
770 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
771 doc=f"Maximum-value map of {name}",
772 name=f"{{coaddName}}Coadd_{name}_map_max",
773 storageClass="HealSparseMap",
774 dimensions=("tract", "skymap", "band"),
775 multiple=True,
776 deferLoad=True,
777 )
778 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
779 doc=f"Minumum-value map of {name}",
780 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
781 storageClass="HealSparseMap",
782 dimensions=("skymap", "band"),
783 )
784 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
785 doc=f"Mean-value map of {name}",
786 name=f"{{coaddName}}Coadd_{name}_map_mean",
787 storageClass="HealSparseMap",
788 dimensions=("tract", "skymap", "band"),
789 multiple=True,
790 deferLoad=True,
791 )
792 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
793 doc=f"Minumum-value map of {name}",
794 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
795 storageClass="HealSparseMap",
796 dimensions=("skymap", "band"),
797 )
798 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
799 doc=f"Weighted mean-value map of {name}",
800 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
801 storageClass="HealSparseMap",
802 dimensions=("tract", "skymap", "band"),
803 multiple=True,
804 deferLoad=True,
805 )
806 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
807 doc=f"Minumum-value map of {name}",
808 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
809 storageClass="HealSparseMap",
810 dimensions=("skymap", "band"),
811 )
812 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
813 doc=f"Sum-value map of {name}",
814 name=f"{{coaddName}}Coadd_{name}_map_sum",
815 storageClass="HealSparseMap",
816 dimensions=("tract", "skymap", "band"),
817 multiple=True,
818 deferLoad=True,
819 )
820 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
821 doc=f"Minumum-value map of {name}",
822 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
823 storageClass="HealSparseMap",
824 dimensions=("skymap", "band"),
825 )
826
827 def __init__(self, *, config=None):
828 super().__init__(config=config)
829
830 # Not all possible maps in the registry will be configured to run.
831 # Here we remove the unused connections.
832 for name in BasePropertyMap.registry:
833 if name not in config.property_maps:
834 prop_config = BasePropertyMapConfig()
835 prop_config.do_min = False
836 prop_config.do_max = False
837 prop_config.do_mean = False
838 prop_config.do_weighted_mean = False
839 prop_config.do_sum = False
840 else:
841 prop_config = config.property_maps[name]
842
843 if not prop_config.do_min:
844 self.inputs.remove(f"{name}_map_min")
845 self.outputs.remove(f"{name}_consolidated_map_min")
846 if not prop_config.do_max:
847 self.inputs.remove(f"{name}_map_max")
848 self.outputs.remove(f"{name}_consolidated_map_max")
849 if not prop_config.do_mean:
850 self.inputs.remove(f"{name}_map_mean")
851 self.outputs.remove(f"{name}_consolidated_map_mean")
852 if not prop_config.do_weighted_mean:
853 self.inputs.remove(f"{name}_map_weighted_mean")
854 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
855 if not prop_config.do_sum:
856 self.inputs.remove(f"{name}_map_sum")
857 self.outputs.remove(f"{name}_consolidated_map_sum")
858
859
860class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
861 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
862 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
863 property_maps = BasePropertyMap.registry.makeField(
864 multi=True,
865 default=["exposure_time",
866 "psf_size",
867 "psf_e1",
868 "psf_e2",
869 "psf_maglim",
870 "sky_noise",
871 "sky_background",
872 "dcr_dra",
873 "dcr_ddec",
874 "dcr_e1",
875 "dcr_e2",
876 "epoch"],
877 doc="Property map computation objects",
878 )
879 nside_coverage = pexConfig.Field(
880 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
881 dtype=int,
882 default=32,
883 check=_is_power_of_two,
884 )
885
886 def setDefaults(self):
887 self.property_maps["exposure_time"].do_sum = True
888 self.property_maps["psf_size"].do_weighted_mean = True
889 self.property_maps["psf_e1"].do_weighted_mean = True
890 self.property_maps["psf_e2"].do_weighted_mean = True
891 self.property_maps["psf_maglim"].do_weighted_mean = True
892 self.property_maps["sky_noise"].do_weighted_mean = True
893 self.property_maps["sky_background"].do_weighted_mean = True
894 self.property_maps["dcr_dra"].do_weighted_mean = True
895 self.property_maps["dcr_ddec"].do_weighted_mean = True
896 self.property_maps["dcr_e1"].do_weighted_mean = True
897 self.property_maps["dcr_e2"].do_weighted_mean = True
898 self.property_maps["epoch"].do_mean = True
899 self.property_maps["epoch"].do_min = True
900 self.property_maps["epoch"].do_max = True
901
902
903class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
904 """Task to consolidate HealSparse property maps.
905
906 This task will take all the individual tract-based maps (per map type,
907 per band) and consolidate them into one survey-wide map (per map type,
908 per band). Each tract map is truncated to its inner region before
909 consolidation.
910 """
911 ConfigClass = ConsolidateHealSparsePropertyMapConfig
912 _DefaultName = "consolidateHealSparsePropertyMapTask"
913
914 def __init__(self, **kwargs):
915 super().__init__(**kwargs)
916 self.property_maps = PropertyMapMap()
917 for name, config, PropertyMapClass in self.config.property_maps.apply():
918 self.property_maps[name] = PropertyMapClass(config, name)
919
920 @timeMethod
921 def runQuantum(self, butlerQC, inputRefs, outputRefs):
922 inputs = butlerQC.get(inputRefs)
923
924 sky_map = inputs.pop("sky_map")
925
926 # These need to be consolidated one at a time to conserve memory.
927 for name in self.config.property_maps.names:
928 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
929 map_type = f"{name}_map_{type_}"
930 if map_type in inputs:
931 input_refs = {ref.dataId['tract']: ref
932 for ref in inputs[map_type]}
933 consolidated_map = self.consolidate_map(sky_map, input_refs)
934 butlerQC.put(consolidated_map,
935 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
936
937 def consolidate_map(self, sky_map, input_refs):
938 """Consolidate the healsparse property maps.
939
940 Parameters
941 ----------
942 sky_map : Sky map object
943 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
944 Dictionary of tract_id mapping to dataref.
945
946 Returns
947 -------
948 consolidated_map : `healsparse.HealSparseMap`
949 Consolidated HealSparse map.
950 """
951 # First, we read in the coverage maps to know how much memory
952 # to allocate
953 cov_mask = None
954 nside_coverage_inputs = None
955 for tract_id in input_refs:
956 cov = input_refs[tract_id].get(component='coverage')
957 if cov_mask is None:
958 cov_mask = cov.coverage_mask
959 nside_coverage_inputs = cov.nside_coverage
960 else:
961 cov_mask |= cov.coverage_mask
962
963 cov_pix_inputs, = np.where(cov_mask)
964
965 # Compute the coverage pixels for the desired nside_coverage
966 if nside_coverage_inputs == self.config.nside_coverage:
967 cov_pix = cov_pix_inputs
968 elif nside_coverage_inputs > self.config.nside_coverage:
969 # Converting from higher resolution coverage to lower
970 # resolution coverage.
971 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
972 nside_coverage_inputs)
973 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
974 else:
975 # Converting from lower resolution coverage to higher
976 # resolution coverage.
977 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
978 self.config.nside_coverage)
979 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
980
981 # Now read in each tract map and build the consolidated map.
982 consolidated_map = None
983 for tract_id in input_refs:
984 input_map = input_refs[tract_id].get()
985 if consolidated_map is None:
986 consolidated_map = hsp.HealSparseMap.make_empty(
987 self.config.nside_coverage,
988 input_map.nside_sparse,
989 input_map.dtype,
990 sentinel=input_map._sentinel,
991 cov_pixels=cov_pix,
992 metadata=input_map.metadata,
993 )
994
995 # Only use pixels that are properly inside the tract.
996 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
997 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
998
999 in_tract = (vpix_tract_ids == tract_id)
1000
1001 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
1002
1003 return consolidated_map
Cartesian polygons.
Definition Polygon.h:59
A floating-point coordinate rectangle geometry.
Definition Box.h:413
mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition LonLat.h:55