22__all__ = [
"MakeWarpTask",
"MakeWarpConfig"]
31import lsst.pipe.base.connectionTypes
as connectionTypes
34from lsst.daf.butler
import DeferredDatasetHandle
38from lsst.utils.timer
import timeMethod
39from .coaddBase
import CoaddBaseTask, makeSkyInfo, reorderAndPadList
40from .make_psf_matched_warp
import growValidPolygons
41from .warpAndPsfMatch
import WarpAndPsfMatchTask
44log = logging.getLogger(__name__)
48 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
49 defaultTemplates={
"coaddName":
"deep",
51 calExpList = connectionTypes.Input(
52 doc=
"Input exposures to be resampled and optionally PSF-matched onto a SkyMap projection/patch",
53 name=
"{calexpType}calexp",
54 storageClass=
"ExposureF",
55 dimensions=(
"instrument",
"visit",
"detector"),
59 backgroundList = connectionTypes.Input(
60 doc=
"Input backgrounds to be added back into the calexp if bgSubtracted=False",
61 name=
"calexpBackground",
62 storageClass=
"Background",
63 dimensions=(
"instrument",
"visit",
"detector"),
66 skyCorrList = connectionTypes.Input(
67 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
69 storageClass=
"Background",
70 dimensions=(
"instrument",
"visit",
"detector"),
73 skyMap = connectionTypes.Input(
74 doc=
"Input definition of geometry/bbox and projection/wcs for warped exposures",
75 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
76 storageClass=
"SkyMap",
77 dimensions=(
"skymap",),
79 direct = connectionTypes.Output(
80 doc=(
"Output direct warped exposure (previously called CoaddTempExp), produced by resampling "
81 "calexps onto the skyMap patch geometry."),
82 name=
"{coaddName}Coadd_directWarp",
83 storageClass=
"ExposureF",
84 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
86 psfMatched = connectionTypes.Output(
87 doc=(
"Output PSF-Matched warped exposure (previously called CoaddTempExp), produced by resampling "
88 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF."),
89 name=
"{coaddName}Coadd_psfMatchedWarp",
90 storageClass=
"ExposureF",
91 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
93 visitSummary = connectionTypes.Input(
94 doc=
"Input visit-summary catalog with updated calibration objects.",
95 name=
"finalVisitSummary",
96 storageClass=
"ExposureCatalog",
97 dimensions=(
"instrument",
"visit",),
100 def __init__(self, *, config=None):
101 if config.bgSubtracted:
102 del self.backgroundList
103 if not config.doApplySkyCorr:
105 if not config.makeDirect:
107 if not config.makePsfMatched:
111class MakeWarpConfig(pipeBase.PipelineTaskConfig, CoaddBaseTask.ConfigClass,
112 pipelineConnections=MakeWarpConnections):
113 """Config for MakeWarpTask."""
115 warpAndPsfMatch = pexConfig.ConfigurableField(
116 target=WarpAndPsfMatchTask,
117 doc=
"Task to warp and PSF-match calexp",
119 doWrite = pexConfig.Field(
120 doc=
"persist <coaddName>Coadd_<warpType>Warp",
124 bgSubtracted = pexConfig.Field(
125 doc=
"Work with a background subtracted calexp?",
129 coaddPsf = pexConfig.ConfigField(
130 doc=
"Configuration for CoaddPsf",
131 dtype=CoaddPsfConfig,
133 makeDirect = pexConfig.Field(
134 doc=
"Make direct Warp/Coadds",
138 makePsfMatched = pexConfig.Field(
139 doc=
"Make Psf-Matched Warp/Coadd?",
143 modelPsf = GaussianPsfFactory.makeField(doc=
"Model Psf factory")
144 useVisitSummaryPsf = pexConfig.Field(
146 "If True, use the PSF model and aperture corrections from the 'visitSummary' connection. "
147 "If False, use the PSF model and aperture corrections from the 'exposure' connection. "
152 doWriteEmptyWarps = pexConfig.Field(
155 doc=
"Write out warps even if they are empty"
157 hasFakes = pexConfig.Field(
158 doc=
"Should be set to True if fake sources have been inserted into the input data.",
162 doApplySkyCorr = pexConfig.Field(
165 doc=
"Apply sky correction?",
167 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
170 CoaddBaseTask.ConfigClass.validate(self)
172 if not self.makePsfMatched
and not self.makeDirect:
173 raise ValueError(
"At least one of config.makePsfMatched and config.makeDirect must be True")
174 if self.warpAndPsfMatch.warp.cacheSize != self.coaddPsf.cacheSize:
179 raise ValueError(
"Image warping cache size and CoaddPSf warping cache size do not agree.")
181 if self.matchingKernelSize:
182 if self.matchingKernelSize != self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize:
183 raise pexConfig.FieldValidationError(
184 self.__class__.matchingKernelSize, self,
185 f
"matchingKernelSize ({self.matchingKernelSize}) must match "
186 "warpAndPsfMatch.psfMatch.kernel.active.kernelSize "
187 f
"({self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize})"
190 def setDefaults(self):
191 CoaddBaseTask.ConfigClass.setDefaults(self)
192 self.warpAndPsfMatch.warp.cacheSize = 0
193 self.coaddPsf.cacheSize = 0
194 self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize = self.matchingKernelSize
198 """Warp and optionally PSF-Match calexps onto an a common projection.
200 Warp and optionally PSF-Match calexps onto a common projection, by
201 performing the following operations:
202 - Group calexps by visit/run
203 - For each visit, generate a Warp by calling method @ref run.
204 `run` loops over the visit's calexps calling
205 `~lsst.pipe.tasks.warpAndPsfMatch.WarpAndPsfMatchTask` on each visit
208 ConfigClass = MakeWarpConfig
209 _DefaultName =
"makeWarp"
211 def __init__(self, **kwargs):
212 CoaddBaseTask.__init__(self, **kwargs)
213 self.makeSubtask(
"warpAndPsfMatch")
214 if self.config.hasFakes:
215 self.calexpType =
"fakes_calexp"
217 self.calexpType =
"calexp"
219 @utils.inheritDoc(pipeBase.PipelineTask)
220 def runQuantum(self, butlerQC, inputRefs, outputRefs):
224 Obtain the list of input detectors from calExpList. Sort them by
225 detector order (to ensure reproducibility). Then ensure all input
226 lists are in the same sorted detector order.
228 detectorOrder = [handle.datasetRef.dataId[
'detector']
for handle
in inputRefs.calExpList]
230 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
233 inputs = butlerQC.get(inputRefs)
237 skyMap = inputs.pop(
"skyMap")
238 quantumDataId = butlerQC.quantum.dataId
239 skyInfo = makeSkyInfo(skyMap, tractId=quantumDataId[
'tract'], patchId=quantumDataId[
'patch'])
242 dataIdList = [ref.datasetRef.dataId
for ref
in inputRefs.calExpList]
245 self.config.idGenerator.apply(dataId).catalog_id
246 for dataId
in dataIdList
250 visitSummary = inputs[
"visitSummary"]
253 for dataId
in dataIdList:
254 row = visitSummary.find(dataId[
"detector"])
257 f
"Unexpectedly incomplete visitSummary provided to makeWarp: {dataId} is missing."
259 bboxList.append(row.getBBox())
260 wcsList.append(row.getWcs())
261 inputs[
"bboxList"] = bboxList
262 inputs[
"wcsList"] = wcsList
266 completeIndices = self._prepareCalibratedExposures(**inputs)
267 inputs = self.filterInputs(indices=completeIndices, inputs=inputs)
273 coordList = [skyInfo.wcs.pixelToSky(pos)
for pos
in cornerPosList]
274 goodIndices = self.select.run(**inputs, coordList=coordList, dataIds=dataIdList)
275 inputs = self.filterInputs(indices=goodIndices, inputs=inputs)
278 visitId = dataIdList[0][
"visit"]
280 results = self.run(**inputs,
282 ccdIdList=[ccdIdList[i]
for i
in goodIndices],
283 dataIdList=[dataIdList[i]
for i
in goodIndices],
285 if self.config.makeDirect
and results.exposures[
"direct"]
is not None:
286 butlerQC.put(results.exposures[
"direct"], outputRefs.direct)
287 if self.config.makePsfMatched
and results.exposures[
"psfMatched"]
is not None:
288 butlerQC.put(results.exposures[
"psfMatched"], outputRefs.psfMatched)
291 def run(self, calExpList, ccdIdList, skyInfo, visitId=0, dataIdList=None, **kwargs):
292 """Create a Warp from inputs.
294 We iterate over the multiple calexps in a single exposure to construct
295 the warp (previously called a coaddTempExp) of that exposure to the
296 supplied tract/patch.
298 Pixels that receive no pixels are set to NAN; this is not correct
299 (violates LSST algorithms group policy), but will be fixed up by
300 interpolating after the coaddition.
302 calExpList : `list` [ `lsst.afw.image.Exposure` ]
303 List of single-detector input images that (may) overlap the patch
305 skyInfo : `lsst.pipe.base.Struct`
306 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
307 geometric information about the patch.
309 Integer identifier for visit, for the table that will
310 produce the CoaddPsf.
314 result : `lsst.pipe.base.Struct`
315 Results as a struct with attributes:
318 A dictionary containing the warps requested:
319 "direct": direct warp if ``config.makeDirect``
320 "psfMatched": PSF-matched warp if ``config.makePsfMatched``
323 warpTypeList = self.getWarpTypeList()
325 totGoodPix = {warpType: 0
for warpType
in warpTypeList}
326 didSetMetadata = {warpType:
False for warpType
in warpTypeList}
327 warps = {warpType: self._prepareEmptyExposure(skyInfo)
for warpType
in warpTypeList}
328 inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calExpList))
329 for warpType
in warpTypeList}
331 modelPsf = self.config.modelPsf.apply()
if self.config.makePsfMatched
else None
332 if dataIdList
is None:
333 dataIdList = ccdIdList
335 for calExpInd, (calExp, ccdId, dataId)
in enumerate(zip(calExpList, ccdIdList, dataIdList)):
336 self.log.info(
"Processing calexp %d of %d for this Warp: id=%s",
337 calExpInd+1, len(calExpList), dataId)
339 warpedAndMatched = self.warpAndPsfMatch.run(calExp, modelPsf=modelPsf,
340 wcs=skyInfo.wcs, maxBBox=skyInfo.bbox,
341 makeDirect=self.config.makeDirect,
342 makePsfMatched=self.config.makePsfMatched)
343 except Exception
as e:
344 self.log.warning(
"WarpAndPsfMatch failed for calexp %s; skipping it: %s", dataId, e)
347 numGoodPix = {warpType: 0
for warpType
in warpTypeList}
348 for warpType
in warpTypeList:
349 exposure = warpedAndMatched.getDict()[warpType]
352 warp = warps[warpType]
353 if didSetMetadata[warpType]:
354 mimg = exposure.getMaskedImage()
355 mimg *= (warp.getPhotoCalib().getInstFluxAtZeroMagnitude()
356 / exposure.getPhotoCalib().getInstFluxAtZeroMagnitude())
359 warp.getMaskedImage(), exposure.getMaskedImage(), self.getBadPixelMask())
360 totGoodPix[warpType] += numGoodPix[warpType]
361 self.log.debug(
"Calexp %s has %d good pixels in this patch (%.1f%%) for %s",
362 dataId, numGoodPix[warpType],
363 100.0*numGoodPix[warpType]/skyInfo.bbox.getArea(), warpType)
364 if numGoodPix[warpType] > 0
and not didSetMetadata[warpType]:
365 warp.info.id = exposure.info.id
366 warp.setPhotoCalib(exposure.getPhotoCalib())
367 warp.setFilter(exposure.getFilter())
368 warp.getInfo().setVisitInfo(exposure.getInfo().getVisitInfo())
371 warp.setPsf(exposure.getPsf())
372 didSetMetadata[warpType] =
True
376 inputRecorder[warpType].addCalExp(calExp, ccdId, numGoodPix[warpType])
378 except Exception
as e:
379 self.log.warning(
"Error processing calexp %s; skipping it: %s", dataId, e)
382 for warpType
in warpTypeList:
383 self.log.info(
"%sWarp has %d good pixels (%.1f%%)",
384 warpType, totGoodPix[warpType], 100.0*totGoodPix[warpType]/skyInfo.bbox.getArea())
386 if totGoodPix[warpType] > 0
and didSetMetadata[warpType]:
387 inputRecorder[warpType].finish(warps[warpType], totGoodPix[warpType])
388 if warpType ==
"direct":
389 warps[warpType].setPsf(
390 CoaddPsf(inputRecorder[warpType].coaddInputs.ccds, skyInfo.wcs,
391 self.config.coaddPsf.makeControl()))
394 inputRecorder[warpType].coaddInputs,
395 -self.config.warpAndPsfMatch.psfMatch.kernel.active.kernelSize // 2,
398 if not self.config.doWriteEmptyWarps:
400 warps[warpType] =
None
404 result = pipeBase.Struct(exposures=warps)
407 def filterInputs(self, indices, inputs):
408 """Filter task inputs by their indices.
412 indices : `list` [`int`]
413 inputs : `dict` [`list`]
414 A dictionary of input connections to be passed to run.
418 inputs : `dict` [`list`]
419 Task inputs with their lists filtered by indices.
421 for key
in inputs.keys():
423 if isinstance(inputs[key], list):
424 inputs[key] = [inputs[key][ind]
for ind
in indices]
427 def _prepareCalibratedExposures(self, *, visitSummary, calExpList=[], wcsList=None,
428 backgroundList=None, skyCorrList=None, **kwargs):
429 """Calibrate and add backgrounds to input calExpList in place.
433 visitSummary : `lsst.afw.table.ExposureCatalog`
434 Exposure catalog with potentially all calibrations. Attributes set
435 to `None` are ignored.
436 calExpList : `list` [`lsst.afw.image.Exposure` or
437 `lsst.daf.butler.DeferredDatasetHandle`]
438 Sequence of single-epoch images (or deferred load handles for
439 images) to be modified in place. On return this always has images,
441 wcsList : `list` [`lsst.afw.geom.SkyWcs`]
442 The WCSs of the calexps in ``calExpList``. These will be used to
443 determine if the calexp should be used in the warp. The list is
444 dynamically updated with the WCSs from the visitSummary.
445 backgroundList : `list` [`lsst.afw.math.BackgroundList`], optional
446 Sequence of backgrounds to be added back in if bgSubtracted=False.
447 skyCorrList : `list` [`lsst.afw.math.BackgroundList`], optional
448 Sequence of background corrections to be subtracted if
451 Additional keyword arguments.
455 indices : `list` [`int`]
456 Indices of ``calExpList`` and friends that have valid
459 wcsList = len(calExpList)*[
None]
if wcsList
is None else wcsList
460 backgroundList = len(calExpList)*[
None]
if backgroundList
is None else backgroundList
461 skyCorrList = len(calExpList)*[
None]
if skyCorrList
is None else skyCorrList
463 includeCalibVar = self.config.includeCalibVar
466 for index, (calexp, background, skyCorr)
in enumerate(zip(calExpList,
469 if isinstance(calexp, DeferredDatasetHandle):
470 calexp = calexp.get()
472 if not self.config.bgSubtracted:
473 calexp.maskedImage += background.getImage()
475 detectorId = calexp.info.getDetector().getId()
478 row = visitSummary.find(detectorId)
481 f
"Unexpectedly incomplete visitSummary: detector={detectorId} is missing."
483 if (photoCalib := row.getPhotoCalib())
is not None:
484 calexp.setPhotoCalib(photoCalib)
487 "Detector id %d for visit %d has None for photoCalib in the visitSummary and will "
488 "not be used in the warp", detectorId, row[
"visit"],
491 if (skyWcs := row.getWcs())
is not None:
492 calexp.setWcs(skyWcs)
493 wcsList[index] = skyWcs
496 "Detector id %d for visit %d has None for wcs in the visitSummary and will "
497 "not be used in the warp", detectorId, row[
"visit"],
500 if self.config.useVisitSummaryPsf:
501 if (psf := row.getPsf())
is not None:
505 "Detector id %d for visit %d has None for psf in the visitSummary and will "
506 "not be used in the warp", detectorId, row[
"visit"],
509 if (apCorrMap := row.getApCorrMap())
is not None:
510 calexp.info.setApCorrMap(apCorrMap)
513 "Detector id %d for visit %d has None for apCorrMap in the visitSummary and will "
514 "not be used in the warp", detectorId, row[
"visit"],
518 if calexp.getPsf()
is None:
520 "Detector id %d for visit %d has None for psf for the calexp and will "
521 "not be used in the warp", detectorId, row[
"visit"],
524 if calexp.info.getApCorrMap()
is None:
526 "Detector id %d for visit %d has None for apCorrMap in the calexp and will "
527 "not be used in the warp", detectorId, row[
"visit"],
532 if self.config.doApplySkyCorr:
533 calexp.maskedImage -= skyCorr.getImage()
536 calexp.maskedImage = photoCalib.calibrateImage(calexp.maskedImage,
537 includeScaleUncertainty=includeCalibVar)
538 calexp.maskedImage /= photoCalib.getCalibrationMean()
543 indices.append(index)
544 calExpList[index] = calexp
549 def _prepareEmptyExposure(skyInfo):
550 """Produce an empty exposure for a given patch.
554 skyInfo : `lsst.pipe.base.Struct`
555 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
556 geometric information about the patch.
560 exp : `lsst.afw.image.exposure.ExposureF`
561 An empty exposure for a given patch.
563 exp = afwImage.ExposureF(skyInfo.bbox, skyInfo.wcs)
565 .getPlaneBitMask(
"NO_DATA"), numpy.inf)
568 def getWarpTypeList(self):
569 """Return list of requested warp types per the config.
572 if self.config.makeDirect:
573 warpTypeList.append(
"direct")
574 if self.config.makePsfMatched:
575 warpTypeList.append(
"psfMatched")
579def reorderRefs(inputRefs, outputSortKeyOrder, dataIdKey):
580 """Reorder inputRefs per outputSortKeyOrder.
582 Any inputRefs which are lists will be resorted per specified key e.g.,
583 'detector.' Only iterables will be reordered, and values can be of type
584 `lsst.pipe.base.connections.DeferredDatasetRef` or
585 `lsst.daf.butler.core.datasets.ref.DatasetRef`.
587 Returned lists of refs have the same length as the outputSortKeyOrder.
588 If an outputSortKey not in the inputRef, then it will be padded with None.
589 If an inputRef contains an inputSortKey that is not in the
590 outputSortKeyOrder it will be removed.
594 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
595 Input references to be reordered and padded.
596 outputSortKeyOrder : `iterable`
597 Iterable of values to be compared with inputRef's dataId[dataIdKey].
599 The data ID key in the dataRefs to compare with the outputSortKeyOrder.
603 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
604 Quantized Connection with sorted DatasetRef values sorted if iterable.
606 for connectionName, refs
in inputRefs:
607 if isinstance(refs, Iterable):
608 if hasattr(refs[0],
"dataId"):
609 inputSortKeyOrder = [ref.dataId[dataIdKey]
for ref
in refs]
611 inputSortKeyOrder = [handle.datasetRef.dataId[dataIdKey]
for handle
in refs]
612 if inputSortKeyOrder != outputSortKeyOrder:
613 setattr(inputRefs, connectionName,
614 reorderAndPadList(refs, inputSortKeyOrder, outputSortKeyOrder))
Represent a 2-dimensional array of bitmask pixels.
A floating-point coordinate rectangle geometry.
CoaddPsf is the Psf derived to be used for non-PSF-matched Coadd images.
int copyGoodPixels(lsst::afw::image::Image< ImagePixelT > &destImage, lsst::afw::image::Image< ImagePixelT > const &srcImage)
copy good pixels from one image to another