22__all__ = [
"MakeWarpTask",
"MakeWarpConfig"]
31import lsst.pipe.base.connectionTypes
as connectionTypes
34from lsst.daf.butler
import DeferredDatasetHandle
38from lsst.utils.timer
import timeMethod
39from .coaddBase
import CoaddBaseTask, makeSkyInfo, reorderAndPadList
40from .warpAndPsfMatch
import WarpAndPsfMatchTask
43log = logging.getLogger(__name__)
47 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
48 defaultTemplates={
"coaddName":
"deep",
50 calExpList = connectionTypes.Input(
51 doc=
"Input exposures to be resampled and optionally PSF-matched onto a SkyMap projection/patch",
52 name=
"{calexpType}calexp",
53 storageClass=
"ExposureF",
54 dimensions=(
"instrument",
"visit",
"detector"),
58 backgroundList = connectionTypes.Input(
59 doc=
"Input backgrounds to be added back into the calexp if bgSubtracted=False",
60 name=
"calexpBackground",
61 storageClass=
"Background",
62 dimensions=(
"instrument",
"visit",
"detector"),
65 skyCorrList = connectionTypes.Input(
66 doc=
"Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True",
68 storageClass=
"Background",
69 dimensions=(
"instrument",
"visit",
"detector"),
72 skyMap = connectionTypes.Input(
73 doc=
"Input definition of geometry/bbox and projection/wcs for warped exposures",
74 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
75 storageClass=
"SkyMap",
76 dimensions=(
"skymap",),
78 direct = connectionTypes.Output(
79 doc=(
"Output direct warped exposure (previously called CoaddTempExp), produced by resampling ",
80 "calexps onto the skyMap patch geometry."),
81 name=
"{coaddName}Coadd_directWarp",
82 storageClass=
"ExposureF",
83 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
85 psfMatched = connectionTypes.Output(
86 doc=(
"Output PSF-Matched warped exposure (previously called CoaddTempExp), produced by resampling ",
87 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF."),
88 name=
"{coaddName}Coadd_psfMatchedWarp",
89 storageClass=
"ExposureF",
90 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
92 visitSummary = connectionTypes.Input(
93 doc=
"Input visit-summary catalog with updated calibration objects.",
94 name=
"finalVisitSummary",
95 storageClass=
"ExposureCatalog",
96 dimensions=(
"instrument",
"visit",),
99 def __init__(self, *, config=None):
100 if config.bgSubtracted:
101 del self.backgroundList
102 if not config.doApplySkyCorr:
104 if not config.makeDirect:
106 if not config.makePsfMatched:
110class MakeWarpConfig(pipeBase.PipelineTaskConfig, CoaddBaseTask.ConfigClass,
111 pipelineConnections=MakeWarpConnections):
112 """Config for MakeWarpTask."""
114 warpAndPsfMatch = pexConfig.ConfigurableField(
115 target=WarpAndPsfMatchTask,
116 doc=
"Task to warp and PSF-match calexp",
118 doWrite = pexConfig.Field(
119 doc=
"persist <coaddName>Coadd_<warpType>Warp",
123 bgSubtracted = pexConfig.Field(
124 doc=
"Work with a background subtracted calexp?",
128 coaddPsf = pexConfig.ConfigField(
129 doc=
"Configuration for CoaddPsf",
130 dtype=CoaddPsfConfig,
132 makeDirect = pexConfig.Field(
133 doc=
"Make direct Warp/Coadds",
137 makePsfMatched = pexConfig.Field(
138 doc=
"Make Psf-Matched Warp/Coadd?",
142 modelPsf = GaussianPsfFactory.makeField(doc=
"Model Psf factory")
143 useVisitSummaryPsf = pexConfig.Field(
145 "If True, use the PSF model and aperture corrections from the 'visitSummary' connection. "
146 "If False, use the PSF model and aperture corrections from the 'exposure' connection. "
151 doWriteEmptyWarps = pexConfig.Field(
154 doc=
"Write out warps even if they are empty"
156 hasFakes = pexConfig.Field(
157 doc=
"Should be set to True if fake sources have been inserted into the input data.",
161 doApplySkyCorr = pexConfig.Field(
164 doc=
"Apply sky correction?",
166 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
169 CoaddBaseTask.ConfigClass.validate(self)
171 if not self.makePsfMatched
and not self.makeDirect:
172 raise ValueError(
"At least one of config.makePsfMatched and config.makeDirect must be True")
173 if self.warpAndPsfMatch.warp.cacheSize != self.coaddPsf.cacheSize:
178 raise ValueError(
"Image warping cache size and CoaddPSf warping cache size do not agree.")
180 def setDefaults(self):
181 CoaddBaseTask.ConfigClass.setDefaults(self)
182 self.warpAndPsfMatch.warp.cacheSize = 0
183 self.coaddPsf.cacheSize = 0
184 self.warpAndPsfMatch.psfMatch.kernel.active.kernelSize = self.matchingKernelSize
188 """Warp and optionally PSF-Match calexps onto an a common projection.
190 Warp and optionally PSF-Match calexps onto a common projection, by
191 performing the following operations:
192 - Group calexps by visit/run
193 - For each visit, generate a Warp by calling method @ref run.
194 `run` loops over the visit's calexps calling
195 `~lsst.pipe.tasks.warpAndPsfMatch.WarpAndPsfMatchTask` on each visit
198 ConfigClass = MakeWarpConfig
199 _DefaultName =
"makeWarp"
201 def __init__(self, **kwargs):
202 CoaddBaseTask.__init__(self, **kwargs)
203 self.makeSubtask(
"warpAndPsfMatch")
204 if self.config.hasFakes:
205 self.calexpType =
"fakes_calexp"
207 self.calexpType =
"calexp"
209 @utils.inheritDoc(pipeBase.PipelineTask)
210 def runQuantum(self, butlerQC, inputRefs, outputRefs):
214 Obtain the list of input detectors from calExpList. Sort them by
215 detector order (to ensure reproducibility). Then ensure all input
216 lists are in the same sorted detector order.
218 detectorOrder = [ref.datasetRef.dataId[
'detector']
for ref
in inputRefs.calExpList]
220 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
223 inputs = butlerQC.get(inputRefs)
227 skyMap = inputs.pop(
"skyMap")
228 quantumDataId = butlerQC.quantum.dataId
229 skyInfo = makeSkyInfo(skyMap, tractId=quantumDataId[
'tract'], patchId=quantumDataId[
'patch'])
232 dataIdList = [ref.datasetRef.dataId
for ref
in inputRefs.calExpList]
235 self.config.idGenerator.apply(dataId).catalog_id
236 for dataId
in dataIdList
240 visitSummary = inputs[
"visitSummary"]
243 for dataId
in dataIdList:
244 row = visitSummary.find(dataId[
"detector"])
247 f
"Unexpectedly incomplete visitSummary provided to makeWarp: {dataId} is missing."
249 bboxList.append(row.getBBox())
250 wcsList.append(row.getWcs())
251 inputs[
"bboxList"] = bboxList
252 inputs[
"wcsList"] = wcsList
256 completeIndices = self._prepareCalibratedExposures(**inputs)
257 inputs = self.filterInputs(indices=completeIndices, inputs=inputs)
263 coordList = [skyInfo.wcs.pixelToSky(pos)
for pos
in cornerPosList]
264 goodIndices = self.select.run(**inputs, coordList=coordList, dataIds=dataIdList)
265 inputs = self.filterInputs(indices=goodIndices, inputs=inputs)
268 visitId = dataIdList[0][
"visit"]
270 results = self.run(**inputs,
272 ccdIdList=[ccdIdList[i]
for i
in goodIndices],
273 dataIdList=[dataIdList[i]
for i
in goodIndices],
275 if self.config.makeDirect
and results.exposures[
"direct"]
is not None:
276 butlerQC.put(results.exposures[
"direct"], outputRefs.direct)
277 if self.config.makePsfMatched
and results.exposures[
"psfMatched"]
is not None:
278 butlerQC.put(results.exposures[
"psfMatched"], outputRefs.psfMatched)
281 def run(self, calExpList, ccdIdList, skyInfo, visitId=0, dataIdList=None, **kwargs):
282 """Create a Warp from inputs.
284 We iterate over the multiple calexps in a single exposure to construct
285 the warp (previously called a coaddTempExp) of that exposure to the
286 supplied tract/patch.
288 Pixels that receive no pixels are set to NAN; this is not correct
289 (violates LSST algorithms group policy), but will be fixed up by
290 interpolating after the coaddition.
292 calexpRefList : `list`
293 List of data references for calexps that (may)
294 overlap the patch of interest.
295 skyInfo : `lsst.pipe.base.Struct`
296 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
297 geometric information about the patch.
299 Integer identifier for visit, for the table that will
300 produce the CoaddPsf.
304 result : `lsst.pipe.base.Struct`
305 Results as a struct with attributes:
308 A dictionary containing the warps requested:
309 "direct": direct warp if ``config.makeDirect``
310 "psfMatched": PSF-matched warp if ``config.makePsfMatched``
313 warpTypeList = self.getWarpTypeList()
315 totGoodPix = {warpType: 0
for warpType
in warpTypeList}
316 didSetMetadata = {warpType:
False for warpType
in warpTypeList}
317 warps = {warpType: self._prepareEmptyExposure(skyInfo)
for warpType
in warpTypeList}
318 inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calExpList))
319 for warpType
in warpTypeList}
321 modelPsf = self.config.modelPsf.apply()
if self.config.makePsfMatched
else None
322 if dataIdList
is None:
323 dataIdList = ccdIdList
325 for calExpInd, (calExp, ccdId, dataId)
in enumerate(zip(calExpList, ccdIdList, dataIdList)):
326 self.log.info(
"Processing calexp %d of %d for this Warp: id=%s",
327 calExpInd+1, len(calExpList), dataId)
329 warpedAndMatched = self.warpAndPsfMatch.run(calExp, modelPsf=modelPsf,
330 wcs=skyInfo.wcs, maxBBox=skyInfo.bbox,
331 makeDirect=self.config.makeDirect,
332 makePsfMatched=self.config.makePsfMatched)
333 except Exception
as e:
334 self.log.warning(
"WarpAndPsfMatch failed for calexp %s; skipping it: %s", dataId, e)
337 numGoodPix = {warpType: 0
for warpType
in warpTypeList}
338 for warpType
in warpTypeList:
339 exposure = warpedAndMatched.getDict()[warpType]
342 warp = warps[warpType]
343 if didSetMetadata[warpType]:
344 mimg = exposure.getMaskedImage()
345 mimg *= (warp.getPhotoCalib().getInstFluxAtZeroMagnitude()
346 / exposure.getPhotoCalib().getInstFluxAtZeroMagnitude())
349 warp.getMaskedImage(), exposure.getMaskedImage(), self.getBadPixelMask())
350 totGoodPix[warpType] += numGoodPix[warpType]
351 self.log.debug(
"Calexp %s has %d good pixels in this patch (%.1f%%) for %s",
352 dataId, numGoodPix[warpType],
353 100.0*numGoodPix[warpType]/skyInfo.bbox.getArea(), warpType)
354 if numGoodPix[warpType] > 0
and not didSetMetadata[warpType]:
355 warp.info.id = exposure.info.id
356 warp.setPhotoCalib(exposure.getPhotoCalib())
357 warp.setFilter(exposure.getFilter())
358 warp.getInfo().setVisitInfo(exposure.getInfo().getVisitInfo())
361 warp.setPsf(exposure.getPsf())
362 didSetMetadata[warpType] =
True
366 inputRecorder[warpType].addCalExp(calExp, ccdId, numGoodPix[warpType])
368 except Exception
as e:
369 self.log.warning(
"Error processing calexp %s; skipping it: %s", dataId, e)
372 for warpType
in warpTypeList:
373 self.log.info(
"%sWarp has %d good pixels (%.1f%%)",
374 warpType, totGoodPix[warpType], 100.0*totGoodPix[warpType]/skyInfo.bbox.getArea())
376 if totGoodPix[warpType] > 0
and didSetMetadata[warpType]:
377 inputRecorder[warpType].finish(warps[warpType], totGoodPix[warpType])
378 if warpType ==
"direct":
379 warps[warpType].setPsf(
380 CoaddPsf(inputRecorder[warpType].coaddInputs.ccds, skyInfo.wcs,
381 self.config.coaddPsf.makeControl()))
383 if not self.config.doWriteEmptyWarps:
385 warps[warpType] =
None
389 result = pipeBase.Struct(exposures=warps)
392 def filterInputs(self, indices, inputs):
393 """Filter task inputs by their indices.
397 indices : `list` [`int`]
398 inputs : `dict` [`list`]
399 A dictionary of input connections to be passed to run.
403 inputs : `dict` [`list`]
404 Task inputs with their lists filtered by indices.
406 for key
in inputs.keys():
408 if isinstance(inputs[key], list):
409 inputs[key] = [inputs[key][ind]
for ind
in indices]
412 def _prepareCalibratedExposures(self, *, visitSummary, calExpList=[], wcsList=None,
413 backgroundList=None, skyCorrList=None, **kwargs):
414 """Calibrate and add backgrounds to input calExpList in place.
418 visitSummary : `lsst.afw.table.ExposureCatalog`
419 Exposure catalog with potentially all calibrations. Attributes set
420 to `None` are ignored.
421 calExpList : `list` [`lsst.afw.image.Exposure` or
422 `lsst.daf.butler.DeferredDatasetHandle`]
423 Sequence of calexps to be modified in place.
424 wcsList : `list` [`lsst.afw.geom.SkyWcs`]
425 The WCSs of the calexps in ``calExpList``. These will be used to
426 determine if the calexp should be used in the warp. The list is
427 dynamically updated with the WCSs from the visitSummary.
428 backgroundList : `list` [`lsst.afw.math.backgroundList`], optional
429 Sequence of backgrounds to be added back in if bgSubtracted=False.
430 skyCorrList : `list` [`lsst.afw.math.backgroundList`], optional
431 Sequence of background corrections to be subtracted if
434 Additional keyword arguments.
438 indices : `list` [`int`]
439 Indices of ``calExpList`` and friends that have valid
442 wcsList = len(calExpList)*[
None]
if wcsList
is None else wcsList
443 backgroundList = len(calExpList)*[
None]
if backgroundList
is None else backgroundList
444 skyCorrList = len(calExpList)*[
None]
if skyCorrList
is None else skyCorrList
446 includeCalibVar = self.config.includeCalibVar
449 for index, (calexp, background, skyCorr)
in enumerate(zip(calExpList,
452 if isinstance(calexp, DeferredDatasetHandle):
453 calexp = calexp.get()
455 if not self.config.bgSubtracted:
456 calexp.maskedImage += background.getImage()
458 detectorId = calexp.info.getDetector().getId()
461 row = visitSummary.find(detectorId)
464 f
"Unexpectedly incomplete visitSummary: detector={detectorId} is missing."
466 if (photoCalib := row.getPhotoCalib())
is not None:
467 calexp.setPhotoCalib(photoCalib)
470 "Detector id %d for visit %d has None for photoCalib in the visitSummary and will "
471 "not be used in the warp", detectorId, row[
"visit"],
474 if (skyWcs := row.getWcs())
is not None:
475 calexp.setWcs(skyWcs)
476 wcsList[index] = skyWcs
479 "Detector id %d for visit %d has None for wcs in the visitSummary and will "
480 "not be used in the warp", detectorId, row[
"visit"],
483 if self.config.useVisitSummaryPsf:
484 if (psf := row.getPsf())
is not None:
488 "Detector id %d for visit %d has None for psf in the visitSummary and will "
489 "not be used in the warp", detectorId, row[
"visit"],
492 if (apCorrMap := row.getApCorrMap())
is not None:
493 calexp.info.setApCorrMap(apCorrMap)
496 "Detector id %d for visit %d has None for apCorrMap in the visitSummary and will "
497 "not be used in the warp", detectorId, row[
"visit"],
501 if calexp.getPsf()
is None:
503 "Detector id %d for visit %d has None for psf for the calexp and will "
504 "not be used in the warp", detectorId, row[
"visit"],
507 if calexp.info.getApCorrMap()
is None:
509 "Detector id %d for visit %d has None for apCorrMap in the calexp and will "
510 "not be used in the warp", detectorId, row[
"visit"],
515 calexp.maskedImage = photoCalib.calibrateImage(calexp.maskedImage,
516 includeScaleUncertainty=includeCalibVar)
517 calexp.maskedImage /= photoCalib.getCalibrationMean()
523 if self.config.doApplySkyCorr:
524 calexp.maskedImage -= skyCorr.getImage()
526 indices.append(index)
527 calExpList[index] = calexp
532 def _prepareEmptyExposure(skyInfo):
533 """Produce an empty exposure for a given patch.
537 skyInfo : `lsst.pipe.base.Struct`
538 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo()` with
539 geometric information about the patch.
543 exp : `lsst.afw.image.exposure.ExposureF`
544 An empty exposure for a given patch.
546 exp = afwImage.ExposureF(skyInfo.bbox, skyInfo.wcs)
548 .getPlaneBitMask(
"NO_DATA"), numpy.inf)
551 def getWarpTypeList(self):
552 """Return list of requested warp types per the config.
555 if self.config.makeDirect:
556 warpTypeList.append(
"direct")
557 if self.config.makePsfMatched:
558 warpTypeList.append(
"psfMatched")
562def reorderRefs(inputRefs, outputSortKeyOrder, dataIdKey):
563 """Reorder inputRefs per outputSortKeyOrder.
565 Any inputRefs which are lists will be resorted per specified key e.g.,
566 'detector.' Only iterables will be reordered, and values can be of type
567 `lsst.pipe.base.connections.DeferredDatasetRef` or
568 `lsst.daf.butler.core.datasets.ref.DatasetRef`.
570 Returned lists of refs have the same length as the outputSortKeyOrder.
571 If an outputSortKey not in the inputRef, then it will be padded with None.
572 If an inputRef contains an inputSortKey that is not in the
573 outputSortKeyOrder it will be removed.
577 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
578 Input references to be reordered and padded.
579 outputSortKeyOrder : `iterable`
580 Iterable of values to be compared with inputRef's dataId[dataIdKey].
582 The data ID key in the dataRefs to compare with the outputSortKeyOrder.
586 inputRefs : `lsst.pipe.base.connections.QuantizedConnection`
587 Quantized Connection with sorted DatasetRef values sorted if iterable.
589 for connectionName, refs
in inputRefs:
590 if isinstance(refs, Iterable):
591 if hasattr(refs[0],
"dataId"):
592 inputSortKeyOrder = [ref.dataId[dataIdKey]
for ref
in refs]
594 inputSortKeyOrder = [ref.datasetRef.dataId[dataIdKey]
for ref
in refs]
595 if inputSortKeyOrder != outputSortKeyOrder:
596 setattr(inputRefs, connectionName,
597 reorderAndPadList(refs, inputSortKeyOrder, outputSortKeyOrder))
Represent a 2-dimensional array of bitmask pixels.
A floating-point coordinate rectangle geometry.
CoaddPsf is the Psf derived to be used for non-PSF-matched Coadd images.
daf::base::PropertySet * set
int copyGoodPixels(lsst::afw::image::Image< ImagePixelT > &destImage, lsst::afw::image::Image< ImagePixelT > const &srcImage)
copy good pixels from one image to another