22from __future__
import annotations
26 "MakePsfMatchedWarpConfig",
27 "MakePsfMatchedWarpConnections",
28 "MakePsfMatchedWarpTask",
31from typing
import TYPE_CHECKING
37from lsst.afw.geom import Polygon, makeWcsPairTransform, SinglePolygonException
45 PipelineTaskConnections,
48from lsst.pipe.base.connectionTypes
import Input, Output
51from lsst.utils.timer
import timeMethod
58 PipelineTaskConnections,
59 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
64 """Connections for MakePsfMatchedWarpTask"""
66 doc=
"Input definition of geometry/bbox and projection/wcs for warps.",
67 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
68 storageClass=
"SkyMap",
69 dimensions=(
"skymap",),
73 doc=
"Direct warped exposure produced by resampling calexps onto the skyMap patch geometry",
74 name=
"{coaddName}Coadd_directWarp",
75 storageClass=
"ExposureF",
76 dimensions=(
"tract",
"patch",
"skymap",
"instrument",
"visit"),
79 psf_matched_warp = Output(
81 "Output PSF-Matched warped exposure, produced by resampling ",
82 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF.",
84 name=
"{coaddName}Coadd_psfMatchedWarp",
85 storageClass=
"ExposureF",
86 dimensions=(
"tract",
"patch",
"skymap",
"visit",
"instrument"),
90class MakePsfMatchedWarpConfig(
92 pipelineConnections=MakePsfMatchedWarpConnections,
94 """Config for MakePsfMatchedWarpTask."""
96 modelPsf = GaussianPsfFactory.makeField(doc=
"Model Psf factory")
98 target=ModelPsfMatchTask,
99 doc=
"Task to warp and PSF-match calexp",
102 def setDefaults(self):
103 super().setDefaults()
104 self.psfMatch.kernel[
"AL"].alardSigGauss = [1.0, 2.0, 4.5]
105 self.modelPsf.defaultFwhm = 7.7
108class MakePsfMatchedWarpTask(PipelineTask):
109 ConfigClass = MakePsfMatchedWarpConfig
110 _DefaultName =
"makePsfMatchedWarp"
112 def __init__(self, **kwargs):
113 super().__init__(**kwargs)
114 self.makeSubtask(
"psfMatch")
116 def runQuantum(self, butlerQC, inputRefs, outputRefs):
120 inputs = butlerQC.get(inputRefs)
122 sky_map = inputs.pop(
"sky_map")
124 quantumDataId = butlerQC.quantum.dataId
125 sky_info = makeSkyInfo(
127 tractId=quantumDataId[
"tract"],
128 patchId=quantumDataId[
"patch"],
131 results = self.run(inputs[
"direct_warp"], sky_info.bbox)
132 butlerQC.put(results, outputRefs)
135 def run(self, direct_warp: Exposure, bbox:
geom.Box2I):
136 """Make a PSF-matched warp from a direct warp.
138 Each individual detector from the direct warp is isolated, one at a
139 time, and PSF-matched to the same model PSF. The PSF-matched images are
140 then added back together to form the final PSF-matched warp. The bulk
141 of the work is done by the `psfMatchTask`.
145 Pixels that receive no inputs are set to NaN, for e.g, chip gaps. This
146 violates LSST algorithms group policy.
150 direct_warp : `lsst.afw.image.Exposure`
151 Direct warp to be PSF-matched.
155 struct : `lsst.pipe.base.Struct`
156 Struct containing the PSF-matched warp under the attribute
159 modelPsf = self.config.modelPsf.apply()
163 exposure_psf_matched = direct_warp[bbox].clone()
164 exposure_psf_matched.image.array[:, :] = np.nan
165 exposure_psf_matched.variance.array[:, :] = np.inf
166 exposure_psf_matched.setPsf(modelPsf)
168 bit_mask = direct_warp.mask.getPlaneBitMask(
"NO_DATA")
169 total_good_pixels = 0
171 for row
in direct_warp.info.getCoaddInputs().ccds:
172 transform = makeWcsPairTransform(row.wcs, direct_warp.wcs)
173 warp_psf =
WarpedPsf(row.getPsf(), transform)
175 if (src_polygon := row.validPolygon)
is None:
178 [
geom.Point2D(corner)
for corner
in row.getBBox().getCorners()]
180 self.log.debug(
"Polygon for detector=%d is calculated as %s",
185 self.log.debug(
"Polygon for detector=%d is read from the input calexp as %s",
191 destination_polygon = src_polygon.transform(transform).intersectionSingle(
194 except SinglePolygonException:
196 "Skipping CCD %d as its polygon does not intersect the direct warp",
205 for corner
in destination_polygon.getVertices():
207 bbox.clip(direct_warp.getBBox())
211 if not bbox.overlaps(exposure_psf_matched.getBBox()):
213 "Skipping CCD %d as its bbox %s does not overlap the PSF-matched warp",
219 self.log.debug(
"PSF-matching CCD %d with bbox %s", row[
"ccd"], bbox)
221 ccd_mask_array = ~(destination_polygon.createImage(bbox).array <= 0)
225 temp_warp = direct_warp[bbox].clone()
226 temp_warp.setPsf(warp_psf)
227 temp_warp.image.array *= ccd_mask_array
228 temp_warp.mask.array |= (~ccd_mask_array) * bit_mask
231 with warnings.catch_warnings():
232 warnings.filterwarnings(
"ignore", message=
"divide by zero", category=RuntimeWarning)
233 temp_warp.variance.array /= ccd_mask_array
235 temp_psf_matched = self.psfMatch.run(temp_warp, modelPsf).psfMatchedExposure
239 temp_psf_matched.maskedImage[bbox].mask.array |= (~ccd_mask_array) * bit_mask
242 bbox.clip(exposure_psf_matched.getBBox())
244 num_good_pixels = copyGoodPixels(
245 exposure_psf_matched.maskedImage[bbox],
246 temp_psf_matched.maskedImage[bbox],
253 "Copied %d pixels from CCD %d to exposure_psf_matched", num_good_pixels, row[
"ccd"],
255 total_good_pixels += num_good_pixels
257 self.log.info(
"Total number of good pixels = %d", total_good_pixels)
259 if total_good_pixels > 0:
261 exposure_psf_matched.info.getCoaddInputs(),
262 -self.config.psfMatch.kernel.active.kernelSize // 2
265 return Struct(psf_matched_warp=exposure_psf_matched)
267 return Struct(psf_matched_warp=
None)
273def growValidPolygons(coaddInputs, growBy: int) ->
None:
274 """Grow coaddInputs' ccds' ValidPolygons in place.
276 Either modify each ccd's validPolygon in place, or if CoaddInputs
277 does not have a validPolygon, create one from its bbox.
281 coaddInputs : `lsst.afw.image.coaddInputs`
282 CoaddInputs object containing the ccds to grow the valid polygons of.
284 The value to grow the valid polygons by.
288 Negative values for ``growBy`` can shrink the polygons.
290 for ccd
in coaddInputs.ccds:
291 polyOrig = ccd.getValidPolygon()
292 validPolyBBox = polyOrig.getBBox()
if polyOrig
else ccd.getBBox()
293 validPolyBBox.grow(growBy)
295 validPolygon = polyOrig.intersectionSingle(validPolyBBox)
299 ccd.validPolygon = validPolygon
A floating-point coordinate rectangle geometry.
An integer coordinate rectangle.
A Psf class that maps an arbitrary Psf through a coordinate transformation.