24 from collections
import defaultdict
26 import lsst.pex.config
as pexConfig
31 from .parquetTable
import ParquetTable
32 from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
33 from .functors
import CompositeFunctor, RAColumn, DecColumn, Column
36 def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False):
37 """Flattens a dataframe with multilevel column index 39 newDf = pd.DataFrame()
40 for filt, filtShort
in filterDict.items():
45 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}' 46 newColumns = {c: columnFormat.format(filtShort, c)
47 for c
in subdf.columns
if c
not in noDupCols}
48 cols =
list(newColumns.keys())
49 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
51 newDf = pd.concat([subdf[noDupCols], newDf], axis=1)
56 priorityList = pexConfig.ListField(
59 doc=
"Priority-ordered list of bands for the merge." 61 engine = pexConfig.Field(
64 doc=
"Parquet engine for writing (pyarrow or fastparquet)" 66 coaddName = pexConfig.Field(
73 pexConfig.Config.validate(self)
75 raise RuntimeError(
"No priority list provided")
79 """Write filter-merged source tables to parquet 81 _DefaultName =
"writeObjectTable" 82 ConfigClass = WriteObjectTableConfig
83 RunnerClass = MergeSourcesRunner
86 inputDatasets = (
'forced_src',
'meas',
'ref')
91 def __init__(self, butler=None, schema=None, **kwargs):
95 CmdLineTask.__init__(self, **kwargs)
99 @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in 100 subclasses that inherit from MergeSourcesTask. 101 @param[in] patchRefList list of data references for each filter 103 catalogs = dict(self.
readCatalog(patchRef)
for patchRef
in patchRefList)
104 dataId = patchRefList[0].dataId
105 mergedCatalog = self.
run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
106 self.
write(patchRefList[0], mergedCatalog)
109 def _makeArgumentParser(cls):
110 """Create a suitable ArgumentParser. 112 We will use the ArgumentParser to get a list of data 113 references for patches; the RunnerClass will sort them into lists 114 of data references for the same patch. 116 References first of self.inputDatasets, rather than 122 """Read input catalogs 124 Read all the input datasets given by the 'inputDatasets' 129 patchRef : `lsst.daf.persistence.ButlerDataRef` 130 Data reference for patch 134 Tuple consisting of filter name and a dict of catalogs, keyed by 137 filterName = patchRef.dataId[
"filter"]
140 catalog = patchRef.get(self.
config.coaddName +
"Coadd_" + dataset, immediate=
True)
141 self.
log.
info(
"Read %d sources from %s for filter %s: %s" %
142 (len(catalog), dataset, filterName, patchRef.dataId))
143 catalogDict[dataset] = catalog
144 return filterName, catalogDict
146 def run(self, catalogs, tract, patch):
147 """Merge multiple catalogs. 152 Mapping from filter names to dict of catalogs. 154 tractId to use for the tractId column 156 patchId to use for the patchId column 160 catalog : `lsst.pipe.tasks.parquetTable.ParquetTable` 161 Merged dataframe, with each column prefixed by 162 `filter_tag(filt)`, wrapped in the parquet writer shim class. 166 for filt, tableDict
in catalogs.items():
167 for dataset, table
in tableDict.items():
169 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
172 df = df.reindex(sorted(df.columns), axis=1)
173 df[
'tractId'] = tract
174 df[
'patchId'] = patch
177 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
178 names=(
'dataset',
'filter',
'column'))
181 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
189 catalog : `ParquetTable` 191 patchRef : `lsst.daf.persistence.ButlerDataRef` 192 Data reference for patch 197 mergeDataId = patchRef.dataId.copy()
198 del mergeDataId[
"filter"]
199 self.
log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
202 """No metadata to write, and not sure how to write it for a list of dataRefs. 208 """Calculate columns from ParquetTable 210 This object manages and organizes an arbitrary set of computations 211 on a catalog. The catalog is defined by a 212 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a 213 `deepCoadd_obj` dataset, and the computations are defined by a collection 214 of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently, 215 a `CompositeFunctor`). 217 After the object is initialized, accessing the `.df` attribute (which 218 holds the `pandas.DataFrame` containing the results of the calculations) triggers 219 computation of said dataframe. 221 One of the conveniences of using this object is the ability to define a desired common 222 filter for all functors. This enables the same functor collection to be passed to 223 several different `PostprocessAnalysis` objects without having to change the original 224 functor collection, since the `filt` keyword argument of this object triggers an 225 overwrite of the `filt` property for all functors in the collection. 227 This object also allows a list of flags to be passed, and defines a set of default 228 flags that are always included even if not requested. 230 If a list of `ParquetTable` object is passed, rather than a single one, then the 231 calculations will be mapped over all the input catalogs. In principle, it should 232 be straightforward to parallelize this activity, but initial tests have failed 233 (see TODO in code comments). 237 parq : `lsst.pipe.tasks.ParquetTable` (or list of such) 238 Source catalog(s) for computation 240 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor` 241 Computations to do (functors that act on `parq`). 242 If a dict, the output 243 DataFrame will have columns keyed accordingly. 244 If a list, the column keys will come from the 245 `.shortname` attribute of each functor. 247 filt : `str` (optional) 248 Filter in which to calculate. If provided, 249 this will overwrite any existing `.filt` attribute 250 of the provided functors. 252 flags : `list` (optional) 253 List of flags to include in output table. 255 _defaultFlags = (
'calib_psf_used',
'detect_isPrimary')
256 _defaultFuncs = ((
'coord_ra', RAColumn()),
257 (
'coord_dec', DecColumn()))
259 def __init__(self, parq, functors, filt=None, flags=None):
265 if flags
is not None:
278 additionalFuncs.update({flag:
Column(flag)
for flag
in self.
flags})
280 if isinstance(self.
functors, CompositeFunctor):
285 func.funcDict.update(additionalFuncs)
286 func.filt = self.
filt 292 return [name
for name, func
in self.
func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
302 if type(self.
parq)
in (list, tuple):
304 dflist = [self.
func(parq, dropna=dropna)
for parq
in self.
parq]
307 dflist = pool.map(functools.partial(self.
func, dropna=dropna), self.
parq)
308 self.
_df = pd.concat(dflist)
316 coaddName = pexConfig.Field(
321 functorFile = pexConfig.Field(
323 doc=
'Path to YAML file specifying functors to be computed',
329 """Base class for transforming/standardizing a catalog 331 by applying functors that convert units and apply calibrations. 332 The purpose of this task is to perform a set of computations on 333 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the 334 results to a new dataset (which needs to be declared in an `outputDataset` 337 The calculations to be performed are defined in a YAML file that specifies 338 a set of functors to be computed, provided as 339 a `--functorFile` config parameter. An example of such a YAML file 364 - base_InputCount_value 367 functor: DeconvolvedMoments 372 - merge_measurement_i 373 - merge_measurement_r 374 - merge_measurement_z 375 - merge_measurement_y 376 - merge_measurement_g 377 - base_PixelFlags_flag_inexact_psfCenter 380 The names for each entry under "func" will become the names of columns in the 381 output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`. 382 Positional arguments to be passed to each functor are in the `args` list, 383 and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`, 384 `'dataset'`) are treated as keyword arguments to be passed to the functor initialization. 386 The "flags" entry is shortcut for a bunch of `Column` functors with the original column and 387 taken from the `'ref'` dataset. 389 Note, if `'filter'` is provided as part of the `dataId` when running this task (even though 390 `deepCoadd_obj` does not use `'filter'`), then this will override the `filt` kwargs 391 provided in the YAML file, and the calculations will be done in that filter. 393 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object 394 to organize and excecute the calculations. 398 def _DefaultName(self):
399 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
403 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
407 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
411 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
414 parq = patchRef.get()
415 dataId = patchRef.dataId
417 df = self.
run(parq, funcs=funcs, dataId=dataId)
418 self.
write(df, patchRef)
421 def run(self, parq, funcs=None, dataId=None):
422 """Do postprocessing calculations 424 Takes a `ParquetTable` object and dataId, 425 returns a dataframe with results of postprocessing calculations. 429 parq : `lsst.pipe.tasks.parquetTable.ParquetTable` 430 ParquetTable from which calculations are done. 431 funcs : `lsst.pipe.tasks.functors.Functors` 432 Functors to apply to the table's columns 433 dataId : dict, optional 434 Used to add a `patchId` column to the output dataframe. 441 filt = dataId.get(
'filter',
None)
442 return self.
transform(filt, parq, funcs, dataId).df
445 funcs = CompositeFunctor.from_file(self.
config.functorFile)
446 funcs.update(dict(PostprocessAnalysis._defaultFuncs))
457 analysis = self.
getAnalysis(parq, funcs=funcs, filt=filt)
459 if dataId
is not None:
460 for key, value
in dataId.items():
463 return pipeBase.Struct(
472 """No metadata to write. 477 class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
478 filterMap = pexConfig.DictField(
482 doc=
"Dictionary mapping full filter name to short one for column name munging." 484 camelCase = pexConfig.Field(
487 doc=(
"Write per-filter columns names with camelCase, else underscore " 488 "For example: gPsfFlux instead of g_PsfFlux.")
490 multilevelOutput = pexConfig.Field(
493 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat " 494 "and name-munged (False).")
499 """Compute Flatted Object Table as defined in the DPDD 501 Do the same set of postprocessing calculations on all bands 503 This is identical to `PostprocessTask`, except for that it does the 504 specified functor calculations for all filters present in the 505 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified 506 by the YAML file will be superceded. 508 _DefaultName =
"transformObjectCatalog" 509 ConfigClass = TransformObjectCatalogConfig
511 inputDataset =
'deepCoadd_obj' 512 outputDataset =
'objectTable' 515 def _makeArgumentParser(cls):
518 ContainerClass=CoaddDataIdContainer,
519 help=
"data ID, e.g. --id tract=12345 patch=1,2")
522 def run(self, parq, funcs=None, dataId=None):
525 for filt
in parq.columnLevelNames[
'filter']:
526 result = self.
transform(filt, parq, funcs, dataId)
527 dfDict[filt] = result.df
528 analysisDict[filt] = result.analysis
531 df = pd.concat(dfDict, axis=1, names=[
'filter',
'column'])
533 if not self.
config.multilevelOutput:
534 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
535 if dataId
is not None:
536 noDupCols +=
list(dataId.keys())
538 camelCase=self.
config.camelCase)
546 """Make self.refList from self.idList 548 Generate a list of data references given tract and/or patch. 549 This was adapted from `TractQADataIdContainer`, which was 550 `TractDataIdContainer` modifie to not require "filter". 551 Only existing dataRefs are returned. 553 def getPatchRefList(tract):
554 return [namespace.butler.dataRef(datasetType=self.datasetType,
556 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
558 tractRefs = defaultdict(list)
559 for dataId
in self.idList:
562 if "tract" in dataId:
563 tractId = dataId[
"tract"]
564 if "patch" in dataId:
565 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
567 patch=dataId[
'patch']))
569 tractRefs[tractId] += getPatchRefList(skymap[tractId])
571 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
574 for tractRefList
in tractRefs.values():
575 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
576 outputRefList.append(existingRefs)
582 coaddName = pexConfig.Field(
590 """Write patch-merged source tables to a tract-level parquet file 592 _DefaultName =
"consolidateObjectTable" 593 ConfigClass = ConsolidateObjectTableConfig
595 inputDataset =
'objectTable' 596 outputDataset =
'objectTable_tract' 599 def _makeArgumentParser(cls):
603 help=
"data ID, e.g. --id tract=12345",
604 ContainerClass=TractObjectDataIdContainer)
608 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
612 """No metadata to write. def readCatalog(self, patchRef)
def flattenFilters(df, filterDict, noDupCols=['coord_ra', coord_dec, camelCase=False)
def makeDataRefList(self, namespace)
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def run(self, parq, funcs=None, dataId=None)
def __init__(self, parq, functors, filt=None, flags=None)
def writeMetadata(self, dataRef)
def runDataRef(self, patchRef)
def write(self, patchRef, catalog)
def runDataRef(self, patchRefList)
def __init__(self, butler=None, schema=None, kwargs)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
daf::base::PropertySet * set
def writeMetadata(self, dataRefList)
def compute(self, dropna=False, pool=None)
def run(self, catalogs, tract, patch)
def getSkymap(self, namespace)
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
def run(self, parq, funcs=None, dataId=None)
def write(self, df, parqRef)
def writeMetadata(self, dataRef)
def transform(self, filt, parq, funcs, dataId)
def getAnalysis(self, parq, funcs=None, filt=None)
daf::base::PropertyList * list