LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
healSparseMapping.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2021 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 from collections import defaultdict
23 import numbers
24 import numpy as np
25 import healpy as hp
26 import healsparse as hsp
27 
28 import lsst.pex.config as pexConfig
29 import lsst.pipe.base as pipeBase
30 import lsst.geom
31 import lsst.afw.geom as afwGeom
32 from lsst.daf.butler import Formatter
33 from lsst.skymap import BaseSkyMap
34 from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
35  PropertyMapMap, compute_approx_psf_size_and_shape)
36 
37 
38 __all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
39  "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
40  "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
41  "ConsolidateHealSparsePropertyMapConnections",
42  "ConsolidateHealSparsePropertyMapConfig",
43  "ConsolidateHealSparsePropertyMapTask"]
44 
45 
46 class HealSparseMapFormatter(Formatter):
47  """Interface for reading and writing healsparse.HealSparseMap files."""
48  unsupportedParameters = frozenset()
49  supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
50  extension = '.hsp'
51 
52  def read(self, component=None):
53  # Docstring inherited from Formatter.read.
54  path = self.fileDescriptor.location.path
55 
56  if component == 'coverage':
57  try:
58  data = hsp.HealSparseCoverage.read(path)
59  except (OSError, RuntimeError):
60  raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
61 
62  return data
63 
64  if self.fileDescriptor.parameters is None:
65  pixels = None
66  degrade_nside = None
67  else:
68  pixels = self.fileDescriptor.parameters.get('pixels', None)
69  degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
70  try:
71  data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
72  except (OSError, RuntimeError):
73  raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
74 
75  return data
76 
77  def write(self, inMemoryDataset):
78  # Docstring inherited from Formatter.write.
79  # Update the location with the formatter-preferred file extension
80  self.fileDescriptor.location.updateExtension(self.extensionextension)
81  inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
82 
83 
84 def _is_power_of_two(value):
85  """Check that value is a power of two.
86 
87  Parameters
88  ----------
89  value : `int`
90  Value to check.
91 
92  Returns
93  -------
94  is_power_of_two : `bool`
95  True if value is a power of two; False otherwise, or
96  if value is not an integer.
97  """
98  if not isinstance(value, numbers.Integral):
99  return False
100 
101  # See https://stackoverflow.com/questions/57025836
102  # Every power of 2 has exactly 1 bit set to 1; subtracting
103  # 1 therefore flips every preceding bit. If you and that
104  # together with the original value it must be 0.
105  return (value & (value - 1) == 0) and value != 0
106 
107 
108 class HealSparseInputMapConfig(pexConfig.Config):
109  """Configuration parameters for HealSparseInputMapTask"""
110  nside = pexConfig.Field(
111  doc="Mapping healpix nside. Must be power of 2.",
112  dtype=int,
113  default=32768,
114  check=_is_power_of_two,
115  )
116  nside_coverage = pexConfig.Field(
117  doc="HealSparse coverage map nside. Must be power of 2.",
118  dtype=int,
119  default=256,
120  check=_is_power_of_two,
121  )
122  bad_mask_min_coverage = pexConfig.Field(
123  doc=("Minimum area fraction of a map healpixel pixel that must be "
124  "covered by bad pixels to be removed from the input map. "
125  "This is approximate."),
126  dtype=float,
127  default=0.5,
128  )
129 
130 
131 class HealSparseInputMapTask(pipeBase.Task):
132  """Task for making a HealSparse input map."""
133 
134  ConfigClass = HealSparseInputMapConfig
135  _DefaultName = "healSparseInputMap"
136 
137  def __init__(self, **kwargs):
138  pipeBase.Task.__init__(self, **kwargs)
139 
140  self.ccd_input_mapccd_input_map = None
141 
142  def build_ccd_input_map(self, bbox, wcs, ccds):
143  """Build a map from ccd valid polygons or bounding boxes.
144 
145  Parameters
146  ----------
147  bbox : `lsst.geom.Box2I`
148  Bounding box for region to build input map.
149  wcs : `lsst.afw.geom.SkyWcs`
150  WCS object for region to build input map.
151  ccds : `lsst.afw.table.ExposureCatalog`
152  Exposure catalog with ccd data from coadd inputs.
153  """
154  self.ccd_input_mapccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
155  nside_sparse=self.config.nside,
156  dtype=hsp.WIDE_MASK,
157  wide_mask_maxbits=len(ccds))
158  self._wcs_wcs = wcs
159  self._bbox_bbox = bbox
160  self._ccds_ccds = ccds
161 
162  pixel_scale = wcs.getPixelScale().asArcseconds()
163  hpix_area_arcsec2 = hp.nside2pixarea(self.config.nside, degrees=True)*(3600.**2.)
164  self._min_bad_min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
165 
166  metadata = {}
167  self._bits_per_visit_ccd_bits_per_visit_ccd = {}
168  self._bits_per_visit_bits_per_visit = defaultdict(list)
169  for bit, ccd_row in enumerate(ccds):
170  metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
171  metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
172  metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
173 
174  self._bits_per_visit_ccd_bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
175  self._bits_per_visit_bits_per_visit[ccd_row["visit"]].append(bit)
176 
177  ccd_poly = ccd_row.getValidPolygon()
178  if ccd_poly is None:
179  ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
180  # Detectors need to be rendered with their own wcs.
181  ccd_poly_radec = self._pixels_to_radec_pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
182 
183  # Create a ccd healsparse polygon
184  poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
185  dec=ccd_poly_radec[: -1, 1],
186  value=[bit])
187  self.ccd_input_mapccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_mapccd_input_map.nside_sparse),
188  [bit])
189 
190  # Cut down to the overall bounding box with associated wcs.
191  bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
192  bbox_poly_radec = self._pixels_to_radec_pixels_to_radec(self._wcs_wcs,
193  bbox_afw_poly.convexHull().getVertices())
194  bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
195  value=np.arange(self.ccd_input_mapccd_input_map.wide_mask_maxbits))
196  bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_mapccd_input_map)
197  self.ccd_input_mapccd_input_map = hsp.and_intersection([self.ccd_input_mapccd_input_map, bbox_poly_map])
198  self.ccd_input_mapccd_input_map.metadata = metadata
199 
200  # Create a temporary map to hold the count of bad pixels in each healpix pixel
201  self._ccd_input_pixels_ccd_input_pixels = self.ccd_input_mapccd_input_map.valid_pixels
202 
203  dtype = [(f"v{visit}", "i4") for visit in self._bits_per_visit_bits_per_visit.keys()]
204 
205  cov = self.config.nside_coverage
206  ns = self.config.nside
207  self._ccd_input_bad_count_map_ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(nside_coverage=cov,
208  nside_sparse=ns,
209  dtype=dtype,
210  primary=dtype[0][0])
211  # Don't set input bad map if there are no ccds which overlap the bbox.
212  if len(self._ccd_input_pixels_ccd_input_pixels) > 0:
213  self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels] = np.zeros(1, dtype=dtype)
214 
215  def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
216  """Mask a subregion from a visit.
217  This must be run after build_ccd_input_map initializes
218  the overall map.
219 
220  Parameters
221  ----------
222  bbox : `lsst.geom.Box2I`
223  Bounding box from region to mask.
224  visit : `int`
225  Visit number corresponding to warp with mask.
226  mask : `lsst.afw.image.MaskX`
227  Mask plane from warp exposure.
228  bit_mask_value : `int`
229  Bit mask to check for bad pixels.
230 
231  Raises
232  ------
233  RuntimeError : Raised if build_ccd_input_map was not run first.
234  """
235  if self.ccd_input_mapccd_input_map is None:
236  raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
237 
238  # Find the bad pixels and convert to healpix
239  bad_pixels = np.where(mask.array & bit_mask_value)
240  if len(bad_pixels[0]) == 0:
241  # No bad pixels
242  return
243 
244  # Bad pixels come from warps which use the overall wcs.
245  bad_ra, bad_dec = self._wcs_wcs.pixelToSkyArray(bad_pixels[1].astype(np.float64),
246  bad_pixels[0].astype(np.float64),
247  degrees=True)
248  bad_hpix = hp.ang2pix(self.config.nside, bad_ra, bad_dec,
249  lonlat=True, nest=True)
250 
251  # Count the number of bad image pixels in each healpix pixel
252  min_bad_hpix = bad_hpix.min()
253  bad_hpix_count = np.zeros(bad_hpix.max() - min_bad_hpix + 1, dtype=np.int32)
254  np.add.at(bad_hpix_count, bad_hpix - min_bad_hpix, 1)
255 
256  # Add these to the accumulator map.
257  # We need to make sure that the "primary" array has valid values for
258  # this pixel to be registered in the accumulator map.
259  pix_to_add, = np.where(bad_hpix_count > 0)
260  count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add]
261  primary = self._ccd_input_bad_count_map_ccd_input_bad_count_map.primary
262  count_map_arr[primary] = np.clip(count_map_arr[primary], 0, None)
263 
264  count_map_arr[f"v{visit}"] = np.clip(count_map_arr[f"v{visit}"], 0, None)
265  count_map_arr[f"v{visit}"] += bad_hpix_count[pix_to_add]
266 
267  self._ccd_input_bad_count_map_ccd_input_bad_count_map[min_bad_hpix + pix_to_add] = count_map_arr
268 
270  """Use accumulated mask information to finalize the masking of
271  ccd_input_map.
272 
273  Raises
274  ------
275  RuntimeError : Raised if build_ccd_input_map was not run first.
276  """
277  if self.ccd_input_mapccd_input_map is None:
278  raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
279 
280  count_map_arr = self._ccd_input_bad_count_map_ccd_input_bad_count_map[self._ccd_input_pixels_ccd_input_pixels]
281  for visit in self._bits_per_visit_bits_per_visit:
282  to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad_min_bad)
283  if to_mask.size == 0:
284  continue
285  self.ccd_input_mapccd_input_map.clear_bits_pix(self._ccd_input_pixels_ccd_input_pixels[to_mask],
286  self._bits_per_visit_bits_per_visit[visit])
287 
288  # Clear memory
289  self._ccd_input_bad_count_map_ccd_input_bad_count_map = None
290 
291  def _pixels_to_radec(self, wcs, pixels):
292  """Convert pixels to ra/dec positions using a wcs.
293 
294  Parameters
295  ----------
296  wcs : `lsst.afw.geom.SkyWcs`
297  WCS object.
298  pixels : `list` [`lsst.geom.Point2D`]
299  List of pixels to convert.
300 
301  Returns
302  -------
303  radec : `numpy.ndarray`
304  Nx2 array of ra/dec positions associated with pixels.
305  """
306  sph_pts = wcs.pixelToSky(pixels)
307  return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
308  for sph in sph_pts])
309 
310 
311 class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
312  dimensions=("tract", "band", "skymap",),
313  defaultTemplates={"coaddName": "deep"}):
314  input_maps = pipeBase.connectionTypes.Input(
315  doc="Healsparse bit-wise coadd input maps",
316  name="{coaddName}Coadd_inputMap",
317  storageClass="HealSparseMap",
318  dimensions=("tract", "patch", "skymap", "band"),
319  multiple=True,
320  deferLoad=True,
321  )
322  coadd_exposures = pipeBase.connectionTypes.Input(
323  doc="Coadded exposures associated with input_maps",
324  name="{coaddName}Coadd",
325  storageClass="ExposureF",
326  dimensions=("tract", "patch", "skymap", "band"),
327  multiple=True,
328  deferLoad=True,
329  )
330  visit_summaries = pipeBase.connectionTypes.Input(
331  doc="Visit summary tables with aggregated statistics",
332  name="visitSummary",
333  storageClass="ExposureCatalog",
334  dimensions=("instrument", "visit"),
335  multiple=True,
336  deferLoad=True,
337  )
338  sky_map = pipeBase.connectionTypes.Input(
339  doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
340  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
341  storageClass="SkyMap",
342  dimensions=("skymap",),
343  )
344 
345  # Create output connections for all possible maps defined in the
346  # registry. The vars() trick used here allows us to set class attributes
347  # programatically. Taken from
348  # https://stackoverflow.com/questions/2519807/
349  # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
350  for name in BasePropertyMap.registry:
351  vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
352  doc=f"Minimum-value map of {name}",
353  name=f"{{coaddName}}Coadd_{name}_map_min",
354  storageClass="HealSparseMap",
355  dimensions=("tract", "skymap", "band"),
356  )
357  vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
358  doc=f"Maximum-value map of {name}",
359  name=f"{{coaddName}}Coadd_{name}_map_max",
360  storageClass="HealSparseMap",
361  dimensions=("tract", "skymap", "band"),
362  )
363  vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
364  doc=f"Mean-value map of {name}",
365  name=f"{{coaddName}}Coadd_{name}_map_mean",
366  storageClass="HealSparseMap",
367  dimensions=("tract", "skymap", "band"),
368  )
369  vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
370  doc=f"Weighted mean-value map of {name}",
371  name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
372  storageClass="HealSparseMap",
373  dimensions=("tract", "skymap", "band"),
374  )
375  vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
376  doc=f"Sum-value map of {name}",
377  name=f"{{coaddName}}Coadd_{name}_map_sum",
378  storageClass="HealSparseMap",
379  dimensions=("tract", "skymap", "band"),
380  )
381 
382  def __init__(self, *, config=None):
383  super().__init__(config=config)
384 
385  # Not all possible maps in the registry will be configured to run.
386  # Here we remove the unused connections.
387  for name in BasePropertyMap.registry:
388  if name not in config.property_maps:
389  prop_config = BasePropertyMapConfig()
390  prop_config.do_min = False
391  prop_config.do_max = False
392  prop_config.do_mean = False
393  prop_config.do_weighted_mean = False
394  prop_config.do_sum = False
395  else:
396  prop_config = config.property_maps[name]
397 
398  if not prop_config.do_min:
399  self.outputs.remove(f"{name}_map_min")
400  if not prop_config.do_max:
401  self.outputs.remove(f"{name}_map_max")
402  if not prop_config.do_mean:
403  self.outputs.remove(f"{name}_map_mean")
404  if not prop_config.do_weighted_mean:
405  self.outputs.remove(f"{name}_map_weighted_mean")
406  if not prop_config.do_sum:
407  self.outputs.remove(f"{name}_map_sum")
408 
409 
410 class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
411  pipelineConnections=HealSparsePropertyMapConnections):
412  """Configuration parameters for HealSparsePropertyMapTask"""
413  property_maps = BasePropertyMap.registry.makeField(
414  multi=True,
415  default=["exposure_time",
416  "psf_size",
417  "psf_e1",
418  "psf_e2",
419  "psf_maglim",
420  "sky_noise",
421  "sky_background",
422  "dcr_dra",
423  "dcr_ddec",
424  "dcr_e1",
425  "dcr_e2"],
426  doc="Property map computation objects",
427  )
428 
429  def setDefaults(self):
430  self.property_maps["exposure_time"].do_sum = True
431  self.property_maps["psf_size"].do_weighted_mean = True
432  self.property_maps["psf_e1"].do_weighted_mean = True
433  self.property_maps["psf_e2"].do_weighted_mean = True
434  self.property_maps["psf_maglim"].do_weighted_mean = True
435  self.property_maps["sky_noise"].do_weighted_mean = True
436  self.property_maps["sky_background"].do_weighted_mean = True
437  self.property_maps["dcr_dra"].do_weighted_mean = True
438  self.property_maps["dcr_ddec"].do_weighted_mean = True
439  self.property_maps["dcr_e1"].do_weighted_mean = True
440  self.property_maps["dcr_e2"].do_weighted_mean = True
441 
442 
443 class HealSparsePropertyMapTask(pipeBase.PipelineTask):
444  """Task to compute Healsparse property maps.
445 
446  This task will compute individual property maps (per tract, per
447  map type, per band). These maps cover the full coadd tract, and
448  are not truncated to the inner tract region.
449  """
450  ConfigClass = HealSparsePropertyMapConfig
451  _DefaultName = "healSparsePropertyMapTask"
452 
453  def __init__(self, **kwargs):
454  super().__init__(**kwargs)
455  self.property_maps = PropertyMapMap()
456  for name, config, PropertyMapClass in self.config.property_maps.apply():
457  self.property_maps[name] = PropertyMapClass(config, name)
458 
459  @pipeBase.timeMethod
460  def runQuantum(self, butlerQC, inputRefs, outputRefs):
461  inputs = butlerQC.get(inputRefs)
462 
463  sky_map = inputs.pop("sky_map")
464 
465  tract = butlerQC.quantum.dataId["tract"]
466  band = butlerQC.quantum.dataId["band"]
467 
468  input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
469  coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
470 
471  visit_summary_dict = {ref.dataId["visit"]: ref.get()
472  for ref in inputs["visit_summaries"]}
473 
474  self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
475 
476  # Write the outputs
477  for name, property_map in self.property_maps.items():
478  if property_map.config.do_min:
479  butlerQC.put(property_map.min_map,
480  getattr(outputRefs, f"{name}_map_min"))
481  if property_map.config.do_max:
482  butlerQC.put(property_map.max_map,
483  getattr(outputRefs, f"{name}_map_max"))
484  if property_map.config.do_mean:
485  butlerQC.put(property_map.mean_map,
486  getattr(outputRefs, f"{name}_map_mean"))
487  if property_map.config.do_weighted_mean:
488  butlerQC.put(property_map.weighted_mean_map,
489  getattr(outputRefs, f"{name}_map_weighted_mean"))
490  if property_map.config.do_sum:
491  butlerQC.put(property_map.sum_map,
492  getattr(outputRefs, f"{name}_map_sum"))
493 
494  def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
495  """Run the healsparse property task.
496 
497  Parameters
498  ----------
499  sky_map : Sky map object
500  tract : `int`
501  Tract number.
502  band : `str`
503  Band name for logging.
504  coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
505  Dictionary of coadd exposure references. Keys are patch numbers.
506  input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
507  Dictionary of input map references. Keys are patch numbers.
508  visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
509  Dictionary of visit summary tables. Keys are visit numbers.
510 
511  Raises
512  ------
513  RepeatableQuantumError
514  If visit_summary_dict is missing any visits or detectors found in an
515  input map. This leads to an inconsistency between what is in the coadd
516  (via the input map) and the visit summary tables which contain data
517  to compute the maps.
518  """
519  tract_info = sky_map[tract]
520 
521  tract_maps_initialized = False
522 
523  for patch in input_map_dict.keys():
524  self.log.info("Making maps for band %s, tract %d, patch %d.",
525  band, tract, patch)
526 
527  patch_info = tract_info[patch]
528 
529  input_map = input_map_dict[patch].get()
530  coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
531  coadd_inputs = coadd_dict[patch].get(component="coaddInputs")
532 
533  coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
534 
535  # Crop input_map to the inner polygon of the patch
536  poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
537  patch_radec = self._vertices_to_radec(poly_vertices)
538  patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
539  value=np.arange(input_map.wide_mask_maxbits))
540  patch_poly_map = patch_poly.get_map_like(input_map)
541  input_map = hsp.and_intersection([input_map, patch_poly_map])
542 
543  if not tract_maps_initialized:
544  # We use the first input map nside information to initialize
545  # the tract maps
546  nside_coverage = self._compute_nside_coverage_tract(tract_info)
547  nside = input_map.nside_sparse
548 
549  do_compute_approx_psf = False
550  # Initialize the tract maps
551  for property_map in self.property_maps:
552  property_map.initialize_tract_maps(nside_coverage, nside)
553  if property_map.requires_psf:
554  do_compute_approx_psf = True
555 
556  tract_maps_initialized = True
557 
558  valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
559 
560  # Check if there are no valid pixels for the inner (unique) patch region
561  if valid_pixels.size == 0:
562  continue
563 
564  # Initialize the value accumulators
565  for property_map in self.property_maps:
566  property_map.initialize_values(valid_pixels.size)
567  property_map.zeropoint = coadd_zeropoint
568 
569  # Initialize the weight and counter accumulators
570  total_weights = np.zeros(valid_pixels.size)
571  total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
572 
573  for bit, ccd_row in enumerate(coadd_inputs.ccds):
574  # Which pixels in the map are used by this visit/detector
575  inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
576 
577  # Check if there are any valid pixels in the map from this deteector.
578  if inmap.size == 0:
579  continue
580 
581  # visit, detector_id, weight = input_dict[bit]
582  visit = ccd_row["visit"]
583  detector_id = ccd_row["ccd"]
584  weight = ccd_row["weight"]
585 
586  x, y = ccd_row.getWcs().skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
587  scalings = self._compute_calib_scale(ccd_row, x, y)
588 
589  if do_compute_approx_psf:
590  psf_array = compute_approx_psf_size_and_shape(ccd_row, vpix_ra[inmap], vpix_dec[inmap])
591  else:
592  psf_array = None
593 
594  total_weights[inmap] += weight
595  total_inputs[inmap] += 1
596 
597  # Retrieve the correct visitSummary row
598  if visit not in visit_summary_dict:
599  msg = f"Visit {visit} not found in visit_summaries."
600  raise pipeBase.RepeatableQuantumError(msg)
601  row = visit_summary_dict[visit].find(detector_id)
602  if row is None:
603  msg = f"Visit {visit} / detector_id {detector_id} not found in visit_summaries."
604  raise pipeBase.RepeatableQuantumError(msg)
605 
606  # Accumulate the values
607  for property_map in self.property_maps:
608  property_map.accumulate_values(inmap,
609  vpix_ra[inmap],
610  vpix_dec[inmap],
611  weight,
612  scalings,
613  row,
614  psf_array=psf_array)
615 
616  # Finalize the mean values and set the tract maps
617  for property_map in self.property_maps:
618  property_map.finalize_mean_values(total_weights, total_inputs)
619  property_map.set_map_values(valid_pixels)
620 
621  def _compute_calib_scale(self, ccd_row, x, y):
622  """Compute calibration scaling values.
623 
624  Parameters
625  ----------
626  ccd_row : `lsst.afw.table.ExposureRecord`
627  Exposure metadata for a given detector exposure.
628  x : `np.ndarray`
629  Array of x positions.
630  y : `np.ndarray`
631  Array of y positions.
632 
633  Returns
634  -------
635  calib_scale : `np.ndarray`
636  Array of calibration scale values.
637  """
638  photo_calib = ccd_row.getPhotoCalib()
639  bf = photo_calib.computeScaledCalibration()
640  if bf.getBBox() == ccd_row.getBBox():
641  # Track variable calibration over the detector
642  calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
643  else:
644  # Spatially constant calibration
645  calib_scale = photo_calib.getCalibrationMean()
646 
647  return calib_scale
648 
649  def _vertices_to_radec(self, vertices):
650  """Convert polygon vertices to ra/dec.
651 
652  Parameters
653  ----------
654  vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
655  Vertices for bounding polygon.
656 
657  Returns
658  -------
659  radec : `numpy.ndarray`
660  Nx2 array of ra/dec positions (in degrees) associated with vertices.
661  """
662  lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
663  radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
664  x in lonlats])
665  return radec
666 
667  def _compute_nside_coverage_tract(self, tract_info):
668  """Compute the optimal coverage nside for a tract.
669 
670  Parameters
671  ----------
672  tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
673  Tract information object.
674 
675  Returns
676  -------
677  nside_coverage : `int`
678  Optimal coverage nside for a tract map.
679  """
680  num_patches = tract_info.getNumPatches()
681 
682  # Compute approximate patch area
683  patch_info = tract_info.getPatchInfo(0)
684  vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
685  radec = self._vertices_to_radec(vertices)
686  delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
687  delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
688  patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
689 
690  tract_area = num_patches[0]*num_patches[1]*patch_area
691  # Start with a fairly low nside and increase until we find the approximate area.
692  nside_coverage_tract = 32
693  while hp.nside2pixarea(nside_coverage_tract, degrees=True) > tract_area:
694  nside_coverage_tract = 2*nside_coverage_tract
695  # Step back one, but don't go bigger pixels than nside=32
696  nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, None))
697 
698  return nside_coverage_tract
699 
700 
701 class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
702  dimensions=("band", "skymap",),
703  defaultTemplates={"coaddName": "deep"}):
704  sky_map = pipeBase.connectionTypes.Input(
705  doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
706  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
707  storageClass="SkyMap",
708  dimensions=("skymap",),
709  )
710 
711  # Create output connections for all possible maps defined in the
712  # registry. The vars() trick used here allows us to set class attributes
713  # programatically. Taken from
714  # https://stackoverflow.com/questions/2519807/
715  # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
716  for name in BasePropertyMap.registry:
717  vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
718  doc=f"Minimum-value map of {name}",
719  name=f"{{coaddName}}Coadd_{name}_map_min",
720  storageClass="HealSparseMap",
721  dimensions=("tract", "skymap", "band"),
722  multiple=True,
723  deferLoad=True,
724  )
725  vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
726  doc=f"Minumum-value map of {name}",
727  name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
728  storageClass="HealSparseMap",
729  dimensions=("skymap", "band"),
730  )
731  vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
732  doc=f"Maximum-value map of {name}",
733  name=f"{{coaddName}}Coadd_{name}_map_max",
734  storageClass="HealSparseMap",
735  dimensions=("tract", "skymap", "band"),
736  multiple=True,
737  deferLoad=True,
738  )
739  vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
740  doc=f"Minumum-value map of {name}",
741  name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
742  storageClass="HealSparseMap",
743  dimensions=("skymap", "band"),
744  )
745  vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
746  doc=f"Mean-value map of {name}",
747  name=f"{{coaddName}}Coadd_{name}_map_mean",
748  storageClass="HealSparseMap",
749  dimensions=("tract", "skymap", "band"),
750  multiple=True,
751  deferLoad=True,
752  )
753  vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
754  doc=f"Minumum-value map of {name}",
755  name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
756  storageClass="HealSparseMap",
757  dimensions=("skymap", "band"),
758  )
759  vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
760  doc=f"Weighted mean-value map of {name}",
761  name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
762  storageClass="HealSparseMap",
763  dimensions=("tract", "skymap", "band"),
764  multiple=True,
765  deferLoad=True,
766  )
767  vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
768  doc=f"Minumum-value map of {name}",
769  name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
770  storageClass="HealSparseMap",
771  dimensions=("skymap", "band"),
772  )
773  vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
774  doc=f"Sum-value map of {name}",
775  name=f"{{coaddName}}Coadd_{name}_map_sum",
776  storageClass="HealSparseMap",
777  dimensions=("tract", "skymap", "band"),
778  multiple=True,
779  deferLoad=True,
780  )
781  vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
782  doc=f"Minumum-value map of {name}",
783  name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
784  storageClass="HealSparseMap",
785  dimensions=("skymap", "band"),
786  )
787 
788  def __init__(self, *, config=None):
789  super().__init__(config=config)
790 
791  # Not all possible maps in the registry will be configured to run.
792  # Here we remove the unused connections.
793  for name in BasePropertyMap.registry:
794  if name not in config.property_maps:
795  prop_config = BasePropertyMapConfig()
796  prop_config.do_min = False
797  prop_config.do_max = False
798  prop_config.do_mean = False
799  prop_config.do_weighted_mean = False
800  prop_config.do_sum = False
801  else:
802  prop_config = config.property_maps[name]
803 
804  if not prop_config.do_min:
805  self.inputs.remove(f"{name}_map_min")
806  self.outputs.remove(f"{name}_consolidated_map_min")
807  if not prop_config.do_max:
808  self.inputs.remove(f"{name}_map_max")
809  self.outputs.remove(f"{name}_consolidated_map_max")
810  if not prop_config.do_mean:
811  self.inputs.remove(f"{name}_map_mean")
812  self.outputs.remove(f"{name}_consolidated_map_mean")
813  if not prop_config.do_weighted_mean:
814  self.inputs.remove(f"{name}_map_weighted_mean")
815  self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
816  if not prop_config.do_sum:
817  self.inputs.remove(f"{name}_map_sum")
818  self.outputs.remove(f"{name}_consolidated_map_sum")
819 
820 
821 class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
822  pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
823  """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
824  property_maps = BasePropertyMap.registry.makeField(
825  multi=True,
826  default=["exposure_time",
827  "psf_size",
828  "psf_e1",
829  "psf_e2",
830  "psf_maglim",
831  "sky_noise",
832  "sky_background",
833  "dcr_dra",
834  "dcr_ddec",
835  "dcr_e1",
836  "dcr_e2"],
837  doc="Property map computation objects",
838  )
839 
840  def setDefaults(self):
841  self.property_maps["exposure_time"].do_sum = True
842  self.property_maps["psf_size"].do_weighted_mean = True
843  self.property_maps["psf_e1"].do_weighted_mean = True
844  self.property_maps["psf_e2"].do_weighted_mean = True
845  self.property_maps["psf_maglim"].do_weighted_mean = True
846  self.property_maps["sky_noise"].do_weighted_mean = True
847  self.property_maps["sky_background"].do_weighted_mean = True
848  self.property_maps["dcr_dra"].do_weighted_mean = True
849  self.property_maps["dcr_ddec"].do_weighted_mean = True
850  self.property_maps["dcr_e1"].do_weighted_mean = True
851  self.property_maps["dcr_e2"].do_weighted_mean = True
852 
853 
854 class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
855  """Task to consolidate HealSparse property maps.
856 
857  This task will take all the individual tract-based maps (per map type,
858  per band) and consolidate them into one survey-wide map (per map type,
859  per band). Each tract map is truncated to its inner region before
860  consolidation.
861  """
862  ConfigClass = ConsolidateHealSparsePropertyMapConfig
863  _DefaultName = "consolidateHealSparsePropertyMapTask"
864 
865  def __init__(self, **kwargs):
866  super().__init__(**kwargs)
867  self.property_maps = PropertyMapMap()
868  for name, config, PropertyMapClass in self.config.property_maps.apply():
869  self.property_maps[name] = PropertyMapClass(config, name)
870 
871  @pipeBase.timeMethod
872  def runQuantum(self, butlerQC, inputRefs, outputRefs):
873  inputs = butlerQC.get(inputRefs)
874 
875  sky_map = inputs.pop("sky_map")
876 
877  # These need to be consolidated one at a time to conserve memory.
878  for name in self.config.property_maps.names:
879  for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
880  map_type = f"{name}_map_{type_}"
881  if map_type in inputs:
882  input_refs = {ref.dataId['tract']: ref
883  for ref in inputs[map_type]}
884  consolidated_map = self.consolidate_map(sky_map, input_refs)
885  butlerQC.put(consolidated_map,
886  getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
887 
888  def consolidate_map(self, sky_map, input_refs):
889  """Consolidate the healsparse property maps.
890 
891  Parameters
892  ----------
893  sky_map : Sky map object
894  input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
895  Dictionary of tract_id mapping to dataref.
896 
897  Returns
898  -------
899  consolidated_map : `healsparse.HealSparseMap`
900  Consolidated HealSparse map.
901  """
902  # First, we read in the coverage maps to know how much memory
903  # to allocate
904  cov_mask = None
905  for tract_id in input_refs:
906  cov = input_refs[tract_id].get(component='coverage')
907  if cov_mask is None:
908  cov_mask = cov.coverage_mask
909  else:
910  cov_mask |= cov.coverage_mask
911 
912  cov_pix, = np.where(cov_mask)
913 
914  # Now read in each tract map and build the consolidated map.
915  consolidated_map = None
916  for tract_id in input_refs:
917  input_map = input_refs[tract_id].get()
918  if consolidated_map is None:
919  dtype = input_map.dtype
920  sentinel = input_map._sentinel
921  nside_coverage = input_map.nside_coverage
922  nside_sparse = input_map.nside_sparse
923  consolidated_map = hsp.HealSparseMap.make_empty(nside_coverage,
924  nside_sparse,
925  dtype,
926  sentinel=sentinel)
927  consolidated_map._reserve_cov_pix(cov_pix)
928 
929  # Only use pixels that are properly inside the tract.
930  vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
931  vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
932 
933  in_tract = (vpix_tract_ids == tract_id)
934 
935  consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
936 
937  return consolidated_map
std::vector< SchemaItem< Flag > > * items
Cartesian polygons.
Definition: Polygon.h:59
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value)
LonLat represents a spherical coordinate (longitude/latitude angle) pair.
Definition: LonLat.h:48
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def compute_approx_psf_size_and_shape(ccd_row, ra, dec, nx=20, ny=20, orderx=2, ordery=2)