LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
healSparseMapping.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
28
29from collections import defaultdict
30import warnings
31import numbers
32import numpy as np
33import hpgeom as hpg
34import healsparse as hsp
35
36import lsst.pex.config as pexConfig
37import lsst.pipe.base as pipeBase
38import lsst.geom
39import lsst.afw.geom as afwGeom
40from lsst.daf.butler import Formatter
41from lsst.skymap import BaseSkyMap
42from lsst.utils.timer import timeMethod
43from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
44 PropertyMapMap, compute_approx_psf_size_and_shape)
45
46
47class HealSparseMapFormatter(Formatter):
48 """Interface for reading and writing healsparse.HealSparseMap files."""
49 unsupportedParameters = frozenset()
50 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
51 extension = '.hsp'
52
53 def read(self, component=None):
54 # Docstring inherited from Formatter.read.
55 path = self.fileDescriptor.location.path
56
57 if component == 'coverage':
58 try:
59 data = hsp.HealSparseCoverage.read(path)
60 except (OSError, RuntimeError):
61 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
62
63 return data
64
65 if self.fileDescriptor.parameters is None:
66 pixels = None
67 degrade_nside = None
68 else:
69 pixels = self.fileDescriptor.parameters.get('pixels', None)
70 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
71 try:
72 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
73 except (OSError, RuntimeError):
74 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
75
76 return data
77
78 def write(self, inMemoryDataset):
79 # Docstring inherited from Formatter.write.
80 # Update the location with the formatter-preferred file extension
81 self.fileDescriptor.location.updateExtension(self.extension)
82 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
83
84
85def _is_power_of_two(value):
86 """Check that value is a power of two.
87
88 Parameters
89 ----------
90 value : `int`
91 Value to check.
92
93 Returns
94 -------
95 is_power_of_two : `bool`
96 True if value is a power of two; False otherwise, or
97 if value is not an integer.
98 """
99 if not isinstance(value, numbers.Integral):
100 return False
101
102 # See https://stackoverflow.com/questions/57025836
103 # Every power of 2 has exactly 1 bit set to 1; subtracting
104 # 1 therefore flips every preceding bit. If you and that
105 # together with the original value it must be 0.
106 return (value & (value - 1) == 0) and value != 0
107
108
109class HealSparseInputMapConfig(pexConfig.Config):
110 """Configuration parameters for HealSparseInputMapTask"""
111 nside = pexConfig.Field(
112 doc="Mapping healpix nside. Must be power of 2.",
113 dtype=int,
114 default=32768,
115 check=_is_power_of_two,
116 )
117 nside_coverage = pexConfig.Field(
118 doc="HealSparse coverage map nside. Must be power of 2.",
119 dtype=int,
120 default=256,
121 check=_is_power_of_two,
122 )
123 bad_mask_min_coverage = pexConfig.Field(
124 doc=("Minimum area fraction of a map healpixel pixel that must be "
125 "covered by bad pixels to be removed from the input map. "
126 "This is approximate."),
127 dtype=float,
128 default=0.5,
129 )
130
131
132class HealSparseInputMapTask(pipeBase.Task):
133 """Task for making a HealSparse input map."""
134
135 ConfigClass = HealSparseInputMapConfig
136 _DefaultName = "healSparseInputMap"
137
138 def __init__(self, **kwargs):
139 pipeBase.Task.__init__(self, **kwargs)
140
141 self.ccd_input_map = None
142
143 def build_ccd_input_map(self, bbox, wcs, ccds):
144 """Build a map from ccd valid polygons or bounding boxes.
145
146 Parameters
147 ----------
148 bbox : `lsst.geom.Box2I`
149 Bounding box for region to build input map.
151 WCS object for region to build input map.
153 Exposure catalog with ccd data from coadd inputs.
154 """
155 with warnings.catch_warnings():
156 # Healsparse will emit a warning if nside coverage is greater than
157 # 128. In the case of generating patch input maps, and not global
158 # maps, high nside coverage works fine, so we can suppress this
159 # warning.
160 warnings.simplefilter("ignore")
161 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
162 nside_sparse=self.config.nside,
163 dtype=hsp.WIDE_MASK,
164 wide_mask_maxbits=len(ccds))
165 self._wcs = wcs
166 self._bbox = bbox
167 self._ccds = ccds
168
169 pixel_scale = wcs.getPixelScale().asArcseconds()
170 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.)
171 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
172
173 metadata = {}
174 self._bits_per_visit_ccd = {}
175 self._bits_per_visit = defaultdict(list)
176 for bit, ccd_row in enumerate(ccds):
177 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
178 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
179 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
180
181 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
182 self._bits_per_visit[ccd_row["visit"]].append(bit)
183
184 ccd_poly = ccd_row.getValidPolygon()
185 if ccd_poly is None:
186 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
187 # Detectors need to be rendered with their own wcs.
188 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
189
190 # Create a ccd healsparse polygon
191 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
192 dec=ccd_poly_radec[: -1, 1],
193 value=[bit])
194 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse),
195 [bit])
196
197 # Cut down to the overall bounding box with associated wcs.
198 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
199 bbox_poly_radec = self._pixels_to_radec(self._wcs,
200 bbox_afw_poly.convexHull().getVertices())
201 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
202 value=np.arange(self.ccd_input_map.wide_mask_maxbits))
203 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
204 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
205 self.ccd_input_map.metadata = metadata
206
207 # Create a temporary map to hold the count of bad pixels in each healpix pixel
208 self._ccd_input_pixels = self.ccd_input_map.valid_pixels
209
210 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()]
211
212 with warnings.catch_warnings():
213 # Healsparse will emit a warning if nside coverage is greater than
214 # 128. In the case of generating patch input maps, and not global
215 # maps, high nside coverage works fine, so we can suppress this
216 # warning.
217 warnings.simplefilter("ignore")
218 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
219 nside_coverage=self.config.nside_coverage,
220 nside_sparse=self.config.nside,
221 dtype=dtype,
222 primary=dtype[0][0])
223
224 # Don't set input bad map if there are no ccds which overlap the bbox.
225 if len(self._ccd_input_pixels) > 0:
226 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype)
227
228 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
229 """Mask a subregion from a visit.
230 This must be run after build_ccd_input_map initializes
231 the overall map.
232
233 Parameters
234 ----------
235 bbox : `lsst.geom.Box2I`
236 Bounding box from region to mask.
237 visit : `int`
238 Visit number corresponding to warp with mask.
239 mask : `lsst.afw.image.MaskX`
240 Mask plane from warp exposure.
241 bit_mask_value : `int`
242 Bit mask to check for bad pixels.
243
244 Raises
245 ------
246 RuntimeError : Raised if build_ccd_input_map was not run first.
247 """
248 if self.ccd_input_map is None:
249 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
250
251 # Find the bad pixels and convert to healpix
252 bad_pixels = np.where(mask.array & bit_mask_value)
253 if len(bad_pixels[0]) == 0:
254 # No bad pixels
255 return
256
257 # Bad pixels come from warps which use the overall wcs.
258 bad_ra, bad_dec = self._wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
259 bad_pixels[0].astype(np.float64),
260 degrees=True)
261 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
262
263 # Count the number of bad image pixels in each healpix pixel
264 min_bad_hpix = bad_hpix.min()
265 bad_hpix_count = np.zeros(bad_hpix.max() - min_bad_hpix + 1, dtype=np.int32)
266 np.add.at(bad_hpix_count, bad_hpix - min_bad_hpix, 1)
267
268 # Add these to the accumulator map.
269 # We need to make sure that the "primary" array has valid values for
270 # this pixel to be registered in the accumulator map.
271 pix_to_add, = np.where(bad_hpix_count > 0)
272 count_map_arr = self._ccd_input_bad_count_map[min_bad_hpix + pix_to_add]
273 primary = self._ccd_input_bad_count_map.primary
274 count_map_arr[primary] = np.clip(count_map_arr[primary], 0, None)
275
276 count_map_arr[f"v{visit}"] = np.clip(count_map_arr[f"v{visit}"], 0, None)
277 count_map_arr[f"v{visit}"] += bad_hpix_count[pix_to_add]
278
279 self._ccd_input_bad_count_map[min_bad_hpix + pix_to_add] = count_map_arr
280
282 """Use accumulated mask information to finalize the masking of
283 ccd_input_map.
284
285 Raises
286 ------
287 RuntimeError : Raised if build_ccd_input_map was not run first.
288 """
289 if self.ccd_input_map is None:
290 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
291
292 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels]
293 for visit in self._bits_per_visit:
294 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad)
295 if to_mask.size == 0:
296 continue
297 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask],
298 self._bits_per_visit[visit])
299
300 # Clear memory
301 self._ccd_input_bad_count_map = None
302
303 def _pixels_to_radec(self, wcs, pixels):
304 """Convert pixels to ra/dec positions using a wcs.
305
306 Parameters
307 ----------
309 WCS object.
310 pixels : `list` [`lsst.geom.Point2D`]
311 List of pixels to convert.
312
313 Returns
314 -------
315 radec : `numpy.ndarray`
316 Nx2 array of ra/dec positions associated with pixels.
317 """
318 sph_pts = wcs.pixelToSky(pixels)
319 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
320 for sph in sph_pts])
321
322
323class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
324 dimensions=("tract", "band", "skymap",),
325 defaultTemplates={"coaddName": "deep",
326 "calexpType": ""}):
327 input_maps = pipeBase.connectionTypes.Input(
328 doc="Healsparse bit-wise coadd input maps",
329 name="{coaddName}Coadd_inputMap",
330 storageClass="HealSparseMap",
331 dimensions=("tract", "patch", "skymap", "band"),
332 multiple=True,
333 deferLoad=True,
334 )
335 coadd_exposures = pipeBase.connectionTypes.Input(
336 doc="Coadded exposures associated with input_maps",
337 name="{coaddName}Coadd",
338 storageClass="ExposureF",
339 dimensions=("tract", "patch", "skymap", "band"),
340 multiple=True,
341 deferLoad=True,
342 )
343 visit_summaries = pipeBase.connectionTypes.Input(
344 doc="Visit summary tables with aggregated statistics",
345 name="{calexpType}visitSummary",
346 storageClass="ExposureCatalog",
347 dimensions=("instrument", "visit"),
348 multiple=True,
349 deferLoad=True,
350 )
351 sky_map = pipeBase.connectionTypes.Input(
352 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
353 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
354 storageClass="SkyMap",
355 dimensions=("skymap",),
356 )
357
358 # Create output connections for all possible maps defined in the
359 # registry. The vars() trick used here allows us to set class attributes
360 # programatically. Taken from
361 # https://stackoverflow.com/questions/2519807/
362 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
363 for name in BasePropertyMap.registry:
364 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
365 doc=f"Minimum-value map of {name}",
366 name=f"{{coaddName}}Coadd_{name}_map_min",
367 storageClass="HealSparseMap",
368 dimensions=("tract", "skymap", "band"),
369 )
370 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
371 doc=f"Maximum-value map of {name}",
372 name=f"{{coaddName}}Coadd_{name}_map_max",
373 storageClass="HealSparseMap",
374 dimensions=("tract", "skymap", "band"),
375 )
376 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
377 doc=f"Mean-value map of {name}",
378 name=f"{{coaddName}}Coadd_{name}_map_mean",
379 storageClass="HealSparseMap",
380 dimensions=("tract", "skymap", "band"),
381 )
382 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
383 doc=f"Weighted mean-value map of {name}",
384 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
385 storageClass="HealSparseMap",
386 dimensions=("tract", "skymap", "band"),
387 )
388 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
389 doc=f"Sum-value map of {name}",
390 name=f"{{coaddName}}Coadd_{name}_map_sum",
391 storageClass="HealSparseMap",
392 dimensions=("tract", "skymap", "band"),
393 )
394
395 def __init__(self, *, config=None):
396 super().__init__(config=config)
397
398 # Not all possible maps in the registry will be configured to run.
399 # Here we remove the unused connections.
400 for name in BasePropertyMap.registry:
401 if name not in config.property_maps:
402 prop_config = BasePropertyMapConfig()
403 prop_config.do_min = False
404 prop_config.do_max = False
405 prop_config.do_mean = False
406 prop_config.do_weighted_mean = False
407 prop_config.do_sum = False
408 else:
409 prop_config = config.property_maps[name]
410
411 if not prop_config.do_min:
412 self.outputs.remove(f"{name}_map_min")
413 if not prop_config.do_max:
414 self.outputs.remove(f"{name}_map_max")
415 if not prop_config.do_mean:
416 self.outputs.remove(f"{name}_map_mean")
417 if not prop_config.do_weighted_mean:
418 self.outputs.remove(f"{name}_map_weighted_mean")
419 if not prop_config.do_sum:
420 self.outputs.remove(f"{name}_map_sum")
421
422
423class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
424 pipelineConnections=HealSparsePropertyMapConnections):
425 """Configuration parameters for HealSparsePropertyMapTask"""
426 property_maps = BasePropertyMap.registry.makeField(
427 multi=True,
428 default=["exposure_time",
429 "psf_size",
430 "psf_e1",
431 "psf_e2",
432 "psf_maglim",
433 "sky_noise",
434 "sky_background",
435 "dcr_dra",
436 "dcr_ddec",
437 "dcr_e1",
438 "dcr_e2",
439 "epoch"],
440 doc="Property map computation objects",
441 )
442
443 def setDefaults(self):
444 self.property_maps["exposure_time"].do_sum = True
445 self.property_maps["psf_size"].do_weighted_mean = True
446 self.property_maps["psf_e1"].do_weighted_mean = True
447 self.property_maps["psf_e2"].do_weighted_mean = True
448 self.property_maps["psf_maglim"].do_weighted_mean = True
449 self.property_maps["sky_noise"].do_weighted_mean = True
450 self.property_maps["sky_background"].do_weighted_mean = True
451 self.property_maps["dcr_dra"].do_weighted_mean = True
452 self.property_maps["dcr_ddec"].do_weighted_mean = True
453 self.property_maps["dcr_e1"].do_weighted_mean = True
454 self.property_maps["dcr_e2"].do_weighted_mean = True
455 self.property_maps["epoch"].do_mean = True
456 self.property_maps["epoch"].do_min = True
457 self.property_maps["epoch"].do_max = True
458
459
460class HealSparsePropertyMapTask(pipeBase.PipelineTask):
461 """Task to compute Healsparse property maps.
462
463 This task will compute individual property maps (per tract, per
464 map type, per band). These maps cover the full coadd tract, and
465 are not truncated to the inner tract region.
466 """
467 ConfigClass = HealSparsePropertyMapConfig
468 _DefaultName = "healSparsePropertyMapTask"
469
470 def __init__(self, **kwargs):
471 super().__init__(**kwargs)
472 self.property_maps = PropertyMapMap()
473 for name, config, PropertyMapClass in self.config.property_maps.apply():
474 self.property_maps[name] = PropertyMapClass(config, name)
475
476 @timeMethod
477 def runQuantum(self, butlerQC, inputRefs, outputRefs):
478 inputs = butlerQC.get(inputRefs)
479
480 sky_map = inputs.pop("sky_map")
481
482 tract = butlerQC.quantum.dataId["tract"]
483 band = butlerQC.quantum.dataId["band"]
484
485 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
486 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
487
488 visit_summary_dict = {ref.dataId["visit"]: ref.get()
489 for ref in inputs["visit_summaries"]}
490
491 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
492
493 # Write the outputs
494 for name, property_map in self.property_maps.items():
495 if property_map.config.do_min:
496 butlerQC.put(property_map.min_map,
497 getattr(outputRefs, f"{name}_map_min"))
498 if property_map.config.do_max:
499 butlerQC.put(property_map.max_map,
500 getattr(outputRefs, f"{name}_map_max"))
501 if property_map.config.do_mean:
502 butlerQC.put(property_map.mean_map,
503 getattr(outputRefs, f"{name}_map_mean"))
504 if property_map.config.do_weighted_mean:
505 butlerQC.put(property_map.weighted_mean_map,
506 getattr(outputRefs, f"{name}_map_weighted_mean"))
507 if property_map.config.do_sum:
508 butlerQC.put(property_map.sum_map,
509 getattr(outputRefs, f"{name}_map_sum"))
510
511 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
512 """Run the healsparse property task.
513
514 Parameters
515 ----------
516 sky_map : Sky map object
517 tract : `int`
518 Tract number.
519 band : `str`
520 Band name for logging.
521 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
522 Dictionary of coadd exposure references. Keys are patch numbers.
523 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
524 Dictionary of input map references. Keys are patch numbers.
525 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
526 Dictionary of visit summary tables. Keys are visit numbers.
527
528 Raises
529 ------
530 RepeatableQuantumError
531 If visit_summary_dict is missing any visits or detectors found in an
532 input map. This leads to an inconsistency between what is in the coadd
533 (via the input map) and the visit summary tables which contain data
534 to compute the maps.
535 """
536 tract_info = sky_map[tract]
537
538 tract_maps_initialized = False
539
540 for patch in input_map_dict.keys():
541 self.log.info("Making maps for band %s, tract %d, patch %d.",
542 band, tract, patch)
543
544 patch_info = tract_info[patch]
545
546 input_map = input_map_dict[patch].get()
547 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
548 coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
549
550 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
551
552 # Crop input_map to the inner polygon of the patch
553 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
554 patch_radec = self._vertices_to_radec(poly_vertices)
555 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
556 value=np.arange(input_map.wide_mask_maxbits))
557 with warnings.catch_warnings():
558 # Healsparse will emit a warning if nside coverage is greater than
559 # 128. In the case of generating patch input maps, and not global
560 # maps, high nside coverage works fine, so we can suppress this
561 # warning.
562 warnings.simplefilter("ignore")
563 patch_poly_map = patch_poly.get_map_like(input_map)
564 input_map = hsp.and_intersection([input_map, patch_poly_map])
565
566 if not tract_maps_initialized:
567 # We use the first input map nside information to initialize
568 # the tract maps
569 nside_coverage = self._compute_nside_coverage_tract(tract_info)
570 nside = input_map.nside_sparse
571
572 do_compute_approx_psf = False
573 # Initialize the tract maps
574 for property_map in self.property_maps:
575 property_map.initialize_tract_maps(nside_coverage, nside)
576 if property_map.requires_psf:
577 do_compute_approx_psf = True
578
579 tract_maps_initialized = True
580
581 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
582
583 # Check if there are no valid pixels for the inner (unique) patch region
584 if valid_pixels.size == 0:
585 continue
586
587 # Initialize the value accumulators
588 for property_map in self.property_maps:
589 property_map.initialize_values(valid_pixels.size)
590 property_map.zeropoint = coadd_zeropoint
591
592 # Initialize the weight and counter accumulators
593 total_weights = np.zeros(valid_pixels.size)
594 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
595
596 for bit, ccd_row in enumerate(coadd_inputs.ccds):
597 # Which pixels in the map are used by this visit/detector
598 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
599
600 # Check if there are any valid pixels in the map from this deteector.
601 if inmap.size == 0:
602 continue
603
604 # visit, detector_id, weight = input_dict[bit]
605 visit = ccd_row["visit"]
606 detector_id = ccd_row["ccd"]
607 weight = ccd_row["weight"]
608
609 x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
610 scalings = self._compute_calib_scale(ccd_row, x, y)
611
612 if do_compute_approx_psf:
613 psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
614 else:
615 psf_array = None
616
617 total_weights[inmap] += weight
618 total_inputs[inmap] += 1
619
620 # Retrieve the correct visitSummary row
621 if visit not in visit_summary_dict:
622 msg = f"Visit {visit} not found in visit_summaries."
623 raise pipeBase.RepeatableQuantumError(msg)
624 row = visit_summary_dict[visit].find(detector_id)
625 if row is None:
626 msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
627 raise pipeBase.RepeatableQuantumError(msg)
628
629 # Accumulate the values
630 for property_map in self.property_maps:
631 property_map.accumulate_values(inmap,
632 vpix_ra[inmap],
633 vpix_dec[inmap],
634 weight,
635 scalings,
636 row,
637 psf_array=psf_array)
638
639 # Finalize the mean values and set the tract maps
640 for property_map in self.property_maps:
641 property_map.finalize_mean_values(total_weights, total_inputs)
642 property_map.set_map_values(valid_pixels)
643
644 def _compute_calib_scale(self, ccd_row, x, y):
645 """Compute calibration scaling values.
646
647 Parameters
648 ----------
650 Exposure metadata for a given detector exposure.
651 x : `np.ndarray`
652 Array of x positions.
653 y : `np.ndarray`
654 Array of y positions.
655
656 Returns
657 -------
658 calib_scale : `np.ndarray`
659 Array of calibration scale values.
660 """
661 photo_calib = ccd_row.getPhotoCalib()
662 bf = photo_calib.computeScaledCalibration()
663 if bf.getBBox() == ccd_row.getBBox():
664 # Track variable calibration over the detector
665 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
666 else:
667 # Spatially constant calibration
668 calib_scale = photo_calib.getCalibrationMean()
669
670 return calib_scale
671
672 def _vertices_to_radec(self, vertices):
673 """Convert polygon vertices to ra/dec.
674
675 Parameters
676 ----------
677 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
678 Vertices for bounding polygon.
679
680 Returns
681 -------
682 radec : `numpy.ndarray`
683 Nx2 array of ra/dec positions (in degrees) associated with vertices.
684 """
685 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
686 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
687 x in lonlats])
688 return radec
689
690 def _compute_nside_coverage_tract(self, tract_info):
691 """Compute the optimal coverage nside for a tract.
692
693 Parameters
694 ----------
696 Tract information object.
697
698 Returns
699 -------
700 nside_coverage : `int`
701 Optimal coverage nside for a tract map.
702 """
703 num_patches = tract_info.getNumPatches()
704
705 # Compute approximate patch area
706 patch_info = tract_info.getPatchInfo(0)
707 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
708 radec = self._vertices_to_radec(vertices)
709 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
710 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
711 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
712
713 tract_area = num_patches[0]*num_patches[1]*patch_area
714 # Start with a fairly low nside and increase until we find the approximate area.
715 nside_coverage_tract = 32
716 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
717 nside_coverage_tract = 2*nside_coverage_tract
718 # Step back one, but don't go bigger pixels than nside=32 or smaller
719 # than 128 (recommended by healsparse).
720 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
721
722 return nside_coverage_tract
723
724
725class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
726 dimensions=("band", "skymap",),
727 defaultTemplates={"coaddName": "deep"}):
728 sky_map = pipeBase.connectionTypes.Input(
729 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
730 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
731 storageClass="SkyMap",
732 dimensions=("skymap",),
733 )
734
735 # Create output connections for all possible maps defined in the
736 # registry. The vars() trick used here allows us to set class attributes
737 # programatically. Taken from
738 # https://stackoverflow.com/questions/2519807/
739 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
740 for name in BasePropertyMap.registry:
741 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
742 doc=f"Minimum-value map of {name}",
743 name=f"{{coaddName}}Coadd_{name}_map_min",
744 storageClass="HealSparseMap",
745 dimensions=("tract", "skymap", "band"),
746 multiple=True,
747 deferLoad=True,
748 )
749 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
750 doc=f"Minumum-value map of {name}",
751 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
752 storageClass="HealSparseMap",
753 dimensions=("skymap", "band"),
754 )
755 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
756 doc=f"Maximum-value map of {name}",
757 name=f"{{coaddName}}Coadd_{name}_map_max",
758 storageClass="HealSparseMap",
759 dimensions=("tract", "skymap", "band"),
760 multiple=True,
761 deferLoad=True,
762 )
763 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
764 doc=f"Minumum-value map of {name}",
765 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
766 storageClass="HealSparseMap",
767 dimensions=("skymap", "band"),
768 )
769 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
770 doc=f"Mean-value map of {name}",
771 name=f"{{coaddName}}Coadd_{name}_map_mean",
772 storageClass="HealSparseMap",
773 dimensions=("tract", "skymap", "band"),
774 multiple=True,
775 deferLoad=True,
776 )
777 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
778 doc=f"Minumum-value map of {name}",
779 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
780 storageClass="HealSparseMap",
781 dimensions=("skymap", "band"),
782 )
783 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
784 doc=f"Weighted mean-value map of {name}",
785 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
786 storageClass="HealSparseMap",
787 dimensions=("tract", "skymap", "band"),
788 multiple=True,
789 deferLoad=True,
790 )
791 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
792 doc=f"Minumum-value map of {name}",
793 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
794 storageClass="HealSparseMap",
795 dimensions=("skymap", "band"),
796 )
797 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
798 doc=f"Sum-value map of {name}",
799 name=f"{{coaddName}}Coadd_{name}_map_sum",
800 storageClass="HealSparseMap",
801 dimensions=("tract", "skymap", "band"),
802 multiple=True,
803 deferLoad=True,
804 )
805 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
806 doc=f"Minumum-value map of {name}",
807 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
808 storageClass="HealSparseMap",
809 dimensions=("skymap", "band"),
810 )
811
812 def __init__(self, *, config=None):
813 super().__init__(config=config)
814
815 # Not all possible maps in the registry will be configured to run.
816 # Here we remove the unused connections.
817 for name in BasePropertyMap.registry:
818 if name not in config.property_maps:
819 prop_config = BasePropertyMapConfig()
820 prop_config.do_min = False
821 prop_config.do_max = False
822 prop_config.do_mean = False
823 prop_config.do_weighted_mean = False
824 prop_config.do_sum = False
825 else:
826 prop_config = config.property_maps[name]
827
828 if not prop_config.do_min:
829 self.inputs.remove(f"{name}_map_min")
830 self.outputs.remove(f"{name}_consolidated_map_min")
831 if not prop_config.do_max:
832 self.inputs.remove(f"{name}_map_max")
833 self.outputs.remove(f"{name}_consolidated_map_max")
834 if not prop_config.do_mean:
835 self.inputs.remove(f"{name}_map_mean")
836 self.outputs.remove(f"{name}_consolidated_map_mean")
837 if not prop_config.do_weighted_mean:
838 self.inputs.remove(f"{name}_map_weighted_mean")
839 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
840 if not prop_config.do_sum:
841 self.inputs.remove(f"{name}_map_sum")
842 self.outputs.remove(f"{name}_consolidated_map_sum")
843
844
845class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
846 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
847 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
848 property_maps = BasePropertyMap.registry.makeField(
849 multi=True,
850 default=["exposure_time",
851 "psf_size",
852 "psf_e1",
853 "psf_e2",
854 "psf_maglim",
855 "sky_noise",
856 "sky_background",
857 "dcr_dra",
858 "dcr_ddec",
859 "dcr_e1",
860 "dcr_e2",
861 "epoch"],
862 doc="Property map computation objects",
863 )
864 nside_coverage = pexConfig.Field(
865 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
866 dtype=int,
867 default=32,
868 check=_is_power_of_two,
869 )
870
871 def setDefaults(self):
872 self.property_maps["exposure_time"].do_sum = True
873 self.property_maps["psf_size"].do_weighted_mean = True
874 self.property_maps["psf_e1"].do_weighted_mean = True
875 self.property_maps["psf_e2"].do_weighted_mean = True
876 self.property_maps["psf_maglim"].do_weighted_mean = True
877 self.property_maps["sky_noise"].do_weighted_mean = True
878 self.property_maps["sky_background"].do_weighted_mean = True
879 self.property_maps["dcr_dra"].do_weighted_mean = True
880 self.property_maps["dcr_ddec"].do_weighted_mean = True
881 self.property_maps["dcr_e1"].do_weighted_mean = True
882 self.property_maps["dcr_e2"].do_weighted_mean = True
883 self.property_maps["epoch"].do_mean = True
884 self.property_maps["epoch"].do_min = True
885 self.property_maps["epoch"].do_max = True
886
887
888class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
889 """Task to consolidate HealSparse property maps.
890
891 This task will take all the individual tract-based maps (per map type,
892 per band) and consolidate them into one survey-wide map (per map type,
893 per band). Each tract map is truncated to its inner region before
894 consolidation.
895 """
896 ConfigClass = ConsolidateHealSparsePropertyMapConfig
897 _DefaultName = "consolidateHealSparsePropertyMapTask"
898
899 def __init__(self, **kwargs):
900 super().__init__(**kwargs)
901 self.property_maps = PropertyMapMap()
902 for name, config, PropertyMapClass in self.config.property_maps.apply():
903 self.property_maps[name] = PropertyMapClass(config, name)
904
905 @timeMethod
906 def runQuantum(self, butlerQC, inputRefs, outputRefs):
907 inputs = butlerQC.get(inputRefs)
908
909 sky_map = inputs.pop("sky_map")
910
911 # These need to be consolidated one at a time to conserve memory.
912 for name in self.config.property_maps.names:
913 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
914 map_type = f"{name}_map_{type_}"
915 if map_type in inputs:
916 input_refs = {ref.dataId['tract']: ref
917 for ref in inputs[map_type]}
918 consolidated_map = self.consolidate_map(sky_map, input_refs)
919 butlerQC.put(consolidated_map,
920 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
921
922 def consolidate_map(self, sky_map, input_refs):
923 """Consolidate the healsparse property maps.
924
925 Parameters
926 ----------
927 sky_map : Sky map object
928 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
929 Dictionary of tract_id mapping to dataref.
930
931 Returns
932 -------
933 consolidated_map : `healsparse.HealSparseMap`
934 Consolidated HealSparse map.
935 """
936 # First, we read in the coverage maps to know how much memory
937 # to allocate
938 cov_mask = None
939 nside_coverage_inputs = None
940 for tract_id in input_refs:
941 cov = input_refs[tract_id].get(component='coverage')
942 if cov_mask is None:
943 cov_mask = cov.coverage_mask
944 nside_coverage_inputs = cov.nside_coverage
945 else:
946 cov_mask |= cov.coverage_mask
947
948 cov_pix_inputs, = np.where(cov_mask)
949
950 # Compute the coverage pixels for the desired nside_coverage
951 if nside_coverage_inputs == self.config.nside_coverage:
952 cov_pix = cov_pix_inputs
953 elif nside_coverage_inputs > self.config.nside_coverage:
954 # Converting from higher resolution coverage to lower
955 # resolution coverage.
956 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
957 nside_coverage_inputs)
958 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
959 else:
960 # Converting from lower resolution coverage to higher
961 # resolution coverage.
962 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
963 self.config.nside_coverage)
964 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
965
966 # Now read in each tract map and build the consolidated map.
967 consolidated_map = None
968 for tract_id in input_refs:
969 input_map = input_refs[tract_id].get()
970 if consolidated_map is None:
971 consolidated_map = hsp.HealSparseMap.make_empty(
972 self.config.nside_coverage,
973 input_map.nside_sparse,
974 input_map.dtype,
975 sentinel=input_map._sentinel,
976 cov_pixels=cov_pix)
977
978 # Only use pixels that are properly inside the tract.
979 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
980 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
981
982 in_tract = (vpix_tract_ids == tract_id)
983
984 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
985
986 return consolidated_map
std::vector< SchemaItem< Flag > > * items
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
Cartesian polygons.
Definition: Polygon.h:59
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
Record class used to store exposure metadata.
Definition: Exposure.h:79
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
An integer coordinate rectangle.
Definition: Box.h:55
def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition: LonLat.h:48
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
Definition: UnitVector3d.h:55