24 from collections 
import defaultdict
 
   27 import lsst.pex.config 
as pexConfig
 
   31 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
 
   34 from .parquetTable 
import ParquetTable
 
   35 from .multiBandUtils 
import makeMergeArgumentParser, MergeSourcesRunner
 
   36 from .functors 
import CompositeFunctor, RAColumn, DecColumn, Column
 
   39 def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False):
 
   40     """Flattens a dataframe with multilevel column index 
   42     newDf = pd.DataFrame()
 
   43     for filt, filtShort 
in filterDict.items():
 
   45         columnFormat = 
'{0}{1}' if camelCase 
else '{0}_{1}' 
   46         newColumns = {c: columnFormat.format(filtShort, c)
 
   47                       for c 
in subdf.columns 
if c 
not in noDupCols}
 
   48         cols = 
list(newColumns.keys())
 
   49         newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
 
   51     newDf = pd.concat([subdf[noDupCols], newDf], axis=1)
 
   56     priorityList = pexConfig.ListField(
 
   59         doc=
"Priority-ordered list of bands for the merge." 
   61     engine = pexConfig.Field(
 
   64         doc=
"Parquet engine for writing (pyarrow or fastparquet)" 
   66     coaddName = pexConfig.Field(
 
   73         pexConfig.Config.validate(self)
 
   75             raise RuntimeError(
"No priority list provided")
 
   79     """Write filter-merged source tables to parquet 
   81     _DefaultName = 
"writeObjectTable" 
   82     ConfigClass = WriteObjectTableConfig
 
   83     RunnerClass = MergeSourcesRunner
 
   86     inputDatasets = (
'forced_src', 
'meas', 
'ref')
 
   91     def __init__(self, butler=None, schema=None, **kwargs):
 
   95         CmdLineTask.__init__(self, **kwargs)
 
   99         @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in 
  100         subclasses that inherit from MergeSourcesTask. 
  101         @param[in] patchRefList list of data references for each filter 
  103         catalogs = dict(self.
readCatalog(patchRef) 
for patchRef 
in patchRefList)
 
  104         dataId = patchRefList[0].dataId
 
  105         mergedCatalog = self.
run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
 
  106         self.
write(patchRefList[0], mergedCatalog)
 
  109     def _makeArgumentParser(cls):
 
  110         """Create a suitable ArgumentParser. 
  112         We will use the ArgumentParser to get a list of data 
  113         references for patches; the RunnerClass will sort them into lists 
  114         of data references for the same patch. 
  116         References first of self.inputDatasets, rather than 
  122         """Read input catalogs 
  124         Read all the input datasets given by the 'inputDatasets' 
  129         patchRef : `lsst.daf.persistence.ButlerDataRef` 
  130             Data reference for patch 
  134         Tuple consisting of filter name and a dict of catalogs, keyed by 
  137         filterName = patchRef.dataId[
"filter"]
 
  140             catalog = patchRef.get(self.
config.coaddName + 
"Coadd_" + dataset, immediate=
True)
 
  141             self.
log.
info(
"Read %d sources from %s for filter %s: %s" %
 
  142                           (len(catalog), dataset, filterName, patchRef.dataId))
 
  143             catalogDict[dataset] = catalog
 
  144         return filterName, catalogDict
 
  146     def run(self, catalogs, tract, patch):
 
  147         """Merge multiple catalogs. 
  152             Mapping from filter names to dict of catalogs. 
  154             tractId to use for the tractId column 
  156             patchId to use for the patchId column 
  160         catalog : `lsst.pipe.tasks.parquetTable.ParquetTable` 
  161             Merged dataframe, with each column prefixed by 
  162             `filter_tag(filt)`, wrapped in the parquet writer shim class. 
  166         for filt, tableDict 
in catalogs.items():
 
  167             for dataset, table 
in tableDict.items():
 
  169                 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
 
  172                 df = df.reindex(sorted(df.columns), axis=1)
 
  173                 df[
'tractId'] = tract
 
  174                 df[
'patchId'] = patch
 
  177                 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) 
for c 
in df.columns],
 
  178                                                        names=(
'dataset', 
'filter', 
'column'))
 
  181         catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
 
  189         catalog : `ParquetTable` 
  191         patchRef : `lsst.daf.persistence.ButlerDataRef` 
  192             Data reference for patch 
  197         mergeDataId = patchRef.dataId.copy()
 
  198         del mergeDataId[
"filter"]
 
  199         self.
log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
 
  202         """No metadata to write, and not sure how to write it for a list of dataRefs. 
  207 class WriteSourceTableConfig(pexConfig.Config):
 
  208     doApplyExternalPhotoCalib = pexConfig.Field(
 
  211         doc=(
"Add local photoCalib columns from the calexp.photoCalib? Should only set True if " 
  212              "generating Source Tables from older src tables which do not already have local calib columns")
 
  214     doApplyExternalSkyWcs = pexConfig.Field(
 
  217         doc=(
"Add local WCS columns from the calexp.wcs? Should only set True if " 
  218              "generating Source Tables from older src tables which do not already have local calib columns")
 
  223     """Write source table to parquet 
  225     _DefaultName = 
"writeSourceTable" 
  226     ConfigClass = WriteSourceTableConfig
 
  229         src = dataRef.get(
'src')
 
  230         if self.
config.doApplyExternalPhotoCalib 
or self.
config.doApplyExternalSkyWcs:
 
  233         ccdVisitId = dataRef.get(
'ccdExposureId')
 
  234         result = self.
run(src, ccdVisitId=ccdVisitId)
 
  235         dataRef.put(result.table, 
'source')
 
  237     def run(self, catalog, ccdVisitId=None):
 
  238         """Convert `src` catalog to parquet 
  242         catalog: `afwTable.SourceCatalog` 
  243             catalog to be converted 
  245             ccdVisitId to be added as a column 
  249         result : `lsst.pipe.base.Struct` 
  251                 `ParquetTable` version of the input catalog 
  253         self.
log.
info(
"Generating parquet table from src catalog")
 
  254         df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
 
  255         df[
'ccdVisitId'] = ccdVisitId
 
  256         return pipeBase.Struct(table=
ParquetTable(dataFrame=df))
 
  259         """Add columns with local calibration evaluated at each centroid 
  261         for backwards compatibility with old repos. 
  262         This exists for the purpose of converting old src catalogs 
  263         (which don't have the expected local calib columns) to Source Tables. 
  267         catalog: `afwTable.SourceCatalog` 
  268             catalog to which calib columns will be added 
  269         dataRef: `lsst.daf.persistence.ButlerDataRef 
  270             for fetching the calibs from disk. 
  274         newCat:  `afwTable.SourceCatalog` 
  275             Source Catalog with requested local calib columns 
  278         measureConfig = SingleFrameMeasurementTask.ConfigClass()
 
  279         measureConfig.doReplaceWithNoise = 
False 
  282         exposure = dataRef.get(
'calexp_sub',
 
  286         mapper.addMinimalSchema(catalog.schema, 
True)
 
  287         schema = mapper.getOutputSchema()
 
  289         exposureIdInfo = dataRef.get(
"expIdInfo")
 
  290         measureConfig.plugins.names = []
 
  291         if self.
config.doApplyExternalSkyWcs:
 
  292             plugin = 
'base_LocalWcs' 
  294                 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
 
  296                 measureConfig.plugins.names.add(plugin)
 
  298         if self.
config.doApplyExternalPhotoCalib:
 
  299             plugin = 
'base_LocalPhotoCalib' 
  301                 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
 
  303                 measureConfig.plugins.names.add(plugin)
 
  307         newCat.extend(catalog, mapper=mapper)
 
  308         measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
 
  312         """No metadata to write. 
  317     def _makeArgumentParser(cls):
 
  319         parser.add_id_argument(
"--id", 
'src',
 
  320                                help=
"data ID, e.g. --id visit=12345 ccd=0")
 
  325     """Calculate columns from ParquetTable 
  327     This object manages and organizes an arbitrary set of computations 
  328     on a catalog.  The catalog is defined by a 
  329     `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a 
  330     `deepCoadd_obj` dataset, and the computations are defined by a collection 
  331     of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently, 
  332     a `CompositeFunctor`). 
  334     After the object is initialized, accessing the `.df` attribute (which 
  335     holds the `pandas.DataFrame` containing the results of the calculations) triggers 
  336     computation of said dataframe. 
  338     One of the conveniences of using this object is the ability to define a desired common 
  339     filter for all functors.  This enables the same functor collection to be passed to 
  340     several different `PostprocessAnalysis` objects without having to change the original 
  341     functor collection, since the `filt` keyword argument of this object triggers an 
  342     overwrite of the `filt` property for all functors in the collection. 
  344     This object also allows a list of refFlags to be passed, and defines a set of default 
  345     refFlags that are always included even if not requested. 
  347     If a list of `ParquetTable` object is passed, rather than a single one, then the 
  348     calculations will be mapped over all the input catalogs.  In principle, it should 
  349     be straightforward to parallelize this activity, but initial tests have failed 
  350     (see TODO in code comments). 
  354     parq : `lsst.pipe.tasks.ParquetTable` (or list of such) 
  355         Source catalog(s) for computation 
  357     functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor` 
  358         Computations to do (functors that act on `parq`). 
  359         If a dict, the output 
  360         DataFrame will have columns keyed accordingly. 
  361         If a list, the column keys will come from the 
  362         `.shortname` attribute of each functor. 
  364     filt : `str` (optional) 
  365         Filter in which to calculate.  If provided, 
  366         this will overwrite any existing `.filt` attribute 
  367         of the provided functors. 
  369     flags : `list` (optional) 
  370         List of flags (per-band) to include in output table. 
  372     refFlags : `list` (optional) 
  373         List of refFlags (only reference band) to include in output table. 
  377     _defaultRefFlags = []
 
  378     _defaultFuncs = ((
'coord_ra', 
RAColumn()),
 
  381     def __init__(self, parq, functors, filt=None, flags=None, refFlags=None):
 
  388         if refFlags 
is not None:
 
  401         additionalFuncs.update({flag: 
Column(flag, dataset=
'ref') 
for flag 
in self.
refFlags})
 
  402         additionalFuncs.update({flag: 
Column(flag, dataset=
'meas') 
for flag 
in self.
flags})
 
  404         if isinstance(self.
functors, CompositeFunctor):
 
  409         func.funcDict.update(additionalFuncs)
 
  410         func.filt = self.
filt 
  416         return [name 
for name, func 
in self.
func.funcDict.items() 
if func.noDup 
or func.dataset == 
'ref']
 
  426         if type(self.
parq) 
in (list, tuple):
 
  428                 dflist = [self.
func(parq, dropna=dropna) 
for parq 
in self.
parq]
 
  431                 dflist = pool.map(functools.partial(self.
func, dropna=dropna), self.
parq)
 
  432             self.
_df = pd.concat(dflist)
 
  440     functorFile = pexConfig.Field(
 
  442         doc=
'Path to YAML file specifying functors to be computed',
 
  449     """Base class for transforming/standardizing a catalog 
  451     by applying functors that convert units and apply calibrations. 
  452     The purpose of this task is to perform a set of computations on 
  453     an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the 
  454     results to a new dataset (which needs to be declared in an `outputDataset` 
  457     The calculations to be performed are defined in a YAML file that specifies 
  458     a set of functors to be computed, provided as 
  459     a `--functorFile` config parameter.  An example of such a YAML file 
  484                     - base_InputCount_value 
  487                 functor: DeconvolvedMoments 
  492             - merge_measurement_i 
  493             - merge_measurement_r 
  494             - merge_measurement_z 
  495             - merge_measurement_y 
  496             - merge_measurement_g 
  497             - base_PixelFlags_flag_inexact_psfCenter 
  500     The names for each entry under "func" will become the names of columns in the 
  501     output dataset.  All the functors referenced are defined in `lsst.pipe.tasks.functors`. 
  502     Positional arguments to be passed to each functor are in the `args` list, 
  503     and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`, 
  504     `'dataset'`) are treated as keyword arguments to be passed to the functor initialization. 
  506     The "refFlags" entry is shortcut for a bunch of `Column` functors with the original column and 
  507     taken from the `'ref'` dataset. 
  509     The "flags" entry will be expanded out per band. 
  511     Note, if `'filter'` is provided as part of the `dataId` when running this task (even though 
  512     `deepCoadd_obj` does not use `'filter'`), then this will override the `filt` kwargs 
  513     provided in the YAML file, and the calculations will be done in that filter. 
  515     This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object 
  516     to organize and excecute the calculations. 
  520     def _DefaultName(self):
 
  521         raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
 
  525         raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
 
  529         raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
 
  533         raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
 
  538         df = self.
run(parq, funcs=funcs, dataId=dataRef.dataId)
 
  539         self.
write(df, dataRef)
 
  542     def run(self, parq, funcs=None, dataId=None):
 
  543         """Do postprocessing calculations 
  545         Takes a `ParquetTable` object and dataId, 
  546         returns a dataframe with results of postprocessing calculations. 
  550         parq : `lsst.pipe.tasks.parquetTable.ParquetTable` 
  551             ParquetTable from which calculations are done. 
  552         funcs : `lsst.pipe.tasks.functors.Functors` 
  553             Functors to apply to the table's columns 
  554         dataId : dict, optional 
  555             Used to add a `patchId` column to the output dataframe. 
  562         self.
log.
info(
"Transforming/standardizing the source table dataId: %s", dataId)
 
  564         filt = dataId.get(
'filter', 
None)
 
  565         df = self.
transform(filt, parq, funcs, dataId).df
 
  566         self.
log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
 
  570         funcs = CompositeFunctor.from_file(self.
config.functorFile)
 
  571         funcs.update(dict(PostprocessAnalysis._defaultFuncs))
 
  582         analysis = self.
getAnalysis(parq, funcs=funcs, filt=filt)
 
  584         if dataId 
is not None:
 
  585             for key, value 
in dataId.items():
 
  588         return pipeBase.Struct(
 
  597         """No metadata to write. 
  602 class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
 
  603     coaddName = pexConfig.Field(
 
  608     filterMap = pexConfig.DictField(
 
  612         doc=(
"Dictionary mapping full filter name to short one for column name munging." 
  613              "These filters determine the output columns no matter what filters the " 
  614              "input data actually contain.")
 
  616     camelCase = pexConfig.Field(
 
  619         doc=(
"Write per-filter columns names with camelCase, else underscore " 
  620              "For example: gPsfFlux instead of g_PsfFlux.")
 
  622     multilevelOutput = pexConfig.Field(
 
  625         doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat " 
  626              "and name-munged (False).")
 
  631     """Compute Flatted Object Table as defined in the DPDD 
  633     Do the same set of postprocessing calculations on all bands 
  635     This is identical to `TransformCatalogBaseTask`, except for that it does the 
  636     specified functor calculations for all filters present in the 
  637     input `deepCoadd_obj` table.  Any specific `"filt"` keywords specified 
  638     by the YAML file will be superceded. 
  640     _DefaultName = 
"transformObjectCatalog" 
  641     ConfigClass = TransformObjectCatalogConfig
 
  643     inputDataset = 
'deepCoadd_obj' 
  644     outputDataset = 
'objectTable' 
  647     def _makeArgumentParser(cls):
 
  650                                ContainerClass=CoaddDataIdContainer,
 
  651                                help=
"data ID, e.g. --id tract=12345 patch=1,2")
 
  654     def run(self, parq, funcs=None, dataId=None):
 
  657         templateDf = pd.DataFrame()
 
  660         for filt 
in parq.columnLevelNames[
'filter']:
 
  661             if filt 
not in self.
config.filterMap:
 
  662                 self.
log.
info(
"Ignoring %s data in the input", filt)
 
  664             self.
log.
info(
"Transforming the catalog of filter %s", filt)
 
  665             result = self.
transform(filt, parq, funcs, dataId)
 
  666             dfDict[filt] = result.df
 
  667             analysisDict[filt] = result.analysis
 
  669                 templateDf = result.df
 
  672         for filt 
in self.
config.filterMap:
 
  673             if filt 
not in dfDict:
 
  674                 self.
log.
info(
"Adding empty columns for filter %s", filt)
 
  675                 dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
 
  678         df = pd.concat(dfDict, axis=1, names=[
'filter', 
'column'])
 
  680         if not self.
config.multilevelOutput:
 
  681             noDupCols = 
list(set.union(*[
set(v.noDupCols) 
for v 
in analysisDict.values()]))
 
  682             if dataId 
is not None:
 
  683                 noDupCols += 
list(dataId.keys())
 
  685                                 camelCase=self.
config.camelCase)
 
  687         self.
log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
 
  694         """Make self.refList from self.idList 
  696         Generate a list of data references given tract and/or patch. 
  697         This was adapted from `TractQADataIdContainer`, which was 
  698         `TractDataIdContainer` modifie to not require "filter". 
  699         Only existing dataRefs are returned. 
  701         def getPatchRefList(tract):
 
  702             return [namespace.butler.dataRef(datasetType=self.datasetType,
 
  704                                              patch=
"%d,%d" % patch.getIndex()) 
for patch 
in tract]
 
  706         tractRefs = defaultdict(list)  
 
  707         for dataId 
in self.idList:
 
  710             if "tract" in dataId:
 
  711                 tractId = dataId[
"tract"]
 
  712                 if "patch" in dataId:
 
  713                     tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
 
  715                                                                        patch=dataId[
'patch']))
 
  717                     tractRefs[tractId] += getPatchRefList(skymap[tractId])
 
  719                 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
 
  722         for tractRefList 
in tractRefs.values():
 
  723             existingRefs = [ref 
for ref 
in tractRefList 
if ref.datasetExists()]
 
  724             outputRefList.append(existingRefs)
 
  730     coaddName = pexConfig.Field(
 
  738     """Write patch-merged source tables to a tract-level parquet file 
  740     _DefaultName = 
"consolidateObjectTable" 
  741     ConfigClass = ConsolidateObjectTableConfig
 
  743     inputDataset = 
'objectTable' 
  744     outputDataset = 
'objectTable_tract' 
  747     def _makeArgumentParser(cls):
 
  751                                help=
"data ID, e.g. --id tract=12345",
 
  752                                ContainerClass=TractObjectDataIdContainer)
 
  756         df = pd.concat([patchRef.get().toDataFrame() 
for patchRef 
in patchRefList])
 
  760         """No metadata to write. 
  765 class TransformSourceTableConfig(TransformCatalogBaseConfig):
 
  770     """Transform/standardize a source catalog 
  772     _DefaultName = 
"transformSourceTable" 
  773     ConfigClass = TransformSourceTableConfig
 
  775     inputDataset = 
'source' 
  776     outputDataset = 
'sourceTable' 
  779         """No metadata to write. 
  784     def _makeArgumentParser(cls):
 
  786         parser.add_id_argument(
"--id", datasetType=cls.
inputDataset,
 
  788                                help=
"data ID, e.g. --id visit=12345 ccd=0")
 
  793     """DataIdContainer that groups sensor-level id's by visit 
  797         """Make self.refList from self.idList 
  799         Generate a list of data references grouped by visit. 
  803         namespace : `argparse.Namespace` 
  804             Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments 
  806         def ccdDataRefList(visitId):
 
  807             """Get all possible ccds for a given visit""" 
  808             ccds = namespace.butler.queryMetadata(
'src', [
'ccd'], dataId={
'visit': visitId})
 
  809             return [namespace.butler.dataRef(datasetType=self.
datasetType,
 
  811                                              ccd=ccd) 
for ccd 
in ccds]
 
  813         visitRefs = defaultdict(list)
 
  814         for dataId 
in self.
idList:
 
  815             if "visit" in dataId:
 
  816                 visitId = dataId[
"visit"]
 
  819                                                                        visit=visitId, ccd=dataId[
'ccd']))
 
  821                     visitRefs[visitId] += ccdDataRefList(visitId)
 
  823         for refList 
in visitRefs.values():
 
  824             existingRefs = [ref 
for ref 
in refList 
if ref.datasetExists()]
 
  825             outputRefList.append(existingRefs)
 
  834 class ConsolidateSourceTableTask(CmdLineTask):
 
  835     """Concatenate `sourceTable` list into a per-visit `sourceTable_visit` 
  837     _DefaultName = 
'consolidateSourceTable' 
  838     ConfigClass = ConsolidateSourceTableConfig
 
  840     inputDataset = 
'sourceTable' 
  841     outputDataset = 
'sourceTable_visit' 
  844         self.
log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
 
  845         df = pd.concat([dataRef.get().toDataFrame() 
for dataRef 
in dataRefList])
 
  849     def _makeArgumentParser(cls):
 
  853                                help=
"data ID, e.g. --id visit=12345",
 
  854                                ContainerClass=VisitDataIdContainer)
 
  858         """No metadata to write. 
  863         """No config to write.