24 from collections
import defaultdict
27 import lsst.pex.config
as pexConfig
31 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
34 from .parquetTable
import ParquetTable
35 from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
36 from .functors
import CompositeFunctor, RAColumn, DecColumn, Column
39 def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False):
40 """Flattens a dataframe with multilevel column index
42 newDf = pd.DataFrame()
43 for filt, filtShort
in filterDict.items():
45 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
46 newColumns = {c: columnFormat.format(filtShort, c)
47 for c
in subdf.columns
if c
not in noDupCols}
48 cols =
list(newColumns.keys())
49 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
51 newDf = pd.concat([subdf[noDupCols], newDf], axis=1)
56 priorityList = pexConfig.ListField(
59 doc=
"Priority-ordered list of bands for the merge."
61 engine = pexConfig.Field(
64 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
66 coaddName = pexConfig.Field(
73 pexConfig.Config.validate(self)
75 raise RuntimeError(
"No priority list provided")
79 """Write filter-merged source tables to parquet
81 _DefaultName =
"writeObjectTable"
82 ConfigClass = WriteObjectTableConfig
83 RunnerClass = MergeSourcesRunner
86 inputDatasets = (
'forced_src',
'meas',
'ref')
91 def __init__(self, butler=None, schema=None, **kwargs):
95 CmdLineTask.__init__(self, **kwargs)
99 @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
100 subclasses that inherit from MergeSourcesTask.
101 @param[in] patchRefList list of data references for each filter
103 catalogs = dict(self.
readCatalog(patchRef)
for patchRef
in patchRefList)
104 dataId = patchRefList[0].dataId
105 mergedCatalog = self.
run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
106 self.
write(patchRefList[0], mergedCatalog)
109 def _makeArgumentParser(cls):
110 """Create a suitable ArgumentParser.
112 We will use the ArgumentParser to get a list of data
113 references for patches; the RunnerClass will sort them into lists
114 of data references for the same patch.
116 References first of self.inputDatasets, rather than
122 """Read input catalogs
124 Read all the input datasets given by the 'inputDatasets'
129 patchRef : `lsst.daf.persistence.ButlerDataRef`
130 Data reference for patch
134 Tuple consisting of filter name and a dict of catalogs, keyed by
137 filterName = patchRef.dataId[
"filter"]
140 catalog = patchRef.get(self.
config.coaddName +
"Coadd_" + dataset, immediate=
True)
141 self.
log.
info(
"Read %d sources from %s for filter %s: %s" %
142 (len(catalog), dataset, filterName, patchRef.dataId))
143 catalogDict[dataset] = catalog
144 return filterName, catalogDict
146 def run(self, catalogs, tract, patch):
147 """Merge multiple catalogs.
152 Mapping from filter names to dict of catalogs.
154 tractId to use for the tractId column
156 patchId to use for the patchId column
160 catalog : `lsst.pipe.tasks.parquetTable.ParquetTable`
161 Merged dataframe, with each column prefixed by
162 `filter_tag(filt)`, wrapped in the parquet writer shim class.
166 for filt, tableDict
in catalogs.items():
167 for dataset, table
in tableDict.items():
169 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
172 df = df.reindex(sorted(df.columns), axis=1)
173 df[
'tractId'] = tract
174 df[
'patchId'] = patch
177 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
178 names=(
'dataset',
'filter',
'column'))
181 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
189 catalog : `ParquetTable`
191 patchRef : `lsst.daf.persistence.ButlerDataRef`
192 Data reference for patch
197 mergeDataId = patchRef.dataId.copy()
198 del mergeDataId[
"filter"]
199 self.
log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
202 """No metadata to write, and not sure how to write it for a list of dataRefs.
207 class WriteSourceTableConfig(pexConfig.Config):
208 doApplyExternalPhotoCalib = pexConfig.Field(
211 doc=(
"Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
212 "generating Source Tables from older src tables which do not already have local calib columns")
214 doApplyExternalSkyWcs = pexConfig.Field(
217 doc=(
"Add local WCS columns from the calexp.wcs? Should only set True if "
218 "generating Source Tables from older src tables which do not already have local calib columns")
223 """Write source table to parquet
225 _DefaultName =
"writeSourceTable"
226 ConfigClass = WriteSourceTableConfig
229 src = dataRef.get(
'src')
230 if self.
config.doApplyExternalPhotoCalib
or self.
config.doApplyExternalSkyWcs:
233 ccdVisitId = dataRef.get(
'ccdExposureId')
234 result = self.
run(src, ccdVisitId=ccdVisitId)
235 dataRef.put(result.table,
'source')
237 def run(self, catalog, ccdVisitId=None):
238 """Convert `src` catalog to parquet
242 catalog: `afwTable.SourceCatalog`
243 catalog to be converted
245 ccdVisitId to be added as a column
249 result : `lsst.pipe.base.Struct`
251 `ParquetTable` version of the input catalog
253 self.
log.
info(
"Generating parquet table from src catalog")
254 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
255 df[
'ccdVisitId'] = ccdVisitId
256 return pipeBase.Struct(table=
ParquetTable(dataFrame=df))
259 """Add columns with local calibration evaluated at each centroid
261 for backwards compatibility with old repos.
262 This exists for the purpose of converting old src catalogs
263 (which don't have the expected local calib columns) to Source Tables.
267 catalog: `afwTable.SourceCatalog`
268 catalog to which calib columns will be added
269 dataRef: `lsst.daf.persistence.ButlerDataRef
270 for fetching the calibs from disk.
274 newCat: `afwTable.SourceCatalog`
275 Source Catalog with requested local calib columns
278 measureConfig = SingleFrameMeasurementTask.ConfigClass()
279 measureConfig.doReplaceWithNoise =
False
282 exposure = dataRef.get(
'calexp_sub',
286 mapper.addMinimalSchema(catalog.schema,
True)
287 schema = mapper.getOutputSchema()
289 exposureIdInfo = dataRef.get(
"expIdInfo")
290 measureConfig.plugins.names = []
291 if self.
config.doApplyExternalSkyWcs:
292 plugin =
'base_LocalWcs'
294 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
296 measureConfig.plugins.names.add(plugin)
298 if self.
config.doApplyExternalPhotoCalib:
299 plugin =
'base_LocalPhotoCalib'
301 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
303 measureConfig.plugins.names.add(plugin)
307 newCat.extend(catalog, mapper=mapper)
308 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
312 """No metadata to write.
317 def _makeArgumentParser(cls):
319 parser.add_id_argument(
"--id",
'src',
320 help=
"data ID, e.g. --id visit=12345 ccd=0")
325 """Calculate columns from ParquetTable
327 This object manages and organizes an arbitrary set of computations
328 on a catalog. The catalog is defined by a
329 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
330 `deepCoadd_obj` dataset, and the computations are defined by a collection
331 of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
332 a `CompositeFunctor`).
334 After the object is initialized, accessing the `.df` attribute (which
335 holds the `pandas.DataFrame` containing the results of the calculations) triggers
336 computation of said dataframe.
338 One of the conveniences of using this object is the ability to define a desired common
339 filter for all functors. This enables the same functor collection to be passed to
340 several different `PostprocessAnalysis` objects without having to change the original
341 functor collection, since the `filt` keyword argument of this object triggers an
342 overwrite of the `filt` property for all functors in the collection.
344 This object also allows a list of refFlags to be passed, and defines a set of default
345 refFlags that are always included even if not requested.
347 If a list of `ParquetTable` object is passed, rather than a single one, then the
348 calculations will be mapped over all the input catalogs. In principle, it should
349 be straightforward to parallelize this activity, but initial tests have failed
350 (see TODO in code comments).
354 parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
355 Source catalog(s) for computation
357 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
358 Computations to do (functors that act on `parq`).
359 If a dict, the output
360 DataFrame will have columns keyed accordingly.
361 If a list, the column keys will come from the
362 `.shortname` attribute of each functor.
364 filt : `str` (optional)
365 Filter in which to calculate. If provided,
366 this will overwrite any existing `.filt` attribute
367 of the provided functors.
369 flags : `list` (optional)
370 List of flags (per-band) to include in output table.
372 refFlags : `list` (optional)
373 List of refFlags (only reference band) to include in output table.
377 _defaultRefFlags = []
378 _defaultFuncs = ((
'coord_ra',
RAColumn()),
381 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None):
388 if refFlags
is not None:
401 additionalFuncs.update({flag:
Column(flag, dataset=
'ref')
for flag
in self.
refFlags})
402 additionalFuncs.update({flag:
Column(flag, dataset=
'meas')
for flag
in self.
flags})
404 if isinstance(self.
functors, CompositeFunctor):
409 func.funcDict.update(additionalFuncs)
410 func.filt = self.
filt
416 return [name
for name, func
in self.
func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
426 if type(self.
parq)
in (list, tuple):
428 dflist = [self.
func(parq, dropna=dropna)
for parq
in self.
parq]
431 dflist = pool.map(functools.partial(self.
func, dropna=dropna), self.
parq)
432 self.
_df = pd.concat(dflist)
440 functorFile = pexConfig.Field(
442 doc=
'Path to YAML file specifying functors to be computed',
449 """Base class for transforming/standardizing a catalog
451 by applying functors that convert units and apply calibrations.
452 The purpose of this task is to perform a set of computations on
453 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
454 results to a new dataset (which needs to be declared in an `outputDataset`
457 The calculations to be performed are defined in a YAML file that specifies
458 a set of functors to be computed, provided as
459 a `--functorFile` config parameter. An example of such a YAML file
484 - base_InputCount_value
487 functor: DeconvolvedMoments
492 - merge_measurement_i
493 - merge_measurement_r
494 - merge_measurement_z
495 - merge_measurement_y
496 - merge_measurement_g
497 - base_PixelFlags_flag_inexact_psfCenter
500 The names for each entry under "func" will become the names of columns in the
501 output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
502 Positional arguments to be passed to each functor are in the `args` list,
503 and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
504 `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
506 The "refFlags" entry is shortcut for a bunch of `Column` functors with the original column and
507 taken from the `'ref'` dataset.
509 The "flags" entry will be expanded out per band.
511 Note, if `'filter'` is provided as part of the `dataId` when running this task (even though
512 `deepCoadd_obj` does not use `'filter'`), then this will override the `filt` kwargs
513 provided in the YAML file, and the calculations will be done in that filter.
515 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
516 to organize and excecute the calculations.
520 def _DefaultName(self):
521 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
525 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
529 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
533 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
538 df = self.
run(parq, funcs=funcs, dataId=dataRef.dataId)
539 self.
write(df, dataRef)
542 def run(self, parq, funcs=None, dataId=None):
543 """Do postprocessing calculations
545 Takes a `ParquetTable` object and dataId,
546 returns a dataframe with results of postprocessing calculations.
550 parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
551 ParquetTable from which calculations are done.
552 funcs : `lsst.pipe.tasks.functors.Functors`
553 Functors to apply to the table's columns
554 dataId : dict, optional
555 Used to add a `patchId` column to the output dataframe.
562 self.
log.
info(
"Transforming/standardizing the source table dataId: %s", dataId)
564 filt = dataId.get(
'filter',
None)
565 df = self.
transform(filt, parq, funcs, dataId).df
566 self.
log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
570 funcs = CompositeFunctor.from_file(self.
config.functorFile)
571 funcs.update(dict(PostprocessAnalysis._defaultFuncs))
582 analysis = self.
getAnalysis(parq, funcs=funcs, filt=filt)
584 if dataId
is not None:
585 for key, value
in dataId.items():
588 return pipeBase.Struct(
597 """No metadata to write.
602 class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
603 coaddName = pexConfig.Field(
608 filterMap = pexConfig.DictField(
612 doc=(
"Dictionary mapping full filter name to short one for column name munging."
613 "These filters determine the output columns no matter what filters the "
614 "input data actually contain.")
616 camelCase = pexConfig.Field(
619 doc=(
"Write per-filter columns names with camelCase, else underscore "
620 "For example: gPsfFlux instead of g_PsfFlux.")
622 multilevelOutput = pexConfig.Field(
625 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
626 "and name-munged (False).")
631 """Compute Flatted Object Table as defined in the DPDD
633 Do the same set of postprocessing calculations on all bands
635 This is identical to `TransformCatalogBaseTask`, except for that it does the
636 specified functor calculations for all filters present in the
637 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
638 by the YAML file will be superceded.
640 _DefaultName =
"transformObjectCatalog"
641 ConfigClass = TransformObjectCatalogConfig
643 inputDataset =
'deepCoadd_obj'
644 outputDataset =
'objectTable'
647 def _makeArgumentParser(cls):
650 ContainerClass=CoaddDataIdContainer,
651 help=
"data ID, e.g. --id tract=12345 patch=1,2")
654 def run(self, parq, funcs=None, dataId=None):
657 templateDf = pd.DataFrame()
660 for filt
in parq.columnLevelNames[
'filter']:
661 if filt
not in self.
config.filterMap:
662 self.
log.
info(
"Ignoring %s data in the input", filt)
664 self.
log.
info(
"Transforming the catalog of filter %s", filt)
665 result = self.
transform(filt, parq, funcs, dataId)
666 dfDict[filt] = result.df
667 analysisDict[filt] = result.analysis
669 templateDf = result.df
672 for filt
in self.
config.filterMap:
673 if filt
not in dfDict:
674 self.
log.
info(
"Adding empty columns for filter %s", filt)
675 dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
678 df = pd.concat(dfDict, axis=1, names=[
'filter',
'column'])
680 if not self.
config.multilevelOutput:
681 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
682 if dataId
is not None:
683 noDupCols +=
list(dataId.keys())
685 camelCase=self.
config.camelCase)
687 self.
log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
694 """Make self.refList from self.idList
696 Generate a list of data references given tract and/or patch.
697 This was adapted from `TractQADataIdContainer`, which was
698 `TractDataIdContainer` modifie to not require "filter".
699 Only existing dataRefs are returned.
701 def getPatchRefList(tract):
702 return [namespace.butler.dataRef(datasetType=self.datasetType,
704 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
706 tractRefs = defaultdict(list)
707 for dataId
in self.idList:
710 if "tract" in dataId:
711 tractId = dataId[
"tract"]
712 if "patch" in dataId:
713 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
715 patch=dataId[
'patch']))
717 tractRefs[tractId] += getPatchRefList(skymap[tractId])
719 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
722 for tractRefList
in tractRefs.values():
723 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
724 outputRefList.append(existingRefs)
730 coaddName = pexConfig.Field(
738 """Write patch-merged source tables to a tract-level parquet file
740 _DefaultName =
"consolidateObjectTable"
741 ConfigClass = ConsolidateObjectTableConfig
743 inputDataset =
'objectTable'
744 outputDataset =
'objectTable_tract'
747 def _makeArgumentParser(cls):
751 help=
"data ID, e.g. --id tract=12345",
752 ContainerClass=TractObjectDataIdContainer)
756 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
760 """No metadata to write.
765 class TransformSourceTableConfig(TransformCatalogBaseConfig):
770 """Transform/standardize a source catalog
772 _DefaultName =
"transformSourceTable"
773 ConfigClass = TransformSourceTableConfig
775 inputDataset =
'source'
776 outputDataset =
'sourceTable'
779 """No metadata to write.
784 def _makeArgumentParser(cls):
786 parser.add_id_argument(
"--id", datasetType=cls.
inputDataset,
788 help=
"data ID, e.g. --id visit=12345 ccd=0")
793 """DataIdContainer that groups sensor-level id's by visit
797 """Make self.refList from self.idList
799 Generate a list of data references grouped by visit.
803 namespace : `argparse.Namespace`
804 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
806 def ccdDataRefList(visitId):
807 """Get all possible ccds for a given visit"""
808 ccds = namespace.butler.queryMetadata(
'src', [
'ccd'], dataId={
'visit': visitId})
809 return [namespace.butler.dataRef(datasetType=self.
datasetType,
811 ccd=ccd)
for ccd
in ccds]
813 visitRefs = defaultdict(list)
814 for dataId
in self.
idList:
815 if "visit" in dataId:
816 visitId = dataId[
"visit"]
819 visit=visitId, ccd=dataId[
'ccd']))
821 visitRefs[visitId] += ccdDataRefList(visitId)
823 for refList
in visitRefs.values():
824 existingRefs = [ref
for ref
in refList
if ref.datasetExists()]
825 outputRefList.append(existingRefs)
834 class ConsolidateSourceTableTask(CmdLineTask):
835 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
837 _DefaultName =
'consolidateSourceTable'
838 ConfigClass = ConsolidateSourceTableConfig
840 inputDataset =
'sourceTable'
841 outputDataset =
'sourceTable_visit'
844 self.
log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
845 df = pd.concat([dataRef.get().toDataFrame()
for dataRef
in dataRefList])
849 def _makeArgumentParser(cls):
853 help=
"data ID, e.g. --id visit=12345",
854 ContainerClass=VisitDataIdContainer)
858 """No metadata to write.
863 """No config to write.