Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+a777afbe81,g07dc498a13+7e3c5f68a2,g12483e3c20+0145ec33cd,g1409bbee79+7e3c5f68a2,g1a7e361dbc+7e3c5f68a2,g1fd858c14a+9f35e23ec3,g35bb328faa+fcb1d3bbc8,g3ad4f90e5c+0145ec33cd,g3bd4b5ce2c+cbf1bea503,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g5477a8d5ce+db04660fe6,g60b5630c4e+0145ec33cd,g623d845a50+0145ec33cd,g6f0c2978f1+3526b51a37,g75b6c65c88+d54b601591,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+4639f750a5,g89139ef638+7e3c5f68a2,g9125e01d80+fcb1d3bbc8,g919ac25b3e+6220c5324a,g95236ca021+f7a31438ed,g989de1cb63+7e3c5f68a2,g9f33ca652e+2d6fa11d35,gaaedd4e678+7e3c5f68a2,gabe3b4be73+1e0a283bba,gb1101e3267+4a428ef779,gb4a253aaf5+0122250889,gb58c049af0+f03b321e39,gc99c83e5f0+76d20ab76d,gcf25f946ba+4639f750a5,gd6cbbdb0b4+c8606af20c,gde0f65d7ad+3d8a3b7e46,ge278dab8ac+932305ba37,gf795337580+03b96afe58,gfba249425e+fcb1d3bbc8,w.2025.08
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
lsst.pipe.tasks.postprocess Namespace Reference

Classes

class  for
 
class  TableVStack
 
class  TransformObjectCatalogConnections
 
class  WriteObjectTableConnections
 

Functions

 flattenFilters (df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)
 

Variables

 log = logging.getLogger(__name__)
 
 catalogs : `dict`
 
str catalog : `pandas.DataFrame`
 
 result : `~lsst.pipe.base.Struct`
 
 exposure : `lsst.afw.image.exposure.Exposure`
 
 detectorId : `int`
 
 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
 
 newCat : `lsst.afw.table.SourceCatalog`
 
 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
 
 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
 
 filt : `str`, optional
 
 flags : `list`, optional
 
 refFlags : `list`, optional
 
 forcedFlags : `list`, optional
 
 funcs : `~lsst.pipe.tasks.functors.Functor`
 
 inplace
 
 True
 
 drop
 
 primaryKey
 
 df
 
 analysis
 

Function Documentation

◆ flattenFilters()

lsst.pipe.tasks.postprocess.flattenFilters ( df,
noDupCols = ["coord_ra", "coord_dec"],
camelCase = False,
inputBands = None )
Flattens a dataframe with multilevel column index.

Definition at line 66 of file postprocess.py.

66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
68 """
69 newDf = pd.DataFrame()
70 # band is the level 0 index
71 dfBands = df.columns.unique(level=0).values
72 for band in dfBands:
73 subdf = df[band]
74 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c in subdf.columns if c not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
79
80 # Band must be present in the input and output or else column is all NaN:
81 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
82 # Get the unexploded columns from any present band's partition
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
85 return newDf
86
87

Variable Documentation

◆ analysis

lsst.pipe.tasks.postprocess.analysis

Definition at line 881 of file postprocess.py.

◆ catalog

lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame`
dfs = []
for filt, tableDict in catalogs.items():
    for dataset, table in tableDict.items():
        # Convert afwTable to pandas DataFrame if needed
        if isinstance(table, pd.DataFrame):
            df = table
        elif isinstance(table, afwTable.SourceCatalog):
            df = table.asAstropy().to_pandas()
        elif isinstance(table, astropy.table.Table):
            df = table.to_pandas()
        else:
            raise ValueError(f"{dataset=} has unsupported {type(table)=}")
        df.set_index("id", drop=True, inplace=True)

        # Sort columns by name, to ensure matching schema among patches
        df = df.reindex(sorted(df.columns), axis=1)
        df = df.assign(tractId=tract, patchId=patch)

        # Make columns a 3-level MultiIndex
        df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
                                               names=("dataset", "band", "column"))
        dfs.append(df)

# We do this dance and not `pd.concat(dfs)` because the pandas
# concatenation uses infinite memory.
catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog


class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
                          defaultTemplates={"catalogType": ""},
                          dimensions=("instrument", "visit", "detector")):

catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="{catalogType}src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Astropy/Parquet format.  Columns are unchanged.",
name="{catalogType}source",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector")
)


class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
                     pipelineConnections=WriteSourceTableConnections):
pass


class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)
    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]
    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def run(self, catalog, visit, detector, **kwargs):
if visitSummary is not None:
    row = visitSummary.find(detectorId)
    if row is None:
        raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
    if (photoCalib := row.getPhotoCalib()) is None:
        self.log.warning("Detector id %s has None for photoCalib in visit summary; "
                         "skipping reevaluation of photoCalib.", detectorId)
        exposure.setPhotoCalib(None)
    else:
        exposure.setPhotoCalib(photoCalib)
    if (skyWcs := row.getWcs()) is None:
        self.log.warning("Detector id %s has None for skyWcs in visit summary; "
                         "skipping reevaluation of skyWcs.", detectorId)
        exposure.setWcs(None)
    else:
        exposure.setWcs(skyWcs)

return exposure

def addCalibColumns(self, catalog, exposure, **kwargs):

Definition at line 273 of file postprocess.py.

◆ catalogs

lsst.pipe.tasks.postprocess.catalogs : `dict`
_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig

# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = "obj"

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    catalogs = defaultdict(dict)
    for dataset, connection in (
        ("meas", "inputCatalogMeas"),
        ("forced_src", "inputCatalogForcedSrc"),
        ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
    ):
        for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
            catalogs[ref.dataId["band"]][dataset] = cat

    dataId = butlerQC.quantum.dataId
    df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
    outputs = pipeBase.Struct(outputCatalog=df)
    butlerQC.put(outputs, outputRefs)

def run(self, catalogs, tract, patch):

Definition at line 264 of file postprocess.py.

◆ detectorId

lsst.pipe.tasks.postprocess.detectorId : `int`

Definition at line 445 of file postprocess.py.

◆ df

lsst.pipe.tasks.postprocess.df

Definition at line 880 of file postprocess.py.

◆ drop

lsst.pipe.tasks.postprocess.drop

Definition at line 876 of file postprocess.py.

◆ exposure

lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure`
self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
tbl = catalog.asAstropy()
tbl["visit"] = visit
# int16 instead of uint8 because databases don't like unsigned bytes.
tbl["detector"] = np.int16(detector)

return pipeBase.Struct(table=tbl)


class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
                                      defaultTemplates={"catalogType": ""},
                                      dimensions=("instrument", "visit", "detector", "skymap")):
visitSummary = connectionTypes.Input(
doc="Input visit-summary catalog with updated calibration objects.",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit",),
)

def __init__(self, config):
# We don't want the input catalog here to be an initial existence
# constraint in QG generation, because that can unfortunately limit the
# set of data IDs of inputs to other tasks, even those that run earlier
# (e.g. updateVisitSummary), when the input 'src' catalog is not
# produced.  It's safer to just use 'visitSummary' existence as an
# initial constraint, and then let the graph prune out the detectors
# that don't have a 'src' for this task only.
self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)


class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
                                 pipelineConnections=WriteRecalibratedSourceTableConnections):

doReevaluatePhotoCalib = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local photoCalib columns"),
)
doReevaluateSkyWcs = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
)


class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable"
ConfigClass = WriteRecalibratedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]

    if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
        exposure = ExposureF()
        inputs["exposure"] = self.prepareCalibratedExposure(
            exposure=exposure,
            visitSummary=inputs["visitSummary"],
            detectorId=butlerQC.quantum.dataId["detector"]
        )
        inputs["catalog"] = self.addCalibColumns(**inputs)

    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):

Definition at line 443 of file postprocess.py.

◆ filt

lsst.pipe.tasks.postprocess.filt : `str`, optional

Definition at line 595 of file postprocess.py.

◆ flags

lsst.pipe.tasks.postprocess.flags : `list`, optional

Definition at line 600 of file postprocess.py.

◆ forcedFlags

lsst.pipe.tasks.postprocess.forcedFlags : `list`, optional

Definition at line 607 of file postprocess.py.

◆ funcs

lsst.pipe.tasks.postprocess.funcs : `~lsst.pipe.tasks.functors.Functor`

Definition at line 831 of file postprocess.py.

◆ functors

lsst.pipe.tasks.postprocess.functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`

Definition at line 588 of file postprocess.py.

◆ handles

lsst.pipe.tasks.postprocess.handles : `~lsst.daf.butler.DeferredDatasetHandle` or
measureConfig = SingleFrameMeasurementTask.ConfigClass()
measureConfig.doReplaceWithNoise = False

# Clear all slots, because we aren't running the relevant plugins.
for slot in measureConfig.slots:
    setattr(measureConfig.slots, slot, None)

measureConfig.plugins.names = []
if self.config.doReevaluateSkyWcs:
    measureConfig.plugins.names.add("base_LocalWcs")
    self.log.info("Re-evaluating base_LocalWcs plugin")
if self.config.doReevaluatePhotoCalib:
    measureConfig.plugins.names.add("base_LocalPhotoCalib")
    self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
pluginsNotToCopy = tuple(measureConfig.plugins.names)

# Create a new schema and catalog
# Copy all columns from original except for the ones to reevaluate
aliasMap = catalog.schema.getAliasMap()
mapper = afwTable.SchemaMapper(catalog.schema)
for item in catalog.schema:
    if not item.field.getName().startswith(pluginsNotToCopy):
        mapper.addMapping(item.key)

schema = mapper.getOutputSchema()
measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
schema.setAliasMap(aliasMap)
newCat = afwTable.SourceCatalog(schema)
newCat.extend(catalog, mapper=mapper)

# Fluxes in sourceCatalogs are in counts, so there are no fluxes to
# update here. LocalPhotoCalibs are applied during transform tasks.
# Update coord_ra/coord_dec, which are expected to be positions on the
# sky and are used as such in sdm tables without transform
if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
    afwTable.updateSourceCoords(exposure.wcs, newCat)
    wcsPlugin = measurement.plugins["base_LocalWcs"]
else:
    wcsPlugin = None

if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
    pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
else:
    pcPlugin = None

for row in newCat:
    if wcsPlugin is not None:
        wcsPlugin.measure(row, exposure)
    if pcPlugin is not None:
        pcPlugin.measure(row, exposure)

return newCat


class PostprocessAnalysis(object):

Definition at line 584 of file postprocess.py.

◆ inplace

lsst.pipe.tasks.postprocess.inplace

Definition at line 876 of file postprocess.py.

◆ log

lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__)

Definition at line 63 of file postprocess.py.

◆ newCat

lsst.pipe.tasks.postprocess.newCat : `lsst.afw.table.SourceCatalog`

Definition at line 495 of file postprocess.py.

◆ primaryKey

lsst.pipe.tasks.postprocess.primaryKey

Definition at line 877 of file postprocess.py.

◆ refFlags

lsst.pipe.tasks.postprocess.refFlags : `list`, optional

Definition at line 604 of file postprocess.py.

◆ result

lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct`

Definition at line 363 of file postprocess.py.

◆ True

lsst.pipe.tasks.postprocess.True

Definition at line 876 of file postprocess.py.

◆ visitSummary

lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional

Definition at line 447 of file postprocess.py.