22__all__ = [
"MakeWarpTask",
"MakeWarpConfig"]
31import lsst.pipe.base.connectionTypes
as connectionTypes
34from lsst.daf.butler
import DeferredDatasetHandle
38from lsst.utils.timer
import timeMethod
39from .coaddBase
import CoaddBaseTask, growValidPolygons, makeSkyInfo, reorderAndPadList
40from .warpAndPsfMatch
import WarpAndPsfMatchTask
43log = logging.getLogger(__name__)
47 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
48 defaultTemplates={
"coaddName":
"deep",
50 calExpList = connectionTypes.Input(
51 doc=
"Input exposures to be resampled and optionally PSF-matched onto a SkyMap projection/patch",
52 name=
"{calexpType}calexp",
53 storageClass=
"ExposureF",
54 dimensions=(
"instrument",
"visit",
"detector"),
58 backgroundList = connectionTypes.Input(
59 doc=
"Input backgrounds to be added back into the calexp if bgSubtracted=False",
60 name=
"calexpBackground",
61 storageClass=
"Background",
62 dimensions=(
"instrument",
"visit",
"detector"),
65 skyCorrList = connectionTypes.Input(
66 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
68 storageClass=
"Background",
69 dimensions=(
"instrument",
"visit",
"detector"),
72 skyMap = connectionTypes.Input(
73 doc=
"Input definition of geometry/bbox and projection/wcs for warped exposures",
74 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
75 storageClass=
"SkyMap",
76 dimensions=(
"skymap",),
78 direct = connectionTypes.Output(
79 doc=(
"Output direct warped exposure (previously called CoaddTempExp), produced by resampling "
80 "calexps onto the skyMap patch geometry."),
81 name=
"{coaddName}Coadd_directWarp",
82 storageClass=
"ExposureF",
83 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
85 psfMatched = connectionTypes.Output(
86 doc=(
"Output PSF-Matched warped exposure (previously called CoaddTempExp), produced by resampling "
87 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF."),
88 name=
"{coaddName}Coadd_psfMatchedWarp",
89 storageClass=
"ExposureF",
90 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
92 visitSummary = connectionTypes.Input(
93 doc=
"Input visit-summary catalog with updated calibration objects.",
94 name=
"finalVisitSummary",
95 storageClass=
"ExposureCatalog",
96 dimensions=(
"instrument",
"visit",),
99 def __init__(self, *, config=None):
100 if config.bgSubtracted:
101 del self.backgroundList
102 if not config.doApplySkyCorr:
104 if not config.makeDirect:
106 if not config.makePsfMatched:
110class MakeWarpConfig(pipeBase.PipelineTaskConfig, CoaddBaseTask.ConfigClass,
111 pipelineConnections=MakeWarpConnections):
112 """Config for MakeWarpTask."""
114 warpAndPsfMatch = pexConfig.ConfigurableField(
115 target=WarpAndPsfMatchTask,
116 doc=
"Task to warp and PSF-match calexp",
118 doWrite = pexConfig.Field(
119 doc=
"persist <coaddName>Coadd_<warpType>Warp",
123 bgSubtracted = pexConfig.Field(
124 doc=
"Work with a background subtracted calexp?",
128 coaddPsf = pexConfig.ConfigField(
129 doc=
"Configuration for CoaddPsf",
130 dtype=CoaddPsfConfig,
132 makeDirect = pexConfig.Field(
133 doc=
"Make direct Warp/Coadds",
137 makePsfMatched = pexConfig.Field(
138 doc=
"Make Psf-Matched Warp/Coadd?",
142 modelPsf = GaussianPsfFactory.makeField(doc=
"Model Psf factory")
143 useVisitSummaryPsf = pexConfig.Field(
145 "If True, use the PSF model and aperture corrections from the 'visitSummary' connection. "
146 "If False, use the PSF model and aperture corrections from the 'exposure' connection. "
151 doWriteEmptyWarps = pexConfig.Field(
154 doc=
"Write out warps even if they are empty"
156 hasFakes = pexConfig.Field(
157 doc=
"Should be set to True if fake sources have been inserted into the input data.",
161 doApplySkyCorr = pexConfig.Field(
164 doc=
"Apply sky correction?",
166 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
169 CoaddBaseTask.ConfigClass.validate(self)
171 if not self.makePsfMatched
and not self.makeDirect:
172 raise ValueError(
"At least one of config.makePsfMatched and config.makeDirect must be True")
173 if self.warpAndPsfMatch.warp.cacheSize != self.coaddPsf.cacheSize:
178 raise ValueError(
"Image warping cache size and CoaddPSf warping cache size do not agree.")
180 def setDefaults(self):
181 CoaddBaseTask.ConfigClass.setDefaults(self)
182 self.warpAndPsfMatch.warp.cacheSize = 0
183 self.coaddPsf.cacheSize = 0
187 """Warp and optionally PSF-Match calexps onto an a common projection.
189 Warp and optionally PSF-Match calexps onto a common projection, by
190 performing the following operations:
191 - Group calexps by visit/run
192 - For each visit, generate a Warp by calling method @ref run.
193 `run` loops over the visit's calexps calling
194 `~lsst.pipe.tasks.warpAndPsfMatch.WarpAndPsfMatchTask` on each visit
197 ConfigClass = MakeWarpConfig
198 _DefaultName =
"makeWarp"
200 def __init__(self, **kwargs):
201 CoaddBaseTask.__init__(self, **kwargs)
202 self.makeSubtask(
"warpAndPsfMatch")
203 if self.config.hasFakes:
204 self.calexpType =
"fakes_calexp"
206 self.calexpType =
"calexp"
208 @utils.inheritDoc(pipeBase.PipelineTask)
209 def runQuantum(self, butlerQC, inputRefs, outputRefs):
213 Obtain the list of input detectors from calExpList. Sort them by
214 detector order (to ensure reproducibility). Then ensure all input
215 lists are in the same sorted detector order.
217 detectorOrder = [handle.datasetRef.dataId[
'detector']
for handle
in inputRefs.calExpList]
219 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
222 inputs = butlerQC.get(inputRefs)
226 skyMap = inputs.pop(
"skyMap")
227 quantumDataId = butlerQC.quantum.dataId
228 skyInfo = makeSkyInfo(skyMap, tractId=quantumDataId[
'tract'], patchId=quantumDataId[
'patch'])
231 dataIdList = [ref.datasetRef.dataId
for ref
in inputRefs.calExpList]
234 self.config.idGenerator.apply(dataId).catalog_id
235 for dataId
in dataIdList
239 visitSummary = inputs[
"visitSummary"]
242 for dataId
in dataIdList:
243 row = visitSummary.find(dataId[
"detector"])
246 f
"Unexpectedly incomplete visitSummary provided to makeWarp: {dataId} is missing."
248 bboxList.append(row.getBBox())
249 wcsList.append(row.getWcs())
250 inputs[
"bboxList"] = bboxList
251 inputs[
"wcsList"] = wcsList
255 completeIndices = self._prepareCalibratedExposures(**inputs)
256 inputs = self.filterInputs(indices=completeIndices, inputs=inputs)
262 coordList = [skyInfo.wcs.pixelToSky(pos)
for pos
in cornerPosList]
263 goodIndices = self.select.run(**inputs, coordList=coordList, dataIds=dataIdList)
264 inputs = self.filterInputs(indices=goodIndices, inputs=inputs)
267 visitId = dataIdList[0][
"visit"]
269 results = self.run(**inputs,
271 ccdIdList=[ccdIdList[i]
for i
in goodIndices],
272 dataIdList=[dataIdList[i]
for i
in goodIndices],
274 if self.config.makeDirect
and results.exposures[
"direct"]
is not None:
275 butlerQC.put(results.exposures[
"direct"], outputRefs.direct)
276 if self.config.makePsfMatched
and results.exposures[
"psfMatched"]
is not None:
277 butlerQC.put(results.exposures[
"psfMatched"], outputRefs.psfMatched)
280 def run(self, calExpList, ccdIdList, skyInfo, visitId=0, dataIdList=None, **kwargs):
281 """Create a Warp from inputs.
283 We iterate over the multiple calexps in a single exposure to construct
284 the warp (previously called a coaddTempExp) of that exposure to the
285 supplied tract/patch.
287 Pixels that receive no pixels are set to NAN; this is not correct
288 (violates LSST algorithms group policy), but will be fixed up by
289 interpolating after the coaddition.
291 calExpList : `list` [ `lsst.afw.image.Exposure` ]
292 List of single-detector input images that (may) overlap the patch
294 skyInfo : `lsst.pipe.base.Struct`
295 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
296 geometric information about the patch.
298 Integer identifier for visit, for the table that will
299 produce the CoaddPsf.
303 result : `lsst.pipe.base.Struct`
304 Results as a struct with attributes:
307 A dictionary containing the warps requested:
308 "direct": direct warp if ``config.makeDirect``
309 "psfMatched": PSF-matched warp if ``config.makePsfMatched``
312 warpTypeList = self.getWarpTypeList()
314 totGoodPix = {warpType: 0
for warpType
in warpTypeList}
315 didSetMetadata = {warpType:
False for warpType
in warpTypeList}
316 warps = {warpType: self._prepareEmptyExposure(skyInfo)
for warpType
in warpTypeList}
317 inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calExpList))
318 for warpType
in warpTypeList}
320 modelPsf = self.config.modelPsf.apply()
if self.config.makePsfMatched
else None
321 if dataIdList
is None:
322 dataIdList = ccdIdList
324 for calExpInd, (calExp, ccdId, dataId)
in enumerate(zip(calExpList, ccdIdList, dataIdList)):
325 self.log.info(
"Processing calexp %d of %d for this Warp: id=%s",
326 calExpInd+1, len(calExpList), dataId)
328 warpedAndMatched = self.warpAndPsfMatch.run(calExp, modelPsf=modelPsf,
329 wcs=skyInfo.wcs, maxBBox=skyInfo.bbox,
330 makeDirect=self.config.makeDirect,
331 makePsfMatched=self.config.makePsfMatched)
332 except Exception
as e:
333 self.log.warning(
"WarpAndPsfMatch failed for calexp %s; skipping it: %s", dataId, e)
336 numGoodPix = {warpType: 0
for warpType
in warpTypeList}
337 for warpType
in warpTypeList:
338 exposure = warpedAndMatched.getDict()[warpType]
341 warp = warps[warpType]
342 if didSetMetadata[warpType]:
343 mimg = exposure.getMaskedImage()
344 mimg *= (warp.getPhotoCalib().getInstFluxAtZeroMagnitude()
345 / exposure.getPhotoCalib().getInstFluxAtZeroMagnitude())
348 warp.getMaskedImage(), exposure.getMaskedImage(), self.getBadPixelMask())
349 totGoodPix[warpType] += numGoodPix[warpType]
350 self.log.debug(
"Calexp %s has %d good pixels in this patch (%.1f%%) for %s",
351 dataId, numGoodPix[warpType],
352 100.0*numGoodPix[warpType]/skyInfo.bbox.getArea(), warpType)
353 if numGoodPix[warpType] > 0
and not didSetMetadata[warpType]:
354 warp.info.id = exposure.info.id
355 warp.setPhotoCalib(exposure.getPhotoCalib())
356 warp.setFilter(exposure.getFilter())
357 warp.getInfo().setVisitInfo(exposure.getInfo().getVisitInfo())
360 warp.setPsf(exposure.getPsf())
361 didSetMetadata[warpType] =
True
365 inputRecorder[warpType].addCalExp(calExp, ccdId, numGoodPix[warpType])
367 except Exception
as e:
368 self.log.warning(
"Error processing calexp %s; skipping it: %s", dataId, e)
371 for warpType
in warpTypeList:
372 self.log.info(
"%sWarp has %d good pixels (%.1f%%)",
373 warpType, totGoodPix[warpType], 100.0*totGoodPix[warpType]/skyInfo.bbox.getArea())
375 if totGoodPix[warpType] > 0
and didSetMetadata[warpType]:
376 inputRecorder[warpType].finish(warps[warpType], totGoodPix[warpType])
377 if warpType ==
"direct":
378 warps[warpType].setPsf(
379 CoaddPsf(inputRecorder[warpType].coaddInputs.ccds, skyInfo.wcs,
380 self.config.coaddPsf.makeControl()))
383 inputRecorder[warpType].coaddInputs,
384 -self.config.warpAndPsfMatch.psfMatch.kernel.active.kernelSize // 2,
387 if not self.config.doWriteEmptyWarps:
389 warps[warpType] =
None
393 result = pipeBase.Struct(exposures=warps)
396 def filterInputs(self, indices, inputs):
397 """Filter task inputs by their indices.
401 indices : `list` [`int`]
402 inputs : `dict` [`list`]
403 A dictionary of input connections to be passed to run.
407 inputs : `dict` [`list`]
408 Task inputs with their lists filtered by indices.
410 for key
in inputs.keys():
412 if isinstance(inputs[key], list):
413 inputs[key] = [inputs[key][ind]
for ind
in indices]
416 def _prepareCalibratedExposures(self, *, visitSummary, calExpList=[], wcsList=None,
417 backgroundList=None, skyCorrList=None, **kwargs):
418 """Calibrate and add backgrounds to input calExpList in place.
422 visitSummary : `lsst.afw.table.ExposureCatalog`
423 Exposure catalog with potentially all calibrations. Attributes set
424 to `None` are ignored.
425 calExpList : `list` [`lsst.afw.image.Exposure` or
426 `lsst.daf.butler.DeferredDatasetHandle`]
427 Sequence of single-epoch images (or deferred load handles for
428 images) to be modified in place. On return this always has images,
430 wcsList : `list` [`lsst.afw.geom.SkyWcs`]
431 The WCSs of the calexps in ``calExpList``. These will be used to
432 determine if the calexp should be used in the warp. The list is
433 dynamically updated with the WCSs from the visitSummary.
434 backgroundList : `list` [`lsst.afw.math.BackgroundList`], optional
435 Sequence of backgrounds to be added back in if bgSubtracted=False.
436 skyCorrList : `list` [`lsst.afw.math.BackgroundList`], optional
437 Sequence of background corrections to be subtracted if
440 Additional keyword arguments.
444 indices : `list` [`int`]
445 Indices of ``calExpList`` and friends that have valid
448 wcsList = len(calExpList)*[
None]
if wcsList
is None else wcsList
449 backgroundList = len(calExpList)*[
None]
if backgroundList
is None else backgroundList
450 skyCorrList = len(calExpList)*[
None]
if skyCorrList
is None else skyCorrList
452 includeCalibVar = self.config.includeCalibVar
455 for index, (calexp, background, skyCorr)
in enumerate(zip(calExpList,
458 if isinstance(calexp, DeferredDatasetHandle):
459 calexp = calexp.get()
461 if not self.config.bgSubtracted:
462 calexp.maskedImage += background.getImage()
464 detectorId = calexp.info.getDetector().getId()
467 row = visitSummary.find(detectorId)
470 f
"Unexpectedly incomplete visitSummary: detector={detectorId} is missing."
472 if (photoCalib := row.getPhotoCalib())
is not None:
473 calexp.setPhotoCalib(photoCalib)
476 "Detector id %d for visit %d has None for photoCalib in the visitSummary and will "
477 "not be used in the warp", detectorId, row[
"visit"],
480 if (skyWcs := row.getWcs())
is not None:
481 calexp.setWcs(skyWcs)
482 wcsList[index] = skyWcs
485 "Detector id %d for visit %d has None for wcs in the visitSummary and will "
486 "not be used in the warp", detectorId, row[
"visit"],
489 if self.config.useVisitSummaryPsf:
490 if (psf := row.getPsf())
is not None:
494 "Detector id %d for visit %d has None for psf in the visitSummary and will "
495 "not be used in the warp", detectorId, row[
"visit"],
498 if (apCorrMap := row.getApCorrMap())
is not None:
499 calexp.info.setApCorrMap(apCorrMap)
502 "Detector id %d for visit %d has None for apCorrMap in the visitSummary and will "
503 "not be used in the warp", detectorId, row[
"visit"],
507 if calexp.getPsf()
is None:
509 "Detector id %d for visit %d has None for psf for the calexp and will "
510 "not be used in the warp", detectorId, row[
"visit"],
513 if calexp.info.getApCorrMap()
is None:
515 "Detector id %d for visit %d has None for apCorrMap in the calexp and will "
516 "not be used in the warp", detectorId, row[
"visit"],
521 if self.config.doApplySkyCorr:
522 calexp.maskedImage -= skyCorr.getImage()
525 calexp.maskedImage = photoCalib.calibrateImage(calexp.maskedImage,
526 includeScaleUncertainty=includeCalibVar)
527 calexp.maskedImage /= photoCalib.getCalibrationMean()
532 indices.append(index)
533 calExpList[index] = calexp
538 def _prepareEmptyExposure(skyInfo):
539 """Produce an empty exposure for a given patch.
543 skyInfo : `lsst.pipe.base.Struct`
544 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
545 geometric information about the patch.
549 exp : `lsst.afw.image.exposure.ExposureF`
550 An empty exposure for a given patch.
552 exp = afwImage.ExposureF(skyInfo.bbox, skyInfo.wcs)
554 .getPlaneBitMask(
"NO_DATA"), numpy.inf)
557 def getWarpTypeList(self):
558 """Return list of requested warp types per the config.
561 if self.config.makeDirect:
562 warpTypeList.append(
"direct")
563 if self.config.makePsfMatched:
564 warpTypeList.append(
"psfMatched")
568def reorderRefs(inputRefs, outputSortKeyOrder, dataIdKey):
569 """Reorder inputRefs per outputSortKeyOrder.
571 Any inputRefs which are lists will be resorted per specified key e.g.,
572 'detector.' Only iterables will be reordered, and values can be of type
573 `lsst.pipe.base.connections.DeferredDatasetRef` or
574 `lsst.daf.butler.core.datasets.ref.DatasetRef`.
576 Returned lists of refs have the same length as the outputSortKeyOrder.
577 If an outputSortKey not in the inputRef, then it will be padded with None.
578 If an inputRef contains an inputSortKey that is not in the
579 outputSortKeyOrder it will be removed.
583 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
584 Input references to be reordered and padded.
585 outputSortKeyOrder : `iterable`
586 Iterable of values to be compared with inputRef's dataId[dataIdKey].
588 The data ID key in the dataRefs to compare with the outputSortKeyOrder.
592 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
593 Quantized Connection with sorted DatasetRef values sorted if iterable.
595 for connectionName, refs
in inputRefs:
596 if isinstance(refs, Iterable):
597 if hasattr(refs[0],
"dataId"):
598 inputSortKeyOrder = [ref.dataId[dataIdKey]
for ref
in refs]
600 inputSortKeyOrder = [handle.datasetRef.dataId[dataIdKey]
for handle
in refs]
601 if inputSortKeyOrder != outputSortKeyOrder:
602 setattr(inputRefs, connectionName,
603 reorderAndPadList(refs, inputSortKeyOrder, outputSortKeyOrder))
Represent a 2-dimensional array of bitmask pixels.
A floating-point coordinate rectangle geometry.
CoaddPsf is the Psf derived to be used for non-PSF-matched Coadd images.
int copyGoodPixels(lsst::afw::image::Image< ImagePixelT > &destImage, lsst::afw::image::Image< ImagePixelT > const &srcImage)
copy good pixels from one image to another