LSST Applications g0f08755f38+9c285cab97,g1635faa6d4+13f3999e92,g1653933729+a8ce1bb630,g1a0ca8cf93+bf6eb00ceb,g28da252d5a+0829b12dee,g29321ee8c0+5700dc9eac,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+fde0dd39b6,g5fbc88fb19+17cd334064,g781aacb6e4+a8ce1bb630,g80478fca09+55a9465950,g82479be7b0+d730eedb7d,g858d7b2824+9c285cab97,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+2a84bb7594,gacf8899fa4+c69c5206e8,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+9634bc57db,gcf0d15dbbd+4b7d09cae4,gda3e153d99+9c285cab97,gda6a2b7d83+4b7d09cae4,gdaeeff99f8+1711a396fd,ge2409df99d+5e831397f4,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+41c94011de,gf3fb38a9a8+8f07a9901b,gfb92a5be7c+9c285cab97,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Variables
lsst.pipe.tasks.make_direct_warp Namespace Reference

Classes

class  MakeDirectWarpConnections
 
class  WarpDetectorInputs
 

Variables

 ways :
 
 inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ]
 
 sky_info : `lsst.pipe.base.Struct`
 
 visit_summary : `~lsst.afw.table.ExposureCatalog`
 
 filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ]
 
 results : `~lsst.pipe.base.Struct`
 
 detector_inputs : `WarpDetectorInputs`
 
 target_wcs : `~lsst.afw.geom.SkyWcs`
 
 warper : `~lsst.afw.math.Warper`
 
 maxBBox : `~lsst.geom.Box2I` | None
 
 destBBox : `~lsst.geom.Box2I` | None
 
 warped_exposure : `~lsst.afw.image.Exposure` | None
 
bool includeScaleUncertainty
 
 success : `bool`
 
 exp : `lsst.afw.image.exposure.ExposureF`
 
 mi : `~lsst.afw.image.MaskedImage`
 
 median_variance : `float`
 

Variable Documentation

◆ destBBox

lsst.pipe.tasks.make_direct_warp.destBBox : `~lsst.geom.Box2I` | None

Definition at line 683 of file make_direct_warp.py.

◆ detector_inputs

lsst.pipe.tasks.make_direct_warp.detector_inputs : `WarpDetectorInputs`
if self.config.doSelectPreWarp:
    inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary)
    if not inputs:
        raise NoWorkFound("No input warps remain after selection for co-addition")

sky_info.bbox.grow(self.config.border)
target_bbox, target_wcs = sky_info.bbox, sky_info.wcs

# Initialize the objects that will hold the warp.
final_warp = ExposureF(target_bbox, target_wcs)

for _, warp_detector_input in inputs.items():
    visit_id = warp_detector_input.data_id["visit"]
    break  # Just need the visit id from any one of the inputs.

# The warpExposure routine is expensive, and we do not want to call
# it twice (i.e., a second time for PSF-matched warps). We do not
# want to hold all the warped exposures in memory at once either.
# So we create empty exposure(s) to accumulate the warps of each type,
# and then process each detector serially.
final_warp = self._prepareEmptyExposure(sky_info)
final_masked_fraction_warp = self._prepareEmptyExposure(sky_info)
final_noise_warps = {
    n_noise: self._prepareEmptyExposure(sky_info)
    for n_noise in range(self.config.numberOfNoiseRealizations)
}

# We need a few bookkeeping variables only for the science coadd.
totalGoodPixels = 0
inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
    visit_id,
    len(inputs),
)

for index, detector_inputs in enumerate(inputs.values()):
    self.log.debug(
        "Warping exposure %d/%d for id=%s",
        index + 1,
        len(inputs),
        detector_inputs.data_id,
    )

    input_exposure = detector_inputs.exposure
    # Generate noise image(s) in-situ.
    seed = self.get_seed_from_data_id(detector_inputs.data_id)
    rng = np.random.RandomState(seed + self.config.seedOffset)

    # Generate noise images in-situ.
    noise_calexps = self.make_noise_exposures(input_exposure, rng)

    warpedExposure = self.process(
        detector_inputs,
        target_wcs,
        self.warper,
        visit_summary,
        destBBox=target_bbox,
    )

    if warpedExposure is None:
        self.log.debug(
            "Skipping exposure %s because it could not be warped.", detector_inputs.data_id
        )
        continue

    if final_warp.photoCalib is not None:
        ratio = (
            final_warp.photoCalib.getInstFluxAtZeroMagnitude()
            / warpedExposure.photoCalib.getInstFluxAtZeroMagnitude()
        )
    else:
        ratio = 1

    self.log.debug("Scaling exposure %s by %f", detector_inputs.data_id, ratio)
    warpedExposure.maskedImage *= ratio

    # Accumulate the partial warps in an online fashion.
    nGood = copyGoodPixels(
        final_warp.maskedImage,
        warpedExposure.maskedImage,
        final_warp.mask.getPlaneBitMask(["NO_DATA"]),
    )

    if final_warp.photoCalib is None and nGood > 0:
        final_warp.setPhotoCalib(warpedExposure.photoCalib)

    ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id
    inputRecorder.addCalExp(input_exposure, ccdId, nGood)
    totalGoodPixels += nGood

    if self.config.doWarpMaskedFraction:
        # Obtain the masked fraction exposure and warp it.
        if self.config.doPreWarpInterpolation:
            badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
        else:
            badMaskPlanes = []
        masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes)

        masked_fraction_warp = self.maskedFractionWarper.warpExposure(
            target_wcs, masked_fraction_exp, destBBox=target_bbox
        )

        copyGoodPixels(
            final_masked_fraction_warp.maskedImage,
            masked_fraction_warp.maskedImage,
            final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]),
        )

    # Process and accumulate noise images.
    for n_noise in range(self.config.numberOfNoiseRealizations):
        noise_calexp = noise_calexps[n_noise]
        noise_pseudo_inputs = dataclasses.replace(
            detector_inputs,
            exposure_or_handle=noise_calexp,
            background_revert=BackgroundList(),
            background_apply=BackgroundList(),
        )
        warpedNoise = self.process(
            noise_pseudo_inputs,
            target_wcs,
            self.warper,
            visit_summary,
            destBBox=target_bbox,
        )

        warpedNoise.maskedImage *= ratio

        copyGoodPixels(
            final_noise_warps[n_noise].maskedImage,
            warpedNoise.maskedImage,
            final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]),
        )

# If there are no good pixels, return a Struct filled with None.
if totalGoodPixels == 0:
    results = Struct(
        warp=None,
        masked_fraction_warp=None,
    )
    for noise_index in range(self.config.numberOfNoiseRealizations):
        setattr(results, f"noise_warp{noise_index}", None)

    return results

# Finish the inputRecorder and add the coaddPsf to the final warp.
inputRecorder.finish(final_warp, totalGoodPixels)

coaddPsf = CoaddPsf(
    inputRecorder.coaddInputs.ccds,
    sky_info.wcs,
    self.config.coaddPsf.makeControl(),
)

final_warp.setPsf(coaddPsf)
for _, warp_detector_input in inputs.items():
    final_warp.setFilter(warp_detector_input.exposure.getFilter())
    final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo())
    break  # Just need the filter and visit info from any one of the inputs.

results = Struct(
    warp=final_warp,
)

if self.config.doWarpMaskedFraction:
    results.masked_fraction_warp = final_masked_fraction_warp.image

for noise_index, noise_exposure in final_noise_warps.items():
    setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)

return results

def process(
self,
detector_inputs: WarpDetectorInputs,
target_wcs,
warper,
visit_summary=None,
maxBBox=None,
destBBox=None,
) -> ExposureF | None:
input_exposure = detector_inputs.exposure
if self.config.doPreWarpInterpolation:
    self.preWarpInterpolation.run(input_exposure.maskedImage)

success = self._apply_all_calibrations(
    detector_inputs,
    visit_summary=visit_summary,
    includeScaleUncertainty=self.config.includeCalibVar,
)

if not success:
    return None

with self.timer("warp"):
    warped_exposure = warper.warpExposure(
        target_wcs,
        input_exposure,
        maxBBox=maxBBox,
        destBBox=destBBox,
    )

# Potentially a post-warp interpolation here? Relies on DM-38630.

return warped_exposure

def _apply_all_calibrations(
self,
detector_inputs: WarpDetectorInputs,
*,
visit_summary: ExposureCatalog | None = None,
includeScaleUncertainty: bool = False,
) -> bool:

Definition at line 669 of file make_direct_warp.py.

◆ exp

lsst.pipe.tasks.make_direct_warp.exp : `lsst.afw.image.exposure.ExposureF`

Definition at line 856 of file make_direct_warp.py.

◆ filtered_inputs

lsst.pipe.tasks.make_direct_warp.filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ]

Definition at line 432 of file make_direct_warp.py.

◆ includeScaleUncertainty

bool lsst.pipe.tasks.make_direct_warp.includeScaleUncertainty

Definition at line 747 of file make_direct_warp.py.

◆ inputs

lsst.pipe.tasks.make_direct_warp.inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ]
ConfigClass = MakeDirectWarpConfig
_DefaultName = "makeDirectWarp"

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.makeSubtask("inputRecorder")
    self.makeSubtask("preWarpInterpolation")
    if self.config.doSelectPreWarp:
        self.makeSubtask("select")

    self.warper = Warper.fromConfig(self.config.warper)
    if self.config.doWarpMaskedFraction:
        self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    # Docstring inherited.

    inputs: Mapping[int, WarpDetectorInputs] = {}  # Detector ID -> WarpDetectorInputs
    for handle in butlerQC.get(inputRefs.calexp_list):
        inputs[handle.dataId["detector"]] = WarpDetectorInputs(
            exposure_or_handle=handle,
            data_id=handle.dataId,
        )

    if not inputs:
        raise NoWorkFound("No input warps provided for co-addition")

    # Add backgrounds to the inputs struct, being careful not to assume
    # they're present for the same detectors we got image handles for, in
    # case of upstream errors.
    for ref in getattr(inputRefs, "background_revert_list", []):
        inputs[ref.dataId["detector"]].background_revert = butlerQC.get(ref)
    for ref in getattr(inputRefs, "background_apply_list", []):
        inputs[ref.dataId["detector"]].background_apply = butlerQC.get(ref)

    visit_summary = butlerQC.get(inputRefs.visit_summary)
    sky_map = butlerQC.get(inputRefs.sky_map)

    quantumDataId = butlerQC.quantum.dataId
    sky_info = makeSkyInfo(
        sky_map,
        tractId=quantumDataId["tract"],
        patchId=quantumDataId["patch"],
    )

    results = self.run(inputs, sky_info, visit_summary=visit_summary)
    butlerQC.put(results, outputRefs)

def _preselect_inputs(
    self,
    inputs: Mapping[int, WarpDetectorInputs],
    sky_info: Struct,
    visit_summary: ExposureCatalog,
) -> dict[int, WarpDetectorInputs]:
data_id_list, bbox_list, wcs_list = [], [], []
for detector_id, detector_inputs in inputs.items():
    row = visit_summary.find(detector_id)
    if row is None:
        raise RuntimeError(f"Unexpectedly incomplete visit_summary: {detector_id=} is missing.")
    data_id_list.append(detector_inputs.data_id)
    bbox_list.append(row.getBBox())
    wcs_list.append(row.getWcs())

cornerPosList = Box2D(sky_info.bbox).getCorners()
coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList]

good_indices = self.select.run(
    bboxList=bbox_list,
    wcsList=wcs_list,
    visitSummary=visit_summary,
    coordList=coordList,
    dataIds=data_id_list,
)
detector_ids = list(inputs.keys())
good_detector_ids = [detector_ids[i] for i in good_indices]
return {detector_id: inputs[detector_id] for detector_id in good_detector_ids}

def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct:

Definition at line 423 of file make_direct_warp.py.

◆ maxBBox

lsst.pipe.tasks.make_direct_warp.maxBBox : `~lsst.geom.Box2I` | None

Definition at line 680 of file make_direct_warp.py.

◆ median_variance

lsst.pipe.tasks.make_direct_warp.median_variance : `float`

Definition at line 874 of file make_direct_warp.py.

◆ mi

lsst.pipe.tasks.make_direct_warp.mi : `~lsst.afw.image.MaskedImage`
exp = ExposureF(sky_info.bbox, sky_info.wcs)
exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf)
return exp

@staticmethod
def compute_median_variance(mi: MaskedImage) -> float:

Definition at line 869 of file make_direct_warp.py.

◆ results

lsst.pipe.tasks.make_direct_warp.results : `~lsst.pipe.base.Struct`

Definition at line 475 of file make_direct_warp.py.

◆ sky_info

lsst.pipe.tasks.make_direct_warp.sky_info : `lsst.pipe.base.Struct`
if self.config.doRevertOldBackground:
    detector_inputs.revert_background()
elif detector_inputs.background_revert:
    # This could get trigged only if runQuantum is skipped and run is
    # called directly.
    raise RuntimeError(
        f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert."
    )

input_exposure = detector_inputs.exposure
if visit_summary is not None:
    detector = input_exposure.info.getDetector().getId()
    row = visit_summary.find(detector)

    if row is None:
        self.log.info(
            "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.",
            detector,
        )
        return False

    if photo_calib := row.getPhotoCalib():
        input_exposure.setPhotoCalib(photo_calib)
    else:
        self.log.info(
            "No photometric calibration found in visit summary for detector = %s. Skipping it.",
            detector,
        )
        return False

    if wcs := row.getWcs():
        input_exposure.setWcs(wcs)
    else:
        self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector)
        return False

    if self.config.useVisitSummaryPsf:
        if psf := row.getPsf():
            input_exposure.setPsf(psf)
        else:
            self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector)
            return False

        if apcorr_map := row.getApCorrMap():
            input_exposure.setApCorrMap(apcorr_map)
        else:
            self.log.info(
                "No aperture correction map found in visit summary for detector = %s. Skipping it",
                detector,
            )
            return False

    elif visit_summary is not None:
        # We can only get here by calling `run`, not `runQuantum`.
        raise RuntimeError(
            "useVisitSummaryPsf=True, but visit_summary is provided. "
        )

if self.config.doApplyNewBackground:
    detector_inputs.apply_background()
elif detector_inputs.background_apply:
    # This could get trigged only if runQuantum is skipped and run is
    # called directly.
    raise RuntimeError(
        f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply."
    )

# Calibrate the (masked) image.
# This should likely happen even if visit_summary is None.
photo_calib = input_exposure.photoCalib
input_exposure.maskedImage = photo_calib.calibrateImage(
    input_exposure.maskedImage,
    includeScaleUncertainty=includeScaleUncertainty,
)
input_exposure.maskedImage /= photo_calib.getCalibrationMean()

return True

# This method is copied from makeWarp.py
@classmethod
def _prepareEmptyExposure(cls, sky_info):

Definition at line 425 of file make_direct_warp.py.

◆ success

lsst.pipe.tasks.make_direct_warp.success : `bool`

Definition at line 754 of file make_direct_warp.py.

◆ target_wcs

lsst.pipe.tasks.make_direct_warp.target_wcs : `~lsst.afw.geom.SkyWcs`

Definition at line 672 of file make_direct_warp.py.

◆ visit_summary

lsst.pipe.tasks.make_direct_warp.visit_summary : `~lsst.afw.table.ExposureCatalog`

Definition at line 427 of file make_direct_warp.py.

◆ warped_exposure

lsst.pipe.tasks.make_direct_warp.warped_exposure : `~lsst.afw.image.Exposure` | None

Definition at line 689 of file make_direct_warp.py.

◆ warper

lsst.pipe.tasks.make_direct_warp.warper : `~lsst.afw.math.Warper`

Definition at line 674 of file make_direct_warp.py.

◆ ways

lsst.pipe.tasks.make_direct_warp.ways :
calexp_list = Input(
doc="Input exposures to be interpolated and resampled onto a SkyMap "
"projection/patch.",
name="calexp",
storageClass="ExposureF",
dimensions=("instrument", "visit", "detector"),
multiple=True,
deferLoad=True,
)
background_revert_list = Input(
doc="Background to be reverted (i.e., added back to the calexp). "
"This connection is used only if doRevertOldBackground=False.",
name="calexpBackground",
storageClass="Background",
dimensions=("instrument", "visit", "detector"),
multiple=True,
)
background_apply_list = Input(
doc="Background to be applied (subtracted from the calexp). "
"This is used only if doApplyNewBackground=True.",
name="skyCorr",
storageClass="Background",
dimensions=("instrument", "visit", "detector"),
multiple=True,
)
visit_summary = Input(
doc="Input visit-summary catalog with updated calibration objects.",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit"),
)
sky_map = Input(
doc="Input definition of geometry/bbox and projection/wcs for warps.",
name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
storageClass="SkyMap",
dimensions=("skymap",),
)
# Declare all possible outputs (except noise, which is configurable)
warp = Output(
doc="Output direct warped exposure produced by resampling calexps "
"onto the skyMap patch geometry.",
name="{coaddName}Coadd_directWarp",
storageClass="ExposureF",
dimensions=("tract", "patch", "skymap", "instrument", "visit"),
)
masked_fraction_warp = Output(
doc="Output masked fraction warped exposure.",
name="{coaddName}Coadd_directWarp_maskedFraction",
storageClass="ImageF",
dimensions=("tract", "patch", "skymap", "instrument", "visit"),
)

def __init__(self, *, config=None):
super().__init__(config=config)
if not config:
return

if not config.doRevertOldBackground:
del self.background_revert_list
if not config.doApplyNewBackground:
del self.background_apply_list

if not config.doWarpMaskedFraction:
del self.masked_fraction_warp

# Dynamically set output connections for noise images, depending on the
# number of noise realization specified in the config.
for n in range(config.numberOfNoiseRealizations):
noise_warp = Output(
    doc=f"Output direct warped noise exposure ({n})",
    name=f"{config.connections.coaddName}Coadd_directWarp_noise{n}",
    # Store it as a MaskedImage to preserve the variance plane.
    storageClass="MaskedImageF",
    dimensions=("tract", "patch", "skymap", "instrument", "visit"),
)
setattr(self, f"noise_warp{n}", noise_warp)


class MakeDirectWarpConfig(
PipelineTaskConfig,
pipelineConnections=MakeDirectWarpConnections,
):
MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
numberOfNoiseRealizations = RangeField[int](
    doc="Number of noise realizations to simulate and persist.",
    default=0,
    min=0,
    max=MAX_NUMBER_OF_NOISE_REALIZATIONS,
    inclusiveMax=True,
)
seedOffset = Field[int](
    doc="Offset to the seed used for the noise realization. This can be "
        "used to create a different noise realization if the default ones "
        "are catastrophic, or for testing sensitivity to the noise.",
    default=0,
)
useMedianVariance = Field[bool](
    doc="Use the median of variance plane in the input calexp to generate "
        "noise realizations? If False, per-pixel variance will be used.",
    default=True,
)
doRevertOldBackground = Field[bool](
    doc="Revert the old backgrounds from the `background_revert_list` "
        "connection?",
    default=False,
)
doApplyNewBackground = Field[bool](
    doc="Apply the new backgrounds from the `background_apply_list` "
        "connection?",
    default=False,
)
useVisitSummaryPsf = Field[bool](
    doc="If True, use the PSF model and aperture corrections from the "
        "'visit_summary' connection to make the warp. If False, use the "
        "PSF model and aperture corrections from the 'calexp' connection.",
    default=True,
)
doSelectPreWarp = Field[bool](
    doc="Select ccds before warping?",
    default=True,
)
select = ConfigurableField(
    doc="Image selection subtask.",
    target=PsfWcsSelectImagesTask,
)
doPreWarpInterpolation = Field[bool](
    doc="Interpolate over bad pixels before warping?",
    default=False,
)
preWarpInterpolation = ConfigurableField(
    doc="Interpolation task to use for pre-warping interpolation",
    target=CloughTocher2DInterpolateTask,
)
inputRecorder = ConfigurableField(
    doc="Subtask that helps fill CoaddInputs catalogs added to the final "
        "coadd",
    target=CoaddInputRecorderTask,
)
includeCalibVar = Field[bool](
    doc="Add photometric calibration variance to warp variance plane?",
    default=False,
)
border = Field[int](
    doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later",
    default=256,
)
warper = ConfigField(
    doc="Configuration for the warper that warps the image and noise",
    dtype=Warper.ConfigClass,
)
doWarpMaskedFraction = Field[bool](
    doc="Warp the masked fraction image?",
    default=False,
)
maskedFractionWarper = ConfigField(
    doc="Configuration for the warp that warps the mask fraction image",
    dtype=Warper.ConfigClass,
)
coaddPsf = ConfigField(
    doc="Configuration for CoaddPsf",
    dtype=CoaddPsfConfig,
)
idGenerator = DetectorVisitIdGeneratorConfig.make_field()

# Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig,
# but as properties instead of config fields.
@property
def bgSubtracted(self) -> bool:
    return not self.doRevertOldBackground

@bgSubtracted.setter
def bgSubtracted(self, value: bool) -> None:
    self.doRevertOldBackground = not value

@property
def doApplySkyCorr(self) -> bool:
    return self.doApplyNewBackground

@doApplySkyCorr.setter
def doApplySkyCorr(self, value: bool) -> None:
    self.doApplyNewBackground = value

def setDefaults(self) -> None:
    super().setDefaults()
    self.warper.warpingKernelName = "lanczos3"
    self.warper.cacheSize = 0
    self.maskedFractionWarper.warpingKernelName = "bilinear"


class MakeDirectWarpTask(PipelineTask):

Definition at line 354 of file make_direct_warp.py.