LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
Classes | Functions | Variables
lsst.pipe.tasks.postprocess Namespace Reference

Classes

class  WriteObjectTableConnections
 
class  TransformCatalogBaseConnections
 
class  TransformCatalogBaseConfig
 
class  TransformCatalogBaseTask
 
class  TransformObjectCatalogConnections
 

Functions

def flattenFilters (df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
 

Variables

 dflist
 

Function Documentation

◆ flattenFilters()

def lsst.pipe.tasks.postprocess.flattenFilters (   df,
  noDupCols = ['coord_ra', 'coord_dec'],
  camelCase = False,
  inputBands = None 
)
Flattens a dataframe with multilevel column index

Definition at line 43 of file postprocess.py.

43 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
44  """Flattens a dataframe with multilevel column index
45  """
46  newDf = pd.DataFrame()
47  # band is the level 0 index
48  dfBands = df.columns.unique(level=0).values
49  for band in dfBands:
50  subdf = df[band]
51  columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
52  newColumns = {c: columnFormat.format(band, c)
53  for c in subdf.columns if c not in noDupCols}
54  cols = list(newColumns.keys())
55  newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
56 
57  # Band must be present in the input and output or else column is all NaN:
58  presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
59  # Get the unexploded columns from any present band's partition
60  noDupDf = df[presentBands[0]][noDupCols]
61  newDf = pd.concat([noDupDf, newDf], axis=1)
62  return newDf
63 
64 
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
Definition: postprocess.py:43

Variable Documentation

◆ dflist

lsst.pipe.tasks.postprocess.dflist
_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig
RunnerClass = MergeSourcesRunner

# Names of table datasets to be merged
inputDatasets = ('forced_src', 'meas', 'ref')

# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = 'obj'

def __init__(self, butler=None, schema=None, **kwargs):
    # It is a shame that this class can't use the default init for CmdLineTask
    # But to do so would require its own special task runner, which is many
    # more lines of specialization, so this is how it is for now
    super().__init__(**kwargs)

def runDataRef(self, patchRefList):
catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
dataId = patchRefList[0].dataId
mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
self.write(patchRefList[0], ParquetTable(dataFrame=mergedCatalog))

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)

measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
            zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
                    zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}

catalogs = {}
for band in measDict.keys():
    catalogs[band] = {'meas': measDict[band]['meas'],
                      'forced_src': forcedSourceDict[band]['forced_src'],
                      'ref': inputs['inputCatalogRef']}
dataId = butlerQC.quantum.dataId
df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
outputs = pipeBase.Struct(outputCatalog=df)
butlerQC.put(outputs, outputRefs)

@classmethod
def _makeArgumentParser(cls):
return makeMergeArgumentParser(cls._DefaultName, cls.inputDatasets[0])

def readCatalog(self, patchRef):
band = patchRef.get(self.config.coaddName + "Coadd_filterLabel", immediate=True).bandLabel
catalogDict = {}
for dataset in self.inputDatasets:
    catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
    self.log.info("Read %d sources from %s for band %s: %s" %
                  (len(catalog), dataset, band, patchRef.dataId))
    catalogDict[dataset] = catalog
return band, catalogDict

def run(self, catalogs, tract, patch):
dfs = []
for filt, tableDict in catalogs.items():
    for dataset, table in tableDict.items():
        # Convert afwTable to pandas DataFrame
        df = table.asAstropy().to_pandas().set_index('id', drop=True)

        # Sort columns by name, to ensure matching schema among patches
        df = df.reindex(sorted(df.columns), axis=1)
        df['tractId'] = tract
        df['patchId'] = patch

        # Make columns a 3-level MultiIndex
        df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
                                               names=('dataset', 'band', 'column'))
        dfs.append(df)

catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog

def write(self, patchRef, catalog):
patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
# since the filter isn't actually part of the data ID for the dataset we're saving,
# it's confusing to see it in the log message, even if the butler simply ignores it.
mergeDataId = patchRef.dataId.copy()
del mergeDataId["filter"]
self.log.info("Wrote merged catalog: %s" % (mergeDataId,))

def writeMetadata(self, dataRefList):
pass


class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
                          dimensions=("instrument", "visit", "detector")):

catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Parquet format",
name="source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)


class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
                     pipelineConnections=WriteSourceTableConnections):
doApplyExternalPhotoCalib = pexConfig.Field(
dtype=bool,
default=False,
doc=("Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
     "generating Source Tables from older src tables which do not already have local calib columns")
)
doApplyExternalSkyWcs = pexConfig.Field(
dtype=bool,
default=False,
doc=("Add local WCS columns from the calexp.wcs? Should only set True if "
     "generating Source Tables from older src tables which do not already have local calib columns")
)


class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig

def runDataRef(self, dataRef):
    src = dataRef.get('src')
    if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
        src = self.addCalibColumns(src, dataRef)

    ccdVisitId = dataRef.get('ccdExposureId')
    result = self.run(src, ccdVisitId=ccdVisitId)
    dataRef.put(result.table, 'source')

def runQuantum(self, butlerQC, inputRefs, outputRefs):
    inputs = butlerQC.get(inputRefs)
    inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
    result = self.run(**inputs).table
    outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
    butlerQC.put(outputs, outputRefs)

def run(self, catalog, ccdVisitId=None):
self.log.info("Generating parquet table from src catalog %s", ccdVisitId)
df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
df['ccdVisitId'] = ccdVisitId
return pipeBase.Struct(table=ParquetTable(dataFrame=df))

def addCalibColumns(self, catalog, dataRef):

Definition at line 507 of file postprocess.py.