|
LSST Applications g00d0e8bbd7+8c5ae1fdc5,g013ef56533+603670b062,g083dd6704c+2e189452a7,g199a45376c+0ba108daf9,g1c5cce2383+bc9f6103a4,g1fd858c14a+cd69ed4fc1,g210f2d0738+c4742f2e9e,g262e1987ae+612fa42d85,g29ae962dfc+83d129e820,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+5eaa884f2a,g47891489e3+e32160a944,g53246c7159+8c5ae1fdc5,g5b326b94bb+dcc56af22d,g64539dfbff+c4742f2e9e,g67b6fd64d1+e32160a944,g74acd417e5+c122e1277d,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g88cb488625+47d24e4084,g89139ef638+e32160a944,g8d7436a09f+d14b4ff40a,g8ea07a8fe4+b212507b11,g90f42f885a+e1755607f3,g97be763408+34be90ab8c,g98df359435+ec1fa61bf1,ga2180abaac+8c5ae1fdc5,ga9e74d7ce9+43ac651df0,gbf99507273+8c5ae1fdc5,gc2a301910b+c4742f2e9e,gca7fc764a6+e32160a944,gd7ef33dd92+e32160a944,gdab6d2f7ff+c122e1277d,gdb1e2cdc75+1b18322db8,ge410e46f29+e32160a944,ge41e95a9f2+c4742f2e9e,geaed405ab2+0d91c11c6d,w.2025.44
LSST Data Management Base Package
|
Classes | |
| class | for |
| class | TransformObjectCatalogConnections |
| class | WriteObjectTableConnections |
Functions | |
| flattenFilters (df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None) | |
Variables | |
| log = logging.getLogger(__name__) | |
| catalogs : `dict` | |
| str | catalog : `pandas.DataFrame` |
| result : `~lsst.pipe.base.Struct` | |
| exposure : `lsst.afw.image.exposure.Exposure` | |
| detectorId : `int` | |
| visitSummary : `lsst.afw.table.ExposureCatalog`, optional | |
| newCat : `lsst.afw.table.SourceCatalog` | |
| handles : `~lsst.daf.butler.DeferredDatasetHandle` or | |
| functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor` | |
| filt : `str`, optional | |
| flags : `list`, optional | |
| refFlags : `list`, optional | |
| forcedFlags : `list`, optional | |
| funcs : `~lsst.pipe.tasks.functors.Functor` | |
| inplace | |
| True | |
| drop | |
| primaryKey | |
| df | |
| analysis | |
| lsst.pipe.tasks.postprocess.flattenFilters | ( | df, | |
| noDupCols = ["coord_ra", "coord_dec"], | |||
| camelCase = False, | |||
| inputBands = None ) |
Flattens a dataframe with multilevel column index.
Definition at line 67 of file postprocess.py.
| lsst.pipe.tasks.postprocess.analysis |
Definition at line 785 of file postprocess.py.
| lsst.pipe.tasks.postprocess.catalog : `pandas.DataFrame` |
dfs = []
for filt, tableDict in catalogs.items():
for dataset, table in tableDict.items():
# Convert afwTable to pandas DataFrame if needed
if isinstance(table, pd.DataFrame):
df = table
elif isinstance(table, afwTable.SourceCatalog):
df = table.asAstropy().to_pandas()
elif isinstance(table, astropy.table.Table):
df = table.to_pandas()
else:
raise ValueError(f"{dataset=} has unsupported {type(table)=}")
df.set_index("id", drop=True, inplace=True)
# Sort columns by name, to ensure matching schema among patches
df = df.reindex(sorted(df.columns), axis=1)
df = df.assign(tractId=tract, patchId=patch)
# Make columns a 3-level MultiIndex
df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
names=("dataset", "band", "column"))
dfs.append(df)
# We do this dance and not `pd.concat(dfs)` because the pandas
# concatenation uses infinite memory.
catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return catalog
class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
defaultTemplates={"catalogType": ""},
dimensions=("instrument", "visit", "detector")):
catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="{catalogType}src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
name="{catalogType}source",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit", "detector")
)
class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=WriteSourceTableConnections):
pass
class WriteSourceTableTask(pipeBase.PipelineTask):
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs["visit"] = butlerQC.quantum.dataId["visit"]
inputs["detector"] = butlerQC.quantum.dataId["detector"]
result = self.run(**inputs)
outputs = pipeBase.Struct(outputCatalog=result.table)
butlerQC.put(outputs, outputRefs)
def run(self, catalog, visit, detector, **kwargs):
if visitSummary is not None:
row = visitSummary.find(detectorId)
if row is None:
raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
if (photoCalib := row.getPhotoCalib()) is None:
self.log.warning("Detector id %s has None for photoCalib in visit summary; "
"skipping reevaluation of photoCalib.", detectorId)
exposure.setPhotoCalib(None)
else:
exposure.setPhotoCalib(photoCalib)
if (skyWcs := row.getWcs()) is None:
self.log.warning("Detector id %s has None for skyWcs in visit summary; "
"skipping reevaluation of skyWcs.", detectorId)
exposure.setWcs(None)
else:
exposure.setWcs(skyWcs)
return exposure
def addCalibColumns(self, catalog, exposure, **kwargs):
Definition at line 171 of file postprocess.py.
| lsst.pipe.tasks.postprocess.catalogs : `dict` |
_DefaultName = "writeObjectTable"
ConfigClass = WriteObjectTableConfig
# Tag of output dataset written by `MergeSourcesTask.write`
outputDataset = "obj"
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
catalogs = defaultdict(dict)
for dataset, connection in (
("meas", "inputCatalogMeas"),
("forced_src", "inputCatalogForcedSrc"),
("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
):
for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
catalogs[ref.dataId["band"]][dataset] = cat
dataId = butlerQC.quantum.dataId
df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
outputs = pipeBase.Struct(outputCatalog=df)
butlerQC.put(outputs, outputRefs)
def run(self, catalogs, tract, patch):
Definition at line 162 of file postprocess.py.
| lsst.pipe.tasks.postprocess.detectorId : `int` |
Definition at line 343 of file postprocess.py.
| lsst.pipe.tasks.postprocess.df |
Definition at line 784 of file postprocess.py.
| lsst.pipe.tasks.postprocess.drop |
Definition at line 780 of file postprocess.py.
| lsst.pipe.tasks.postprocess.exposure : `lsst.afw.image.exposure.Exposure` |
self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
tbl = catalog.asAstropy()
tbl["visit"] = visit
# int16 instead of uint8 because databases don't like unsigned bytes.
tbl["detector"] = np.int16(detector)
return pipeBase.Struct(table=tbl)
class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
defaultTemplates={"catalogType": ""},
dimensions=("instrument", "visit", "detector", "skymap")):
visitSummary = connectionTypes.Input(
doc="Input visit-summary catalog with updated calibration objects.",
name="finalVisitSummary",
storageClass="ExposureCatalog",
dimensions=("instrument", "visit",),
)
def __init__(self, config):
# We don't want the input catalog here to be an initial existence
# constraint in QG generation, because that can unfortunately limit the
# set of data IDs of inputs to other tasks, even those that run earlier
# (e.g. updateVisitSummary), when the input 'src' catalog is not
# produced. It's safer to just use 'visitSummary' existence as an
# initial constraint, and then let the graph prune out the detectors
# that don't have a 'src' for this task only.
self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
pipelineConnections=WriteRecalibratedSourceTableConnections):
doReevaluatePhotoCalib = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local photoCalib columns"),
)
doReevaluateSkyWcs = pexConfig.Field(
dtype=bool,
default=True,
doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
)
class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
_DefaultName = "writeRecalibratedSourceTable"
ConfigClass = WriteRecalibratedSourceTableConfig
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs["visit"] = butlerQC.quantum.dataId["visit"]
inputs["detector"] = butlerQC.quantum.dataId["detector"]
if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
exposure = ExposureF()
inputs["exposure"] = self.prepareCalibratedExposure(
exposure=exposure,
visitSummary=inputs["visitSummary"],
detectorId=butlerQC.quantum.dataId["detector"]
)
inputs["catalog"] = self.addCalibColumns(**inputs)
result = self.run(**inputs)
outputs = pipeBase.Struct(outputCatalog=result.table)
butlerQC.put(outputs, outputRefs)
def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
Definition at line 341 of file postprocess.py.
| lsst.pipe.tasks.postprocess.filt : `str`, optional |
Definition at line 493 of file postprocess.py.
| lsst.pipe.tasks.postprocess.flags : `list`, optional |
Definition at line 498 of file postprocess.py.
| lsst.pipe.tasks.postprocess.forcedFlags : `list`, optional |
Definition at line 505 of file postprocess.py.
| lsst.pipe.tasks.postprocess.funcs : `~lsst.pipe.tasks.functors.Functor` |
Definition at line 729 of file postprocess.py.
| lsst.pipe.tasks.postprocess.functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor` |
Definition at line 486 of file postprocess.py.
| lsst.pipe.tasks.postprocess.handles : `~lsst.daf.butler.DeferredDatasetHandle` or |
measureConfig = SingleFrameMeasurementTask.ConfigClass()
measureConfig.doReplaceWithNoise = False
# Clear all slots, because we aren't running the relevant plugins.
for slot in measureConfig.slots:
setattr(measureConfig.slots, slot, None)
measureConfig.plugins.names = []
if self.config.doReevaluateSkyWcs:
measureConfig.plugins.names.add("base_LocalWcs")
self.log.info("Re-evaluating base_LocalWcs plugin")
if self.config.doReevaluatePhotoCalib:
measureConfig.plugins.names.add("base_LocalPhotoCalib")
self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
pluginsNotToCopy = tuple(measureConfig.plugins.names)
# Create a new schema and catalog
# Copy all columns from original except for the ones to reevaluate
aliasMap = catalog.schema.getAliasMap()
mapper = afwTable.SchemaMapper(catalog.schema)
for item in catalog.schema:
if not item.field.getName().startswith(pluginsNotToCopy):
mapper.addMapping(item.key)
schema = mapper.getOutputSchema()
measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
schema.setAliasMap(aliasMap)
newCat = afwTable.SourceCatalog(schema)
newCat.extend(catalog, mapper=mapper)
# Fluxes in sourceCatalogs are in counts, so there are no fluxes to
# update here. LocalPhotoCalibs are applied during transform tasks.
# Update coord_ra/coord_dec, which are expected to be positions on the
# sky and are used as such in sdm tables without transform
if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
afwTable.updateSourceCoords(exposure.wcs, newCat)
wcsPlugin = measurement.plugins["base_LocalWcs"]
else:
wcsPlugin = None
if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
else:
pcPlugin = None
for row in newCat:
if wcsPlugin is not None:
wcsPlugin.measure(row, exposure)
if pcPlugin is not None:
pcPlugin.measure(row, exposure)
return newCat
class PostprocessAnalysis(object):
Definition at line 482 of file postprocess.py.
| lsst.pipe.tasks.postprocess.inplace |
Definition at line 780 of file postprocess.py.
| lsst.pipe.tasks.postprocess.log = logging.getLogger(__name__) |
Definition at line 64 of file postprocess.py.
| lsst.pipe.tasks.postprocess.newCat : `lsst.afw.table.SourceCatalog` |
Definition at line 393 of file postprocess.py.
| lsst.pipe.tasks.postprocess.primaryKey |
Definition at line 781 of file postprocess.py.
| lsst.pipe.tasks.postprocess.refFlags : `list`, optional |
Definition at line 502 of file postprocess.py.
| lsst.pipe.tasks.postprocess.result : `~lsst.pipe.base.Struct` |
Definition at line 261 of file postprocess.py.
| lsst.pipe.tasks.postprocess.True |
Definition at line 780 of file postprocess.py.
| lsst.pipe.tasks.postprocess.visitSummary : `lsst.afw.table.ExposureCatalog`, optional |
Definition at line 345 of file postprocess.py.