_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig
RunnerClass = MergeSourcesRunner
# Names of table datasets to be merged
inputDatasets = ('forced_src', 'meas', 'ref')
# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = 'obj'
def __init__(self, butler=None, schema=None, **kwargs):
# It is a shame that this class can't use the default init for CmdLineTask
# But to do so would require its own special task runner, which is many
# more lines of specialization, so this is how it is for now
super().__init__(**kwargs)
def runDataRef(self, patchRefList):
catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
dataId = patchRefList[0].dataId
mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
self.write(patchRefList[0], ParquetTable(dataFrame=mergedCatalog))
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
catalogs = {}
for band in measDict.keys():
catalogs[band] = {'meas': measDict[band]['meas'],
'forced_src': forcedSourceDict[band]['forced_src'],
'ref': inputs['inputCatalogRef']}
dataId = butlerQC.quantum.dataId
df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
outputs = pipeBase.Struct(outputCatalog=df)
butlerQC.put(outputs, outputRefs)
@classmethod
def _makeArgumentParser(cls):
return makeMergeArgumentParser(cls._DefaultName, cls.inputDatasets[0])
def readCatalog(self, patchRef):
band = patchRef.get(self.config.coaddName + "Coadd_filterLabel", immediate=True).bandLabel
catalogDict = {}
for dataset in self.inputDatasets:
catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
self.log.info("Read %d sources from %s for band %s: %s" %
(len(catalog), dataset, band, patchRef.dataId))
catalogDict[dataset] = catalog
return band, catalogDict
def run(self, catalogs, tract, patch):
dfs = []
for filt, tableDict in catalogs.items():
for dataset, table in tableDict.items():
# Convert afwTable to pandas DataFrame
df = table.asAstropy().to_pandas().set_index('id', drop=True)
# Sort columns by name, to ensure matching schema among patches
df = df.reindex(sorted(df.columns), axis=1)
df['tractId'] = tract
df['patchId'] = patch
# Make columns a 3-level MultiIndex
df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
names=('dataset', 'band', 'column'))
dfs.append(df)
catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog
def write(self, patchRef, catalog):
patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
# since the filter isn't actually part of the data ID for the dataset we're saving,
# it's confusing to see it in the log message, even if the butler simply ignores it.
mergeDataId = patchRef.dataId.copy()
del mergeDataId["filter"]
self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
def writeMetadata(self, dataRefList):
pass
class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):
catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Parquet format",
name="source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)
class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=WriteSourceTableConnections):
doApplyExternalPhotoCalib = pexConfig.Field(
dtype=bool,
default=False,
doc=("Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
"generating Source Tables from older src tables which do not already have local calib columns")
)
doApplyExternalSkyWcs = pexConfig.Field(
dtype=bool,
default=False,
doc=("Add local WCS columns from the calexp.wcs? Should only set True if "
"generating Source Tables from older src tables which do not already have local calib columns")
)
class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig
def runDataRef(self, dataRef):
src = dataRef.get('src')
if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
src = self.addCalibColumns(src, dataRef)
ccdVisitId = dataRef.get('ccdExposureId')
result = self.run(src, ccdVisitId=ccdVisitId)
dataRef.put(result.table, 'source')
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
result = self.run(**inputs).table
outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
butlerQC.put(outputs, outputRefs)
def run(self, catalog, ccdVisitId=None):
self.log.info("Generating parquet table from src catalog %s", ccdVisitId)
df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
df['ccdVisitId'] = ccdVisitId
return pipeBase.Struct(table=ParquetTable(dataFrame=df))
def addCalibColumns(self, catalog, dataRef):
Definition at line 507 of file postprocess.py.