22__all__ = [
"MakeWarpTask",
"MakeWarpConfig"]
31import lsst.pipe.base.connectionTypes
as connectionTypes
34from lsst.daf.butler
import DeferredDatasetHandle
38from lsst.utils.timer
import timeMethod
39from .coaddBase
import CoaddBaseTask, makeSkyInfo, reorderAndPadList
40from .make_psf_matched_warp
import growValidPolygons
41from .warpAndPsfMatch
import WarpAndPsfMatchTask
44log = logging.getLogger(__name__)
48 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
49 defaultTemplates={
"coaddName":
"deep",
51 calExpList = connectionTypes.Input(
52 doc=
"Input exposures to be resampled and optionally PSF-matched onto a SkyMap projection/patch",
53 name=
"{calexpType}calexp",
54 storageClass=
"ExposureF",
55 dimensions=(
"instrument",
"visit",
"detector"),
59 backgroundList = connectionTypes.Input(
60 doc=
"Input backgrounds to be added back into the calexp if bgSubtracted=False",
61 name=
"calexpBackground",
62 storageClass=
"Background",
63 dimensions=(
"instrument",
"visit",
"detector"),
66 skyCorrList = connectionTypes.Input(
67 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
69 storageClass=
"Background",
70 dimensions=(
"instrument",
"visit",
"detector"),
73 skyMap = connectionTypes.Input(
74 doc=
"Input definition of geometry/bbox and projection/wcs for warped exposures",
75 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
76 storageClass=
"SkyMap",
77 dimensions=(
"skymap",),
79 direct = connectionTypes.Output(
80 doc=(
"Output direct warped exposure (previously called CoaddTempExp), produced by resampling ",
81 "calexps onto the skyMap patch geometry."),
82 name=
"{coaddName}Coadd_directWarp",
83 storageClass=
"ExposureF",
84 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
86 psfMatched = connectionTypes.Output(
87 doc=(
"Output PSF-Matched warped exposure (previously called CoaddTempExp), produced by resampling ",
88 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF."),
89 name=
"{coaddName}Coadd_psfMatchedWarp",
90 storageClass=
"ExposureF",
91 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
93 visitSummary = connectionTypes.Input(
94 doc=
"Input visit-summary catalog with updated calibration objects.",
95 name=
"finalVisitSummary",
96 storageClass=
"ExposureCatalog",
97 dimensions=(
"instrument",
"visit",),
100 def __init__(self, *, config=None):
101 if config.bgSubtracted:
102 del self.backgroundList
103 if not config.doApplySkyCorr:
105 if not config.makeDirect:
107 if not config.makePsfMatched:
111class MakeWarpConfig(pipeBase.PipelineTaskConfig, CoaddBaseTask.ConfigClass,
112 pipelineConnections=MakeWarpConnections):
113 """Config for MakeWarpTask."""
115 warpAndPsfMatch = pexConfig.ConfigurableField(
116 target=WarpAndPsfMatchTask,
117 doc=
"Task to warp and PSF-match calexp",
119 doWrite = pexConfig.Field(
120 doc=
"persist <coaddName>Coadd_<warpType>Warp",
124 bgSubtracted = pexConfig.Field(
125 doc=
"Work with a background subtracted calexp?",
129 coaddPsf = pexConfig.ConfigField(
130 doc=
"Configuration for CoaddPsf",
131 dtype=CoaddPsfConfig,
133 makeDirect = pexConfig.Field(
134 doc=
"Make direct Warp/Coadds",
138 makePsfMatched = pexConfig.Field(
139 doc=
"Make Psf-Matched Warp/Coadd?",
143 modelPsf = GaussianPsfFactory.makeField(doc=
"Model Psf factory")
144 useVisitSummaryPsf = pexConfig.Field(
146 "If True, use the PSF model and aperture corrections from the 'visitSummary' connection. "
147 "If False, use the PSF model and aperture corrections from the 'exposure' connection. "
152 doWriteEmptyWarps = pexConfig.Field(
155 doc=
"Write out warps even if they are empty"
157 hasFakes = pexConfig.Field(
158 doc=
"Should be set to True if fake sources have been inserted into the input data.",
162 doApplySkyCorr = pexConfig.Field(
165 doc=
"Apply sky correction?",
167 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
170 CoaddBaseTask.ConfigClass.validate(self)
172 if not self.makePsfMatched
and not self.makeDirect:
173 raise ValueError(
"At least one of config.makePsfMatched and config.makeDirect must be True")
174 if self.warpAndPsfMatch.warp.cacheSize != self.coaddPsf.cacheSize:
179 raise ValueError(
"Image warping cache size and CoaddPSf warping cache size do not agree.")
181 if self.matchingKernelSize:
182 if self.matchingKernelSize != self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize:
183 raise pexConfig.FieldValidationError(
184 self.__class__.matchingKernelSize, self,
185 f
"matchingKernelSize ({self.matchingKernelSize}) must match "
186 "warpAndPsfMatch.psfMatch.kernel.active.kernelSize "
187 f
"({self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize})"
190 def setDefaults(self):
191 CoaddBaseTask.ConfigClass.setDefaults(self)
192 self.warpAndPsfMatch.warp.cacheSize = 0
193 self.coaddPsf.cacheSize = 0
194 self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize = self.matchingKernelSize
198 """Warp and optionally PSF-Match calexps onto an a common projection.
200 Warp and optionally PSF-Match calexps onto a common projection, by
201 performing the following operations:
202 - Group calexps by visit/run
203 - For each visit, generate a Warp by calling method @ref run.
204 `run` loops over the visit's calexps calling
205 `~lsst.pipe.tasks.warpAndPsfMatch.WarpAndPsfMatchTask` on each visit
208 ConfigClass = MakeWarpConfig
209 _DefaultName =
"makeWarp"
211 def __init__(self, **kwargs):
212 CoaddBaseTask.__init__(self, **kwargs)
213 self.makeSubtask(
"warpAndPsfMatch")
214 if self.config.hasFakes:
215 self.calexpType =
"fakes_calexp"
217 self.calexpType =
"calexp"
219 @utils.inheritDoc(pipeBase.PipelineTask)
220 def runQuantum(self, butlerQC, inputRefs, outputRefs):
224 Obtain the list of input detectors from calExpList. Sort them by
225 detector order (to ensure reproducibility). Then ensure all input
226 lists are in the same sorted detector order.
228 detectorOrder = [ref.datasetRef.dataId[
'detector']
for ref
in inputRefs.calExpList]
230 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
233 inputs = butlerQC.get(inputRefs)
237 skyMap = inputs.pop(
"skyMap")
238 quantumDataId = butlerQC.quantum.dataId
239 skyInfo = makeSkyInfo(skyMap, tractId=quantumDataId[
'tract'], patchId=quantumDataId[
'patch'])
242 dataIdList = [ref.datasetRef.dataId
for ref
in inputRefs.calExpList]
245 self.config.idGenerator.apply(dataId).catalog_id
246 for dataId
in dataIdList
250 visitSummary = inputs[
"visitSummary"]
253 for dataId
in dataIdList:
254 row = visitSummary.find(dataId[
"detector"])
257 f
"Unexpectedly incomplete visitSummary provided to makeWarp: {dataId} is missing."
259 bboxList.append(row.getBBox())
260 wcsList.append(row.getWcs())
261 inputs[
"bboxList"] = bboxList
262 inputs[
"wcsList"] = wcsList
266 completeIndices = self._prepareCalibratedExposures(**inputs)
267 inputs = self.filterInputs(indices=completeIndices, inputs=inputs)
273 coordList = [skyInfo.wcs.pixelToSky(pos)
for pos
in cornerPosList]
274 goodIndices = self.select.run(**inputs, coordList=coordList, dataIds=dataIdList)
275 inputs = self.filterInputs(indices=goodIndices, inputs=inputs)
278 visitId = dataIdList[0][
"visit"]
280 results = self.run(**inputs,
282 ccdIdList=[ccdIdList[i]
for i
in goodIndices],
283 dataIdList=[dataIdList[i]
for i
in goodIndices],
285 if self.config.makeDirect
and results.exposures[
"direct"]
is not None:
286 butlerQC.put(results.exposures[
"direct"], outputRefs.direct)
287 if self.config.makePsfMatched
and results.exposures[
"psfMatched"]
is not None:
288 butlerQC.put(results.exposures[
"psfMatched"], outputRefs.psfMatched)
291 def run(self, calExpList, ccdIdList, skyInfo, visitId=0, dataIdList=None, **kwargs):
292 """Create a Warp from inputs.
294 We iterate over the multiple calexps in a single exposure to construct
295 the warp (previously called a coaddTempExp) of that exposure to the
296 supplied tract/patch.
298 Pixels that receive no pixels are set to NAN; this is not correct
299 (violates LSST algorithms group policy), but will be fixed up by
300 interpolating after the coaddition.
302 calexpRefList : `list`
303 List of data references for calexps that (may)
304 overlap the patch of interest.
305 skyInfo : `lsst.pipe.base.Struct`
306 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
307 geometric information about the patch.
309 Integer identifier for visit, for the table that will
310 produce the CoaddPsf.
314 result : `lsst.pipe.base.Struct`
315 Results as a struct with attributes:
318 A dictionary containing the warps requested:
319 "direct": direct warp if ``config.makeDirect``
320 "psfMatched": PSF-matched warp if ``config.makePsfMatched``
323 warpTypeList = self.getWarpTypeList()
325 totGoodPix = {warpType: 0
for warpType
in warpTypeList}
326 didSetMetadata = {warpType:
False for warpType
in warpTypeList}
327 warps = {warpType: self._prepareEmptyExposure(skyInfo)
for warpType
in warpTypeList}
328 inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calExpList))
329 for warpType
in warpTypeList}
331 modelPsf = self.config.modelPsf.apply()
if self.config.makePsfMatched
else None
332 if dataIdList
is None:
333 dataIdList = ccdIdList
335 for calExpInd, (calExp, ccdId, dataId)
in enumerate(zip(calExpList, ccdIdList, dataIdList)):
336 self.log.info(
"Processing calexp %d of %d for this Warp: id=%s",
337 calExpInd+1, len(calExpList), dataId)
339 warpedAndMatched = self.warpAndPsfMatch.run(calExp, modelPsf=modelPsf,
340 wcs=skyInfo.wcs, maxBBox=skyInfo.bbox,
341 makeDirect=self.config.makeDirect,
342 makePsfMatched=self.config.makePsfMatched)
343 except Exception
as e:
344 self.log.warning(
"WarpAndPsfMatch failed for calexp %s; skipping it: %s", dataId, e)
347 numGoodPix = {warpType: 0
for warpType
in warpTypeList}
348 for warpType
in warpTypeList:
349 exposure = warpedAndMatched.getDict()[warpType]
352 warp = warps[warpType]
353 if didSetMetadata[warpType]:
354 mimg = exposure.getMaskedImage()
355 mimg *= (warp.getPhotoCalib().getInstFluxAtZeroMagnitude()
356 / exposure.getPhotoCalib().getInstFluxAtZeroMagnitude())
359 warp.getMaskedImage(), exposure.getMaskedImage(), self.getBadPixelMask())
360 totGoodPix[warpType] += numGoodPix[warpType]
361 self.log.debug(
"Calexp %s has %d good pixels in this patch (%.1f%%) for %s",
362 dataId, numGoodPix[warpType],
363 100.0*numGoodPix[warpType]/skyInfo.bbox.getArea(), warpType)
364 if numGoodPix[warpType] > 0
and not didSetMetadata[warpType]:
365 warp.info.id = exposure.info.id
366 warp.setPhotoCalib(exposure.getPhotoCalib())
367 warp.setFilter(exposure.getFilter())
368 warp.getInfo().setVisitInfo(exposure.getInfo().getVisitInfo())
371 warp.setPsf(exposure.getPsf())
372 didSetMetadata[warpType] =
True
376 inputRecorder[warpType].addCalExp(calExp, ccdId, numGoodPix[warpType])
378 except Exception
as e:
379 self.log.warning(
"Error processing calexp %s; skipping it: %s", dataId, e)
382 for warpType
in warpTypeList:
383 self.log.info(
"%sWarp has %d good pixels (%.1f%%)",
384 warpType, totGoodPix[warpType], 100.0*totGoodPix[warpType]/skyInfo.bbox.getArea())
386 if totGoodPix[warpType] > 0
and didSetMetadata[warpType]:
387 inputRecorder[warpType].finish(warps[warpType], totGoodPix[warpType])
388 if warpType ==
"direct":
389 warps[warpType].setPsf(
390 CoaddPsf(inputRecorder[warpType].coaddInputs.ccds, skyInfo.wcs,
391 self.config.coaddPsf.makeControl()))
394 inputRecorder[warpType].coaddInputs,
395 -self.config.warpAndPsfMatch.psfMatch.kernel.active.kernelSize // 2,
398 if not self.config.doWriteEmptyWarps:
400 warps[warpType] =
None
404 result = pipeBase.Struct(exposures=warps)
407 def filterInputs(self, indices, inputs):
408 """Filter task inputs by their indices.
412 indices : `list` [`int`]
413 inputs : `dict` [`list`]
414 A dictionary of input connections to be passed to run.
418 inputs : `dict` [`list`]
419 Task inputs with their lists filtered by indices.
421 for key
in inputs.keys():
423 if isinstance(inputs[key], list):
424 inputs[key] = [inputs[key][ind]
for ind
in indices]
427 def _prepareCalibratedExposures(self, *, visitSummary, calExpList=[], wcsList=None,
428 backgroundList=None, skyCorrList=None, **kwargs):
429 """Calibrate and add backgrounds to input calExpList in place.
433 visitSummary : `lsst.afw.table.ExposureCatalog`
434 Exposure catalog with potentially all calibrations. Attributes set
435 to `None` are ignored.
436 calExpList : `list` [`lsst.afw.image.Exposure` or
437 `lsst.daf.butler.DeferredDatasetHandle`]
438 Sequence of calexps to be modified in place.
439 wcsList : `list` [`lsst.afw.geom.SkyWcs`]
440 The WCSs of the calexps in ``calExpList``. These will be used to
441 determine if the calexp should be used in the warp. The list is
442 dynamically updated with the WCSs from the visitSummary.
443 backgroundList : `list` [`lsst.afw.math.backgroundList`], optional
444 Sequence of backgrounds to be added back in if bgSubtracted=False.
445 skyCorrList : `list` [`lsst.afw.math.backgroundList`], optional
446 Sequence of background corrections to be subtracted if
449 Additional keyword arguments.
453 indices : `list` [`int`]
454 Indices of ``calExpList`` and friends that have valid
457 wcsList = len(calExpList)*[
None]
if wcsList
is None else wcsList
458 backgroundList = len(calExpList)*[
None]
if backgroundList
is None else backgroundList
459 skyCorrList = len(calExpList)*[
None]
if skyCorrList
is None else skyCorrList
461 includeCalibVar = self.config.includeCalibVar
464 for index, (calexp, background, skyCorr)
in enumerate(zip(calExpList,
467 if isinstance(calexp, DeferredDatasetHandle):
468 calexp = calexp.get()
470 if not self.config.bgSubtracted:
471 calexp.maskedImage += background.getImage()
473 detectorId = calexp.info.getDetector().getId()
476 row = visitSummary.find(detectorId)
479 f
"Unexpectedly incomplete visitSummary: detector={detectorId} is missing."
481 if (photoCalib := row.getPhotoCalib())
is not None:
482 calexp.setPhotoCalib(photoCalib)
485 "Detector id %d for visit %d has None for photoCalib in the visitSummary and will "
486 "not be used in the warp", detectorId, row[
"visit"],
489 if (skyWcs := row.getWcs())
is not None:
490 calexp.setWcs(skyWcs)
491 wcsList[index] = skyWcs
494 "Detector id %d for visit %d has None for wcs in the visitSummary and will "
495 "not be used in the warp", detectorId, row[
"visit"],
498 if self.config.useVisitSummaryPsf:
499 if (psf := row.getPsf())
is not None:
503 "Detector id %d for visit %d has None for psf in the visitSummary and will "
504 "not be used in the warp", detectorId, row[
"visit"],
507 if (apCorrMap := row.getApCorrMap())
is not None:
508 calexp.info.setApCorrMap(apCorrMap)
511 "Detector id %d for visit %d has None for apCorrMap in the visitSummary and will "
512 "not be used in the warp", detectorId, row[
"visit"],
516 if calexp.getPsf()
is None:
518 "Detector id %d for visit %d has None for psf for the calexp and will "
519 "not be used in the warp", detectorId, row[
"visit"],
522 if calexp.info.getApCorrMap()
is None:
524 "Detector id %d for visit %d has None for apCorrMap in the calexp and will "
525 "not be used in the warp", detectorId, row[
"visit"],
530 calexp.maskedImage = photoCalib.calibrateImage(calexp.maskedImage,
531 includeScaleUncertainty=includeCalibVar)
532 calexp.maskedImage /= photoCalib.getCalibrationMean()
538 if self.config.doApplySkyCorr:
539 calexp.maskedImage -= skyCorr.getImage()
541 indices.append(index)
542 calExpList[index] = calexp
547 def _prepareEmptyExposure(skyInfo):
548 """Produce an empty exposure for a given patch.
552 skyInfo : `lsst.pipe.base.Struct`
553 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
554 geometric information about the patch.
558 exp : `lsst.afw.image.exposure.ExposureF`
559 An empty exposure for a given patch.
561 exp = afwImage.ExposureF(skyInfo.bbox, skyInfo.wcs)
563 .getPlaneBitMask(
"NO_DATA"), numpy.inf)
566 def getWarpTypeList(self):
567 """Return list of requested warp types per the config.
570 if self.config.makeDirect:
571 warpTypeList.append(
"direct")
572 if self.config.makePsfMatched:
573 warpTypeList.append(
"psfMatched")
577def reorderRefs(inputRefs, outputSortKeyOrder, dataIdKey):
578 """Reorder inputRefs per outputSortKeyOrder.
580 Any inputRefs which are lists will be resorted per specified key e.g.,
581 'detector.' Only iterables will be reordered, and values can be of type
582 `lsst.pipe.base.connections.DeferredDatasetRef` or
583 `lsst.daf.butler.core.datasets.ref.DatasetRef`.
585 Returned lists of refs have the same length as the outputSortKeyOrder.
586 If an outputSortKey not in the inputRef, then it will be padded with None.
587 If an inputRef contains an inputSortKey that is not in the
588 outputSortKeyOrder it will be removed.
592 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
593 Input references to be reordered and padded.
594 outputSortKeyOrder : `iterable`
595 Iterable of values to be compared with inputRef's dataId[dataIdKey].
597 The data ID key in the dataRefs to compare with the outputSortKeyOrder.
601 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
602 Quantized Connection with sorted DatasetRef values sorted if iterable.
604 for connectionName, refs
in inputRefs:
605 if isinstance(refs, Iterable):
606 if hasattr(refs[0],
"dataId"):
607 inputSortKeyOrder = [ref.dataId[dataIdKey]
for ref
in refs]
609 inputSortKeyOrder = [ref.datasetRef.dataId[dataIdKey]
for ref
in refs]
610 if inputSortKeyOrder != outputSortKeyOrder:
611 setattr(inputRefs, connectionName,
612 reorderAndPadList(refs, inputSortKeyOrder, outputSortKeyOrder))
Represent a 2-dimensional array of bitmask pixels.
A floating-point coordinate rectangle geometry.
CoaddPsf is the Psf derived to be used for non-PSF-matched Coadd images.
int copyGoodPixels(lsst::afw::image::Image< ImagePixelT > &destImage, lsst::afw::image::Image< ImagePixelT > const &srcImage)
copy good pixels from one image to another