22"""Tasks for making and manipulating HIPS images."""
24__all__ = [
"HighResolutionHipsTask",
"HighResolutionHipsConfig",
"HighResolutionHipsConnections"]
26from collections
import defaultdict
33from lsst.utils.timer
import timeMethod
34from lsst.daf.butler
import Butler, DatasetRef, Quantum, SkyPixDimension
44 dimensions=(
"healpix9",
"band"),
45 defaultTemplates={
"coaddName":
"deep"}):
46 coadd_exposure_handles = pipeBase.connectionTypes.Input(
47 doc=
"Coadded exposures to convert to HIPS format.",
48 name=
"{coaddName}Coadd_calexp",
49 storageClass=
"ExposureF",
50 dimensions=(
"tract",
"patch",
"skymap",
"band"),
54 hips_exposures = pipeBase.connectionTypes.Output(
55 doc=
"HIPS-compatible HPX image.",
56 name=
"{coaddName}Coadd_hpx",
57 storageClass=
"ExposureF",
58 dimensions=(
"healpix11",
"band"),
62 def __init__(self, *, config=None):
63 super().__init__(config=config)
66 for dim
in self.dimensions:
68 if quantum_order
is not None:
69 raise ValueError(
"Must not specify more than one quantum healpix dimension.")
70 quantum_order =
int(dim.split(
'healpix')[1])
71 if quantum_order
is None:
72 raise ValueError(
"Must specify a healpix dimension in quantum dimensions.")
74 if quantum_order > config.hips_order:
75 raise ValueError(
"Quantum healpix dimension order must not be greater than hips_order")
78 for dim
in self.hips_exposures.dimensions:
81 raise ValueError(
"Must not specify more than one healpix dimension.")
82 order =
int(dim.split(
'healpix')[1])
84 raise ValueError(
"Must specify a healpix dimension in hips_exposure dimensions.")
86 if order != config.hips_order:
87 raise ValueError(
"healpix dimension order must match config.hips_order.")
90class HighResolutionHipsConfig(pipeBase.PipelineTaskConfig,
91 pipelineConnections=HighResolutionHipsConnections):
92 """Configuration parameters for HighResolutionHipsTask.
96 A HiPS image covers one HEALPix cell, with the HEALPix nside equal to
97 2**hips_order. Each cell
is 'shift_order' orders deeper than the HEALPix
98 cell,
with 2**shift_order x 2**shift_order sub-pixels on a side, which
99 defines the target resolution of the HiPS image. The IVOA recommends
100 shift_order=9,
for 2**9=512 pixels on a side.
103 https://www.ivoa.net/documents/HiPS/20170519/REC-HIPS-1.0-20170519.pdf
104 shows the relationship between hips_order, number of tiles (full
105 sky coverage), cell size,
and sub-pixel size/image resolution (
with
106 the default shift_order=9):
107 +------------+-----------------+--------------+------------------+
108 | hips_order | Number of Tiles | Cell Size | Image Resolution |
109 +============+=================+==============+==================+
110 | 0 | 12 | 58.63 deg | 6.871 arcmin |
111 | 1 | 48 | 29.32 deg | 3.435 arcmin |
112 | 2 | 192 | 14.66 deg | 1.718 arcmin |
113 | 3 | 768 | 7.329 deg | 51.53 arcsec |
114 | 4 | 3072 | 3.665 deg | 25.77 arcsec |
115 | 5 | 12288 | 1.832 deg | 12.88 arcsec |
116 | 6 | 49152 | 54.97 arcmin | 6.442 arcsec |
117 | 7 | 196608 | 27.48 arcmin | 3.221 arcsec |
118 | 8 | 786432 | 13.74 arcmin | 1.61 arcsec |
119 | 9 | 3145728 | 6.871 arcmin | 805.2mas |
120 | 10 | 12582912 | 3.435 arcmin | 402.6mas |
121 | 11 | 50331648 | 1.718 arcmin | 201.3mas |
122 | 12 | 201326592 | 51.53 arcsec | 100.6mas |
123 | 13 | 805306368 | 25.77 arcsec | 50.32mas |
124 +------------+-----------------+--------------+------------------+
126 hips_order = pexConfig.Field(
127 doc="HIPS image order.",
131 shift_order = pexConfig.Field(
132 doc=
"HIPS shift order (such that each tile is 2**shift_order pixels on a side)",
136 warp = pexConfig.ConfigField(
137 dtype=afwMath.Warper.ConfigClass,
138 doc=
"Warper configuration",
142 self.warp.warpingKernelName =
"lanczos5"
145class HipsTaskNameDescriptor:
146 """Descriptor used create a DefaultName that matches the order of
147 the defined dimensions in the connections
class.
152 The prefix of the Default name, to which the order will be
155 def __init__(self, prefix):
157 self._defaultName = f
"{prefix}{{}}"
160 def __get__(self, obj, klass=None):
163 "HipsTaskDescriptor was used in an unexpected context"
165 if self._order
is None:
166 klassDimensions = klass.ConfigClass.ConnectionsClass.dimensions
167 for dim
in klassDimensions:
168 if (match := re.match(
r"^healpix(\d*)$", dim))
is not None:
169 self._order =
int(match.group(1))
173 "Could not find healpix dimension in connections class"
175 return self._defaultName.
format(self._order)
178class HighResolutionHipsTask(pipeBase.PipelineTask):
179 """Task for making high resolution HiPS images."""
180 ConfigClass = HighResolutionHipsConfig
181 _DefaultName = HipsTaskNameDescriptor(
"highResolutionHips")
183 def __init__(self, **kwargs):
184 super().__init__(**kwargs)
185 self.warper = afwMath.Warper.fromConfig(self.config.warp)
188 def runQuantum(self, butlerQC, inputRefs, outputRefs):
189 inputs = butlerQC.get(inputRefs)
191 healpix_dim = f
"healpix{self.config.hips_order}"
193 pixels = [hips_exposure.dataId[healpix_dim]
194 for hips_exposure
in outputRefs.hips_exposures]
196 outputs = self.run(pixels=pixels, coadd_exposure_handles=inputs[
"coadd_exposure_handles"])
198 hips_exposure_ref_dict = {hips_exposure_ref.dataId[healpix_dim]:
199 hips_exposure_ref
for hips_exposure_ref
in outputRefs.hips_exposures}
200 for pixel, hips_exposure
in outputs.hips_exposures.items():
201 butlerQC.put(hips_exposure, hips_exposure_ref_dict[pixel])
203 def run(self, pixels, coadd_exposure_handles):
204 """Run the HighResolutionHipsTask.
208 pixels : `Iterable` [ `int` ]
209 Iterable of healpix pixels (nest ordering) to warp to.
210 coadd_exposure_handles : `list` [`lsst.daf.butler.DeferredDatasetHandle`]
211 Handles for the coadd exposures.
215 outputs : `lsst.pipe.base.Struct`
216 ``hips_exposures``
is a dict
with pixel (key)
and hips_exposure (value)
218 self.log.info("Generating HIPS images for %d pixels at order %d", len(pixels), self.config.hips_order)
220 npix = 2**self.config.shift_order
230 wcs_hpx = afwGeom.makeHpxWcs(self.config.hips_order, pixel, shift_order=self.config.shift_order)
231 exp_hpx = afwImage.ExposureF(bbox_hpx, wcs_hpx)
232 exp_hpx_dict[pixel] = exp_hpx
233 warp_dict[pixel] = []
238 for handle
in coadd_exposure_handles:
239 coadd_exp = handle.get()
243 warped = self.warper.
warpExposure(exp_hpx_dict[pixel].getWcs(), coadd_exp, maxBBox=bbox_hpx)
245 exp = afwImage.ExposureF(exp_hpx_dict[pixel].getBBox(), exp_hpx_dict[pixel].getWcs())
246 exp.maskedImage.set(np.nan, afwImage.Mask.getPlaneBitMask(
"NO_DATA"), np.nan)
251 exp_hpx_dict[pixel].mask.conformMaskPlanes(coadd_exp.mask.getMaskPlaneDict())
252 exp_hpx_dict[pixel].setFilter(coadd_exp.getFilter())
253 exp_hpx_dict[pixel].setPhotoCalib(coadd_exp.getPhotoCalib())
255 if warped.getBBox().getArea() == 0
or not np.any(np.isfinite(warped.image.array)):
258 "No overlap between output HPX %d and input exposure %s",
264 exp.maskedImage.assign(warped.maskedImage, warped.getBBox())
265 warp_dict[pixel].
append(exp.maskedImage)
271 stats_ctrl.setNanSafe(
True)
272 stats_ctrl.setWeighted(
True)
273 stats_ctrl.setCalcErrorFromInputVariance(
True)
279 exp_hpx_dict[pixel].maskedImage.set(np.nan, afwImage.Mask.getPlaneBitMask(
"NO_DATA"), np.nan)
281 if not warp_dict[pixel]:
283 self.log.
debug(
"No data in HPX pixel %d", pixel)
286 exp_hpx_dict.pop(pixel)
293 [1.0]*len(warp_dict[pixel]),
298 return pipeBase.Struct(hips_exposures=exp_hpx_dict)
301 def build_quantum_graph_cli(cls, argv):
302 """A command-line interface entry point to `build_quantum_graph`.
303 This method provides the implementation for the
304 ``build-high-resolution-hips-qg`` script.
308 argv : `Sequence` [ `str` ]
309 Command-line arguments (e.g. ``sys.argv[1:]``).
311 parser = cls._make_cli_parser()
313 args = parser.parse_args(argv)
315 if args.subparser_name
is None:
319 pipeline = pipeBase.Pipeline.from_uri(args.pipeline)
320 expanded_pipeline =
list(pipeline.toExpandedPipeline())
322 if len(expanded_pipeline) != 1:
323 raise RuntimeError(f
"Pipeline file {args.pipeline} may only contain one task.")
325 (task_def,) = expanded_pipeline
327 butler = Butler(args.butler_config, collections=args.input)
329 if args.subparser_name ==
'segment':
332 dataset = task_def.connections.coadd_exposure_handles.name
333 data_ids =
set(butler.registry.queryDataIds(
"tract", datasets=dataset).expanded())
335 for data_id
in data_ids:
336 region = data_id.region
337 pixel_range = hpix_pixelization.envelope(region)
338 for r
in pixel_range.ranges():
339 region_pixels.extend(range(r[0], r[1]))
340 indices = np.unique(region_pixels)
342 print(f
"Pixels to run at HEALPix order --hpix_build_order {args.hpix_build_order}:")
343 for pixel
in indices:
346 elif args.subparser_name ==
'build':
349 build_ranges =
RangeSet(sorted(args.pixels))
351 qg = cls.build_quantum_graph(
354 args.hpix_build_order,
357 collections=args.input
359 qg.saveUri(args.save_qgraph)
362 def _make_cli_parser(cls):
363 """Make the command-line parser.
367 parser : `argparse.ArgumentParser`
369 parser = argparse.ArgumentParser(
371 "Build a QuantumGraph that runs HighResolutionHipsTask on existing coadd datasets."
374 subparsers = parser.add_subparsers(help=
'sub-command help', dest=
'subparser_name')
376 parser_segment = subparsers.add_parser(
'segment',
377 help=
'Determine survey segments for workflow.')
378 parser_build = subparsers.add_parser(
'build',
379 help=
'Build quantum graph for HighResolutionHipsTask')
381 for sub
in [parser_segment, parser_build]:
387 help=
"Path to data repository or butler configuration.",
394 help=
"Pipeline file, limited to one task.",
402 help=
"Input collection(s) to search for coadd exposures.",
407 "--hpix_build_order",
410 help=
"HEALPix order to segment sky for building quantum graph files.",
417 help=
"Data ID expression used when querying for input coadd datasets.",
420 parser_build.add_argument(
424 help=
"Output filename for QuantumGraph.",
427 parser_build.add_argument(
432 help=
"Pixels at --hpix_build_order to generate quantum graph.",
439 def build_quantum_graph(
448 """Generate a `QuantumGraph` for running just this task.
450 This is a temporary workaround
for incomplete butler query support
for
455 task_def : `lsst.pipe.base.TaskDef`
457 registry : `lsst.daf.butler.Registry`
458 Client
for the butler database. May be read-only.
459 constraint_order : `int`
460 HEALPix order used to contrain which quanta are generated, via
461 ``constraint_indices``. This should be a coarser grid (smaller
462 order) than the order used
for the task
's quantum and output data
463 IDs, and ideally something between the spatial scale of a patch
or
464 the data repository
's "common skypix" system (usually ``htm7``).
466 RangeSet which describes constraint pixels (HEALPix NEST, with order
467 constraint_order) to constrain generated quanta.
468 where : `str`, optional
469 A boolean `str` expression of the form accepted by
470 `Registry.queryDatasets` to constrain input datasets. This may
471 contain a constraint on tracts, patches,
or bands, but
not HEALPix
472 indices. Constraints on tracts
and patches should usually be
473 unnecessary, however - existing coadds that overlap the given
474 HEALpix indices will be selected without such a constraint,
and
475 providing one may reject some that should normally be included.
476 collections : `str`
or `Iterable` [ `str` ], optional
477 Collection
or collections to search
for input datasets,
in order.
478 If
not provided, ``registry.defaults.collections`` will be
481 config = task_def.config
483 dataset_types = pipeBase.PipelineDatasetTypes.fromPipeline(pipeline=[task_def], registry=registry)
486 (input_dataset_type,) = dataset_types.inputs
491 output_dataset_type = dataset_types.outputs[task_def.connections.hips_exposures.name]
492 incidental_output_dataset_types = dataset_types.outputs.copy()
493 incidental_output_dataset_types.remove(output_dataset_type)
494 (hpx_output_dimension,) = (d
for d
in output_dataset_type.dimensions
495 if isinstance(d, SkyPixDimension))
497 constraint_hpx_pixelization = registry.dimensions[f
"healpix{constraint_order}"].pixelization
498 common_skypix_name = registry.dimensions.commonSkyPix.name
499 common_skypix_pixelization = registry.dimensions.commonSkyPix.pixelization
502 task_dimensions = registry.dimensions.extract(task_def.connections.dimensions)
503 (hpx_dimension,) = (d
for d
in task_dimensions
if d.name !=
"band")
504 hpx_pixelization = hpx_dimension.pixelization
506 if hpx_pixelization.level < constraint_order:
507 raise ValueError(f
"Quantum order {hpx_pixelization.level} must be < {constraint_order}")
508 hpx_ranges = constraint_ranges.scaled(4**(hpx_pixelization.level - constraint_order))
513 for begin, end
in constraint_ranges:
514 for hpx_index
in range(begin, end):
515 constraint_hpx_region = constraint_hpx_pixelization.pixel(hpx_index)
516 common_skypix_ranges |= common_skypix_pixelization.envelope(constraint_hpx_region)
520 for simp
in range(1, 10):
521 if len(common_skypix_ranges) < 100:
523 common_skypix_ranges.simplify(simp)
530 for n, (begin, end)
in enumerate(common_skypix_ranges):
533 where_terms.append(f
"{common_skypix_name} = cpx{n}")
534 bind[f
"cpx{n}"] = begin
536 where_terms.append(f
"({common_skypix_name} >= cpx{n}a AND {common_skypix_name} <= cpx{n}b)")
537 bind[f
"cpx{n}a"] = begin
538 bind[f
"cpx{n}b"] = stop
540 where =
" OR ".join(where_terms)
542 where = f
"({where}) AND ({' OR '.join(where_terms)})"
546 input_refs = registry.queryDatasets(
550 collections=collections,
553 inputs_by_patch = defaultdict(set)
554 patch_dimensions = registry.dimensions.extract([
"patch"])
555 for input_ref
in input_refs:
556 inputs_by_patch[input_ref.dataId.subset(patch_dimensions)].add(input_ref)
557 if not inputs_by_patch:
558 message_body =
'\n'.join(input_refs.explain_no_results())
559 raise RuntimeError(f
"No inputs found:\n{message_body}")
564 inputs_by_hpx = defaultdict(set)
565 for patch_data_id, input_refs_for_patch
in inputs_by_patch.items():
566 patch_hpx_ranges = hpx_pixelization.envelope(patch_data_id.region)
567 for begin, end
in patch_hpx_ranges & hpx_ranges:
568 for hpx_index
in range(begin, end):
569 inputs_by_hpx[hpx_index].update(input_refs_for_patch)
572 for hpx_index, input_refs_for_hpx_index
in inputs_by_hpx.items():
574 input_refs_by_band = defaultdict(list)
575 for input_ref
in input_refs_for_hpx_index:
576 input_refs_by_band[input_ref.dataId[
"band"]].
append(input_ref)
578 for band, input_refs_for_band
in input_refs_by_band.items():
579 data_id = registry.expandDataId({hpx_dimension: hpx_index,
"band": band})
581 hpx_pixel_ranges =
RangeSet(hpx_index)
582 hpx_output_ranges = hpx_pixel_ranges.scaled(4**(config.hips_order - hpx_pixelization.level))
584 for begin, end
in hpx_output_ranges:
585 for hpx_output_index
in range(begin, end):
586 output_data_ids.append(
587 registry.expandDataId({hpx_output_dimension: hpx_output_index,
"band": band})
589 outputs = {dt: [DatasetRef(dt, data_id)]
for dt
in incidental_output_dataset_types}
590 outputs[output_dataset_type] = [DatasetRef(output_dataset_type, data_id)
591 for data_id
in output_data_ids]
594 taskName=task_def.taskName,
595 taskClass=task_def.taskClass,
598 inputs={input_dataset_type: input_refs_for_band},
604 raise RuntimeError(
"Given constraints yielded empty quantum graph.")
606 return pipeBase.QuantumGraph(quanta={task_def: quanta})
Pass parameters to a Statistics object.
An integer coordinate rectangle.
A RangeSet is a set of unsigned 64 bit integers.
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
std::shared_ptr< lsst::afw::image::Image< PixelT > > statisticsStack(std::vector< std::shared_ptr< lsst::afw::image::Image< PixelT > > > &images, Property flags, StatisticsControl const &sctrl=StatisticsControl(), std::vector< lsst::afw::image::VariancePixel > const &wvector=std::vector< lsst::afw::image::VariancePixel >(0))
A function to compute some statistics of a stack of Images.
Property stringToStatisticsProperty(std::string const property)
Conversion function to switch a string to a Property (see Statistics.h)
int warpExposure(DestExposureT &destExposure, SrcExposureT const &srcExposure, WarpingControl const &control, typename DestExposureT::MaskedImageT::SinglePixel padValue=lsst::afw::math::edgePixel< typename DestExposureT::MaskedImageT >(typename lsst::afw::image::detail::image_traits< typename DestExposureT::MaskedImageT >::image_category()))
Warp (remap) one exposure to another.
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)