LSST Applications g180d380827+0f66a164bb,g2079a07aa2+86d27d4dc4,g2305ad1205+7d304bc7a0,g29320951ab+500695df56,g2bbee38e9b+0e5473021a,g337abbeb29+0e5473021a,g33d1c0ed96+0e5473021a,g3a166c0a6a+0e5473021a,g3ddfee87b4+e42ea45bea,g48712c4677+36a86eeaa5,g487adcacf7+2dd8f347ac,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+c70619cc9d,g5a732f18d5+53520f316c,g5ea96fc03c+341ea1ce94,g64a986408d+f7cd9c7162,g858d7b2824+f7cd9c7162,g8a8a8dda67+585e252eca,g99cad8db69+469ab8c039,g9ddcbc5298+9a081db1e4,ga1e77700b3+15fc3df1f7,gb0e22166c9+60f28cb32d,gba4ed39666+c2a2e4ac27,gbb8dafda3b+c92fc63c7e,gbd866b1f37+f7cd9c7162,gc120e1dc64+02c66aa596,gc28159a63d+0e5473021a,gc3e9b769f7+b0068a2d9f,gcf0d15dbbd+e42ea45bea,gdaeeff99f8+f9a426f77a,ge6526c86ff+84383d05b3,ge79ae78c31+0e5473021a,gee10cc3b42+585e252eca,gff1a9f87cc+f7cd9c7162,w.2024.17
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Variables
lsst.pipe.tasks.make_direct_warp Namespace Reference

Classes

class  MakeDirectWarpConnections
 

Variables

 ways :
 
 inputs : `Mapping`
 
 sky_info : `~lsst.pipe.base.Struct`
 
 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
 
 results : `~lsst.pipe.base.Struct`
 
 exposure : `~lsst.afw.image.Exposure`
 
 target_wcs : `~lsst.afw.geom.SkyWcs`
 
 warper : `~lsst.afw.math.Warper`
 
 old_background : `~lsst.afw.image.Background` | None
 
 new_background : `~lsst.afw.image.Background` | None
 
 maxBBox : `~lsst.geom.Box2I` | None
 
 destBBox : `~lsst.geom.Box2I` | None
 
 warped_exposure : `~lsst.afw.image.Exposure`
 
 exp : `~lsst.afw.image.Exposure`
 
 logger : `logging.Logger`
 
bool includeScaleUncertainty
 
 mi : `~lsst.afw.image.MaskedImage`
 
 median_variance : `float`
 

Variable Documentation

◆ destBBox

lsst.pipe.tasks.make_direct_warp.destBBox : `~lsst.geom.Box2I` | None

Definition at line 516 of file make_direct_warp.py.

◆ exp

lsst.pipe.tasks.make_direct_warp.exp : `~lsst.afw.image.Exposure`
if self.config.doPreWarpInterpolation:
    self.preWarpInterpolation.run(exposure.maskedImage)

self._apply_all_calibrations(
    exposure,
    old_background,
    new_background,
    logger=self.log,
    visit_summary=visit_summary,
    includeScaleUncertainty=self.config.includeCalibVar,
)
with self.timer("warp"):
    warped_exposure = warper.warpExposure(
        target_wcs,
        exposure,
        maxBBox=maxBBox,
        destBBox=destBBox,
    )

# Potentially a post-warp interpolation here? Relies on DM-38630.

return warped_exposure

@staticmethod
def _apply_all_calibrations(
exp: Exposure,
old_background,
new_background,
logger,
visit_summary: ExposureCatalog | None = None,
includeScaleUncertainty: bool = False,
) -> None:

Definition at line 570 of file make_direct_warp.py.

◆ exposure

lsst.pipe.tasks.make_direct_warp.exposure : `~lsst.afw.image.Exposure`
target_bbox, target_wcs = sky_info.bbox, sky_info.wcs

# Initialize the objects that will hold the warp.
final_warp = ExposureF(target_bbox, target_wcs)

exposures = inputs["calexp_list"]
background_revert_list = inputs.get("background_revert_list", [None] * len(exposures))
background_apply_list = inputs.get("background_apply_list", [None] * len(exposures))

visit_id = exposures[0].dataId["visit"]

# The warpExposure routine is expensive, and we do not want to call
# it twice (i.e., a second time for PSF-matched warps). We do not
# want to hold all the warped exposures in memory at once either.
# So we create empty exposure(s) to accumulate the warps of each type,
# and then process each detector serially.
final_warp = self._prepareEmptyExposure(sky_info)
final_masked_fraction_warp = self._prepareEmptyExposure(sky_info)
final_noise_warps = {
    n_noise: self._prepareEmptyExposure(sky_info)
    for n_noise in range(self.config.numberOfNoiseRealizations)
}

# We need a few bookkeeping variables only for the science coadd.
totalGoodPixels = 0
inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
    visit_id,
    len(exposures),
)

for index, (calexp_ref, old_background, new_background) in enumerate(
    zip(exposures, background_revert_list, background_apply_list, strict=True)
):
    dataId = calexp_ref.dataId
    self.log.debug(
        "Warping exposure %d/%d for id=%s",
        index + 1,
        len(exposures),
        dataId,
    )
    calexp = calexp_ref.get()

    # Generate noise image(s) in-situ.
    seed = self.get_seed_from_data_id(dataId)
    rng = np.random.RandomState(seed + self.config.seedOffset)

    # Generate noise images in-situ.
    noise_calexps = self.make_noise_exposures(calexp, rng)

    # Warp the PSF before processing nad overwriting exposure.
    xyTransform = makeWcsPairTransform(calexp.getWcs(), target_wcs)
    psfWarped = WarpedPsf(calexp.getPsf(), xyTransform)

    warpedExposure = self.process(
        calexp,
        target_wcs,
        self.warper,
        old_background,
        new_background,
        visit_summary,
        destBBox=target_bbox,
    )
    warpedExposure.setPsf(psfWarped)

    # Accumulate the partial warps in an online fashion.
    nGood = copyGoodPixels(
        final_warp.maskedImage,
        warpedExposure.maskedImage,
        final_warp.mask.getPlaneBitMask(["NO_DATA"]),
    )
    ccdId = self.config.idGenerator.apply(dataId).catalog_id
    inputRecorder.addCalExp(calexp, ccdId, nGood)
    totalGoodPixels += nGood

    # Obtain the masked fraction exposure and warp it.
    if self.config.doPreWarpInterpolation:
        badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
    else:
        badMaskPlanes = []
    masked_fraction_exp = self._get_bad_mask(calexp, badMaskPlanes)
    masked_fraction_warp = self.maskedFractionWarper.warpExposure(
        target_wcs, masked_fraction_exp, destBBox=target_bbox
    )
    copyGoodPixels(
        final_masked_fraction_warp.maskedImage,
        masked_fraction_warp.maskedImage,
        final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]),
    )

    # Process and accumulate noise images.
    for n_noise in range(self.config.numberOfNoiseRealizations):
        noise_calexp = noise_calexps[n_noise]
        warpedNoise = self.process(
            noise_calexp,
            target_wcs,
            self.warper,
            old_background,
            new_background,
            visit_summary,
            destBBox=target_bbox,
        )
        copyGoodPixels(
            final_noise_warps[n_noise].maskedImage,
            warpedNoise.maskedImage,
            final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]),
        )

# Finish the inputRecorder and add the coaddPsf to the final warp.
if totalGoodPixels > 0:
    inputRecorder.finish(final_warp, totalGoodPixels)

coaddPsf = CoaddPsf(
    inputRecorder.coaddInputs.ccds,
    sky_info.wcs,
    self.config.coaddPsf.makeControl(),
)

final_warp.setPsf(coaddPsf)
final_warp.setFilter(calexp.getFilter())
final_warp.getInfo().setVisitInfo(calexp.getInfo().getVisitInfo())

results = Struct(
    warp=final_warp,
    masked_fraction_warp=final_masked_fraction_warp.image,
)

for noise_index, noise_exposure in final_noise_warps.items():
    setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)

return results

def process(
self,
exposure,
target_wcs,
warper,
old_background=None,
new_background=None,
visit_summary=None,
maxBBox=None,
destBBox=None,
):

Definition at line 499 of file make_direct_warp.py.

◆ includeScaleUncertainty

bool lsst.pipe.tasks.make_direct_warp.includeScaleUncertainty

Definition at line 582 of file make_direct_warp.py.

◆ inputs

lsst.pipe.tasks.make_direct_warp.inputs : `Mapping`
ConfigClass = MakeDirectWarpConfig
_DefaultName = "makeDirectWarp"

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.makeSubtask("inputRecorder")
    self.makeSubtask("preWarpInterpolation")

    self.warper = Warper.fromConfig(self.config.warper)
    self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    # Docstring inherited.

    # Read in all inputs.
    inputs = butlerQC.get(inputRefs)

    if not inputs["calexp_list"]:
        raise NoWorkFound("No input warps provided for co-addition")

    sky_map = inputs.pop("sky_map")

    quantumDataId = butlerQC.quantum.dataId
    sky_info = makeSkyInfo(
        sky_map,
        tractId=quantumDataId["tract"],
        patchId=quantumDataId["patch"],
    )

    visit_summary = inputs["visit_summary"] if self.config.useVisitSummaryPsf else None

    results = self.run(inputs, sky_info, visit_summary)
    butlerQC.put(results, outputRefs)

def run(self, inputs, sky_info, visit_summary):

Definition at line 327 of file make_direct_warp.py.

◆ logger

lsst.pipe.tasks.make_direct_warp.logger : `logging.Logger`

Definition at line 576 of file make_direct_warp.py.

◆ maxBBox

lsst.pipe.tasks.make_direct_warp.maxBBox : `~lsst.geom.Box2I` | None

Definition at line 513 of file make_direct_warp.py.

◆ median_variance

lsst.pipe.tasks.make_direct_warp.median_variance : `float`

Definition at line 676 of file make_direct_warp.py.

◆ mi

lsst.pipe.tasks.make_direct_warp.mi : `~lsst.afw.image.MaskedImage`
exp = ExposureF(sky_info.bbox, sky_info.wcs)
exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf)
return exp

@staticmethod
def compute_median_variance(mi: MaskedImage) -> float:

Definition at line 671 of file make_direct_warp.py.

◆ new_background

lsst.pipe.tasks.make_direct_warp.new_background : `~lsst.afw.image.Background` | None

Definition at line 507 of file make_direct_warp.py.

◆ old_background

lsst.pipe.tasks.make_direct_warp.old_background : `~lsst.afw.image.Background` | None

Definition at line 505 of file make_direct_warp.py.

◆ results

lsst.pipe.tasks.make_direct_warp.results : `~lsst.pipe.base.Struct`

Definition at line 343 of file make_direct_warp.py.

◆ sky_info

lsst.pipe.tasks.make_direct_warp.sky_info : `~lsst.pipe.base.Struct`
if not visit_summary:
    logger.debug("No visit summary provided.")
else:
    logger.debug("Updating calibration from visit summary.")

if old_background:
    exp.maskedImage += old_background.getImage()

if visit_summary:
    detector = exp.info.getDetector().getId()
    row = visit_summary.find(detector)

    if row is None:
        raise RuntimeError(f"Unexpectedly incomplete visit_summary: {detector=} is missing.")

    if photo_calib := row.getPhotoCalib():
        exp.setPhotoCalib(photo_calib)
    else:
        logger.warning(
            "No photometric calibration found in visit summary for detector = %s.",
            detector,
        )

    if wcs := row.getWcs():
        exp.setWcs(wcs)
    else:
        logger.warning("No WCS found in visit summary for detector = %s.", detector)

    if psf := row.getPsf():
        exp.setPsf(psf)
    else:
        logger.warning("No PSF found in visit summary for detector = %s.", detector)

    if apcorr_map := row.getApCorrMap():
        exp.setApCorrMap(apcorr_map)
    else:
        logger.warning(
            "No aperture correction map found in visit summary for detector = %s.",
            detector,
        )

if new_background:
    exp.maskedImage -= new_background.getImage()

# Calibrate the (masked) image.
# This should likely happen even if visit_summary is None.
photo_calib = exp.getPhotoCalib()
exp.maskedImage = photo_calib.calibrateImage(
    exp.maskedImage, includeScaleUncertainty=includeScaleUncertainty
)
exp.maskedImage /= photo_calib.getCalibrationMean()

# This method is copied from makeWarp.py
@classmethod
def _prepareEmptyExposure(cls, sky_info):

Definition at line 333 of file make_direct_warp.py.

◆ target_wcs

lsst.pipe.tasks.make_direct_warp.target_wcs : `~lsst.afw.geom.SkyWcs`

Definition at line 501 of file make_direct_warp.py.

◆ visit_summary

lsst.pipe.tasks.make_direct_warp.visit_summary : `~lsst.afw.table.ExposureCatalog` | None

Definition at line 336 of file make_direct_warp.py.

◆ warped_exposure

lsst.pipe.tasks.make_direct_warp.warped_exposure : `~lsst.afw.image.Exposure`

Definition at line 522 of file make_direct_warp.py.

◆ warper

lsst.pipe.tasks.make_direct_warp.warper : `~lsst.afw.math.Warper`

Definition at line 503 of file make_direct_warp.py.

◆ ways

lsst.pipe.tasks.make_direct_warp.ways :
    calexp_list = Input(
        doc="Input exposures to be interpolated and resampled onto a SkyMap "
            "projection/patch.",
        name="{calexpType}calexp",
        storageClass="ExposureF",
        dimensions=("instrument", "visit", "detector"),
        multiple=True,
        deferLoad=True,
    )
    background_revert_list = Input(
        doc="Background to be reverted (i.e., added back to the calexp). "
        "This connection is used only if doRevertOldBackground=False.",
        name="calexpBackground",
        storageClass="Background",
        dimensions=("instrument", "visit", "detector"),
        multiple=True,
    )
    background_apply_list = Input(
        doc="Background to be applied (subtracted from the calexp). "
        "This is used only if doApplyNewBackground=True.",
        name="skyCorr",
        storageClass="Background",
        dimensions=("instrument", "visit", "detector"),
        multiple=True,
    )
    visit_summary = Input(
        doc="Input visit-summary catalog with updated calibration objects.",
        name="finalVisitSummary",
        storageClass="ExposureCatalog",
        dimensions=("instrument", "visit"),
    )
    sky_map = Input(
        doc="Input definition of geometry/bbox and projection/wcs for warps.",
        name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
        storageClass="SkyMap",
        dimensions=("skymap",),
    )
    # Declare all possible outputs (except noise, which is configurable)
    warp = Output(
        doc="Output direct warped exposure produced by resampling calexps "
            "onto the skyMap patch geometry.",
        name="{coaddName}Coadd_directWarp",
        storageClass="ExposureF",
        dimensions=("tract", "patch", "skymap", "instrument", "visit"),
    )
    masked_fraction_warp = Output(
        doc="Output masked fraction warped exposure.",
        name="{coaddName}Coadd_directWarp_maskedFraction",
        storageClass="ImageF",
        dimensions=("tract", "patch", "skymap", "instrument", "visit"),
    )

    def __init__(self, *, config=None):
        super().__init__(config=config)
        if not config:
            return

        if not config.doRevertOldBackground:
            del self.background_revert_list
        if not config.doApplyNewBackground:
            del self.background_apply_list

        # Dynamically set output connections for noise images, depending on the
        # number of noise realization specified in the config.
        for n in range(config.numberOfNoiseRealizations):
            noise_warp = Output(
                doc=f"Output direct warped noise exposure ({n})",
                name=f"{config.connections.coaddName}Coadd_directWarp_noise{n}",
                # Store it as a MaskedImage to preserve the variance plane.
                storageClass="MaskedImageF",
                dimensions=("tract", "patch", "skymap", "instrument", "visit"),
            )
            setattr(self, f"noise_warp{n}", noise_warp)


class MakeDirectWarpConfig(
    PipelineTaskConfig,
    pipelineConnections=MakeDirectWarpConnections,
):
MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
numberOfNoiseRealizations = RangeField[int](
    doc="Number of noise realizations to simulate and persist.",
    default=1,
    min=0,
    max=MAX_NUMBER_OF_NOISE_REALIZATIONS,
    inclusiveMax=True,
)
seedOffset = Field[int](
    doc="Offset to the seed used for the noise realization. This can be "
        "used to create a different noise realization if the default ones "
        "are catastrophic, or for testing sensitivity to the noise.",
    default=0,
)
useMedianVariance = Field[bool](
    doc="Use the median of variance plane in the input calexp to generate "
        "noise realizations? If False, per-pixel variance will be used.",
    default=True,
)
doRevertOldBackground = Field[bool](
    doc="Revert the old backgrounds from the `background_revert_list` "
        "connection?",
    default=True,
)
doApplyNewBackground = Field[bool](
    doc="Apply the new backgrounds from the `background_apply_list` "
        "connection?",
    default=False,
)
useVisitSummaryPsf = Field[bool](
    doc="If True, use the PSF model and aperture corrections from the "
        "'visit_summary' connection. If False, use the PSF model and "
        "aperture corrections from the 'calexp' connection.",
    default=True,
)
doPreWarpInterpolation = Field[bool](
    doc="Interpolate over bad pixels before warping?",
    default=True,
)
preWarpInterpolation = ConfigurableField(
    doc="Interpolation task to use for pre-warping interpolation",
    target=CloughTocher2DInterpolateTask,
)
inputRecorder = ConfigurableField(
    doc="Subtask that helps fill CoaddInputs catalogs added to the final "
        "coadd",
    target=CoaddInputRecorderTask,
)
includeCalibVar = Field[bool](
    doc="Add photometric calibration variance to warp variance plane?",
    default=False,
)
matchingKernelSize = Field[int](
    doc="Size in pixels of matching kernel. Must be odd.",
    default=21,
    check=lambda x: x % 2 == 1,
)
warper = ConfigField(
    doc="Configuration for the warper that warps the image and noise",
    dtype=Warper.ConfigClass,
)
maskedFractionWarper = ConfigField(
    doc="Configuration for the warp that warps the mask fraction image",
    dtype=Warper.ConfigClass,
)
coaddPsf = ConfigField(
    doc="Configuration for CoaddPsf",
    dtype=CoaddPsfConfig,
)
idGenerator = DetectorVisitIdGeneratorConfig.make_field()

# Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig,
# but as properties instead of config fields.
@property
def bgSubtracted(self) -> bool:
    return not self.doRevertOldBackground

@bgSubtracted.setter
def bgSubtracted(self, value: bool) -> None:
    self.doRevertOldBackground = ~value

@property
def doApplySkyCorr(self) -> bool:
    return self.doApplyNewBackground

@doApplySkyCorr.setter
def doApplySkyCorr(self, value: bool) -> None:
    self.doApplyNewBackground = value

def setDefaults(self) -> None:
    super().setDefaults()
    self.warper.warpingKernelName = "lanczos3"
    self.maskedFractionWarper.warpingKernelName = "bilinear"


class MakeDirectWarpTask(PipelineTask):

Definition at line 277 of file make_direct_warp.py.