_DefaultName = "transformObjectCatalog"
ConfigClass = TransformObjectCatalogConfig
# Used by Gen 2 runDataRef only:
inputDataset = 'deepCoadd_obj'
outputDataset = 'objectTable'
@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)
parser.add_id_argument("--id", cls.inputDataset,
ContainerClass=CoaddDataIdContainer,
help="data ID, e.g. --id tract=12345 patch=1,2")
return parser
def run(self, parq, funcs=None, dataId=None, band=None):
# NOTE: band kwarg is ignored here.
dfDict = {}
analysisDict = {}
templateDf = pd.DataFrame()
if isinstance(parq, DeferredDatasetHandle):
columns = parq.get(component='columns')
inputBands = columns.unique(level=1).values
else:
inputBands = parq.columnLevelNames['band']
outputBands = self.config.outputBands if self.config.outputBands else inputBands
# Perform transform for data of filters that exist in parq.
for inputBand in inputBands:
if inputBand not in outputBands:
self.log.info("Ignoring %s band data in the input", inputBand)
continue
self.log.info("Transforming the catalog of band %s", inputBand)
result = self.transform(inputBand, parq, funcs, dataId)
dfDict[inputBand] = result.df
analysisDict[inputBand] = result.analysis
if templateDf.empty:
templateDf = result.df
# Put filler values in columns of other wanted bands
for filt in outputBands:
if filt not in dfDict:
self.log.info("Adding empty columns for band %s", filt)
dfTemp = templateDf.copy()
for col in dfTemp.columns:
testValue = dfTemp[col].values[0]
if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
# Boolean flag type, check if it is a "good" flag
if col in self.config.goodFlags:
fillValue = False
else:
fillValue = True
elif isinstance(testValue, numbers.Integral):
# Checking numbers.Integral catches all flavors
# of python, numpy, pandas, etc. integers.
# We must ensure this is not an unsigned integer.
if isinstance(testValue, np.unsignedinteger):
raise ValueError("Parquet tables may not have unsigned integer columns.")
else:
fillValue = self.config.integerFillValue
else:
fillValue = self.config.floatFillValue
dfTemp[col].values[:] = fillValue
dfDict[filt] = dfTemp
# This makes a multilevel column index, with band as first level
df = pd.concat(dfDict, axis=1, names=['band', 'column'])
if not self.config.multilevelOutput:
noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
if self.config.primaryKey in noDupCols:
noDupCols.remove(self.config.primaryKey)
if dataId is not None:
noDupCols += list(dataId.keys())
df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
inputBands=inputBands)
self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
return df
class TractObjectDataIdContainer(CoaddDataIdContainer):
def makeDataRefList(self, namespace):
def getPatchRefList(tract):
return [namespace.butler.dataRef(datasetType=self.datasetType,
tract=tract.getId(),
patch="%d,%d" % patch.getIndex()) for patch in tract]
tractRefs = defaultdict(list) # Data references for each tract
for dataId in self.idList:
skymap = self.getSkymap(namespace)
if "tract" in dataId:
tractId = dataId["tract"]
if "patch" in dataId:
tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
tract=tractId,
patch=dataId['patch']))
else:
tractRefs[tractId] += getPatchRefList(skymap[tractId])
else:
tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
for tract in skymap)
outputRefList = []
for tractRefList in tractRefs.values():
existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
outputRefList.append(existingRefs)
self.refList = outputRefList
class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("tract", "skymap")):
inputCatalogs = connectionTypes.Input(
doc="Per-Patch objectTables conforming to the standard data model.",
name="objectTable",
storageClass="DataFrame",
dimensions=("tract", "patch", "skymap"),
multiple=True,
)
outputCatalog = connectionTypes.Output(
doc="Pre-tract horizontal concatenation of the input objectTables",
name="objectTable_tract",
storageClass="DataFrame",
dimensions=("tract", "skymap"),
)
class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateObjectTableConnections):
coaddName = pexConfig.Field(
dtype=str,
default="deep",
doc="Name of coadd"
)
class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
_DefaultName = "consolidateObjectTable"
ConfigClass = ConsolidateObjectTableConfig
inputDataset = 'objectTable'
outputDataset = 'objectTable_tract'
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
self.log.info("Concatenating %s per-patch Object Tables",
len(inputs['inputCatalogs']))
df = pd.concat(inputs['inputCatalogs'])
butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)
parser.add_id_argument("--id", cls.inputDataset,
help="data ID, e.g. --id tract=12345",
ContainerClass=TractObjectDataIdContainer)
return parser
def runDataRef(self, patchRefList):
df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
def writeMetadata(self, dataRef):
pass
class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
defaultTemplates={"catalogType": ""},
dimensions=("instrument", "visit", "detector")):
inputCatalog = connectionTypes.Input(
doc="Wide input catalog of sources produced by WriteSourceTableTask",
name="{catalogType}source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
deferLoad=True
)
outputCatalog = connectionTypes.Output(
doc="Narrower, per-detector Source Table transformed and converted per a "
"specified set of functors",
name="{catalogType}sourceTable",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)
class TransformSourceTableConfig(TransformCatalogBaseConfig,
pipelineConnections=TransformSourceTableConnections):
def setDefaults(self):
super().setDefaults()
self.primaryKey = 'sourceId'
class TransformSourceTableTask(TransformCatalogBaseTask):
_DefaultName = "transformSourceTable"
ConfigClass = TransformSourceTableConfig
inputDataset = 'source'
outputDataset = 'sourceTable'
@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)
parser.add_id_argument("--id", datasetType=cls.inputDataset,
level="sensor",
help="data ID, e.g. --id visit=12345 ccd=0")
return parser
def runDataRef(self, dataRef):
parq = dataRef.get()
funcs = self.getFunctors()
band = dataRef.get("calexp_filterLabel", immediate=True).bandLabel
df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
self.write(df, dataRef)
return df
class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit",),
defaultTemplates={"calexpType": ""}):
calexp = connectionTypes.Input(
doc="Processed exposures used for metadata",
name="{calexpType}calexp",
storageClass="ExposureF",
dimensions=("instrument", "visit", "detector"),
deferLoad=True,
multiple=True,
)
visitSummary = connectionTypes.Output(
doc=("Per-visit consolidated exposure metadata. These catalogs use "
"detector id for the id and are sorted for fast lookups of a "
"detector."),
name="{calexpType}visitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit"),
)
class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateVisitSummaryConnections):
pass
class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
_DefaultName = "consolidateVisitSummary"
ConfigClass = ConsolidateVisitSummaryConfig
@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)
parser.add_id_argument("--id", "calexp",
help="data ID, e.g. --id visit=12345",
ContainerClass=VisitDataIdContainer)
return parser
def writeMetadata(self, dataRef):
pass
def writeConfig(self, butler, clobber=False, doBackup=True):
pass
def runDataRef(self, dataRefList):
visit = dataRefList[0].dataId['visit']
self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
len(dataRefList), visit)
expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=False)
dataRefList[0].put(expCatalog, 'visitSummary', visit=visit)
def runQuantum(self, butlerQC, inputRefs, outputRefs):
dataRefs = butlerQC.get(inputRefs.calexp)
visit = dataRefs[0].dataId.byName()['visit']
self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
len(dataRefs), visit)
expCatalog = self._combineExposureMetadata(visit, dataRefs)
butlerQC.put(expCatalog, outputRefs.visitSummary)
def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
schema = self._makeVisitSummarySchema()
cat = afwTable.ExposureCatalog(schema)
cat.resize(len(dataRefs))
cat['visit'] = visit
for i, dataRef in enumerate(dataRefs):
if isGen3:
visitInfo = dataRef.get(component='visitInfo')
filterLabel = dataRef.get(component='filterLabel')
summaryStats = dataRef.get(component='summaryStats')
detector = dataRef.get(component='detector')
wcs = dataRef.get(component='wcs')
photoCalib = dataRef.get(component='photoCalib')
detector = dataRef.get(component='detector')
bbox = dataRef.get(component='bbox')
validPolygon = dataRef.get(component='validPolygon')
else:
# Note that we need to read the calexp because there is
# no magic access to the psf except through the exposure.
gen2_read_bbox = lsst.geom.BoxI(lsst.geom.PointI(0, 0), lsst.geom.PointI(1, 1))
exp = dataRef.get(datasetType='calexp_sub', bbox=gen2_read_bbox)
visitInfo = exp.getInfo().getVisitInfo()
filterLabel = dataRef.get("calexp_filterLabel")
summaryStats = exp.getInfo().getSummaryStats()
wcs = exp.getWcs()
photoCalib = exp.getPhotoCalib()
detector = exp.getDetector()
bbox = dataRef.get(datasetType='calexp_bbox')
validPolygon = exp.getInfo().getValidPolygon()
rec = cat[i]
rec.setBBox(bbox)
rec.setVisitInfo(visitInfo)
rec.setWcs(wcs)
rec.setPhotoCalib(photoCalib)
rec.setValidPolygon(validPolygon)
rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
rec.setId(detector.getId())
rec['psfSigma'] = summaryStats.psfSigma
rec['psfIxx'] = summaryStats.psfIxx
rec['psfIyy'] = summaryStats.psfIyy
rec['psfIxy'] = summaryStats.psfIxy
rec['psfArea'] = summaryStats.psfArea
rec['raCorners'][:] = summaryStats.raCorners
rec['decCorners'][:] = summaryStats.decCorners
rec['ra'] = summaryStats.ra
rec['decl'] = summaryStats.decl
rec['zenithDistance'] = summaryStats.zenithDistance
rec['zeroPoint'] = summaryStats.zeroPoint
rec['skyBg'] = summaryStats.skyBg
rec['skyNoise'] = summaryStats.skyNoise
rec['meanVar'] = summaryStats.meanVar
rec['astromOffsetMean'] = summaryStats.astromOffsetMean
rec['astromOffsetStd'] = summaryStats.astromOffsetStd
rec['nPsfStar'] = summaryStats.nPsfStar
rec['psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
rec['psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
rec['psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
rec['psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
rec['psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
rec['psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
rec['psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
metadata = dafBase.PropertyList()
metadata.add("COMMENT", "Catalog id is detector id, sorted.")
# We are looping over existing datarefs, so the following is true
metadata.add("COMMENT", "Only detectors with data have entries.")
cat.setMetadata(metadata)
cat.sort()
return cat
def _makeVisitSummarySchema(self):
schema = afwTable.ExposureTable.makeMinimalSchema()
schema.addField('visit', type='I', doc='Visit number')
schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
schema.addField('band', type='String', size=32, doc='Name of band')
schema.addField('psfSigma', type='F',
doc='PSF model second-moments determinant radius (center of chip) (pixel)')
schema.addField('psfArea', type='F',
doc='PSF model effective area (center of chip) (pixel**2)')
schema.addField('psfIxx', type='F',
doc='PSF model Ixx (center of chip) (pixel**2)')
schema.addField('psfIyy', type='F',
doc='PSF model Iyy (center of chip) (pixel**2)')
schema.addField('psfIxy', type='F',
doc='PSF model Ixy (center of chip) (pixel**2)')
schema.addField('raCorners', type='ArrayD', size=4,
doc='Right Ascension of bounding box corners (degrees)')
schema.addField('decCorners', type='ArrayD', size=4,
doc='Declination of bounding box corners (degrees)')
schema.addField('ra', type='D',
doc='Right Ascension of bounding box center (degrees)')
schema.addField('decl', type='D',
doc='Declination of bounding box center (degrees)')
schema.addField('zenithDistance', type='F',
doc='Zenith distance of bounding box center (degrees)')
schema.addField('zeroPoint', type='F',
doc='Mean zeropoint in detector (mag)')
schema.addField('skyBg', type='F',
doc='Average sky background (ADU)')
schema.addField('skyNoise', type='F',
doc='Average sky noise (ADU)')
schema.addField('meanVar', type='F',
doc='Mean variance of the weight plane (ADU**2)')
schema.addField('astromOffsetMean', type='F',
doc='Mean offset of astrometric calibration matches (arcsec)')
schema.addField('astromOffsetStd', type='F',
doc='Standard deviation of offsets of astrometric calibration matches (arcsec)')
schema.addField('nPsfStar', type='I', doc='Number of stars used for PSF model')
schema.addField('psfStarDeltaE1Median', type='F',
doc='Median E1 residual (starE1 - psfE1) for psf stars')
schema.addField('psfStarDeltaE2Median', type='F',
doc='Median E2 residual (starE2 - psfE2) for psf stars')
schema.addField('psfStarDeltaE1Scatter', type='F',
doc='Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
schema.addField('psfStarDeltaE2Scatter', type='F',
doc='Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
schema.addField('psfStarDeltaSizeMedian', type='F',
doc='Median size residual (starSize - psfSize) for psf stars (pixel)')
schema.addField('psfStarDeltaSizeScatter', type='F',
doc='Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
schema.addField('psfStarScaledDeltaSizeScatter', type='F',
doc='Scatter (via MAD) of size residual scaled by median size squared')
return schema
class VisitDataIdContainer(DataIdContainer):
Definition at line 1799 of file postprocess.py.