LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
healSparseMapping.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2021 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 from collections import defaultdict
23 import numbers
24 import numpy as np
25 import healpy as hp
26 import healsparse as hsp
27 
28 import lsst.pex.config as pexConfig
29 import lsst.pipe.base as pipeBase
30 import lsst.geom
31 import lsst.afw.geom as afwGeom
32 from lsst.daf.butler import Formatter
33 from lsst.skymap import BaseSkyMap
34 from lsst.utils.timer import timeMethod
35 from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
36  PropertyMapMap, compute_approx_psf_size_and_shape)
37 
38 
39 __all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
40  "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
41  "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
42  "ConsolidateHealSparsePropertyMapConnections",
43  "ConsolidateHealSparsePropertyMapConfig",
44  "ConsolidateHealSparsePropertyMapTask"]
45 
46 
47 class HealSparseMapFormatter(Formatter):
48  """Interface for reading and writing healsparse.HealSparseMap files."""
49  unsupportedParameters = frozenset()
50  supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
51  extension = '.hsp'
52 
53  def read(self, component=None):
54  # Docstring inherited from Formatter.read.
55  path = self.fileDescriptor.location.path
56 
57  if component == 'coverage':
58  try:
59  data = hsp.HealSparseCoverage.read(path)
60  except (OSError, RuntimeError):
61  raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
62 
63  return data
64 
65  if self.fileDescriptor.parameters is None:
66  pixels = None
67  degrade_nside = None
68  else:
69  pixels = self.fileDescriptor.parameters.get('pixels', None)
70  degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
71  try:
72  data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
73  except (OSError, RuntimeError):
74  raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
75 
76  return data
77 
78  def write(self, inMemoryDataset):
79  # Docstring inherited from Formatter.write.
80  # Update the location with the formatter-preferred file extension
81  self.fileDescriptor.location.updateExtension(self.extensionextension)
82  inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
83 
84 
85 def _is_power_of_two(value):
86  """Check that value is a power of two.
87 
88  Parameters
89  ----------
90  value : `int`
91  Value to check.
92 
93  Returns
94  -------
95  is_power_of_two : `bool`
96  True if value is a power of two; False otherwise, or
97  if value is not an integer.
98  """
99  if not isinstance(value, numbers.Integral):
100  return False
101 
102  # See https://stackoverflow.com/questions/57025836
103  # Every power of 2 has exactly 1 bit set to 1; subtracting
104  # 1 therefore flips every preceding bit. If you and that
105  # together with the original value it must be 0.
106  return (value & (value - 1) == 0) and value != 0
107 
108 
109 class HealSparseInputMapConfig(pexConfig.Config):
110  """Configuration parameters for HealSparseInputMapTask"""
111  nside = pexConfig.Field(
112  doc="Mapping healpix nside. Must be power of 2.",
113  dtype=int,
114  default=32768,
115  check=_is_power_of_two,
116  )
117  nside_coverage = pexConfig.Field(
118  doc="HealSparse coverage map nside. Must be power of 2.",
119  dtype=int,
120  default=256,
121  check=_is_power_of_two,
122  )
123  bad_mask_min_coverage = pexConfig.Field(
124  doc=("Minimum area fraction of a map healpixel pixel that must be "
125  "covered by bad pixels to be removed from the input map. "
126  "This is approximate."),
127  dtype=float,
128  default=0.5,
129  )
130 
131 
132 class HealSparseInputMapTask(pipeBase.Task):
133  """Task for making a HealSparse input map."""
134 
135  ConfigClass = HealSparseInputMapConfig
136  _DefaultName = "healSparseInputMap"
137 
138  def __init__(self, **kwargs):
139  pipeBase.Task.__init__(self, **kwargs)
140 
141  self.ccd_input_mapccd_input_map = None
142 
143  def build_ccd_input_map(self, bbox, wcs, ccds):
144  """Build a map from ccd valid polygons or bounding boxes.
145 
146  Parameters
147  ----------
148  bbox : `lsst.geom.Box2I`
149  Bounding box for region to build input map.
150  wcs : `lsst.afw.geom.SkyWcs`
151  WCS object for region to build input map.
152  ccds : `lsst.afw.table.ExposureCatalog`
153  Exposure catalog with ccd data from coadd inputs.
154  """
155  self.ccd_input_mapccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
156  nside_sparse=self.config.nside,
157  dtype=hsp.WIDE_MASK,
158  wide_mask_maxbits=len(ccds))
159  self._wcs_wcs = wcs
160  self._bbox_bbox = bbox
161  self._ccds_ccds = ccds
162 
163  pixel_scale = wcs.getPixelScale().asArcseconds()
164  hpix_area_arcsec2 = hp.nside2pixarea(self.config.nside, degrees=True)*(3600.**2.)
165  self._min_bad_min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
166 
167  metadata = {}
168  self._bits_per_visit_ccd_bits_per_visit_ccd = {}
169  self._bits_per_visit_bits_per_visit = defaultdict(list)
170  for bit, ccd_row in enumerate(ccds):
171  metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
172  metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
173  metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
174 
175  self._bits_per_visit_ccd_bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
176  self._bits_per_visit_bits_per_visit[ccd_row["visit"]].append(bit)
177 
178  ccd_poly = ccd_row.getValidPolygon()
179  if ccd_poly is None:
180  ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
181  # Detectors need to be rendered with their own wcs.
182  ccd_poly_radec = self._pixels_to_radec_pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
183 
184  # Create a ccd healsparse polygon
185  poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
186  dec=ccd_poly_radec[: -1, 1],
187  value=[bit])
188  self.ccd_input_mapccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_mapccd_input_map.nside_sparse),
189  [bit])
190 
191  # Cut down to the overall bounding box with associated wcs.
192  bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
193  bbox_poly_radec = self._pixels_to_radec_pixels_to_radec(self._wcs_wcs,
194  bbox_afw_poly.convexHull().getVertices())
195  bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
196  value=np.arange(self.ccd_input_mapccd_input_map.wide_mask_maxbits))
197  bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_mapccd_input_map)
198  self.ccd_input_mapccd_input_map = hsp.and_intersection([self.ccd_input_mapccd_input_map, bbox_poly_map])
199  self.ccd_input_mapccd_input_map.metadata = metadata
200 
201  # Create a temporary map to hold the count of bad pixels in each healpix pixel
202  self._ccd_input_pixels_ccd_input_pixels = self.ccd_input_mapccd_input_map.valid_pixels
203 
204  dtype = [(f"v{visit}", "i4") for visit in self._bits_per_visit_bits_per_visit.keys()]
205 
206  cov = self.config.nside_coverage
207  ns = self.config.nside
208  self._ccd_input_bad_count_map_ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(nside_coverage=cov,
209  nside_sparse=ns,
210  dtype=dtype,
211  primary=dtype[0][0])
212  # Don't set input bad map if there are no ccds which overlap the bbox.
213  if len(self._ccd_input_pixels_ccd_input_pixels) > 0:
214  self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels] = np.zeros(1, dtype=dtype)
215 
216  def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
217  """Mask a subregion from a visit.
218  This must be run after build_ccd_input_map initializes
219  the overall map.
220 
221  Parameters
222  ----------
223  bbox : `lsst.geom.Box2I`
224  Bounding box from region to mask.
225  visit : `int`
226  Visit number corresponding to warp with mask.
227  mask : `lsst.afw.image.MaskX`
228  Mask plane from warp exposure.
229  bit_mask_value : `int`
230  Bit mask to check for bad pixels.
231 
232  Raises
233  ------
234  RuntimeError : Raised if build_ccd_input_map was not run first.
235  """
236  if self.ccd_input_mapccd_input_map is None:
237  raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
238 
239  # Find the bad pixels and convert to healpix
240  bad_pixels = np.where(mask.array & bit_mask_value)
241  if len(bad_pixels[0]) == 0:
242  # No bad pixels
243  return
244 
245  # Bad pixels come from warps which use the overall wcs.
246  bad_ra, bad_dec = self._wcs_wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
247  bad_pixels[0].astype(np.float64),
248  degrees=True)
249  bad_hpix = hp.ang2pix(self.config.nside, bad_ra, bad_dec,
250  lonlat=True, nest=True)
251 
252  # Count the number of bad image pixels in each healpix pixel
253  min_bad_hpix = bad_hpix.min()
254  bad_hpix_count = np.zeros(bad_hpix.max() - min_bad_hpix + 1, dtype=np.int32)
255  np.add.at(bad_hpix_count, bad_hpix - min_bad_hpix, 1)
256 
257  # Add these to the accumulator map.
258  # We need to make sure that the "primary" array has valid values for
259  # this pixel to be registered in the accumulator map.
260  pix_to_add, = np.where(bad_hpix_count > 0)
261  count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add]
262  primary = self._ccd_input_bad_count_map_ccd_input_bad_count_map.primary
263  count_map_arr[primary] = np.clip(count_map_arr[primary], 0, None)
264 
265  count_map_arr[f"v{visit}"] = np.clip(count_map_arr[f"v{visit}"], 0, None)
266  count_map_arr[f"v{visit}"] += bad_hpix_count[pix_to_add]
267 
268  self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add] = count_map_arr
269 
271  """Use accumulated mask information to finalize the masking of
272  ccd_input_map.
273 
274  Raises
275  ------
276  RuntimeError : Raised if build_ccd_input_map was not run first.
277  """
278  if self.ccd_input_mapccd_input_map is None:
279  raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
280 
281  count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels]
282  for visit in self._bits_per_visit_bits_per_visit:
283  to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad_min_bad)
284  if to_mask.size == 0:
285  continue
286  self.ccd_input_mapccd_input_map.clear_bits_pix(self._ccd_input_pixels_ccd_input_pixels[to_mask],
287  self._bits_per_visit_bits_per_visit[visit])
288 
289  # Clear memory
290  self._ccd_input_bad_count_map_ccd_input_bad_count_map = None
291 
292  def _pixels_to_radec(self, wcs, pixels):
293  """Convert pixels to ra/dec positions using a wcs.
294 
295  Parameters
296  ----------
297  wcs : `lsst.afw.geom.SkyWcs`
298  WCS object.
299  pixels : `list` [`lsst.geom.Point2D`]
300  List of pixels to convert.
301 
302  Returns
303  -------
304  radec : `numpy.ndarray`
305  Nx2 array of ra/dec positions associated with pixels.
306  """
307  sph_pts = wcs.pixelToSky(pixels)
308  return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
309  for sph in sph_pts])
310 
311 
312 class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
313  dimensions=("tract", "band", "skymap",),
314  defaultTemplates={"coaddName": "deep",
315  "calexpType": ""}):
316  input_maps = pipeBase.connectionTypes.Input(
317  doc="Healsparse bit-wise coadd input maps",
318  name="{coaddName}Coadd_inputMap",
319  storageClass="HealSparseMap",
320  dimensions=("tract", "patch", "skymap", "band"),
321  multiple=True,
322  deferLoad=True,
323  )
324  coadd_exposures = pipeBase.connectionTypes.Input(
325  doc="Coadded exposures associated with input_maps",
326  name="{coaddName}Coadd",
327  storageClass="ExposureF",
328  dimensions=("tract", "patch", "skymap", "band"),
329  multiple=True,
330  deferLoad=True,
331  )
332  visit_summaries = pipeBase.connectionTypes.Input(
333  doc="Visit summary tables with aggregated statistics",
334  name="{calexpType}visitSummary",
335  storageClass="ExposureCatalog",
336  dimensions=("instrument", "visit"),
337  multiple=True,
338  deferLoad=True,
339  )
340  sky_map = pipeBase.connectionTypes.Input(
341  doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
342  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
343  storageClass="SkyMap",
344  dimensions=("skymap",),
345  )
346 
347  # Create output connections for all possible maps defined in the
348  # registry. The vars() trick used here allows us to set class attributes
349  # programatically. Taken from
350  # https://stackoverflow.com/questions/2519807/
351  # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
352  for name in BasePropertyMap.registry:
353  vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
354  doc=f"Minimum-value map of {name}",
355  name=f"{{coaddName}}Coadd_{name}_map_min",
356  storageClass="HealSparseMap",
357  dimensions=("tract", "skymap", "band"),
358  )
359  vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
360  doc=f"Maximum-value map of {name}",
361  name=f"{{coaddName}}Coadd_{name}_map_max",
362  storageClass="HealSparseMap",
363  dimensions=("tract", "skymap", "band"),
364  )
365  vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
366  doc=f"Mean-value map of {name}",
367  name=f"{{coaddName}}Coadd_{name}_map_mean",
368  storageClass="HealSparseMap",
369  dimensions=("tract", "skymap", "band"),
370  )
371  vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
372  doc=f"Weighted mean-value map of {name}",
373  name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
374  storageClass="HealSparseMap",
375  dimensions=("tract", "skymap", "band"),
376  )
377  vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
378  doc=f"Sum-value map of {name}",
379  name=f"{{coaddName}}Coadd_{name}_map_sum",
380  storageClass="HealSparseMap",
381  dimensions=("tract", "skymap", "band"),
382  )
383 
384  def __init__(self, *, config=None):
385  super().__init__(config=config)
386 
387  # Not all possible maps in the registry will be configured to run.
388  # Here we remove the unused connections.
389  for name in BasePropertyMap.registry:
390  if name not in config.property_maps:
391  prop_config = BasePropertyMapConfig()
392  prop_config.do_min = False
393  prop_config.do_max = False
394  prop_config.do_mean = False
395  prop_config.do_weighted_mean = False
396  prop_config.do_sum = False
397  else:
398  prop_config = config.property_maps[name]
399 
400  if not prop_config.do_min:
401  self.outputs.remove(f"{name}_map_min")
402  if not prop_config.do_max:
403  self.outputs.remove(f"{name}_map_max")
404  if not prop_config.do_mean:
405  self.outputs.remove(f"{name}_map_mean")
406  if not prop_config.do_weighted_mean:
407  self.outputs.remove(f"{name}_map_weighted_mean")
408  if not prop_config.do_sum:
409  self.outputs.remove(f"{name}_map_sum")
410 
411 
412 class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
413  pipelineConnections=HealSparsePropertyMapConnections):
414  """Configuration parameters for HealSparsePropertyMapTask"""
415  property_maps = BasePropertyMap.registry.makeField(
416  multi=True,
417  default=["exposure_time",
418  "psf_size",
419  "psf_e1",
420  "psf_e2",
421  "psf_maglim",
422  "sky_noise",
423  "sky_background",
424  "dcr_dra",
425  "dcr_ddec",
426  "dcr_e1",
427  "dcr_e2"],
428  doc="Property map computation objects",
429  )
430 
431  def setDefaults(self):
432  self.property_maps["exposure_time"].do_sum = True
433  self.property_maps["psf_size"].do_weighted_mean = True
434  self.property_maps["psf_e1"].do_weighted_mean = True
435  self.property_maps["psf_e2"].do_weighted_mean = True
436  self.property_maps["psf_maglim"].do_weighted_mean = True
437  self.property_maps["sky_noise"].do_weighted_mean = True
438  self.property_maps["sky_background"].do_weighted_mean = True
439  self.property_maps["dcr_dra"].do_weighted_mean = True
440  self.property_maps["dcr_ddec"].do_weighted_mean = True
441  self.property_maps["dcr_e1"].do_weighted_mean = True
442  self.property_maps["dcr_e2"].do_weighted_mean = True
443 
444 
445 class HealSparsePropertyMapTask(pipeBase.PipelineTask):
446  """Task to compute Healsparse property maps.
447 
448  This task will compute individual property maps (per tract, per
449  map type, per band). These maps cover the full coadd tract, and
450  are not truncated to the inner tract region.
451  """
452  ConfigClass = HealSparsePropertyMapConfig
453  _DefaultName = "healSparsePropertyMapTask"
454 
455  def __init__(self, **kwargs):
456  super().__init__(**kwargs)
457  self.property_maps = PropertyMapMap()
458  for name, config, PropertyMapClass in self.config.property_maps.apply():
459  self.property_maps[name] = PropertyMapClass(config, name)
460 
461  @timeMethod
462  def runQuantum(self, butlerQC, inputRefs, outputRefs):
463  inputs = butlerQC.get(inputRefs)
464 
465  sky_map = inputs.pop("sky_map")
466 
467  tract = butlerQC.quantum.dataId["tract"]
468  band = butlerQC.quantum.dataId["band"]
469 
470  input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
471  coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
472 
473  visit_summary_dict = {ref.dataId["visit"]: ref.get()
474  for ref in inputs["visit_summaries"]}
475 
476  self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
477 
478  # Write the outputs
479  for name, property_map in self.property_maps.items():
480  if property_map.config.do_min:
481  butlerQC.put(property_map.min_map,
482  getattr(outputRefs, f"{name}_map_min"))
483  if property_map.config.do_max:
484  butlerQC.put(property_map.max_map,
485  getattr(outputRefs, f"{name}_map_max"))
486  if property_map.config.do_mean:
487  butlerQC.put(property_map.mean_map,
488  getattr(outputRefs, f"{name}_map_mean"))
489  if property_map.config.do_weighted_mean:
490  butlerQC.put(property_map.weighted_mean_map,
491  getattr(outputRefs, f"{name}_map_weighted_mean"))
492  if property_map.config.do_sum:
493  butlerQC.put(property_map.sum_map,
494  getattr(outputRefs, f"{name}_map_sum"))
495 
496  def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
497  """Run the healsparse property task.
498 
499  Parameters
500  ----------
501  sky_map : Sky map object
502  tract : `int`
503  Tract number.
504  band : `str`
505  Band name for logging.
506  coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
507  Dictionary of coadd exposure references. Keys are patch numbers.
508  input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
509  Dictionary of input map references. Keys are patch numbers.
510  visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
511  Dictionary of visit summary tables. Keys are visit numbers.
512 
513  Raises
514  ------
515  RepeatableQuantumError
516  If visit_summary_dict is missing any visits or detectors found in an
517  input map. This leads to an inconsistency between what is in the coadd
518  (via the input map) and the visit summary tables which contain data
519  to compute the maps.
520  """
521  tract_info = sky_map[tract]
522 
523  tract_maps_initialized = False
524 
525  for patch in input_map_dict.keys():
526  self.log.info("Making maps for band %s, tract %d, patch %d.",
527  band, tract, patch)
528 
529  patch_info = tract_info[patch]
530 
531  input_map = input_map_dict[patch].get()
532  coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
533  coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
534 
535  coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
536 
537  # Crop input_map to the inner polygon of the patch
538  poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
539  patch_radec = self._vertices_to_radec(poly_vertices)
540  patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
541  value=np.arange(input_map.wide_mask_maxbits))
542  patch_poly_map = patch_poly.get_map_like(input_map)
543  input_map = hsp.and_intersection([input_map, patch_poly_map])
544 
545  if not tract_maps_initialized:
546  # We use the first input map nside information to initialize
547  # the tract maps
548  nside_coverage = self._compute_nside_coverage_tract(tract_info)
549  nside = input_map.nside_sparse
550 
551  do_compute_approx_psf = False
552  # Initialize the tract maps
553  for property_map in self.property_maps:
554  property_map.initialize_tract_maps(nside_coverage, nside)
555  if property_map.requires_psf:
556  do_compute_approx_psf = True
557 
558  tract_maps_initialized = True
559 
560  valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
561 
562  # Check if there are no valid pixels for the inner (unique) patch region
563  if valid_pixels.size == 0:
564  continue
565 
566  # Initialize the value accumulators
567  for property_map in self.property_maps:
568  property_map.initialize_values(valid_pixels.size)
569  property_map.zeropoint = coadd_zeropoint
570 
571  # Initialize the weight and counter accumulators
572  total_weights = np.zeros(valid_pixels.size)
573  total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
574 
575  for bit, ccd_row in enumerate(coadd_inputs.ccds):
576  # Which pixels in the map are used by this visit/detector
577  inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
578 
579  # Check if there are any valid pixels in the map from this deteector.
580  if inmap.size == 0:
581  continue
582 
583  # visit, detector_id, weight = input_dict[bit]
584  visit = ccd_row["visit"]
585  detector_id = ccd_row["ccd"]
586  weight = ccd_row["weight"]
587 
588  x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
589  scalings = self._compute_calib_scale(ccd_row, x, y)
590 
591  if do_compute_approx_psf:
592  psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
593  else:
594  psf_array = None
595 
596  total_weights[inmap] += weight
597  total_inputs[inmap] += 1
598 
599  # Retrieve the correct visitSummary row
600  if visit not in visit_summary_dict:
601  msg = f"Visit {visit} not found in visit_summaries."
602  raise pipeBase.RepeatableQuantumError(msg)
603  row = visit_summary_dict[visit].find(detector_id)
604  if row is None:
605  msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
606  raise pipeBase.RepeatableQuantumError(msg)
607 
608  # Accumulate the values
609  for property_map in self.property_maps:
610  property_map.accumulate_values(inmap,
611  vpix_ra[inmap],
612  vpix_dec[inmap],
613  weight,
614  scalings,
615  row,
616  psf_array=psf_array)
617 
618  # Finalize the mean values and set the tract maps
619  for property_map in self.property_maps:
620  property_map.finalize_mean_values(total_weights, total_inputs)
621  property_map.set_map_values(valid_pixels)
622 
623  def _compute_calib_scale(self, ccd_row, x, y):
624  """Compute calibration scaling values.
625 
626  Parameters
627  ----------
628  ccd_row : `lsst.afw.table.ExposureRecord`
629  Exposure metadata for a given detector exposure.
630  x : `np.ndarray`
631  Array of x positions.
632  y : `np.ndarray`
633  Array of y positions.
634 
635  Returns
636  -------
637  calib_scale : `np.ndarray`
638  Array of calibration scale values.
639  """
640  photo_calib = ccd_row.getPhotoCalib()
641  bf = photo_calib.computeScaledCalibration()
642  if bf.getBBox() == ccd_row.getBBox():
643  # Track variable calibration over the detector
644  calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
645  else:
646  # Spatially constant calibration
647  calib_scale = photo_calib.getCalibrationMean()
648 
649  return calib_scale
650 
651  def _vertices_to_radec(self, vertices):
652  """Convert polygon vertices to ra/dec.
653 
654  Parameters
655  ----------
656  vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
657  Vertices for bounding polygon.
658 
659  Returns
660  -------
661  radec : `numpy.ndarray`
662  Nx2 array of ra/dec positions (in degrees) associated with vertices.
663  """
664  lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
665  radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
666  x in lonlats])
667  return radec
668 
669  def _compute_nside_coverage_tract(self, tract_info):
670  """Compute the optimal coverage nside for a tract.
671 
672  Parameters
673  ----------
674  tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
675  Tract information object.
676 
677  Returns
678  -------
679  nside_coverage : `int`
680  Optimal coverage nside for a tract map.
681  """
682  num_patches = tract_info.getNumPatches()
683 
684  # Compute approximate patch area
685  patch_info = tract_info.getPatchInfo(0)
686  vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
687  radec = self._vertices_to_radec(vertices)
688  delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
689  delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
690  patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
691 
692  tract_area = num_patches[0]*num_patches[1]*patch_area
693  # Start with a fairly low nside and increase until we find the approximate area.
694  nside_coverage_tract = 32
695  while hp.nside2pixarea(nside_coverage_tract, degrees=True) > tract_area:
696  nside_coverage_tract = 2*nside_coverage_tract
697  # Step back one, but don't go bigger pixels than nside=32
698  nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, None))
699 
700  return nside_coverage_tract
701 
702 
703 class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
704  dimensions=("band", "skymap",),
705  defaultTemplates={"coaddName": "deep"}):
706  sky_map = pipeBase.connectionTypes.Input(
707  doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
708  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
709  storageClass="SkyMap",
710  dimensions=("skymap",),
711  )
712 
713  # Create output connections for all possible maps defined in the
714  # registry. The vars() trick used here allows us to set class attributes
715  # programatically. Taken from
716  # https://stackoverflow.com/questions/2519807/
717  # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
718  for name in BasePropertyMap.registry:
719  vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
720  doc=f"Minimum-value map of {name}",
721  name=f"{{coaddName}}Coadd_{name}_map_min",
722  storageClass="HealSparseMap",
723  dimensions=("tract", "skymap", "band"),
724  multiple=True,
725  deferLoad=True,
726  )
727  vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
728  doc=f"Minumum-value map of {name}",
729  name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
730  storageClass="HealSparseMap",
731  dimensions=("skymap", "band"),
732  )
733  vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
734  doc=f"Maximum-value map of {name}",
735  name=f"{{coaddName}}Coadd_{name}_map_max",
736  storageClass="HealSparseMap",
737  dimensions=("tract", "skymap", "band"),
738  multiple=True,
739  deferLoad=True,
740  )
741  vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
742  doc=f"Minumum-value map of {name}",
743  name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
744  storageClass="HealSparseMap",
745  dimensions=("skymap", "band"),
746  )
747  vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
748  doc=f"Mean-value map of {name}",
749  name=f"{{coaddName}}Coadd_{name}_map_mean",
750  storageClass="HealSparseMap",
751  dimensions=("tract", "skymap", "band"),
752  multiple=True,
753  deferLoad=True,
754  )
755  vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
756  doc=f"Minumum-value map of {name}",
757  name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
758  storageClass="HealSparseMap",
759  dimensions=("skymap", "band"),
760  )
761  vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
762  doc=f"Weighted mean-value map of {name}",
763  name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
764  storageClass="HealSparseMap",
765  dimensions=("tract", "skymap", "band"),
766  multiple=True,
767  deferLoad=True,
768  )
769  vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
770  doc=f"Minumum-value map of {name}",
771  name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
772  storageClass="HealSparseMap",
773  dimensions=("skymap", "band"),
774  )
775  vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
776  doc=f"Sum-value map of {name}",
777  name=f"{{coaddName}}Coadd_{name}_map_sum",
778  storageClass="HealSparseMap",
779  dimensions=("tract", "skymap", "band"),
780  multiple=True,
781  deferLoad=True,
782  )
783  vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
784  doc=f"Minumum-value map of {name}",
785  name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
786  storageClass="HealSparseMap",
787  dimensions=("skymap", "band"),
788  )
789 
790  def __init__(self, *, config=None):
791  super().__init__(config=config)
792 
793  # Not all possible maps in the registry will be configured to run.
794  # Here we remove the unused connections.
795  for name in BasePropertyMap.registry:
796  if name not in config.property_maps:
797  prop_config = BasePropertyMapConfig()
798  prop_config.do_min = False
799  prop_config.do_max = False
800  prop_config.do_mean = False
801  prop_config.do_weighted_mean = False
802  prop_config.do_sum = False
803  else:
804  prop_config = config.property_maps[name]
805 
806  if not prop_config.do_min:
807  self.inputs.remove(f"{name}_map_min")
808  self.outputs.remove(f"{name}_consolidated_map_min")
809  if not prop_config.do_max:
810  self.inputs.remove(f"{name}_map_max")
811  self.outputs.remove(f"{name}_consolidated_map_max")
812  if not prop_config.do_mean:
813  self.inputs.remove(f"{name}_map_mean")
814  self.outputs.remove(f"{name}_consolidated_map_mean")
815  if not prop_config.do_weighted_mean:
816  self.inputs.remove(f"{name}_map_weighted_mean")
817  self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
818  if not prop_config.do_sum:
819  self.inputs.remove(f"{name}_map_sum")
820  self.outputs.remove(f"{name}_consolidated_map_sum")
821 
822 
823 class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
824  pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
825  """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
826  property_maps = BasePropertyMap.registry.makeField(
827  multi=True,
828  default=["exposure_time",
829  "psf_size",
830  "psf_e1",
831  "psf_e2",
832  "psf_maglim",
833  "sky_noise",
834  "sky_background",
835  "dcr_dra",
836  "dcr_ddec",
837  "dcr_e1",
838  "dcr_e2"],
839  doc="Property map computation objects",
840  )
841 
842  def setDefaults(self):
843  self.property_maps["exposure_time"].do_sum = True
844  self.property_maps["psf_size"].do_weighted_mean = True
845  self.property_maps["psf_e1"].do_weighted_mean = True
846  self.property_maps["psf_e2"].do_weighted_mean = True
847  self.property_maps["psf_maglim"].do_weighted_mean = True
848  self.property_maps["sky_noise"].do_weighted_mean = True
849  self.property_maps["sky_background"].do_weighted_mean = True
850  self.property_maps["dcr_dra"].do_weighted_mean = True
851  self.property_maps["dcr_ddec"].do_weighted_mean = True
852  self.property_maps["dcr_e1"].do_weighted_mean = True
853  self.property_maps["dcr_e2"].do_weighted_mean = True
854 
855 
856 class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
857  """Task to consolidate HealSparse property maps.
858 
859  This task will take all the individual tract-based maps (per map type,
860  per band) and consolidate them into one survey-wide map (per map type,
861  per band). Each tract map is truncated to its inner region before
862  consolidation.
863  """
864  ConfigClass = ConsolidateHealSparsePropertyMapConfig
865  _DefaultName = "consolidateHealSparsePropertyMapTask"
866 
867  def __init__(self, **kwargs):
868  super().__init__(**kwargs)
869  self.property_maps = PropertyMapMap()
870  for name, config, PropertyMapClass in self.config.property_maps.apply():
871  self.property_maps[name] = PropertyMapClass(config, name)
872 
873  @timeMethod
874  def runQuantum(self, butlerQC, inputRefs, outputRefs):
875  inputs = butlerQC.get(inputRefs)
876 
877  sky_map = inputs.pop("sky_map")
878 
879  # These need to be consolidated one at a time to conserve memory.
880  for name in self.config.property_maps.names:
881  for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
882  map_type = f"{name}_map_{type_}"
883  if map_type in inputs:
884  input_refs = {ref.dataId['tract']: ref
885  for ref in inputs[map_type]}
886  consolidated_map = self.consolidate_map(sky_map, input_refs)
887  butlerQC.put(consolidated_map,
888  getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
889 
890  def consolidate_map(self, sky_map, input_refs):
891  """Consolidate the healsparse property maps.
892 
893  Parameters
894  ----------
895  sky_map : Sky map object
896  input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
897  Dictionary of tract_id mapping to dataref.
898 
899  Returns
900  -------
901  consolidated_map : `healsparse.HealSparseMap`
902  Consolidated HealSparse map.
903  """
904  # First, we read in the coverage maps to know how much memory
905  # to allocate
906  cov_mask = None
907  for tract_id in input_refs:
908  cov = input_refs[tract_id].get(component='coverage')
909  if cov_mask is None:
910  cov_mask = cov.coverage_mask
911  else:
912  cov_mask |= cov.coverage_mask
913 
914  cov_pix, = np.where(cov_mask)
915 
916  # Now read in each tract map and build the consolidated map.
917  consolidated_map = None
918  for tract_id in input_refs:
919  input_map = input_refs[tract_id].get()
920  if consolidated_map is None:
921  dtype = input_map.dtype
922  sentinel = input_map._sentinel
923  nside_coverage = input_map.nside_coverage
924  nside_sparse = input_map.nside_sparse
925  consolidated_map = hsp.HealSparseMap.make_empty(nside_coverage,
926  nside_sparse,
927  dtype,
928  sentinel=sentinel)
929  consolidated_map._reserve_cov_pix(cov_pix)
930 
931  # Only use pixels that are properly inside the tract.
932  vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
933  vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
934 
935  in_tract = (vpix_tract_ids == tract_id)
936 
937  consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
938 
939  return consolidated_map
std::vector< SchemaItem< Flag > > * items
Cartesian polygons.
Definition: Polygon.h:59
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition: LonLat.h:48
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def compute_approx_psf_size_and_shape(ccd_row, ra, dec, nx=20, ny=20, orderx=2, ordery=2)