LSST Applications g0f08755f38+9c285cab97,g1635faa6d4+13f3999e92,g1653933729+a8ce1bb630,g1a0ca8cf93+bf6eb00ceb,g28da252d5a+0829b12dee,g29321ee8c0+5700dc9eac,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+fde0dd39b6,g5fbc88fb19+17cd334064,g781aacb6e4+a8ce1bb630,g80478fca09+55a9465950,g82479be7b0+d730eedb7d,g858d7b2824+9c285cab97,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+2a84bb7594,gacf8899fa4+c69c5206e8,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+9634bc57db,gcf0d15dbbd+4b7d09cae4,gda3e153d99+9c285cab97,gda6a2b7d83+4b7d09cae4,gdaeeff99f8+1711a396fd,ge2409df99d+5e831397f4,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+41c94011de,gf3fb38a9a8+8f07a9901b,gfb92a5be7c+9c285cab97,w.2024.46
LSST Data Management Base Package
|
Classes | |
class | MakeDirectWarpConnections |
class | WarpDetectorInputs |
Variables | |
ways : | |
inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ] | |
sky_info : `lsst.pipe.base.Struct` | |
visit_summary : `~lsst.afw.table.ExposureCatalog` | |
filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ] | |
results : `~lsst.pipe.base.Struct` | |
detector_inputs : `WarpDetectorInputs` | |
target_wcs : `~lsst.afw.geom.SkyWcs` | |
warper : `~lsst.afw.math.Warper` | |
maxBBox : `~lsst.geom.Box2I` | None | |
destBBox : `~lsst.geom.Box2I` | None | |
warped_exposure : `~lsst.afw.image.Exposure` | None | |
bool | includeScaleUncertainty |
success : `bool` | |
exp : `lsst.afw.image.exposure.ExposureF` | |
mi : `~lsst.afw.image.MaskedImage` | |
median_variance : `float` | |
lsst.pipe.tasks.make_direct_warp.destBBox : `~lsst.geom.Box2I` | None |
Definition at line 683 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.detector_inputs : `WarpDetectorInputs` |
if self.config.doSelectPreWarp: inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary) if not inputs: raise NoWorkFound("No input warps remain after selection for co-addition") sky_info.bbox.grow(self.config.border) target_bbox, target_wcs = sky_info.bbox, sky_info.wcs # Initialize the objects that will hold the warp. final_warp = ExposureF(target_bbox, target_wcs) for _, warp_detector_input in inputs.items(): visit_id = warp_detector_input.data_id["visit"] break # Just need the visit id from any one of the inputs. # The warpExposure routine is expensive, and we do not want to call # it twice (i.e., a second time for PSF-matched warps). We do not # want to hold all the warped exposures in memory at once either. # So we create empty exposure(s) to accumulate the warps of each type, # and then process each detector serially. final_warp = self._prepareEmptyExposure(sky_info) final_masked_fraction_warp = self._prepareEmptyExposure(sky_info) final_noise_warps = { n_noise: self._prepareEmptyExposure(sky_info) for n_noise in range(self.config.numberOfNoiseRealizations) } # We need a few bookkeeping variables only for the science coadd. totalGoodPixels = 0 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder( visit_id, len(inputs), ) for index, detector_inputs in enumerate(inputs.values()): self.log.debug( "Warping exposure %d/%d for id=%s", index + 1, len(inputs), detector_inputs.data_id, ) input_exposure = detector_inputs.exposure # Generate noise image(s) in-situ. seed = self.get_seed_from_data_id(detector_inputs.data_id) rng = np.random.RandomState(seed + self.config.seedOffset) # Generate noise images in-situ. noise_calexps = self.make_noise_exposures(input_exposure, rng) warpedExposure = self.process( detector_inputs, target_wcs, self.warper, visit_summary, destBBox=target_bbox, ) if warpedExposure is None: self.log.debug( "Skipping exposure %s because it could not be warped.", detector_inputs.data_id ) continue if final_warp.photoCalib is not None: ratio = ( final_warp.photoCalib.getInstFluxAtZeroMagnitude() / warpedExposure.photoCalib.getInstFluxAtZeroMagnitude() ) else: ratio = 1 self.log.debug("Scaling exposure %s by %f", detector_inputs.data_id, ratio) warpedExposure.maskedImage *= ratio # Accumulate the partial warps in an online fashion. nGood = copyGoodPixels( final_warp.maskedImage, warpedExposure.maskedImage, final_warp.mask.getPlaneBitMask(["NO_DATA"]), ) if final_warp.photoCalib is None and nGood > 0: final_warp.setPhotoCalib(warpedExposure.photoCalib) ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id inputRecorder.addCalExp(input_exposure, ccdId, nGood) totalGoodPixels += nGood if self.config.doWarpMaskedFraction: # Obtain the masked fraction exposure and warp it. if self.config.doPreWarpInterpolation: badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes else: badMaskPlanes = [] masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes) masked_fraction_warp = self.maskedFractionWarper.warpExposure( target_wcs, masked_fraction_exp, destBBox=target_bbox ) copyGoodPixels( final_masked_fraction_warp.maskedImage, masked_fraction_warp.maskedImage, final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]), ) # Process and accumulate noise images. for n_noise in range(self.config.numberOfNoiseRealizations): noise_calexp = noise_calexps[n_noise] noise_pseudo_inputs = dataclasses.replace( detector_inputs, exposure_or_handle=noise_calexp, background_revert=BackgroundList(), background_apply=BackgroundList(), ) warpedNoise = self.process( noise_pseudo_inputs, target_wcs, self.warper, visit_summary, destBBox=target_bbox, ) warpedNoise.maskedImage *= ratio copyGoodPixels( final_noise_warps[n_noise].maskedImage, warpedNoise.maskedImage, final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]), ) # If there are no good pixels, return a Struct filled with None. if totalGoodPixels == 0: results = Struct( warp=None, masked_fraction_warp=None, ) for noise_index in range(self.config.numberOfNoiseRealizations): setattr(results, f"noise_warp{noise_index}", None) return results # Finish the inputRecorder and add the coaddPsf to the final warp. inputRecorder.finish(final_warp, totalGoodPixels) coaddPsf = CoaddPsf( inputRecorder.coaddInputs.ccds, sky_info.wcs, self.config.coaddPsf.makeControl(), ) final_warp.setPsf(coaddPsf) for _, warp_detector_input in inputs.items(): final_warp.setFilter(warp_detector_input.exposure.getFilter()) final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo()) break # Just need the filter and visit info from any one of the inputs. results = Struct( warp=final_warp, ) if self.config.doWarpMaskedFraction: results.masked_fraction_warp = final_masked_fraction_warp.image for noise_index, noise_exposure in final_noise_warps.items(): setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage) return results def process( self, detector_inputs: WarpDetectorInputs, target_wcs, warper, visit_summary=None, maxBBox=None, destBBox=None, ) -> ExposureF | None:
input_exposure = detector_inputs.exposure if self.config.doPreWarpInterpolation: self.preWarpInterpolation.run(input_exposure.maskedImage) success = self._apply_all_calibrations( detector_inputs, visit_summary=visit_summary, includeScaleUncertainty=self.config.includeCalibVar, ) if not success: return None with self.timer("warp"): warped_exposure = warper.warpExposure( target_wcs, input_exposure, maxBBox=maxBBox, destBBox=destBBox, ) # Potentially a post-warp interpolation here? Relies on DM-38630. return warped_exposure def _apply_all_calibrations( self, detector_inputs: WarpDetectorInputs, *, visit_summary: ExposureCatalog | None = None, includeScaleUncertainty: bool = False, ) -> bool:
Definition at line 669 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.exp : `lsst.afw.image.exposure.ExposureF` |
Definition at line 856 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ] |
Definition at line 432 of file make_direct_warp.py.
bool lsst.pipe.tasks.make_direct_warp.includeScaleUncertainty |
Definition at line 747 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ] |
ConfigClass = MakeDirectWarpConfig _DefaultName = "makeDirectWarp" def __init__(self, **kwargs): super().__init__(**kwargs) self.makeSubtask("inputRecorder") self.makeSubtask("preWarpInterpolation") if self.config.doSelectPreWarp: self.makeSubtask("select") self.warper = Warper.fromConfig(self.config.warper) if self.config.doWarpMaskedFraction: self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper) def runQuantum(self, butlerQC, inputRefs, outputRefs): # Docstring inherited. inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs for handle in butlerQC.get(inputRefs.calexp_list): inputs[handle.dataId["detector"]] = WarpDetectorInputs( exposure_or_handle=handle, data_id=handle.dataId, ) if not inputs: raise NoWorkFound("No input warps provided for co-addition") # Add backgrounds to the inputs struct, being careful not to assume # they're present for the same detectors we got image handles for, in # case of upstream errors. for ref in getattr(inputRefs, "background_revert_list", []): inputs[ref.dataId["detector"]].background_revert = butlerQC.get(ref) for ref in getattr(inputRefs, "background_apply_list", []): inputs[ref.dataId["detector"]].background_apply = butlerQC.get(ref) visit_summary = butlerQC.get(inputRefs.visit_summary) sky_map = butlerQC.get(inputRefs.sky_map) quantumDataId = butlerQC.quantum.dataId sky_info = makeSkyInfo( sky_map, tractId=quantumDataId["tract"], patchId=quantumDataId["patch"], ) results = self.run(inputs, sky_info, visit_summary=visit_summary) butlerQC.put(results, outputRefs) def _preselect_inputs( self, inputs: Mapping[int, WarpDetectorInputs], sky_info: Struct, visit_summary: ExposureCatalog, ) -> dict[int, WarpDetectorInputs]:
data_id_list, bbox_list, wcs_list = [], [], [] for detector_id, detector_inputs in inputs.items(): row = visit_summary.find(detector_id) if row is None: raise RuntimeError(f"Unexpectedly incomplete visit_summary: {detector_id=} is missing.") data_id_list.append(detector_inputs.data_id) bbox_list.append(row.getBBox()) wcs_list.append(row.getWcs()) cornerPosList = Box2D(sky_info.bbox).getCorners() coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList] good_indices = self.select.run( bboxList=bbox_list, wcsList=wcs_list, visitSummary=visit_summary, coordList=coordList, dataIds=data_id_list, ) detector_ids = list(inputs.keys()) good_detector_ids = [detector_ids[i] for i in good_indices] return {detector_id: inputs[detector_id] for detector_id in good_detector_ids} def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct:
Definition at line 423 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.maxBBox : `~lsst.geom.Box2I` | None |
Definition at line 680 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.median_variance : `float` |
Definition at line 874 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.mi : `~lsst.afw.image.MaskedImage` |
exp = ExposureF(sky_info.bbox, sky_info.wcs) exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf) return exp @staticmethod def compute_median_variance(mi: MaskedImage) -> float:
Definition at line 869 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.results : `~lsst.pipe.base.Struct` |
Definition at line 475 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.sky_info : `lsst.pipe.base.Struct` |
if self.config.doRevertOldBackground: detector_inputs.revert_background() elif detector_inputs.background_revert: # This could get trigged only if runQuantum is skipped and run is # called directly. raise RuntimeError( f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert." ) input_exposure = detector_inputs.exposure if visit_summary is not None: detector = input_exposure.info.getDetector().getId() row = visit_summary.find(detector) if row is None: self.log.info( "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.", detector, ) return False if photo_calib := row.getPhotoCalib(): input_exposure.setPhotoCalib(photo_calib) else: self.log.info( "No photometric calibration found in visit summary for detector = %s. Skipping it.", detector, ) return False if wcs := row.getWcs(): input_exposure.setWcs(wcs) else: self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector) return False if self.config.useVisitSummaryPsf: if psf := row.getPsf(): input_exposure.setPsf(psf) else: self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector) return False if apcorr_map := row.getApCorrMap(): input_exposure.setApCorrMap(apcorr_map) else: self.log.info( "No aperture correction map found in visit summary for detector = %s. Skipping it", detector, ) return False elif visit_summary is not None: # We can only get here by calling `run`, not `runQuantum`. raise RuntimeError( "useVisitSummaryPsf=True, but visit_summary is provided. " ) if self.config.doApplyNewBackground: detector_inputs.apply_background() elif detector_inputs.background_apply: # This could get trigged only if runQuantum is skipped and run is # called directly. raise RuntimeError( f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply." ) # Calibrate the (masked) image. # This should likely happen even if visit_summary is None. photo_calib = input_exposure.photoCalib input_exposure.maskedImage = photo_calib.calibrateImage( input_exposure.maskedImage, includeScaleUncertainty=includeScaleUncertainty, ) input_exposure.maskedImage /= photo_calib.getCalibrationMean() return True # This method is copied from makeWarp.py @classmethod def _prepareEmptyExposure(cls, sky_info):
Definition at line 425 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.success : `bool` |
Definition at line 754 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.target_wcs : `~lsst.afw.geom.SkyWcs` |
Definition at line 672 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.visit_summary : `~lsst.afw.table.ExposureCatalog` |
Definition at line 427 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.warped_exposure : `~lsst.afw.image.Exposure` | None |
Definition at line 689 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.warper : `~lsst.afw.math.Warper` |
Definition at line 674 of file make_direct_warp.py.
lsst.pipe.tasks.make_direct_warp.ways : |
calexp_list = Input( doc="Input exposures to be interpolated and resampled onto a SkyMap " "projection/patch.", name="calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector"), multiple=True, deferLoad=True, ) background_revert_list = Input( doc="Background to be reverted (i.e., added back to the calexp). " "This connection is used only if doRevertOldBackground=False.", name="calexpBackground", storageClass="Background", dimensions=("instrument", "visit", "detector"), multiple=True, ) background_apply_list = Input( doc="Background to be applied (subtracted from the calexp). " "This is used only if doApplyNewBackground=True.", name="skyCorr", storageClass="Background", dimensions=("instrument", "visit", "detector"), multiple=True, ) visit_summary = Input( doc="Input visit-summary catalog with updated calibration objects.", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), ) sky_map = Input( doc="Input definition of geometry/bbox and projection/wcs for warps.", name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, storageClass="SkyMap", dimensions=("skymap",), ) # Declare all possible outputs (except noise, which is configurable) warp = Output( doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.", name="{coaddName}Coadd_directWarp", storageClass="ExposureF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) masked_fraction_warp = Output( doc="Output masked fraction warped exposure.", name="{coaddName}Coadd_directWarp_maskedFraction", storageClass="ImageF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) def __init__(self, *, config=None): super().__init__(config=config) if not config: return if not config.doRevertOldBackground: del self.background_revert_list if not config.doApplyNewBackground: del self.background_apply_list if not config.doWarpMaskedFraction: del self.masked_fraction_warp # Dynamically set output connections for noise images, depending on the # number of noise realization specified in the config. for n in range(config.numberOfNoiseRealizations): noise_warp = Output( doc=f"Output direct warped noise exposure ({n})", name=f"{config.connections.coaddName}Coadd_directWarp_noise{n}", # Store it as a MaskedImage to preserve the variance plane. storageClass="MaskedImageF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) setattr(self, f"noise_warp{n}", noise_warp) class MakeDirectWarpConfig( PipelineTaskConfig, pipelineConnections=MakeDirectWarpConnections, ):
MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
numberOfNoiseRealizations = RangeField[int]( doc="Number of noise realizations to simulate and persist.", default=0, min=0, max=MAX_NUMBER_OF_NOISE_REALIZATIONS, inclusiveMax=True, ) seedOffset = Field[int]( doc="Offset to the seed used for the noise realization. This can be " "used to create a different noise realization if the default ones " "are catastrophic, or for testing sensitivity to the noise.", default=0, ) useMedianVariance = Field[bool]( doc="Use the median of variance plane in the input calexp to generate " "noise realizations? If False, per-pixel variance will be used.", default=True, ) doRevertOldBackground = Field[bool]( doc="Revert the old backgrounds from the `background_revert_list` " "connection?", default=False, ) doApplyNewBackground = Field[bool]( doc="Apply the new backgrounds from the `background_apply_list` " "connection?", default=False, ) useVisitSummaryPsf = Field[bool]( doc="If True, use the PSF model and aperture corrections from the " "'visit_summary' connection to make the warp. If False, use the " "PSF model and aperture corrections from the 'calexp' connection.", default=True, ) doSelectPreWarp = Field[bool]( doc="Select ccds before warping?", default=True, ) select = ConfigurableField( doc="Image selection subtask.", target=PsfWcsSelectImagesTask, ) doPreWarpInterpolation = Field[bool]( doc="Interpolate over bad pixels before warping?", default=False, ) preWarpInterpolation = ConfigurableField( doc="Interpolation task to use for pre-warping interpolation", target=CloughTocher2DInterpolateTask, ) inputRecorder = ConfigurableField( doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd", target=CoaddInputRecorderTask, ) includeCalibVar = Field[bool]( doc="Add photometric calibration variance to warp variance plane?", default=False, ) border = Field[int]( doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later", default=256, ) warper = ConfigField( doc="Configuration for the warper that warps the image and noise", dtype=Warper.ConfigClass, ) doWarpMaskedFraction = Field[bool]( doc="Warp the masked fraction image?", default=False, ) maskedFractionWarper = ConfigField( doc="Configuration for the warp that warps the mask fraction image", dtype=Warper.ConfigClass, ) coaddPsf = ConfigField( doc="Configuration for CoaddPsf", dtype=CoaddPsfConfig, ) idGenerator = DetectorVisitIdGeneratorConfig.make_field() # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig, # but as properties instead of config fields. @property def bgSubtracted(self) -> bool: return not self.doRevertOldBackground @bgSubtracted.setter def bgSubtracted(self, value: bool) -> None: self.doRevertOldBackground = not value @property def doApplySkyCorr(self) -> bool: return self.doApplyNewBackground @doApplySkyCorr.setter def doApplySkyCorr(self, value: bool) -> None: self.doApplyNewBackground = value def setDefaults(self) -> None: super().setDefaults() self.warper.warpingKernelName = "lanczos3" self.warper.cacheSize = 0 self.maskedFractionWarper.warpingKernelName = "bilinear" class MakeDirectWarpTask(PipelineTask):
Definition at line 354 of file make_direct_warp.py.