22from __future__
import annotations
24from typing
import TYPE_CHECKING, Iterable
34 CloughTocher2DInterpolateTask,
47 PipelineTaskConnections,
50from lsst.pipe.base.connectionTypes
import Input, Output
55from .coaddInputRecorder
import CoaddInputRecorderTask
63 "MakeDirectWarpConfig",
69 PipelineTaskConnections,
70 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
76 """Connections for MakeWarpTask"""
79 doc=
"Input exposures to be interpolated and resampled onto a SkyMap "
81 name=
"{calexpType}calexp",
82 storageClass=
"ExposureF",
83 dimensions=(
"instrument",
"visit",
"detector"),
87 background_revert_list = Input(
88 doc=
"Background to be reverted (i.e., added back to the calexp). "
89 "This connection is used only if doRevertOldBackground=False.",
90 name=
"calexpBackground",
91 storageClass=
"Background",
92 dimensions=(
"instrument",
"visit",
"detector"),
95 background_apply_list = Input(
96 doc=
"Background to be applied (subtracted from the calexp). "
97 "This is used only if doApplyNewBackground=True.",
99 storageClass=
"Background",
100 dimensions=(
"instrument",
"visit",
"detector"),
103 visit_summary = Input(
104 doc=
"Input visit-summary catalog with updated calibration objects.",
105 name=
"finalVisitSummary",
106 storageClass=
"ExposureCatalog",
107 dimensions=(
"instrument",
"visit"),
110 doc=
"Input definition of geometry/bbox and projection/wcs for warps.",
111 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
112 storageClass=
"SkyMap",
113 dimensions=(
"skymap",),
117 doc=
"Output direct warped exposure produced by resampling calexps "
118 "onto the skyMap patch geometry.",
119 name=
"{coaddName}Coadd_directWarp",
120 storageClass=
"ExposureF",
121 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
123 masked_fraction_warp = Output(
124 doc=
"Output masked fraction warped exposure.",
125 name=
"{coaddName}Coadd_directWarp_maskedFraction",
126 storageClass=
"ImageF",
127 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
130 def __init__(self, *, config=None):
131 super().__init__(config=config)
135 if not config.doRevertOldBackground:
136 del self.background_revert_list
137 if not config.doApplyNewBackground:
138 del self.background_apply_list
140 if not config.doWarpMaskedFraction:
141 del self.masked_fraction_warp
145 for n
in range(config.numberOfNoiseRealizations):
147 doc=f
"Output direct warped noise exposure ({n})",
148 name=f
"{config.connections.coaddName}Coadd_directWarp_noise{n}",
150 storageClass=
"MaskedImageF",
151 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
153 setattr(self, f
"noise_warp{n}", noise_warp)
156class MakeDirectWarpConfig(
158 pipelineConnections=MakeDirectWarpConnections,
160 """Configuration for the MakeDirectWarpTask.
162 The config fields are as similar as possible to the corresponding fields in
167 The config fields are in camelCase to match the fields in the earlier
168 version of the makeWarp task as closely as possible.
171 MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
173 numberOfNoiseRealizations is defined as a RangeField to prevent from
174 making multiple output connections and blowing up the memory usage by
175 accident. An upper bound of 3 is based on the best guess of the maximum
176 number of noise realizations that will be used for metadetection.
179 numberOfNoiseRealizations = RangeField[int](
180 doc=
"Number of noise realizations to simulate and persist.",
183 max=MAX_NUMBER_OF_NOISE_REALIZATIONS,
186 seedOffset = Field[int](
187 doc=
"Offset to the seed used for the noise realization. This can be "
188 "used to create a different noise realization if the default ones "
189 "are catastrophic, or for testing sensitivity to the noise.",
192 useMedianVariance = Field[bool](
193 doc=
"Use the median of variance plane in the input calexp to generate "
194 "noise realizations? If False, per-pixel variance will be used.",
197 doRevertOldBackground = Field[bool](
198 doc=
"Revert the old backgrounds from the `background_revert_list` "
202 doApplyNewBackground = Field[bool](
203 doc=
"Apply the new backgrounds from the `background_apply_list` "
207 useVisitSummaryPsf = Field[bool](
208 doc=
"If True, use the PSF model and aperture corrections from the "
209 "'visit_summary' connection to make the warp. If False, use the "
210 "PSF model and aperture corrections from the 'calexp' connection.",
213 doSelectPreWarp = Field[bool](
214 doc=
"Select ccds before warping?",
218 doc=
"Image selection subtask.",
219 target=PsfWcsSelectImagesTask,
221 doPreWarpInterpolation = Field[bool](
222 doc=
"Interpolate over bad pixels before warping?",
226 doc=
"Interpolation task to use for pre-warping interpolation",
227 target=CloughTocher2DInterpolateTask,
230 doc=
"Subtask that helps fill CoaddInputs catalogs added to the final "
232 target=CoaddInputRecorderTask,
234 includeCalibVar = Field[bool](
235 doc=
"Add photometric calibration variance to warp variance plane?",
239 doc=
"Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later",
243 doc=
"Configuration for the warper that warps the image and noise",
244 dtype=Warper.ConfigClass,
246 doWarpMaskedFraction = Field[bool](
247 doc=
"Warp the masked fraction image?",
251 doc=
"Configuration for the warp that warps the mask fraction image",
252 dtype=Warper.ConfigClass,
255 doc=
"Configuration for CoaddPsf",
256 dtype=CoaddPsfConfig,
258 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
263 def bgSubtracted(self) -> bool:
264 return not self.doRevertOldBackground
267 def bgSubtracted(self, value: bool) ->
None:
268 self.doRevertOldBackground = ~value
271 def doApplySkyCorr(self) -> bool:
272 return self.doApplyNewBackground
274 @doApplySkyCorr.setter
275 def doApplySkyCorr(self, value: bool) ->
None:
276 self.doApplyNewBackground = value
278 def setDefaults(self) -> None:
279 super().setDefaults()
280 self.warper.warpingKernelName =
"lanczos3"
281 self.warper.cacheSize = 0
282 self.maskedFractionWarper.warpingKernelName =
"bilinear"
285class MakeDirectWarpTask(PipelineTask):
286 """Warp single-detector images onto a common projection.
288 This task iterates over multiple images (corresponding to different
289 detectors) from a single visit that overlap the target patch. Pixels that
290 receive no input from any detector are set to NaN in the output image, and
291 NO_DATA bit is set in the mask plane.
293 This differs from the standard `MakeWarp` Task in the following
296 1. No selection on ccds at the time of warping. This is done later during
297 the coaddition stage.
298 2. Interpolate over a set of masked pixels before warping.
299 3. Generate an image where each pixel denotes how much of the pixel is
301 4. Generate multiple noise warps with the same interpolation applied.
302 5. No option to produce a PSF-matched warp.
305 ConfigClass = MakeDirectWarpConfig
306 _DefaultName =
"makeDirectWarp"
308 def __init__(self, **kwargs):
309 super().__init__(**kwargs)
310 self.makeSubtask(
"inputRecorder")
311 self.makeSubtask(
"preWarpInterpolation")
312 if self.config.doSelectPreWarp:
313 self.makeSubtask(
"select")
315 self.warper = Warper.fromConfig(self.config.warper)
316 if self.config.doWarpMaskedFraction:
317 self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)
319 def runQuantum(self, butlerQC, inputRefs, outputRefs):
323 inputs = butlerQC.get(inputRefs)
325 if not inputs[
"calexp_list"]:
326 raise NoWorkFound(
"No input warps provided for co-addition")
328 sky_map = inputs.pop(
"sky_map")
330 quantumDataId = butlerQC.quantum.dataId
331 sky_info = makeSkyInfo(
333 tractId=quantumDataId[
"tract"],
334 patchId=quantumDataId[
"patch"],
337 results = self.run(inputs, sky_info)
338 butlerQC.put(results, outputRefs)
340 def _preselect_inputs(self, inputs, sky_info):
341 dataIdList = [ref.dataId
for ref
in inputs[
"calexp_list"]]
342 visit_summary = inputs[
"visit_summary"]
344 bboxList, wcsList = [], []
345 for dataId
in dataIdList:
346 row = visit_summary.find(dataId[
"detector"])
348 raise RuntimeError(f
"Unexpectedly incomplete visit_summary: {dataId=} is missing.")
349 bboxList.append(row.getBBox())
350 wcsList.append(row.getWcs())
352 cornerPosList =
Box2D(sky_info.bbox).getCorners()
353 coordList = [sky_info.wcs.pixelToSky(pos)
for pos
in cornerPosList]
355 goodIndices = self.select.run(
359 visitSummary=visit_summary,
363 inputs = self._filterInputs(indices=goodIndices, inputs=inputs)
367 def run(self, inputs, sky_info, **kwargs):
368 """Create a Warp dataset from inputs.
373 Dictionary of input datasets. It must have a list of input calexps
374 under the key "calexp_list". Other supported keys are
375 "background_revert_list" and "background_apply_list", corresponding
376 to the old and the new backgrounds to be reverted and applied to
377 the calexps. They must be in the same order as the calexps.
378 sky_info : `~lsst.pipe.base.Struct`
379 A Struct object containing wcs, bounding box, and other information
380 about the patches within the tract.
381 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
382 Table of visit summary information. If provided, the visit summary
383 information will be used to update the calibration of the input
384 exposures. If None, the input exposures will be used as-is.
388 results : `~lsst.pipe.base.Struct`
389 A Struct object containing the warped exposure, noise exposure(s),
390 and masked fraction image.
393 if self.config.doSelectPreWarp:
394 inputs = self._preselect_inputs(inputs, sky_info)
395 if not inputs[
"calexp_list"]:
396 raise NoWorkFound(
"No input warps remain after selection for co-addition")
398 sky_info.bbox.grow(self.config.border)
399 target_bbox, target_wcs = sky_info.bbox, sky_info.wcs
401 visit_summary = inputs[
"visit_summary"]
if self.config.useVisitSummaryPsf
else None
404 final_warp = ExposureF(target_bbox, target_wcs)
406 exposures = inputs[
"calexp_list"]
407 background_revert_list = inputs.get(
"background_revert_list", [
None] * len(exposures))
408 background_apply_list = inputs.get(
"background_apply_list", [
None] * len(exposures))
410 visit_id = exposures[0].dataId[
"visit"]
417 final_warp = self._prepareEmptyExposure(sky_info)
418 final_masked_fraction_warp = self._prepareEmptyExposure(sky_info)
419 final_noise_warps = {
420 n_noise: self._prepareEmptyExposure(sky_info)
421 for n_noise
in range(self.config.numberOfNoiseRealizations)
426 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
431 for index, (calexp_ref, old_background, new_background)
in enumerate(
432 zip(exposures, background_revert_list, background_apply_list, strict=
True)
434 dataId = calexp_ref.dataId
436 "Warping exposure %d/%d for id=%s",
441 calexp = calexp_ref.get()
444 seed = self.get_seed_from_data_id(dataId)
445 rng = np.random.RandomState(seed + self.config.seedOffset)
448 noise_calexps = self.make_noise_exposures(calexp, rng)
451 xyTransform = makeWcsPairTransform(calexp.getWcs(), target_wcs)
452 psfWarped =
WarpedPsf(calexp.getPsf(), xyTransform)
454 warpedExposure = self.process(
461 destBBox=target_bbox,
463 warpedExposure.setPsf(psfWarped)
465 if final_warp.photoCalib
is not None:
467 final_warp.photoCalib.getInstFluxAtZeroMagnitude()
468 / warpedExposure.photoCalib.getInstFluxAtZeroMagnitude()
473 self.log.debug(
"Scaling exposure %s by %f", dataId, ratio)
474 warpedExposure.maskedImage *= ratio
477 nGood = copyGoodPixels(
478 final_warp.maskedImage,
479 warpedExposure.maskedImage,
480 final_warp.mask.getPlaneBitMask([
"NO_DATA"]),
483 if final_warp.photoCalib
is None and nGood > 0:
484 final_warp.setPhotoCalib(warpedExposure.photoCalib)
486 ccdId = self.config.idGenerator.apply(dataId).catalog_id
487 inputRecorder.addCalExp(calexp, ccdId, nGood)
488 totalGoodPixels += nGood
490 if self.config.doWarpMaskedFraction:
492 if self.config.doPreWarpInterpolation:
493 badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
496 masked_fraction_exp = self._get_bad_mask(calexp, badMaskPlanes)
498 masked_fraction_warp = self.maskedFractionWarper.warpExposure(
499 target_wcs, masked_fraction_exp, destBBox=target_bbox
503 final_masked_fraction_warp.maskedImage,
504 masked_fraction_warp.maskedImage,
505 final_masked_fraction_warp.mask.getPlaneBitMask([
"NO_DATA"]),
509 for n_noise
in range(self.config.numberOfNoiseRealizations):
510 noise_calexp = noise_calexps[n_noise]
511 warpedNoise = self.process(
518 destBBox=target_bbox,
521 warpedNoise.maskedImage *= ratio
524 final_noise_warps[n_noise].maskedImage,
525 warpedNoise.maskedImage,
526 final_noise_warps[n_noise].mask.getPlaneBitMask([
"NO_DATA"]),
530 if totalGoodPixels == 0:
533 masked_fraction_warp=
None,
535 for noise_index
in range(self.config.numberOfNoiseRealizations):
536 setattr(results, f
"noise_warp{noise_index}",
None)
541 inputRecorder.finish(final_warp, totalGoodPixels)
544 inputRecorder.coaddInputs.ccds,
546 self.config.coaddPsf.makeControl(),
549 final_warp.setPsf(coaddPsf)
550 final_warp.setFilter(calexp.getFilter())
551 final_warp.getInfo().setVisitInfo(calexp.getInfo().getVisitInfo())
557 if self.config.doWarpMaskedFraction:
558 results.masked_fraction_warp = final_masked_fraction_warp.image
560 for noise_index, noise_exposure
in final_noise_warps.items():
561 setattr(results, f
"noise_warp{noise_index}", noise_exposure.maskedImage)
576 """Process an exposure.
578 There are three processing steps that are applied to the input:
580 1. Interpolate over bad pixels before warping.
581 2. Apply all calibrations from visit_summary to the exposure.
582 3. Warp the exposure to the target coordinate system.
586 exposure : `~lsst.afw.image.Exposure`
587 The input exposure to be processed.
588 target_wcs : `~lsst.afw.geom.SkyWcs`
589 The WCS of the target patch.
590 warper : `~lsst.afw.math.Warper`
591 The warper to use for warping the input exposure.
592 old_background : `~lsst.afw.image.Background` | None
593 The old background to be added back into the calexp.
594 new_background : `~lsst.afw.image.Background` | None
595 The new background to be subtracted from the calexp.
596 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
597 Table of visit summary information. If not None, the visit_summary
598 information will be used to update the calibration of the input
599 exposures. Otherwise, the input exposures will be used as-is.
600 maxBBox : `~lsst.geom.Box2I` | None
601 Maximum bounding box of the warped exposure. If None, this is
602 determined automatically.
603 destBBox : `~lsst.geom.Box2I` | None
604 Exact bounding box of the warped exposure. If None, this is
605 determined automatically.
609 warped_exposure : `~lsst.afw.image.Exposure`
610 The processed and warped exposure.
613 if self.config.doPreWarpInterpolation:
614 self.preWarpInterpolation.run(exposure.maskedImage)
616 self._apply_all_calibrations(
621 visit_summary=visit_summary,
622 includeScaleUncertainty=self.config.includeCalibVar,
624 with self.timer(
"warp"):
625 warped_exposure = warper.warpExposure(
634 return warped_exposure
637 def _filterInputs(indices, inputs):
638 """Filter inputs by their indices.
640 This method down-selects the list entries in ``inputs`` dictionary by
641 keeping only those items in the lists that correspond to ``indices``.
642 This is intended to select input visits that go into the warps.
646 indices : `list` [`int`]
648 A dictionary of input connections to be passed to run.
653 Task inputs with their lists filtered by indices.
655 for key
in inputs.keys():
657 if isinstance(inputs[key], list):
658 inputs[key] = [inputs[key][ind]
for ind
in indices]
662 def _apply_all_calibrations(
668 visit_summary: ExposureCatalog |
None =
None,
669 includeScaleUncertainty: bool =
False,
671 """Apply all of the calibrations from visit_summary to the exposure.
673 Specifically, this method updates the following (if available) to the
674 input exposure ``exp`` in place from ``visit_summary``:
676 - Aperture correction map
677 - Photometric calibration
683 exp : `~lsst.afw.image.Exposure`
684 Exposure to be updated.
685 old_background : `~lsst.afw.image.Exposure`
686 Exposure corresponding to the old background, to be added back.
687 new_background : `~lsst.afw.image.Exposure`
688 Exposure corresponding to the new background, to be subtracted.
689 logger : `logging.Logger`
690 Logger object from the caller Task to write the logs onto.
691 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
692 Table of visit summary information. If not None, the visit summary
693 information will be used to update the calibration of the input
694 exposures. Otherwise, the input exposures will be used as-is.
695 includeScaleUncertainty : bool
696 Whether to include the uncertainty on the calibration in the
697 resulting variance? Passed onto the `calibrateImage` method of the
698 PhotoCalib object attached to ``exp``.
703 Raised if ``visit_summary`` is provided but does not contain a
704 record corresponding to ``exp``.
707 exp.maskedImage += old_background.getImage()
709 if self.config.useVisitSummaryPsf:
710 detector = exp.info.getDetector().getId()
711 row = visit_summary.find(detector)
714 raise RuntimeError(f
"Unexpectedly incomplete visit_summary: {detector=} is missing.")
716 if photo_calib := row.getPhotoCalib():
717 exp.setPhotoCalib(photo_calib)
720 "No photometric calibration found in visit summary for detector = %s.",
724 if wcs := row.getWcs():
727 logger.warning(
"No WCS found in visit summary for detector = %s.", detector)
729 if psf := row.getPsf():
732 logger.warning(
"No PSF found in visit summary for detector = %s.", detector)
734 if apcorr_map := row.getApCorrMap():
735 exp.setApCorrMap(apcorr_map)
738 "No aperture correction map found in visit summary for detector = %s.",
743 exp.maskedImage -= new_background.getImage()
747 photo_calib = exp.photoCalib
748 exp.maskedImage = photo_calib.calibrateImage(
749 exp.maskedImage, includeScaleUncertainty=includeScaleUncertainty
751 exp.maskedImage /= photo_calib.getCalibrationMean()
755 def _prepareEmptyExposure(cls, sky_info):
756 """Produce an empty exposure for a given patch.
760 sky_info : `lsst.pipe.base.Struct`
761 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo` with
762 geometric information about the patch.
766 exp : `lsst.afw.image.exposure.ExposureF`
767 An empty exposure for a given patch.
769 exp = ExposureF(sky_info.bbox, sky_info.wcs)
770 exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask(
"NO_DATA"), np.inf)
774 def compute_median_variance(mi: MaskedImage) -> float:
775 """Compute the median variance across the good pixels of a MaskedImage.
779 mi : `~lsst.afw.image.MaskedImage`
780 The input image on which to compute the median variance.
784 median_variance : `float`
785 Median variance of the input calexp.
789 return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)])
791 def get_seed_from_data_id(self, data_id) -> int:
792 """Get a seed value given a data_id.
794 This method generates a unique, reproducible pseudo-random number for
795 a data id. This is not affected by ordering of the input, or what
796 set of visits, ccds etc. are given.
798 This is implemented as a public method, so that simulations that
799 don't necessary deal with the middleware can mock up a ``data_id``
800 instance, or override this method with a different one to obtain a
801 seed value consistent with the pipeline task.
805 data_id : `~lsst.daf.butler.DataCoordinate`
806 Data identifier dictionary.
811 A unique seed for this data_id to seed a random number generator.
813 return self.config.idGenerator.apply(data_id).catalog_id
815 def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]:
816 """Make pure noise realizations based on ``calexp``.
820 calexp : `~lsst.afw.image.ExposureF`
821 The input exposure on which to base the noise realizations.
822 rng : `np.random.RandomState`
823 Random number generator to use for the noise realizations.
827 noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`]
828 A mapping of integers ranging from 0 up to
829 config.numberOfNoiseRealizations to the corresponding
830 noise realization exposures.
836 if self.config.numberOfNoiseRealizations == 0:
839 if self.config.useMedianVariance:
840 variance = self.compute_median_variance(calexp.maskedImage)
842 variance = calexp.variance.array
844 for n_noise
in range(self.config.numberOfNoiseRealizations):
845 noise_calexp = calexp.clone()
846 noise_calexp.image.array[:, :] = rng.normal(
847 scale=np.sqrt(variance),
848 size=noise_calexp.image.array.shape,
850 noise_calexp.variance.array[:, :] = variance
851 noise_calexps[n_noise] = noise_calexp
856 def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF:
857 """Get an Exposure of bad mask
861 exp: `lsst.afw.image.Exposure`
863 badMaskPlanes: `list` [`str`]
864 List of mask planes to be considered as bad.
868 bad_mask: `~lsst.afw.image.Exposure`
869 An Exposure with boolean array with True if inverse variance <= 0
870 or if any of the badMaskPlanes bits are set, and False otherwise.
873 bad_mask = exp.clone()
875 var = exp.variance.array
876 mask = exp.mask.array
878 bitMask = exp.mask.getPlaneBitMask(badMaskPlanes)
880 bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0)
882 bad_mask.variance.array *= 0.0
A floating-point coordinate rectangle geometry.
CoaddPsf is the Psf derived to be used for non-PSF-matched Coadd images.
A Psf class that maps an arbitrary Psf through a coordinate transformation.