LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
|
Classes | |
class | MakeDirectWarpConnections |
Variables | |
ways : | |
inputs : `Mapping` | |
sky_info : `~lsst.pipe.base.Struct` | |
visit_summary : `~lsst.afw.table.ExposureCatalog` | None | |
results : `~lsst.pipe.base.Struct` | |
exposure : `~lsst.afw.image.Exposure` | |
target_wcs : `~lsst.afw.geom.SkyWcs` | |
warper : `~lsst.afw.math.Warper` | |
old_background : `~lsst.afw.image.Background` | None | |
new_background : `~lsst.afw.image.Background` | None | |
maxBBox : `~lsst.geom.Box2I` | None | |
destBBox : `~lsst.geom.Box2I` | None | |
warped_exposure : `~lsst.afw.image.Exposure` | |
exp : `~lsst.afw.image.Exposure` | |
logger : `logging.Logger` | |
bool | includeScaleUncertainty |
mi : `~lsst.afw.image.MaskedImage` | |
median_variance : `float` | |
lsst.pipe.tasks.make_direct_warp.destBBox : `~lsst.geom.Box2I` | None |
Definition at line 516 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.exp : `~lsst.afw.image.Exposure` |
if self.config.doPreWarpInterpolation: self.preWarpInterpolation.run(exposure.maskedImage) self._apply_all_calibrations( exposure, old_background, new_background, logger=self.log, visit_summary=visit_summary, includeScaleUncertainty=self.config.includeCalibVar, ) with self.timer("warp"): warped_exposure = warper.warpExposure( target_wcs, exposure, maxBBox=maxBBox, destBBox=destBBox, ) # Potentially a post-warp interpolation here? Relies on DM-38630. return warped_exposure @staticmethod def _apply_all_calibrations( exp: Exposure, old_background, new_background, logger, visit_summary: ExposureCatalog | None = None, includeScaleUncertainty: bool = False, ) -> None:
Definition at line 570 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.exposure : `~lsst.afw.image.Exposure` |
target_bbox, target_wcs = sky_info.bbox, sky_info.wcs # Initialize the objects that will hold the warp. final_warp = ExposureF(target_bbox, target_wcs) exposures = inputs["calexp_list"] background_revert_list = inputs.get("background_revert_list", [None] * len(exposures)) background_apply_list = inputs.get("background_apply_list", [None] * len(exposures)) visit_id = exposures[0].dataId["visit"] # The warpExposure routine is expensive, and we do not want to call # it twice (i.e., a second time for PSF-matched warps). We do not # want to hold all the warped exposures in memory at once either. # So we create empty exposure(s) to accumulate the warps of each type, # and then process each detector serially. final_warp = self._prepareEmptyExposure(sky_info) final_masked_fraction_warp = self._prepareEmptyExposure(sky_info) final_noise_warps = { n_noise: self._prepareEmptyExposure(sky_info) for n_noise in range(self.config.numberOfNoiseRealizations) } # We need a few bookkeeping variables only for the science coadd. totalGoodPixels = 0 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder( visit_id, len(exposures), ) for index, (calexp_ref, old_background, new_background) in enumerate( zip(exposures, background_revert_list, background_apply_list, strict=True) ): dataId = calexp_ref.dataId self.log.debug( "Warping exposure %d/%d for id=%s", index + 1, len(exposures), dataId, ) calexp = calexp_ref.get() # Generate noise image(s) in-situ. seed = self.get_seed_from_data_id(dataId) rng = np.random.RandomState(seed + self.config.seedOffset) # Generate noise images in-situ. noise_calexps = self.make_noise_exposures(calexp, rng) # Warp the PSF before processing nad overwriting exposure. xyTransform = makeWcsPairTransform(calexp.getWcs(), target_wcs) psfWarped = WarpedPsf(calexp.getPsf(), xyTransform) warpedExposure = self.process( calexp, target_wcs, self.warper, old_background, new_background, visit_summary, destBBox=target_bbox, ) warpedExposure.setPsf(psfWarped) # Accumulate the partial warps in an online fashion. nGood = copyGoodPixels( final_warp.maskedImage, warpedExposure.maskedImage, final_warp.mask.getPlaneBitMask(["NO_DATA"]), ) ccdId = self.config.idGenerator.apply(dataId).catalog_id inputRecorder.addCalExp(calexp, ccdId, nGood) totalGoodPixels += nGood # Obtain the masked fraction exposure and warp it. if self.config.doPreWarpInterpolation: badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes else: badMaskPlanes = [] masked_fraction_exp = self._get_bad_mask(calexp, badMaskPlanes) masked_fraction_warp = self.maskedFractionWarper.warpExposure( target_wcs, masked_fraction_exp, destBBox=target_bbox ) copyGoodPixels( final_masked_fraction_warp.maskedImage, masked_fraction_warp.maskedImage, final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]), ) # Process and accumulate noise images. for n_noise in range(self.config.numberOfNoiseRealizations): noise_calexp = noise_calexps[n_noise] warpedNoise = self.process( noise_calexp, target_wcs, self.warper, old_background, new_background, visit_summary, destBBox=target_bbox, ) copyGoodPixels( final_noise_warps[n_noise].maskedImage, warpedNoise.maskedImage, final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]), ) # Finish the inputRecorder and add the coaddPsf to the final warp. if totalGoodPixels > 0: inputRecorder.finish(final_warp, totalGoodPixels) coaddPsf = CoaddPsf( inputRecorder.coaddInputs.ccds, sky_info.wcs, self.config.coaddPsf.makeControl(), ) final_warp.setPsf(coaddPsf) final_warp.setFilter(calexp.getFilter()) final_warp.getInfo().setVisitInfo(calexp.getInfo().getVisitInfo()) results = Struct( warp=final_warp, masked_fraction_warp=final_masked_fraction_warp.image, ) for noise_index, noise_exposure in final_noise_warps.items(): setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage) return results def process( self, exposure, target_wcs, warper, old_background=None, new_background=None, visit_summary=None, maxBBox=None, destBBox=None, ):
Definition at line 499 of file make_direct_warp.py.
bool lsst.pipe.tasks.make_direct_warp.includeScaleUncertainty |
Definition at line 582 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.inputs : `Mapping` |
ConfigClass = MakeDirectWarpConfig _DefaultName = "makeDirectWarp" def __init__(self, **kwargs): super().__init__(**kwargs) self.makeSubtask("inputRecorder") self.makeSubtask("preWarpInterpolation") self.warper = Warper.fromConfig(self.config.warper) self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper) def runQuantum(self, butlerQC, inputRefs, outputRefs): # Docstring inherited. # Read in all inputs. inputs = butlerQC.get(inputRefs) if not inputs["calexp_list"]: raise NoWorkFound("No input warps provided for co-addition") sky_map = inputs.pop("sky_map") quantumDataId = butlerQC.quantum.dataId sky_info = makeSkyInfo( sky_map, tractId=quantumDataId["tract"], patchId=quantumDataId["patch"], ) visit_summary = inputs["visit_summary"] if self.config.useVisitSummaryPsf else None results = self.run(inputs, sky_info, visit_summary) butlerQC.put(results, outputRefs) def run(self, inputs, sky_info, visit_summary):
Definition at line 327 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.logger : `logging.Logger` |
Definition at line 576 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.maxBBox : `~lsst.geom.Box2I` | None |
Definition at line 513 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.median_variance : `float` |
Definition at line 676 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.mi : `~lsst.afw.image.MaskedImage` |
exp = ExposureF(sky_info.bbox, sky_info.wcs) exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf) return exp @staticmethod def compute_median_variance(mi: MaskedImage) -> float:
Definition at line 671 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.new_background : `~lsst.afw.image.Background` | None |
Definition at line 507 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.old_background : `~lsst.afw.image.Background` | None |
Definition at line 505 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.results : `~lsst.pipe.base.Struct` |
Definition at line 343 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.sky_info : `~lsst.pipe.base.Struct` |
if not visit_summary: logger.debug("No visit summary provided.") else: logger.debug("Updating calibration from visit summary.") if old_background: exp.maskedImage += old_background.getImage() if visit_summary: detector = exp.info.getDetector().getId() row = visit_summary.find(detector) if row is None: raise RuntimeError(f"Unexpectedly incomplete visit_summary: {detector=} is missing.") if photo_calib := row.getPhotoCalib(): exp.setPhotoCalib(photo_calib) else: logger.warning( "No photometric calibration found in visit summary for detector = %s.", detector, ) if wcs := row.getWcs(): exp.setWcs(wcs) else: logger.warning("No WCS found in visit summary for detector = %s.", detector) if psf := row.getPsf(): exp.setPsf(psf) else: logger.warning("No PSF found in visit summary for detector = %s.", detector) if apcorr_map := row.getApCorrMap(): exp.setApCorrMap(apcorr_map) else: logger.warning( "No aperture correction map found in visit summary for detector = %s.", detector, ) if new_background: exp.maskedImage -= new_background.getImage() # Calibrate the (masked) image. # This should likely happen even if visit_summary is None. photo_calib = exp.getPhotoCalib() exp.maskedImage = photo_calib.calibrateImage( exp.maskedImage, includeScaleUncertainty=includeScaleUncertainty ) exp.maskedImage /= photo_calib.getCalibrationMean() # This method is copied from makeWarp.py @classmethod def _prepareEmptyExposure(cls, sky_info):
Definition at line 333 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.target_wcs : `~lsst.afw.geom.SkyWcs` |
Definition at line 501 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.visit_summary : `~lsst.afw.table.ExposureCatalog` | None |
Definition at line 336 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.warped_exposure : `~lsst.afw.image.Exposure` |
Definition at line 522 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.warper : `~lsst.afw.math.Warper` |
Definition at line 503 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.ways : |
calexp_list = Input( doc="Input exposures to be interpolated and resampled onto a SkyMap " "projection/patch.", name="{calexpType}calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector"), multiple=True, deferLoad=True, ) background_revert_list = Input( doc="Background to be reverted (i.e., added back to the calexp). " "This connection is used only if doRevertOldBackground=False.", name="calexpBackground", storageClass="Background", dimensions=("instrument", "visit", "detector"), multiple=True, ) background_apply_list = Input( doc="Background to be applied (subtracted from the calexp). " "This is used only if doApplyNewBackground=True.", name="skyCorr", storageClass="Background", dimensions=("instrument", "visit", "detector"), multiple=True, ) visit_summary = Input( doc="Input visit-summary catalog with updated calibration objects.", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), ) sky_map = Input( doc="Input definition of geometry/bbox and projection/wcs for warps.", name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, storageClass="SkyMap", dimensions=("skymap",), ) # Declare all possible outputs (except noise, which is configurable) warp = Output( doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.", name="{coaddName}Coadd_directWarp", storageClass="ExposureF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) masked_fraction_warp = Output( doc="Output masked fraction warped exposure.", name="{coaddName}Coadd_directWarp_maskedFraction", storageClass="ImageF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) def __init__(self, *, config=None): super().__init__(config=config) if not config: return if not config.doRevertOldBackground: del self.background_revert_list if not config.doApplyNewBackground: del self.background_apply_list # Dynamically set output connections for noise images, depending on the # number of noise realization specified in the config. for n in range(config.numberOfNoiseRealizations): noise_warp = Output( doc=f"Output direct warped noise exposure ({n})", name=f"{config.connections.coaddName}Coadd_directWarp_noise{n}", # Store it as a MaskedImage to preserve the variance plane. storageClass="MaskedImageF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) setattr(self, f"noise_warp{n}", noise_warp) class MakeDirectWarpConfig( PipelineTaskConfig, pipelineConnections=MakeDirectWarpConnections, ):
MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
numberOfNoiseRealizations = RangeField[int]( doc="Number of noise realizations to simulate and persist.", default=1, min=0, max=MAX_NUMBER_OF_NOISE_REALIZATIONS, inclusiveMax=True, ) seedOffset = Field[int]( doc="Offset to the seed used for the noise realization. This can be " "used to create a different noise realization if the default ones " "are catastrophic, or for testing sensitivity to the noise.", default=0, ) useMedianVariance = Field[bool]( doc="Use the median of variance plane in the input calexp to generate " "noise realizations? If False, per-pixel variance will be used.", default=True, ) doRevertOldBackground = Field[bool]( doc="Revert the old backgrounds from the `background_revert_list` " "connection?", default=True, ) doApplyNewBackground = Field[bool]( doc="Apply the new backgrounds from the `background_apply_list` " "connection?", default=False, ) useVisitSummaryPsf = Field[bool]( doc="If True, use the PSF model and aperture corrections from the " "'visit_summary' connection. If False, use the PSF model and " "aperture corrections from the 'calexp' connection.", default=True, ) doPreWarpInterpolation = Field[bool]( doc="Interpolate over bad pixels before warping?", default=True, ) preWarpInterpolation = ConfigurableField( doc="Interpolation task to use for pre-warping interpolation", target=CloughTocher2DInterpolateTask, ) inputRecorder = ConfigurableField( doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd", target=CoaddInputRecorderTask, ) includeCalibVar = Field[bool]( doc="Add photometric calibration variance to warp variance plane?", default=False, ) matchingKernelSize = Field[int]( doc="Size in pixels of matching kernel. Must be odd.", default=21, check=lambda x: x % 2 == 1, ) warper = ConfigField( doc="Configuration for the warper that warps the image and noise", dtype=Warper.ConfigClass, ) maskedFractionWarper = ConfigField( doc="Configuration for the warp that warps the mask fraction image", dtype=Warper.ConfigClass, ) coaddPsf = ConfigField( doc="Configuration for CoaddPsf", dtype=CoaddPsfConfig, ) idGenerator = DetectorVisitIdGeneratorConfig.make_field() # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig, # but as properties instead of config fields. @property def bgSubtracted(self) -> bool: return not self.doRevertOldBackground @bgSubtracted.setter def bgSubtracted(self, value: bool) -> None: self.doRevertOldBackground = ~value @property def doApplySkyCorr(self) -> bool: return self.doApplyNewBackground @doApplySkyCorr.setter def doApplySkyCorr(self, value: bool) -> None: self.doApplyNewBackground = value def setDefaults(self) -> None: super().setDefaults() self.warper.warpingKernelName = "lanczos3" self.maskedFractionWarper.warpingKernelName = "bilinear" class MakeDirectWarpTask(PipelineTask):
Definition at line 277 of file make_direct_warp.py.