LSST Applications g0f08755f38+9c285cab97,g1635faa6d4+13f3999e92,g1653933729+a8ce1bb630,g1a0ca8cf93+bf6eb00ceb,g28da252d5a+0829b12dee,g29321ee8c0+5700dc9eac,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+fde0dd39b6,g5fbc88fb19+17cd334064,g781aacb6e4+a8ce1bb630,g80478fca09+55a9465950,g82479be7b0+d730eedb7d,g858d7b2824+9c285cab97,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+2a84bb7594,gacf8899fa4+c69c5206e8,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+9634bc57db,gcf0d15dbbd+4b7d09cae4,gda3e153d99+9c285cab97,gda6a2b7d83+4b7d09cae4,gdaeeff99f8+1711a396fd,ge2409df99d+5e831397f4,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+41c94011de,gf3fb38a9a8+8f07a9901b,gfb92a5be7c+9c285cab97,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
Classes | Functions | Variables
lsst.pipe.tasks.postprocess Namespace Reference

Classes

class  for
 
class  TransformObjectCatalogConnections
 
class  WriteObjectTableConnections
 

Functions

 flattenFilters (df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)
 

Variables

 log = logging.getLogger(__name__)
 
 catalogs : `dict`
 
int tract
 
str patch
 
 catalog : `pandas.DataFrame`
 
 result : `~lsst.pipe.base.Struct`
 
 exposure : `lsst.afw.image.exposure.Exposure`
 
 detectorId : `int`
 
 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
 
 newCat : `lsst.afw.table.SourceCatalog`
 
 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
 
 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
 
 filt : `str`, optional
 
 flags : `list`, optional
 
 refFlags : `list`, optional
 
 forcedFlags : `list`, optional
 
 funcs : `~lsst.pipe.tasks.functors.Functor`
 
 inplace
 
 True
 
 drop
 
 primaryKey
 
 df
 
 analysis
 
 visit : `int`
 
 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
 
 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
 
 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
 

Function Documentation

◆ flattenFilters()

lsst.pipe.tasks.postprocess.flattenFilters ( df,
noDupCols = ["coord_ra", "coord_dec"],
camelCase = False,
inputBands = None )
Flattens a dataframe with multilevel column index.

Definition at line 59 of file postprocess.py.

59def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
60 """Flattens a dataframe with multilevel column index.
61 """
62 newDf = pd.DataFrame()
63 # band is the level 0 index
64 dfBands = df.columns.unique(level=0).values
65 for band in dfBands:
66 subdf = df[band]
67 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
68 newColumns = {c: columnFormat.format(band, c)
69 for c in subdf.columns if c not in noDupCols}
70 cols = list(newColumns.keys())
71 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
72
73 # Band must be present in the input and output or else column is all NaN:
74 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
75 # Get the unexploded columns from any present band's partition
76 noDupDf = df[presentBands[0]][noDupCols]
77 newDf = pd.concat([noDupDf, newDf], axis=1)
78 return newDf
79
80

Variable Documentation

◆ analysis

lsst.pipe.tasks.postprocess.analysis

Definition at line 751 of file postprocess.py.

◆ catalog

lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame`
dfs = []
for filt, tableDict in catalogs.items():
    for dataset, table in tableDict.items():
        # Convert afwTable to pandas DataFrame
        df = table.asAstropy().to_pandas().set_index("id", drop=True)

        # Sort columns by name, to ensure matching schema among patches
        df = df.reindex(sorted(df.columns), axis=1)
        df = df.assign(tractId=tract, patchId=patch)

        # Make columns a 3-level MultiIndex
        df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
                                               names=("dataset", "band", "column"))
        dfs.append(df)

# We do this dance and not `pd.concat(dfs)` because the pandas
# concatenation uses infinite memory.
catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog


class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
                          defaultTemplates={"catalogType": ""},
                          dimensions=("instrument", "visit", "detector")):

catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="{catalogType}src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is "
    "replaced with an index; all other columns are unchanged.",
name="{catalogType}source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)


class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
                     pipelineConnections=WriteSourceTableConnections):
pass


class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)
    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]
    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def run(self, catalog, visit, detector, **kwargs):
if visitSummary is not None:
    row = visitSummary.find(detectorId)
    if row is None:
        raise RuntimeError(f"Visit summary for detector {detectorId} is unexpectedly missing.")
    if (photoCalib := row.getPhotoCalib()) is None:
        self.log.warning("Detector id %s has None for photoCalib in visit summary; "
                         "skipping reevaluation of photoCalib.", detectorId)
        exposure.setPhotoCalib(None)
    else:
        exposure.setPhotoCalib(photoCalib)
    if (skyWcs := row.getWcs()) is None:
        self.log.warning("Detector id %s has None for skyWcs in visit summary; "
                         "skipping reevaluation of skyWcs.", detectorId)
        exposure.setWcs(None)
    else:
        exposure.setWcs(skyWcs)

return exposure

def addCalibColumns(self, catalog, exposure, **kwargs):

Definition at line 167 of file postprocess.py.

◆ catalogs

lsst.pipe.tasks.postprocess.catalogs : `dict`
_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig

# Names of table datasets to be merged
inputDatasets = ("forced_src", "meas", "ref")

# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = "obj"

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    measDict = {ref.dataId["band"]: {"meas": cat} for ref, cat in
                zip(inputRefs.inputCatalogMeas, inputs["inputCatalogMeas"])}
    forcedSourceDict = {ref.dataId["band"]: {"forced_src": cat} for ref, cat in
                        zip(inputRefs.inputCatalogForcedSrc, inputs["inputCatalogForcedSrc"])}

    catalogs = {}
    for band in measDict.keys():
        catalogs[band] = {"meas": measDict[band]["meas"],
                          "forced_src": forcedSourceDict[band]["forced_src"],
                          "ref": inputs["inputCatalogRef"]}
    dataId = butlerQC.quantum.dataId
    df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
    outputs = pipeBase.Struct(outputCatalog=df)
    butlerQC.put(outputs, outputRefs)

def run(self, catalogs, tract, patch):

Definition at line 158 of file postprocess.py.

◆ dataRefs

lsst.pipe.tasks.postprocess.dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`

Definition at line 1076 of file postprocess.py.

◆ detectorId

lsst.pipe.tasks.postprocess.detectorId : `int`

Definition at line 317 of file postprocess.py.

◆ df

lsst.pipe.tasks.postprocess.df

Definition at line 750 of file postprocess.py.

◆ drop

lsst.pipe.tasks.postprocess.drop

Definition at line 746 of file postprocess.py.

◆ exposure

lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure`
self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
df = catalog.asAstropy().to_pandas().set_index("id", drop=True)
df["visit"] = visit
# int16 instead of uint8 because databases don't like unsigned bytes.
df["detector"] = np.int16(detector)

return pipeBase.Struct(table=df)


class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
                                      defaultTemplates={"catalogType": ""},
                                      dimensions=("instrument", "visit", "detector", "skymap")):
visitSummary = connectionTypes.Input(
doc="Input visit-summary catalog with updated calibration objects.",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit",),
)


class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
                                 pipelineConnections=WriteRecalibratedSourceTableConnections):

doReevaluatePhotoCalib = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local photoCalib columns"),
)
doReevaluateSkyWcs = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
)


class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable"
ConfigClass = WriteRecalibratedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)

    inputs["visit"] = butlerQC.quantum.dataId["visit"]
    inputs["detector"] = butlerQC.quantum.dataId["detector"]

    if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
        exposure = ExposureF()
        inputs["exposure"] = self.prepareCalibratedExposure(
            exposure=exposure,
            visitSummary=inputs["visitSummary"],
            detectorId=butlerQC.quantum.dataId["detector"]
        )
        inputs["catalog"] = self.addCalibColumns(**inputs)

    result = self.run(**inputs)
    outputs = pipeBase.Struct(outputCatalog=result.table)
    butlerQC.put(outputs, outputRefs)

def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):

Definition at line 315 of file postprocess.py.

◆ filt

lsst.pipe.tasks.postprocess.filt : `str`, optional

Definition at line 467 of file postprocess.py.

◆ flags

lsst.pipe.tasks.postprocess.flags : `list`, optional

Definition at line 472 of file postprocess.py.

◆ forcedFlags

lsst.pipe.tasks.postprocess.forcedFlags : `list`, optional

Definition at line 479 of file postprocess.py.

◆ funcs

lsst.pipe.tasks.postprocess.funcs : `~lsst.pipe.tasks.functors.Functor`

Definition at line 704 of file postprocess.py.

◆ functors

lsst.pipe.tasks.postprocess.functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`

Definition at line 460 of file postprocess.py.

◆ handles

lsst.pipe.tasks.postprocess.handles : `~lsst.daf.butler.DeferredDatasetHandle` or
measureConfig = SingleFrameMeasurementTask.ConfigClass()
measureConfig.doReplaceWithNoise = False

# Clear all slots, because we aren't running the relevant plugins.
for slot in measureConfig.slots:
    setattr(measureConfig.slots, slot, None)

measureConfig.plugins.names = []
if self.config.doReevaluateSkyWcs:
    measureConfig.plugins.names.add("base_LocalWcs")
    self.log.info("Re-evaluating base_LocalWcs plugin")
if self.config.doReevaluatePhotoCalib:
    measureConfig.plugins.names.add("base_LocalPhotoCalib")
    self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
pluginsNotToCopy = tuple(measureConfig.plugins.names)

# Create a new schema and catalog
# Copy all columns from original except for the ones to reevaluate
aliasMap = catalog.schema.getAliasMap()
mapper = afwTable.SchemaMapper(catalog.schema)
for item in catalog.schema:
    if not item.field.getName().startswith(pluginsNotToCopy):
        mapper.addMapping(item.key)

schema = mapper.getOutputSchema()
measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
schema.setAliasMap(aliasMap)
newCat = afwTable.SourceCatalog(schema)
newCat.extend(catalog, mapper=mapper)

# Fluxes in sourceCatalogs are in counts, so there are no fluxes to
# update here. LocalPhotoCalibs are applied during transform tasks.
# Update coord_ra/coord_dec, which are expected to be positions on the
# sky and are used as such in sdm tables without transform
if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
    afwTable.updateSourceCoords(exposure.wcs, newCat)
    wcsPlugin = measurement.plugins["base_LocalWcs"]
else:
    wcsPlugin = None

if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
    pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
else:
    pcPlugin = None

for row in newCat:
    if wcsPlugin is not None:
        wcsPlugin.measure(row, exposure)
    if pcPlugin is not None:
        pcPlugin.measure(row, exposure)

return newCat


class PostprocessAnalysis(object):

Definition at line 456 of file postprocess.py.

◆ inplace

lsst.pipe.tasks.postprocess.inplace

Definition at line 746 of file postprocess.py.

◆ log

lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__)

Definition at line 56 of file postprocess.py.

◆ newCat

lsst.pipe.tasks.postprocess.newCat : `lsst.afw.table.SourceCatalog`

Definition at line 367 of file postprocess.py.

◆ patch

str lsst.pipe.tasks.postprocess.patch

Definition at line 162 of file postprocess.py.

◆ primaryKey

lsst.pipe.tasks.postprocess.primaryKey

Definition at line 747 of file postprocess.py.

◆ refFlags

lsst.pipe.tasks.postprocess.refFlags : `list`, optional

Definition at line 476 of file postprocess.py.

◆ result

lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct`

Definition at line 245 of file postprocess.py.

◆ tract

int lsst.pipe.tasks.postprocess.tract

Definition at line 160 of file postprocess.py.

◆ True

lsst.pipe.tasks.postprocess.True

Definition at line 746 of file postprocess.py.

◆ visit

lsst.pipe.tasks.postprocess.visit : `int`
_DefaultName = "transformObjectCatalog"
ConfigClass = TransformObjectCatalogConfig

def run(self, handle, funcs=None, dataId=None, band=None):
    # NOTE: band kwarg is ignored here.
    dfDict = {}
    analysisDict = {}
    templateDf = pd.DataFrame()

    columns = handle.get(component="columns")
    inputBands = columns.unique(level=1).values

    outputBands = self.config.outputBands if self.config.outputBands else inputBands

    # Perform transform for data of filters that exist in the handle dataframe.
    for inputBand in inputBands:
        if inputBand not in outputBands:
            self.log.info("Ignoring %s band data in the input", inputBand)
            continue
        self.log.info("Transforming the catalog of band %s", inputBand)
        result = self.transform(inputBand, handle, funcs, dataId)
        dfDict[inputBand] = result.df
        analysisDict[inputBand] = result.analysis
        if templateDf.empty:
            templateDf = result.df

    # Put filler values in columns of other wanted bands
    for filt in outputBands:
        if filt not in dfDict:
            self.log.info("Adding empty columns for band %s", filt)
            dfTemp = templateDf.copy()
            for col in dfTemp.columns:
                testValue = dfTemp[col].values[0]
                if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
                    # Boolean flag type, check if it is a "good" flag
                    if col in self.config.goodFlags:
                        fillValue = False
                    else:
                        fillValue = True
                elif isinstance(testValue, numbers.Integral):
                    # Checking numbers.Integral catches all flavors
                    # of python, numpy, pandas, etc. integers.
                    # We must ensure this is not an unsigned integer.
                    if isinstance(testValue, np.unsignedinteger):
                        raise ValueError("Parquet tables may not have unsigned integer columns.")
                    else:
                        fillValue = self.config.integerFillValue
                else:
                    fillValue = self.config.floatFillValue
                dfTemp[col].values[:] = fillValue
            dfDict[filt] = dfTemp

    # This makes a multilevel column index, with band as first level
    df = pd.concat(dfDict, axis=1, names=["band", "column"])

    if not self.config.multilevelOutput:
        noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
        if self.config.primaryKey in noDupCols:
            noDupCols.remove(self.config.primaryKey)
        if dataId and self.config.columnsFromDataId:
            noDupCols += self.config.columnsFromDataId
        df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
                            inputBands=inputBands)

    self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))

    return df


class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
                                    dimensions=("tract", "skymap")):
inputCatalogs = connectionTypes.Input(
    doc="Per-Patch objectTables conforming to the standard data model.",
    name="objectTable",
    storageClass="DataFrame",
    dimensions=("tract", "patch", "skymap"),
    multiple=True,
)
outputCatalog = connectionTypes.Output(
    doc="Pre-tract horizontal concatenation of the input objectTables",
    name="objectTable_tract",
    storageClass="DataFrame",
    dimensions=("tract", "skymap"),
)


class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
                               pipelineConnections=ConsolidateObjectTableConnections):
coaddName = pexConfig.Field(
    dtype=str,
    default="deep",
    doc="Name of coadd"
)


class ConsolidateObjectTableTask(pipeBase.PipelineTask):
_DefaultName = "consolidateObjectTable"
ConfigClass = ConsolidateObjectTableConfig

inputDataset = "objectTable"
outputDataset = "objectTable_tract"

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)
    self.log.info("Concatenating %s per-patch Object Tables",
                  len(inputs["inputCatalogs"]))
    df = pd.concat(inputs["inputCatalogs"])
    butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)


class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
                                  defaultTemplates={"catalogType": ""},
                                  dimensions=("instrument", "visit", "detector")):

inputCatalog = connectionTypes.Input(
    doc="Wide input catalog of sources produced by WriteSourceTableTask",
    name="{catalogType}source",
    storageClass="DataFrame",
    dimensions=("instrument", "visit", "detector"),
    deferLoad=True
)
outputCatalog = connectionTypes.Output(
    doc="Narrower, per-detector Source Table transformed and converted per a "
        "specified set of functors",
    name="{catalogType}sourceTable",
    storageClass="DataFrame",
    dimensions=("instrument", "visit", "detector")
)


class TransformSourceTableConfig(TransformCatalogBaseConfig,
                             pipelineConnections=TransformSourceTableConnections):

def setDefaults(self):
    super().setDefaults()
    self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
    self.primaryKey = "sourceId"
    self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]


class TransformSourceTableTask(TransformCatalogBaseTask):
_DefaultName = "transformSourceTable"
ConfigClass = TransformSourceTableConfig


class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
                                     dimensions=("instrument", "visit",),
                                     defaultTemplates={"calexpType": ""}):
calexp = connectionTypes.Input(
    doc="Processed exposures used for metadata",
    name="calexp",
    storageClass="ExposureF",
    dimensions=("instrument", "visit", "detector"),
    deferLoad=True,
    multiple=True,
)
visitSummary = connectionTypes.Output(
    doc=("Per-visit consolidated exposure metadata.  These catalogs use "
         "detector id for the id and are sorted for fast lookups of a "
         "detector."),
    name="visitSummary",
    storageClass="ExposureCatalog",
    dimensions=("instrument", "visit"),
)
visitSummarySchema = connectionTypes.InitOutput(
    doc="Schema of the visitSummary catalog",
    name="visitSummary_schema",
    storageClass="ExposureCatalog",
)


class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
                                pipelineConnections=ConsolidateVisitSummaryConnections):
pass


class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
_DefaultName = "consolidateVisitSummary"
ConfigClass = ConsolidateVisitSummaryConfig

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.schema = afwTable.ExposureTable.makeMinimalSchema()
    self.schema.addField("visit", type="L", doc="Visit number")
    self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
    self.schema.addField("band", type="String", size=32, doc="Name of band")
    ExposureSummaryStats.update_schema(self.schema)
    self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    dataRefs = butlerQC.get(inputRefs.calexp)
    visit = dataRefs[0].dataId["visit"]

    self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
                   len(dataRefs), visit)

    expCatalog = self._combineExposureMetadata(visit, dataRefs)

    butlerQC.put(expCatalog, outputRefs.visitSummary)

def _combineExposureMetadata(self, visit, dataRefs):

Definition at line 1074 of file postprocess.py.

◆ visitSummaries

lsst.pipe.tasks.postprocess.visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
ccdEntries = []
for visitSummaryRef in visitSummaryRefs:
    visitSummary = visitSummaryRef.get()
    visitInfo = visitSummary[0].getVisitInfo()

    ccdEntry = {}
    summaryTable = visitSummary.asAstropy()
    selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
                     "pixelScale", "zenithDistance",
                     "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
                     "astromOffsetMean", "astromOffsetStd", "nPsfStar",
                     "psfStarDeltaE1Median", "psfStarDeltaE2Median",
                     "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
                     "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
                     "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
                     "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
                     "maxDistToNearestPsf",
                     "effTime", "effTimePsfSigmaScale",
                     "effTimeSkyBgScale", "effTimeZeroPointScale",
                     "magLim"]
    ccdEntry = summaryTable[selectColumns].to_pandas().set_index("id")
    # 'visit' is the human readable visit number.
    # 'visitId' is the key to the visitId table. They are the same.
    # Technically you should join to get the visit from the visit
    # table.
    ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})

    # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
    # compatibility. To be removed after September 2023.
    ccdEntry["decl"] = ccdEntry.loc[:, "dec"]

    ccdEntry["ccdVisitId"] = [
        self.config.idGenerator.apply(
            visitSummaryRef.dataId,
            detector=detector_id,
            is_exposure=False,
        ).catalog_id  # The "catalog ID" here is the ccdVisit ID
                      # because it's usually the ID for a whole catalog
                      # with a {visit, detector}, and that's the main
                      # use case for IdGenerator.  This usage for a
                      # summary table is rare.
        for detector_id in summaryTable["id"]
    ]
    ccdEntry["detector"] = summaryTable["id"]
    ccdEntry["seeing"] = (
        visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
    )
    ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
    ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
    ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
    ccdEntry["obsStart"] = (
        ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=ccdEntry["expTime"].values[0])
    )
    expTime_days = ccdEntry["expTime"] / (60*60*24)
    ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
    ccdEntry["darkTime"] = visitInfo.getDarkTime()
    ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
    ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
    ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
    ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
    ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
    ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
    ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
    ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
    ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
    ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
    # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
    # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
    # values are actually wanted.
    ccdEntries.append(ccdEntry)

outputCatalog = pd.concat(ccdEntries)
outputCatalog.set_index("ccdVisitId", inplace=True, verify_integrity=True)
return pipeBase.Struct(outputCatalog=outputCatalog)


class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
                        dimensions=("instrument",),
                        defaultTemplates={"calexpType": ""}):
visitSummaries = connectionTypes.Input(
doc="Per-visit consolidated exposure metadata",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit",),
multiple=True,
deferLoad=True,
)
outputCatalog = connectionTypes.Output(
doc="Visit metadata table",
name="visitTable",
storageClass="DataFrame",
dimensions=("instrument",)
)


class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
                   pipelineConnections=MakeVisitTableConnections):
pass


class MakeVisitTableTask(pipeBase.PipelineTask):
_DefaultName = "makeVisitTable"
ConfigClass = MakeVisitTableConfig

def run(self, visitSummaries):

Definition at line 1325 of file postprocess.py.

◆ visitSummary

lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional

Definition at line 319 of file postprocess.py.

◆ visitSummaryRefs

lsst.pipe.tasks.postprocess.visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
cat = afwTable.ExposureCatalog(self.schema)
cat.resize(len(dataRefs))

cat["visit"] = visit

for i, dataRef in enumerate(dataRefs):
    visitInfo = dataRef.get(component="visitInfo")
    filterLabel = dataRef.get(component="filter")
    summaryStats = dataRef.get(component="summaryStats")
    detector = dataRef.get(component="detector")
    wcs = dataRef.get(component="wcs")
    photoCalib = dataRef.get(component="photoCalib")
    detector = dataRef.get(component="detector")
    bbox = dataRef.get(component="bbox")
    validPolygon = dataRef.get(component="validPolygon")

    rec = cat[i]
    rec.setBBox(bbox)
    rec.setVisitInfo(visitInfo)
    rec.setWcs(wcs)
    rec.setPhotoCalib(photoCalib)
    rec.setValidPolygon(validPolygon)

    rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
    rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
    rec.setId(detector.getId())
    summaryStats.update_record(rec)

metadata = dafBase.PropertyList()
metadata.add("COMMENT", "Catalog id is detector id, sorted.")
# We are looping over existing datarefs, so the following is true
metadata.add("COMMENT", "Only detectors with data have entries.")
cat.setMetadata(metadata)

cat.sort()
return cat


class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
                                defaultTemplates={"catalogType": ""},
                                dimensions=("instrument", "visit")):
inputCatalogs = connectionTypes.Input(
doc="Input per-detector Source Tables",
name="{catalogType}sourceTable",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
multiple=True
)
outputCatalog = connectionTypes.Output(
doc="Per-visit concatenation of Source Table",
name="{catalogType}sourceTable_visit",
storageClass="DataFrame",
dimensions=("instrument", "visit")
)


class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
                           pipelineConnections=ConsolidateSourceTableConnections):
pass


class ConsolidateSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "consolidateSourceTable"
ConfigClass = ConsolidateSourceTableConfig

inputDataset = "sourceTable"
outputDataset = "sourceTable_visit"

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    from .makeWarp import reorderRefs

    detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
    detectorOrder.sort()
    inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
    inputs = butlerQC.get(inputRefs)
    self.log.info("Concatenating %s per-detector Source Tables",
                  len(inputs["inputCatalogs"]))
    df = pd.concat(inputs["inputCatalogs"])
    butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)


class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
                               dimensions=("instrument",),
                               defaultTemplates={"calexpType": ""}):
visitSummaryRefs = connectionTypes.Input(
    doc="Data references for per-visit consolidated exposure metadata",
    name="finalVisitSummary",
    storageClass="ExposureCatalog",
    dimensions=("instrument", "visit"),
    multiple=True,
    deferLoad=True,
)
outputCatalog = connectionTypes.Output(
    doc="CCD and Visit metadata table",
    name="ccdVisitTable",
    storageClass="DataFrame",
    dimensions=("instrument",)
)


class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
                          pipelineConnections=MakeCcdVisitTableConnections):
idGenerator = DetectorVisitIdGeneratorConfig.make_field()


class MakeCcdVisitTableTask(pipeBase.PipelineTask):
_DefaultName = "makeCcdVisitTable"
ConfigClass = MakeCcdVisitTableConfig

def run(self, visitSummaryRefs):

Definition at line 1202 of file postprocess.py.