LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
|
Classes | |
class | PostprocessAnalysis |
class | TransformCatalogBaseConfig |
class | TransformCatalogBaseConnections |
class | TransformCatalogBaseTask |
class | TransformObjectCatalogConnections |
class | WriteObjectTableConnections |
Functions | |
flattenFilters (df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None) | |
Variables | |
log = logging.getLogger(__name__) | |
catalogs : `dict` | |
int | tract |
str | patch |
catalog : `pandas.DataFrame` | |
ccdVisitId : `int` | |
result : `~lsst.pipe.base.Struct` | |
exposure : `lsst.afw.image.exposure.Exposure` | |
detectorId : `int` | |
visitSummary : `lsst.afw.table.ExposureCatalog`, optional | |
names | |
pluginsNotToCopy = tuple(measureConfig.plugins.names) | |
aliasMap = catalog.schema.getAliasMap() | |
mapper = afwTable.SchemaMapper(catalog.schema) | |
schema = mapper.getOutputSchema() | |
measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema) | |
newCat = afwTable.SourceCatalog(schema) | |
wcsPlugin = measurement.plugins["base_LocalWcs"] | |
pcPlugin = measurement.plugins["base_LocalPhotoCalib"] | |
visit : `int` | |
dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` | |
visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` | |
visitSummaries : `list` of `lsst.afw.table.ExposureCatalog` | |
lsst.pipe.tasks.postprocess.flattenFilters | ( | df, | |
noDupCols = ['coord_ra', 'coord_dec'], | |||
camelCase = False, | |||
inputBands = None ) |
Flattens a dataframe with multilevel column index.
Definition at line 59 of file postprocess.py.
lsst.pipe.tasks.postprocess.aliasMap = catalog.schema.getAliasMap() |
Definition at line 405 of file postprocess.py.
lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame` |
dfs = [] for filt, tableDict in catalogs.items(): for dataset, table in tableDict.items(): # Convert afwTable to pandas DataFrame df = table.asAstropy().to_pandas().set_index('id', drop=True) # Sort columns by name, to ensure matching schema among patches df = df.reindex(sorted(df.columns), axis=1) df = df.assign(tractId=tract, patchId=patch) # Make columns a 3-level MultiIndex df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns], names=('dataset', 'band', 'column')) dfs.append(df) # We do this dance and not `pd.concat(dfs)` because the pandas # concatenation uses infinite memory. catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs) return catalog class WriteSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector")): catalog = connectionTypes.Input( doc="Input full-depth catalog of sources produced by CalibrateTask", name="{catalogType}src", storageClass="SourceCatalog", dimensions=("instrument", "visit", "detector") ) outputCatalog = connectionTypes.Output( doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is " "replaced with an index; all other columns are unchanged.", name="{catalogType}source", storageClass="DataFrame", dimensions=("instrument", "visit", "detector") ) class WriteSourceTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=WriteSourceTableConnections): idGenerator = DetectorVisitIdGeneratorConfig.make_field() class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable" ConfigClass = WriteSourceTableConfig def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) inputs['ccdVisitId'] = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id result = self.run(**inputs) outputs = pipeBase.Struct(outputCatalog=result.table) butlerQC.put(outputs, outputRefs) def run(self, catalog, ccdVisitId=None, **kwargs):
Definition at line 173 of file postprocess.py.
lsst.pipe.tasks.postprocess.catalogs : `dict` |
_DefaultName = "writeObjectTable" ConfigClass = WriteObjectTableConfig # Names of table datasets to be merged inputDatasets = ('forced_src', 'meas', 'ref') # Tag of output dataset written by `MergeSourcesTask.write` outputDataset = 'obj' def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])} forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])} catalogs = {} for band in measDict.keys(): catalogs[band] = {'meas': measDict[band]['meas'], 'forced_src': forcedSourceDict[band]['forced_src'], 'ref': inputs['inputCatalogRef']} dataId = butlerQC.quantum.dataId df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch']) outputs = pipeBase.Struct(outputCatalog=df) butlerQC.put(outputs, outputRefs) def run(self, catalogs, tract, patch):
Definition at line 164 of file postprocess.py.
lsst.pipe.tasks.postprocess.ccdVisitId : `int` |
Definition at line 241 of file postprocess.py.
lsst.pipe.tasks.postprocess.dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` |
Definition at line 1100 of file postprocess.py.
lsst.pipe.tasks.postprocess.detectorId : `int` |
Definition at line 332 of file postprocess.py.
lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure` |
self.log.info("Generating DataFrame from src catalog ccdVisitId=%s", ccdVisitId) df = catalog.asAstropy().to_pandas().set_index('id', drop=True) df['ccdVisitId'] = ccdVisitId return pipeBase.Struct(table=df) class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector", "skymap")): exposure = connectionTypes.Input( doc="Input exposure to perform photometry on.", name="calexp", storageClass="ExposureF", dimensions=["instrument", "visit", "detector"], # TODO: remove on DM-39584 deprecated=( "Deprecated, as the `calexp` is not needed and just creates unnecessary i/o. " "Will be removed after v26." ), ) visitSummary = connectionTypes.Input( doc="Input visit-summary catalog with updated calibration objects.", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit",), ) class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig, pipelineConnections=WriteRecalibratedSourceTableConnections): doReevaluatePhotoCalib = pexConfig.Field( dtype=bool, default=True, doc=("Add or replace local photoCalib columns"), ) doReevaluateSkyWcs = pexConfig.Field( dtype=bool, default=True, doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"), ) idGenerator = DetectorVisitIdGeneratorConfig.make_field() class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable" ConfigClass = WriteRecalibratedSourceTableConfig def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId) inputs['idGenerator'] = idGenerator inputs['ccdVisitId'] = idGenerator.catalog_id if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs: inputs['exposure'] = self.prepareCalibratedExposure( exposure=inputs["exposure"], visitSummary=inputs["visitSummary"], detectorId=butlerQC.quantum.dataId["detector"] ) inputs['catalog'] = self.addCalibColumns(**inputs) result = self.run(**inputs) outputs = pipeBase.Struct(outputCatalog=result.table) butlerQC.put(outputs, outputRefs) def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
Definition at line 330 of file postprocess.py.
lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__) |
Definition at line 56 of file postprocess.py.
lsst.pipe.tasks.postprocess.mapper = afwTable.SchemaMapper(catalog.schema) |
Definition at line 406 of file postprocess.py.
lsst.pipe.tasks.postprocess.measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema) |
Definition at line 412 of file postprocess.py.
lsst.pipe.tasks.postprocess.names |
if visitSummary is not None: row = visitSummary.find(detectorId) if row is None: raise RuntimeError(f"Visit summary for detector {detectorId} is unexpectedly missing.") if (photoCalib := row.getPhotoCalib()) is None: self.log.warning("Detector id %s has None for photoCalib in visit summary; " "skipping reevaluation of photoCalib.", detectorId) exposure.setPhotoCalib(None) else: exposure.setPhotoCalib(photoCalib) if (skyWcs := row.getWcs()) is None: self.log.warning("Detector id %s has None for skyWcs in visit summary; " "skipping reevaluation of skyWcs.", detectorId) exposure.setWcs(None) else: exposure.setWcs(skyWcs) return exposure def addCalibColumns(self, catalog, exposure, idGenerator, **kwargs):
Definition at line 394 of file postprocess.py.
lsst.pipe.tasks.postprocess.newCat = afwTable.SourceCatalog(schema) |
Definition at line 414 of file postprocess.py.
str lsst.pipe.tasks.postprocess.patch |
Definition at line 168 of file postprocess.py.
lsst.pipe.tasks.postprocess.pcPlugin = measurement.plugins["base_LocalPhotoCalib"] |
Definition at line 428 of file postprocess.py.
lsst.pipe.tasks.postprocess.pluginsNotToCopy = tuple(measureConfig.plugins.names) |
Definition at line 401 of file postprocess.py.
lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct` |
Definition at line 250 of file postprocess.py.
lsst.pipe.tasks.postprocess.schema = mapper.getOutputSchema() |
Definition at line 411 of file postprocess.py.
int lsst.pipe.tasks.postprocess.tract |
Definition at line 166 of file postprocess.py.
lsst.pipe.tasks.postprocess.visit : `int` |
_DefaultName = "transformObjectCatalog" ConfigClass = TransformObjectCatalogConfig def run(self, handle, funcs=None, dataId=None, band=None): # NOTE: band kwarg is ignored here. dfDict = {} analysisDict = {} templateDf = pd.DataFrame() columns = handle.get(component='columns') inputBands = columns.unique(level=1).values outputBands = self.config.outputBands if self.config.outputBands else inputBands # Perform transform for data of filters that exist in the handle dataframe. for inputBand in inputBands: if inputBand not in outputBands: self.log.info("Ignoring %s band data in the input", inputBand) continue self.log.info("Transforming the catalog of band %s", inputBand) result = self.transform(inputBand, handle, funcs, dataId) dfDict[inputBand] = result.df analysisDict[inputBand] = result.analysis if templateDf.empty: templateDf = result.df # Put filler values in columns of other wanted bands for filt in outputBands: if filt not in dfDict: self.log.info("Adding empty columns for band %s", filt) dfTemp = templateDf.copy() for col in dfTemp.columns: testValue = dfTemp[col].values[0] if isinstance(testValue, (np.bool_, pd.BooleanDtype)): # Boolean flag type, check if it is a "good" flag if col in self.config.goodFlags: fillValue = False else: fillValue = True elif isinstance(testValue, numbers.Integral): # Checking numbers.Integral catches all flavors # of python, numpy, pandas, etc. integers. # We must ensure this is not an unsigned integer. if isinstance(testValue, np.unsignedinteger): raise ValueError("Parquet tables may not have unsigned integer columns.") else: fillValue = self.config.integerFillValue else: fillValue = self.config.floatFillValue dfTemp[col].values[:] = fillValue dfDict[filt] = dfTemp # This makes a multilevel column index, with band as first level df = pd.concat(dfDict, axis=1, names=['band', 'column']) if not self.config.multilevelOutput: noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()])) if self.config.primaryKey in noDupCols: noDupCols.remove(self.config.primaryKey) if dataId and self.config.columnsFromDataId: noDupCols += self.config.columnsFromDataId df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase, inputBands=inputBands) self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df)) return df class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections, dimensions=("tract", "skymap")): inputCatalogs = connectionTypes.Input( doc="Per-Patch objectTables conforming to the standard data model.", name="objectTable", storageClass="DataFrame", dimensions=("tract", "patch", "skymap"), multiple=True, ) outputCatalog = connectionTypes.Output( doc="Pre-tract horizontal concatenation of the input objectTables", name="objectTable_tract", storageClass="DataFrame", dimensions=("tract", "skymap"), ) class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateObjectTableConnections): coaddName = pexConfig.Field( dtype=str, default="deep", doc="Name of coadd" ) class ConsolidateObjectTableTask(pipeBase.PipelineTask):
_DefaultName = "consolidateObjectTable" ConfigClass = ConsolidateObjectTableConfig inputDataset = 'objectTable' outputDataset = 'objectTable_tract' def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-patch Object Tables", len(inputs['inputCatalogs'])) df = pd.concat(inputs['inputCatalogs']) butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) class TransformSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector")): inputCatalog = connectionTypes.Input( doc="Wide input catalog of sources produced by WriteSourceTableTask", name="{catalogType}source", storageClass="DataFrame", dimensions=("instrument", "visit", "detector"), deferLoad=True ) outputCatalog = connectionTypes.Output( doc="Narrower, per-detector Source Table transformed and converted per a " "specified set of functors", name="{catalogType}sourceTable", storageClass="DataFrame", dimensions=("instrument", "visit", "detector") ) class TransformSourceTableConfig(TransformCatalogBaseConfig, pipelineConnections=TransformSourceTableConnections): def setDefaults(self): super().setDefaults() self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Source.yaml') self.primaryKey = 'sourceId' self.columnsFromDataId = ['visit', 'detector', 'band', 'physical_filter'] class TransformSourceTableTask(TransformCatalogBaseTask):
_DefaultName = "transformSourceTable" ConfigClass = TransformSourceTableConfig class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument", "visit",), defaultTemplates={"calexpType": ""}): calexp = connectionTypes.Input( doc="Processed exposures used for metadata", name="calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector"), deferLoad=True, multiple=True, ) visitSummary = connectionTypes.Output( doc=("Per-visit consolidated exposure metadata. These catalogs use " "detector id for the id and are sorted for fast lookups of a " "detector."), name="visitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), ) visitSummarySchema = connectionTypes.InitOutput( doc="Schema of the visitSummary catalog", name="visitSummary_schema", storageClass="ExposureCatalog", ) class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateVisitSummaryConnections):
pass class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
_DefaultName = "consolidateVisitSummary" ConfigClass = ConsolidateVisitSummaryConfig def __init__(self, **kwargs): super().__init__(**kwargs) self.schema = afwTable.ExposureTable.makeMinimalSchema() self.schema.addField('visit', type='L', doc='Visit number') self.schema.addField('physical_filter', type='String', size=32, doc='Physical filter') self.schema.addField('band', type='String', size=32, doc='Name of band') ExposureSummaryStats.update_schema(self.schema) self.visitSummarySchema = afwTable.ExposureCatalog(self.schema) def runQuantum(self, butlerQC, inputRefs, outputRefs): dataRefs = butlerQC.get(inputRefs.calexp) visit = dataRefs[0].dataId['visit'] self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)", len(dataRefs), visit) expCatalog = self._combineExposureMetadata(visit, dataRefs) butlerQC.put(expCatalog, outputRefs.visitSummary) def _combineExposureMetadata(self, visit, dataRefs):
Definition at line 1098 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummaries : `list` of `lsst.afw.table.ExposureCatalog` |
ccdEntries = [] for visitSummaryRef in visitSummaryRefs: visitSummary = visitSummaryRef.get() visitInfo = visitSummary[0].getVisitInfo() ccdEntry = {} summaryTable = visitSummary.asAstropy() selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'dec', 'zenithDistance', 'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise', 'astromOffsetMean', 'astromOffsetStd', 'nPsfStar', 'psfStarDeltaE1Median', 'psfStarDeltaE2Median', 'psfStarDeltaE1Scatter', 'psfStarDeltaE2Scatter', 'psfStarDeltaSizeMedian', 'psfStarDeltaSizeScatter', 'psfStarScaledDeltaSizeScatter', 'psfTraceRadiusDelta', 'maxDistToNearestPsf', 'effTime', 'effTimePsfSigmaScale', 'effTimeSkyBgScale', 'effTimeZeroPointScale'] ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id') # 'visit' is the human readable visit number. # 'visitId' is the key to the visitId table. They are the same. # Technically you should join to get the visit from the visit # table. ccdEntry = ccdEntry.rename(columns={"visit": "visitId"}) # RFC-924: Temporarily keep a duplicate "decl" entry for backwards # compatibility. To be removed after September 2023. ccdEntry["decl"] = ccdEntry.loc[:, "dec"] ccdEntry['ccdVisitId'] = [ self.config.idGenerator.apply( visitSummaryRef.dataId, detector=detector_id, is_exposure=False, ).catalog_id # The "catalog ID" here is the ccdVisit ID # because it's usually the ID for a whole catalog # with a {visit, detector}, and that's the main # use case for IdGenerator. This usage for a # summary table is rare. for detector_id in summaryTable['id'] ] ccdEntry['detector'] = summaryTable['id'] pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() if vR.getWcs() else np.nan for vR in visitSummary]) ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees() ccdEntry["expMidpt"] = visitInfo.getDate().toPython() ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) expTime = visitInfo.getExposureTime() ccdEntry['expTime'] = expTime ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime) expTime_days = expTime / (60*60*24) ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days ccdEntry['darkTime'] = visitInfo.getDarkTime() ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x'] ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y'] ccdEntry['llcra'] = summaryTable['raCorners'][:, 0] ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0] ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1] ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1] ccdEntry['urcra'] = summaryTable['raCorners'][:, 2] ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2] ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3] ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3] # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. # values are actually wanted. ccdEntries.append(ccdEntry) outputCatalog = pd.concat(ccdEntries) outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True) return pipeBase.Struct(outputCatalog=outputCatalog) class MakeVisitTableConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument",), defaultTemplates={"calexpType": ""}): visitSummaries = connectionTypes.Input( doc="Per-visit consolidated exposure metadata", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit",), multiple=True, deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="Visit metadata table", name="visitTable", storageClass="DataFrame", dimensions=("instrument",) ) class MakeVisitTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=MakeVisitTableConnections): pass class MakeVisitTableTask(pipeBase.PipelineTask):
_DefaultName = 'makeVisitTable' ConfigClass = MakeVisitTableConfig def run(self, visitSummaries):
Definition at line 1347 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional |
Definition at line 334 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` |
cat = afwTable.ExposureCatalog(self.schema) cat.resize(len(dataRefs)) cat['visit'] = visit for i, dataRef in enumerate(dataRefs): visitInfo = dataRef.get(component='visitInfo') filterLabel = dataRef.get(component='filter') summaryStats = dataRef.get(component='summaryStats') detector = dataRef.get(component='detector') wcs = dataRef.get(component='wcs') photoCalib = dataRef.get(component='photoCalib') detector = dataRef.get(component='detector') bbox = dataRef.get(component='bbox') validPolygon = dataRef.get(component='validPolygon') rec = cat[i] rec.setBBox(bbox) rec.setVisitInfo(visitInfo) rec.setWcs(wcs) rec.setPhotoCalib(photoCalib) rec.setValidPolygon(validPolygon) rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else "" rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else "" rec.setId(detector.getId()) summaryStats.update_record(rec) metadata = dafBase.PropertyList() metadata.add("COMMENT", "Catalog id is detector id, sorted.") # We are looping over existing datarefs, so the following is true metadata.add("COMMENT", "Only detectors with data have entries.") cat.setMetadata(metadata) cat.sort() return cat class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit")): inputCatalogs = connectionTypes.Input( doc="Input per-detector Source Tables", name="{catalogType}sourceTable", storageClass="DataFrame", dimensions=("instrument", "visit", "detector"), multiple=True ) outputCatalog = connectionTypes.Output( doc="Per-visit concatenation of Source Table", name="{catalogType}sourceTable_visit", storageClass="DataFrame", dimensions=("instrument", "visit") ) class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSourceTableConnections): pass class ConsolidateSourceTableTask(pipeBase.PipelineTask):
_DefaultName = 'consolidateSourceTable' ConfigClass = ConsolidateSourceTableConfig inputDataset = 'sourceTable' outputDataset = 'sourceTable_visit' def runQuantum(self, butlerQC, inputRefs, outputRefs): from .makeWarp import reorderRefs detectorOrder = [ref.dataId['detector'] for ref in inputRefs.inputCatalogs] detectorOrder.sort() inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey='detector') inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-detector Source Tables", len(inputs['inputCatalogs'])) df = pd.concat(inputs['inputCatalogs']) butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument",), defaultTemplates={"calexpType": ""}): visitSummaryRefs = connectionTypes.Input( doc="Data references for per-visit consolidated exposure metadata", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), multiple=True, deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="CCD and Visit metadata table", name="ccdVisitTable", storageClass="DataFrame", dimensions=("instrument",) ) class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=MakeCcdVisitTableConnections): idGenerator = DetectorVisitIdGeneratorConfig.make_field() class MakeCcdVisitTableTask(pipeBase.PipelineTask):
_DefaultName = 'makeCcdVisitTable' ConfigClass = MakeCcdVisitTableConfig def run(self, visitSummaryRefs):
Definition at line 1226 of file postprocess.py.
lsst.pipe.tasks.postprocess.wcsPlugin = measurement.plugins["base_LocalWcs"] |
Definition at line 423 of file postprocess.py.