LSST Applications g0f08755f38+9c285cab97,g1635faa6d4+13f3999e92,g1653933729+a8ce1bb630,g1a0ca8cf93+bf6eb00ceb,g28da252d5a+0829b12dee,g29321ee8c0+5700dc9eac,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+fde0dd39b6,g5fbc88fb19+17cd334064,g781aacb6e4+a8ce1bb630,g80478fca09+55a9465950,g82479be7b0+d730eedb7d,g858d7b2824+9c285cab97,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+2a84bb7594,gacf8899fa4+c69c5206e8,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+9634bc57db,gcf0d15dbbd+4b7d09cae4,gda3e153d99+9c285cab97,gda6a2b7d83+4b7d09cae4,gdaeeff99f8+1711a396fd,ge2409df99d+5e831397f4,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+41c94011de,gf3fb38a9a8+8f07a9901b,gfb92a5be7c+9c285cab97,w.2024.46
LSST Data Management Base Package
|
Classes | |
class | for |
class | TransformObjectCatalogConnections |
class | WriteObjectTableConnections |
Functions | |
flattenFilters (df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None) | |
Variables | |
log = logging.getLogger(__name__) | |
catalogs : `dict` | |
int | tract |
str | patch |
catalog : `pandas.DataFrame` | |
result : `~lsst.pipe.base.Struct` | |
exposure : `lsst.afw.image.exposure.Exposure` | |
detectorId : `int` | |
visitSummary : `lsst.afw.table.ExposureCatalog`, optional | |
newCat : `lsst.afw.table.SourceCatalog` | |
handles : `~lsst.daf.butler.DeferredDatasetHandle` or | |
functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor` | |
filt : `str`, optional | |
flags : `list`, optional | |
refFlags : `list`, optional | |
forcedFlags : `list`, optional | |
funcs : `~lsst.pipe.tasks.functors.Functor` | |
inplace | |
True | |
drop | |
primaryKey | |
df | |
analysis | |
visit : `int` | |
dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` | |
visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` | |
visitSummaries : `list` of `lsst.afw.table.ExposureCatalog` | |
lsst.pipe.tasks.postprocess.flattenFilters | ( | df, | |
noDupCols = ["coord_ra", "coord_dec"], | |||
camelCase = False, | |||
inputBands = None ) |
Flattens a dataframe with multilevel column index.
Definition at line 59 of file postprocess.py.
lsst.pipe.tasks.postprocess.analysis |
Definition at line 751 of file postprocess.py.
lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame` |
dfs = [] for filt, tableDict in catalogs.items(): for dataset, table in tableDict.items(): # Convert afwTable to pandas DataFrame df = table.asAstropy().to_pandas().set_index("id", drop=True) # Sort columns by name, to ensure matching schema among patches df = df.reindex(sorted(df.columns), axis=1) df = df.assign(tractId=tract, patchId=patch) # Make columns a 3-level MultiIndex df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns], names=("dataset", "band", "column")) dfs.append(df) # We do this dance and not `pd.concat(dfs)` because the pandas # concatenation uses infinite memory. catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs) return catalog class WriteSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector")): catalog = connectionTypes.Input( doc="Input full-depth catalog of sources produced by CalibrateTask", name="{catalogType}src", storageClass="SourceCatalog", dimensions=("instrument", "visit", "detector") ) outputCatalog = connectionTypes.Output( doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is " "replaced with an index; all other columns are unchanged.", name="{catalogType}source", storageClass="DataFrame", dimensions=("instrument", "visit", "detector") ) class WriteSourceTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=WriteSourceTableConnections): pass class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable" ConfigClass = WriteSourceTableConfig def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) inputs["visit"] = butlerQC.quantum.dataId["visit"] inputs["detector"] = butlerQC.quantum.dataId["detector"] result = self.run(**inputs) outputs = pipeBase.Struct(outputCatalog=result.table) butlerQC.put(outputs, outputRefs) def run(self, catalog, visit, detector, **kwargs):
if visitSummary is not None: row = visitSummary.find(detectorId) if row is None: raise RuntimeError(f"Visit summary for detector {detectorId} is unexpectedly missing.") if (photoCalib := row.getPhotoCalib()) is None: self.log.warning("Detector id %s has None for photoCalib in visit summary; " "skipping reevaluation of photoCalib.", detectorId) exposure.setPhotoCalib(None) else: exposure.setPhotoCalib(photoCalib) if (skyWcs := row.getWcs()) is None: self.log.warning("Detector id %s has None for skyWcs in visit summary; " "skipping reevaluation of skyWcs.", detectorId) exposure.setWcs(None) else: exposure.setWcs(skyWcs) return exposure def addCalibColumns(self, catalog, exposure, **kwargs):
Definition at line 167 of file postprocess.py.
lsst.pipe.tasks.postprocess.catalogs : `dict` |
_DefaultName = "writeObjectTable" ConfigClass = WriteObjectTableConfig # Names of table datasets to be merged inputDatasets = ("forced_src", "meas", "ref") # Tag of output dataset written by `MergeSourcesTask.write` outputDataset = "obj" def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) measDict = {ref.dataId["band"]: {"meas": cat} for ref, cat in zip(inputRefs.inputCatalogMeas, inputs["inputCatalogMeas"])} forcedSourceDict = {ref.dataId["band"]: {"forced_src": cat} for ref, cat in zip(inputRefs.inputCatalogForcedSrc, inputs["inputCatalogForcedSrc"])} catalogs = {} for band in measDict.keys(): catalogs[band] = {"meas": measDict[band]["meas"], "forced_src": forcedSourceDict[band]["forced_src"], "ref": inputs["inputCatalogRef"]} dataId = butlerQC.quantum.dataId df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"]) outputs = pipeBase.Struct(outputCatalog=df) butlerQC.put(outputs, outputRefs) def run(self, catalogs, tract, patch):
Definition at line 158 of file postprocess.py.
lsst.pipe.tasks.postprocess.dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` |
Definition at line 1076 of file postprocess.py.
lsst.pipe.tasks.postprocess.detectorId : `int` |
Definition at line 317 of file postprocess.py.
lsst.pipe.tasks.postprocess.df |
Definition at line 750 of file postprocess.py.
lsst.pipe.tasks.postprocess.drop |
Definition at line 746 of file postprocess.py.
lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure` |
self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector) df = catalog.asAstropy().to_pandas().set_index("id", drop=True) df["visit"] = visit # int16 instead of uint8 because databases don't like unsigned bytes. df["detector"] = np.int16(detector) return pipeBase.Struct(table=df) class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector", "skymap")): visitSummary = connectionTypes.Input( doc="Input visit-summary catalog with updated calibration objects.", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit",), ) class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig, pipelineConnections=WriteRecalibratedSourceTableConnections): doReevaluatePhotoCalib = pexConfig.Field( dtype=bool, default=True, doc=("Add or replace local photoCalib columns"), ) doReevaluateSkyWcs = pexConfig.Field( dtype=bool, default=True, doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"), ) class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable" ConfigClass = WriteRecalibratedSourceTableConfig def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) inputs["visit"] = butlerQC.quantum.dataId["visit"] inputs["detector"] = butlerQC.quantum.dataId["detector"] if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs: exposure = ExposureF() inputs["exposure"] = self.prepareCalibratedExposure( exposure=exposure, visitSummary=inputs["visitSummary"], detectorId=butlerQC.quantum.dataId["detector"] ) inputs["catalog"] = self.addCalibColumns(**inputs) result = self.run(**inputs) outputs = pipeBase.Struct(outputCatalog=result.table) butlerQC.put(outputs, outputRefs) def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
Definition at line 315 of file postprocess.py.
lsst.pipe.tasks.postprocess.filt : `str`, optional |
Definition at line 467 of file postprocess.py.
lsst.pipe.tasks.postprocess.flags : `list`, optional |
Definition at line 472 of file postprocess.py.
lsst.pipe.tasks.postprocess.forcedFlags : `list`, optional |
Definition at line 479 of file postprocess.py.
lsst.pipe.tasks.postprocess.funcs : `~lsst.pipe.tasks.functors.Functor` |
Definition at line 704 of file postprocess.py.
lsst.pipe.tasks.postprocess.functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor` |
Definition at line 460 of file postprocess.py.
lsst.pipe.tasks.postprocess.handles : `~lsst.daf.butler.DeferredDatasetHandle` or |
measureConfig = SingleFrameMeasurementTask.ConfigClass() measureConfig.doReplaceWithNoise = False # Clear all slots, because we aren't running the relevant plugins. for slot in measureConfig.slots: setattr(measureConfig.slots, slot, None) measureConfig.plugins.names = [] if self.config.doReevaluateSkyWcs: measureConfig.plugins.names.add("base_LocalWcs") self.log.info("Re-evaluating base_LocalWcs plugin") if self.config.doReevaluatePhotoCalib: measureConfig.plugins.names.add("base_LocalPhotoCalib") self.log.info("Re-evaluating base_LocalPhotoCalib plugin") pluginsNotToCopy = tuple(measureConfig.plugins.names) # Create a new schema and catalog # Copy all columns from original except for the ones to reevaluate aliasMap = catalog.schema.getAliasMap() mapper = afwTable.SchemaMapper(catalog.schema) for item in catalog.schema: if not item.field.getName().startswith(pluginsNotToCopy): mapper.addMapping(item.key) schema = mapper.getOutputSchema() measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema) schema.setAliasMap(aliasMap) newCat = afwTable.SourceCatalog(schema) newCat.extend(catalog, mapper=mapper) # Fluxes in sourceCatalogs are in counts, so there are no fluxes to # update here. LocalPhotoCalibs are applied during transform tasks. # Update coord_ra/coord_dec, which are expected to be positions on the # sky and are used as such in sdm tables without transform if self.config.doReevaluateSkyWcs and exposure.wcs is not None: afwTable.updateSourceCoords(exposure.wcs, newCat) wcsPlugin = measurement.plugins["base_LocalWcs"] else: wcsPlugin = None if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None: pcPlugin = measurement.plugins["base_LocalPhotoCalib"] else: pcPlugin = None for row in newCat: if wcsPlugin is not None: wcsPlugin.measure(row, exposure) if pcPlugin is not None: pcPlugin.measure(row, exposure) return newCat class PostprocessAnalysis(object):
Definition at line 456 of file postprocess.py.
lsst.pipe.tasks.postprocess.inplace |
Definition at line 746 of file postprocess.py.
lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__) |
Definition at line 56 of file postprocess.py.
lsst.pipe.tasks.postprocess.newCat : `lsst.afw.table.SourceCatalog` |
Definition at line 367 of file postprocess.py.
str lsst.pipe.tasks.postprocess.patch |
Definition at line 162 of file postprocess.py.
lsst.pipe.tasks.postprocess.primaryKey |
Definition at line 747 of file postprocess.py.
lsst.pipe.tasks.postprocess.refFlags : `list`, optional |
Definition at line 476 of file postprocess.py.
lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct` |
Definition at line 245 of file postprocess.py.
int lsst.pipe.tasks.postprocess.tract |
Definition at line 160 of file postprocess.py.
lsst.pipe.tasks.postprocess.True |
Definition at line 746 of file postprocess.py.
lsst.pipe.tasks.postprocess.visit : `int` |
_DefaultName = "transformObjectCatalog" ConfigClass = TransformObjectCatalogConfig def run(self, handle, funcs=None, dataId=None, band=None): # NOTE: band kwarg is ignored here. dfDict = {} analysisDict = {} templateDf = pd.DataFrame() columns = handle.get(component="columns") inputBands = columns.unique(level=1).values outputBands = self.config.outputBands if self.config.outputBands else inputBands # Perform transform for data of filters that exist in the handle dataframe. for inputBand in inputBands: if inputBand not in outputBands: self.log.info("Ignoring %s band data in the input", inputBand) continue self.log.info("Transforming the catalog of band %s", inputBand) result = self.transform(inputBand, handle, funcs, dataId) dfDict[inputBand] = result.df analysisDict[inputBand] = result.analysis if templateDf.empty: templateDf = result.df # Put filler values in columns of other wanted bands for filt in outputBands: if filt not in dfDict: self.log.info("Adding empty columns for band %s", filt) dfTemp = templateDf.copy() for col in dfTemp.columns: testValue = dfTemp[col].values[0] if isinstance(testValue, (np.bool_, pd.BooleanDtype)): # Boolean flag type, check if it is a "good" flag if col in self.config.goodFlags: fillValue = False else: fillValue = True elif isinstance(testValue, numbers.Integral): # Checking numbers.Integral catches all flavors # of python, numpy, pandas, etc. integers. # We must ensure this is not an unsigned integer. if isinstance(testValue, np.unsignedinteger): raise ValueError("Parquet tables may not have unsigned integer columns.") else: fillValue = self.config.integerFillValue else: fillValue = self.config.floatFillValue dfTemp[col].values[:] = fillValue dfDict[filt] = dfTemp # This makes a multilevel column index, with band as first level df = pd.concat(dfDict, axis=1, names=["band", "column"]) if not self.config.multilevelOutput: noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()])) if self.config.primaryKey in noDupCols: noDupCols.remove(self.config.primaryKey) if dataId and self.config.columnsFromDataId: noDupCols += self.config.columnsFromDataId df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase, inputBands=inputBands) self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df)) return df class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections, dimensions=("tract", "skymap")): inputCatalogs = connectionTypes.Input( doc="Per-Patch objectTables conforming to the standard data model.", name="objectTable", storageClass="DataFrame", dimensions=("tract", "patch", "skymap"), multiple=True, ) outputCatalog = connectionTypes.Output( doc="Pre-tract horizontal concatenation of the input objectTables", name="objectTable_tract", storageClass="DataFrame", dimensions=("tract", "skymap"), ) class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateObjectTableConnections): coaddName = pexConfig.Field( dtype=str, default="deep", doc="Name of coadd" ) class ConsolidateObjectTableTask(pipeBase.PipelineTask):
_DefaultName = "consolidateObjectTable" ConfigClass = ConsolidateObjectTableConfig inputDataset = "objectTable" outputDataset = "objectTable_tract" def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-patch Object Tables", len(inputs["inputCatalogs"])) df = pd.concat(inputs["inputCatalogs"]) butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) class TransformSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit", "detector")): inputCatalog = connectionTypes.Input( doc="Wide input catalog of sources produced by WriteSourceTableTask", name="{catalogType}source", storageClass="DataFrame", dimensions=("instrument", "visit", "detector"), deferLoad=True ) outputCatalog = connectionTypes.Output( doc="Narrower, per-detector Source Table transformed and converted per a " "specified set of functors", name="{catalogType}sourceTable", storageClass="DataFrame", dimensions=("instrument", "visit", "detector") ) class TransformSourceTableConfig(TransformCatalogBaseConfig, pipelineConnections=TransformSourceTableConnections): def setDefaults(self): super().setDefaults() self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml") self.primaryKey = "sourceId" self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"] class TransformSourceTableTask(TransformCatalogBaseTask):
_DefaultName = "transformSourceTable" ConfigClass = TransformSourceTableConfig class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument", "visit",), defaultTemplates={"calexpType": ""}): calexp = connectionTypes.Input( doc="Processed exposures used for metadata", name="calexp", storageClass="ExposureF", dimensions=("instrument", "visit", "detector"), deferLoad=True, multiple=True, ) visitSummary = connectionTypes.Output( doc=("Per-visit consolidated exposure metadata. These catalogs use " "detector id for the id and are sorted for fast lookups of a " "detector."), name="visitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), ) visitSummarySchema = connectionTypes.InitOutput( doc="Schema of the visitSummary catalog", name="visitSummary_schema", storageClass="ExposureCatalog", ) class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateVisitSummaryConnections):
pass class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
_DefaultName = "consolidateVisitSummary" ConfigClass = ConsolidateVisitSummaryConfig def __init__(self, **kwargs): super().__init__(**kwargs) self.schema = afwTable.ExposureTable.makeMinimalSchema() self.schema.addField("visit", type="L", doc="Visit number") self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter") self.schema.addField("band", type="String", size=32, doc="Name of band") ExposureSummaryStats.update_schema(self.schema) self.visitSummarySchema = afwTable.ExposureCatalog(self.schema) def runQuantum(self, butlerQC, inputRefs, outputRefs): dataRefs = butlerQC.get(inputRefs.calexp) visit = dataRefs[0].dataId["visit"] self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)", len(dataRefs), visit) expCatalog = self._combineExposureMetadata(visit, dataRefs) butlerQC.put(expCatalog, outputRefs.visitSummary) def _combineExposureMetadata(self, visit, dataRefs):
Definition at line 1074 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummaries : `list` of `lsst.afw.table.ExposureCatalog` |
ccdEntries = [] for visitSummaryRef in visitSummaryRefs: visitSummary = visitSummaryRef.get() visitInfo = visitSummary[0].getVisitInfo() ccdEntry = {} summaryTable = visitSummary.asAstropy() selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec", "pixelScale", "zenithDistance", "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise", "astromOffsetMean", "astromOffsetStd", "nPsfStar", "psfStarDeltaE1Median", "psfStarDeltaE2Median", "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter", "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter", "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta", "psfApFluxDelta", "psfApCorrSigmaScaledDelta", "maxDistToNearestPsf", "effTime", "effTimePsfSigmaScale", "effTimeSkyBgScale", "effTimeZeroPointScale", "magLim"] ccdEntry = summaryTable[selectColumns].to_pandas().set_index("id") # 'visit' is the human readable visit number. # 'visitId' is the key to the visitId table. They are the same. # Technically you should join to get the visit from the visit # table. ccdEntry = ccdEntry.rename(columns={"visit": "visitId"}) # RFC-924: Temporarily keep a duplicate "decl" entry for backwards # compatibility. To be removed after September 2023. ccdEntry["decl"] = ccdEntry.loc[:, "dec"] ccdEntry["ccdVisitId"] = [ self.config.idGenerator.apply( visitSummaryRef.dataId, detector=detector_id, is_exposure=False, ).catalog_id # The "catalog ID" here is the ccdVisit ID # because it's usually the ID for a whole catalog # with a {visit, detector}, and that's the main # use case for IdGenerator. This usage for a # summary table is rare. for detector_id in summaryTable["id"] ] ccdEntry["detector"] = summaryTable["id"] ccdEntry["seeing"] = ( visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2)) ) ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees() ccdEntry["expMidpt"] = visitInfo.getDate().toPython() ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) ccdEntry["obsStart"] = ( ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=ccdEntry["expTime"].values[0]) ) expTime_days = ccdEntry["expTime"] / (60*60*24) ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days ccdEntry["darkTime"] = visitInfo.getDarkTime() ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"] ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"] ccdEntry["llcra"] = summaryTable["raCorners"][:, 0] ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0] ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1] ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1] ccdEntry["urcra"] = summaryTable["raCorners"][:, 2] ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2] ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3] ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3] # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. # values are actually wanted. ccdEntries.append(ccdEntry) outputCatalog = pd.concat(ccdEntries) outputCatalog.set_index("ccdVisitId", inplace=True, verify_integrity=True) return pipeBase.Struct(outputCatalog=outputCatalog) class MakeVisitTableConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument",), defaultTemplates={"calexpType": ""}): visitSummaries = connectionTypes.Input( doc="Per-visit consolidated exposure metadata", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit",), multiple=True, deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="Visit metadata table", name="visitTable", storageClass="DataFrame", dimensions=("instrument",) ) class MakeVisitTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=MakeVisitTableConnections): pass class MakeVisitTableTask(pipeBase.PipelineTask):
_DefaultName = "makeVisitTable" ConfigClass = MakeVisitTableConfig def run(self, visitSummaries):
Definition at line 1325 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional |
Definition at line 319 of file postprocess.py.
lsst.pipe.tasks.postprocess.visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle` |
cat = afwTable.ExposureCatalog(self.schema) cat.resize(len(dataRefs)) cat["visit"] = visit for i, dataRef in enumerate(dataRefs): visitInfo = dataRef.get(component="visitInfo") filterLabel = dataRef.get(component="filter") summaryStats = dataRef.get(component="summaryStats") detector = dataRef.get(component="detector") wcs = dataRef.get(component="wcs") photoCalib = dataRef.get(component="photoCalib") detector = dataRef.get(component="detector") bbox = dataRef.get(component="bbox") validPolygon = dataRef.get(component="validPolygon") rec = cat[i] rec.setBBox(bbox) rec.setVisitInfo(visitInfo) rec.setWcs(wcs) rec.setPhotoCalib(photoCalib) rec.setValidPolygon(validPolygon) rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else "" rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else "" rec.setId(detector.getId()) summaryStats.update_record(rec) metadata = dafBase.PropertyList() metadata.add("COMMENT", "Catalog id is detector id, sorted.") # We are looping over existing datarefs, so the following is true metadata.add("COMMENT", "Only detectors with data have entries.") cat.setMetadata(metadata) cat.sort() return cat class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"catalogType": ""}, dimensions=("instrument", "visit")): inputCatalogs = connectionTypes.Input( doc="Input per-detector Source Tables", name="{catalogType}sourceTable", storageClass="DataFrame", dimensions=("instrument", "visit", "detector"), multiple=True ) outputCatalog = connectionTypes.Output( doc="Per-visit concatenation of Source Table", name="{catalogType}sourceTable_visit", storageClass="DataFrame", dimensions=("instrument", "visit") ) class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=ConsolidateSourceTableConnections): pass class ConsolidateSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "consolidateSourceTable" ConfigClass = ConsolidateSourceTableConfig inputDataset = "sourceTable" outputDataset = "sourceTable_visit" def runQuantum(self, butlerQC, inputRefs, outputRefs): from .makeWarp import reorderRefs detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs] detectorOrder.sort() inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector") inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-detector Source Tables", len(inputs["inputCatalogs"])) df = pd.concat(inputs["inputCatalogs"]) butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument",), defaultTemplates={"calexpType": ""}): visitSummaryRefs = connectionTypes.Input( doc="Data references for per-visit consolidated exposure metadata", name="finalVisitSummary", storageClass="ExposureCatalog", dimensions=("instrument", "visit"), multiple=True, deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="CCD and Visit metadata table", name="ccdVisitTable", storageClass="DataFrame", dimensions=("instrument",) ) class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig, pipelineConnections=MakeCcdVisitTableConnections): idGenerator = DetectorVisitIdGeneratorConfig.make_field() class MakeCcdVisitTableTask(pipeBase.PipelineTask):
_DefaultName = "makeCcdVisitTable" ConfigClass = MakeCcdVisitTableConfig def run(self, visitSummaryRefs):
Definition at line 1202 of file postprocess.py.