LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
postprocess.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import functools
23 import pandas as pd
24 from collections import defaultdict
25 
26 import lsst.pex.config as pexConfig
27 import lsst.pipe.base as pipeBase
28 from lsst.pipe.base import CmdLineTask, ArgumentParser
29 from lsst.coadd.utils.coaddDataIdContainer import CoaddDataIdContainer
30 
31 from .parquetTable import ParquetTable
32 from .multiBandUtils import makeMergeArgumentParser, MergeSourcesRunner
33 from .functors import CompositeFunctor, RAColumn, DecColumn, Column
34 
35 
36 def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False):
37  """Flattens a dataframe with multilevel column index
38  """
39  newDf = pd.DataFrame()
40  for filt, filtShort in filterDict.items():
41  try:
42  subdf = df[filt]
43  except KeyError:
44  continue
45  columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
46  newColumns = {c: columnFormat.format(filtShort, c)
47  for c in subdf.columns if c not in noDupCols}
48  cols = list(newColumns.keys())
49  newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
50 
51  newDf = pd.concat([subdf[noDupCols], newDf], axis=1)
52  return newDf
53 
54 
55 class WriteObjectTableConfig(pexConfig.Config):
56  priorityList = pexConfig.ListField(
57  dtype=str,
58  default=[],
59  doc="Priority-ordered list of bands for the merge."
60  )
61  engine = pexConfig.Field(
62  dtype=str,
63  default="pyarrow",
64  doc="Parquet engine for writing (pyarrow or fastparquet)"
65  )
66  coaddName = pexConfig.Field(
67  dtype=str,
68  default="deep",
69  doc="Name of coadd"
70  )
71 
72  def validate(self):
73  pexConfig.Config.validate(self)
74  if len(self.priorityList) == 0:
75  raise RuntimeError("No priority list provided")
76 
77 
79  """Write filter-merged source tables to parquet
80  """
81  _DefaultName = "writeObjectTable"
82  ConfigClass = WriteObjectTableConfig
83  RunnerClass = MergeSourcesRunner
84 
85  # Names of table datasets to be merged
86  inputDatasets = ('forced_src', 'meas', 'ref')
87 
88  # Tag of output dataset written by `MergeSourcesTask.write`
89  outputDataset = 'obj'
90 
91  def __init__(self, butler=None, schema=None, **kwargs):
92  # It is a shame that this class can't use the default init for CmdLineTask
93  # But to do so would require its own special task runner, which is many
94  # more lines of specialization, so this is how it is for now
95  CmdLineTask.__init__(self, **kwargs)
96 
97  def runDataRef(self, patchRefList):
98  """!
99  @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
100  subclasses that inherit from MergeSourcesTask.
101  @param[in] patchRefList list of data references for each filter
102  """
103  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
104  dataId = patchRefList[0].dataId
105  mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
106  self.write(patchRefList[0], mergedCatalog)
107 
108  @classmethod
109  def _makeArgumentParser(cls):
110  """Create a suitable ArgumentParser.
111 
112  We will use the ArgumentParser to get a list of data
113  references for patches; the RunnerClass will sort them into lists
114  of data references for the same patch.
115 
116  References first of self.inputDatasets, rather than
117  self.inputDataset
118  """
120 
121  def readCatalog(self, patchRef):
122  """Read input catalogs
123 
124  Read all the input datasets given by the 'inputDatasets'
125  attribute.
126 
127  Parameters
128  ----------
129  patchRef : `lsst.daf.persistence.ButlerDataRef`
130  Data reference for patch
131 
132  Returns
133  -------
134  Tuple consisting of filter name and a dict of catalogs, keyed by
135  dataset name
136  """
137  filterName = patchRef.dataId["filter"]
138  catalogDict = {}
139  for dataset in self.inputDatasets:
140  catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
141  self.log.info("Read %d sources from %s for filter %s: %s" %
142  (len(catalog), dataset, filterName, patchRef.dataId))
143  catalogDict[dataset] = catalog
144  return filterName, catalogDict
145 
146  def run(self, catalogs, tract, patch):
147  """Merge multiple catalogs.
148 
149  Parameters
150  ----------
151  catalogs : `dict`
152  Mapping from filter names to dict of catalogs.
153  tract : int
154  tractId to use for the tractId column
155  patch : str
156  patchId to use for the patchId column
157 
158  Returns
159  -------
160  catalog : `lsst.pipe.tasks.parquetTable.ParquetTable`
161  Merged dataframe, with each column prefixed by
162  `filter_tag(filt)`, wrapped in the parquet writer shim class.
163  """
164 
165  dfs = []
166  for filt, tableDict in catalogs.items():
167  for dataset, table in tableDict.items():
168  # Convert afwTable to pandas DataFrame
169  df = table.asAstropy().to_pandas().set_index('id', drop=True)
170 
171  # Sort columns by name, to ensure matching schema among patches
172  df = df.reindex(sorted(df.columns), axis=1)
173  df['tractId'] = tract
174  df['patchId'] = patch
175 
176  # Make columns a 3-level MultiIndex
177  df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
178  names=('dataset', 'filter', 'column'))
179  dfs.append(df)
180 
181  catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
182  return ParquetTable(dataFrame=catalog)
183 
184  def write(self, patchRef, catalog):
185  """Write the output.
186 
187  Parameters
188  ----------
189  catalog : `ParquetTable`
190  Catalog to write
191  patchRef : `lsst.daf.persistence.ButlerDataRef`
192  Data reference for patch
193  """
194  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
195  # since the filter isn't actually part of the data ID for the dataset we're saving,
196  # it's confusing to see it in the log message, even if the butler simply ignores it.
197  mergeDataId = patchRef.dataId.copy()
198  del mergeDataId["filter"]
199  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
200 
201  def writeMetadata(self, dataRefList):
202  """No metadata to write, and not sure how to write it for a list of dataRefs.
203  """
204  pass
205 
206 
207 class PostprocessAnalysis(object):
208  """Calculate columns from ParquetTable
209 
210  This object manages and organizes an arbitrary set of computations
211  on a catalog. The catalog is defined by a
212  `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
213  `deepCoadd_obj` dataset, and the computations are defined by a collection
214  of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
215  a `CompositeFunctor`).
216 
217  After the object is initialized, accessing the `.df` attribute (which
218  holds the `pandas.DataFrame` containing the results of the calculations) triggers
219  computation of said dataframe.
220 
221  One of the conveniences of using this object is the ability to define a desired common
222  filter for all functors. This enables the same functor collection to be passed to
223  several different `PostprocessAnalysis` objects without having to change the original
224  functor collection, since the `filt` keyword argument of this object triggers an
225  overwrite of the `filt` property for all functors in the collection.
226 
227  This object also allows a list of flags to be passed, and defines a set of default
228  flags that are always included even if not requested.
229 
230  If a list of `ParquetTable` object is passed, rather than a single one, then the
231  calculations will be mapped over all the input catalogs. In principle, it should
232  be straightforward to parallelize this activity, but initial tests have failed
233  (see TODO in code comments).
234 
235  Parameters
236  ----------
237  parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
238  Source catalog(s) for computation
239 
240  functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
241  Computations to do (functors that act on `parq`).
242  If a dict, the output
243  DataFrame will have columns keyed accordingly.
244  If a list, the column keys will come from the
245  `.shortname` attribute of each functor.
246 
247  filt : `str` (optional)
248  Filter in which to calculate. If provided,
249  this will overwrite any existing `.filt` attribute
250  of the provided functors.
251 
252  flags : `list` (optional)
253  List of flags to include in output table.
254  """
255  _defaultFlags = ('calib_psf_used', 'detect_isPrimary')
256  _defaultFuncs = (('coord_ra', RAColumn()),
257  ('coord_dec', DecColumn()))
258 
259  def __init__(self, parq, functors, filt=None, flags=None):
260  self.parq = parq
261  self.functors = functors
262 
263  self.filt = filt
264  self.flags = list(self._defaultFlags)
265  if flags is not None:
266  self.flags += list(flags)
267 
268  self._df = None
269 
270  @property
271  def defaultFuncs(self):
272  funcs = dict(self._defaultFuncs)
273  return funcs
274 
275  @property
276  def func(self):
277  additionalFuncs = self.defaultFuncs
278  additionalFuncs.update({flag: Column(flag) for flag in self.flags})
279 
280  if isinstance(self.functors, CompositeFunctor):
281  func = self.functors
282  else:
283  func = CompositeFunctor(self.functors)
284 
285  func.funcDict.update(additionalFuncs)
286  func.filt = self.filt
287 
288  return func
289 
290  @property
291  def noDupCols(self):
292  return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
293 
294  @property
295  def df(self):
296  if self._df is None:
297  self.compute()
298  return self._df
299 
300  def compute(self, dropna=False, pool=None):
301  # map over multiple parquet tables
302  if type(self.parq) in (list, tuple):
303  if pool is None:
304  dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
305  else:
306  # TODO: Figure out why this doesn't work (pyarrow pickling issues?)
307  dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
308  self._df = pd.concat(dflist)
309  else:
310  self._df = self.func(self.parq, dropna=dropna)
311 
312  return self._df
313 
314 
315 class TransformCatalogBaseConfig(pexConfig.Config):
316  coaddName = pexConfig.Field(
317  dtype=str,
318  default="deep",
319  doc="Name of coadd"
320  )
321  functorFile = pexConfig.Field(
322  dtype=str,
323  doc='Path to YAML file specifying functors to be computed',
324  default=None
325  )
326 
327 
329  """Base class for transforming/standardizing a catalog
330 
331  by applying functors that convert units and apply calibrations.
332  The purpose of this task is to perform a set of computations on
333  an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
334  results to a new dataset (which needs to be declared in an `outputDataset`
335  attribute).
336 
337  The calculations to be performed are defined in a YAML file that specifies
338  a set of functors to be computed, provided as
339  a `--functorFile` config parameter. An example of such a YAML file
340  is the following:
341 
342  funcs:
343  psfMag:
344  functor: Mag
345  args:
346  - base_PsfFlux
347  filt: HSC-G
348  dataset: meas
349  cmodel_magDiff:
350  functor: MagDiff
351  args:
352  - modelfit_CModel
353  - base_PsfFlux
354  filt: HSC-G
355  gauss_magDiff:
356  functor: MagDiff
357  args:
358  - base_GaussianFlux
359  - base_PsfFlux
360  filt: HSC-G
361  count:
362  functor: Column
363  args:
364  - base_InputCount_value
365  filt: HSC-G
366  deconvolved_moments:
367  functor: DeconvolvedMoments
368  filt: HSC-G
369  dataset: forced_src
370  flags:
371  - calib_psfUsed
372  - merge_measurement_i
373  - merge_measurement_r
374  - merge_measurement_z
375  - merge_measurement_y
376  - merge_measurement_g
377  - base_PixelFlags_flag_inexact_psfCenter
378  - detect_isPrimary
379 
380  The names for each entry under "func" will become the names of columns in the
381  output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
382  Positional arguments to be passed to each functor are in the `args` list,
383  and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
384  `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
385 
386  The "flags" entry is shortcut for a bunch of `Column` functors with the original column and
387  taken from the `'ref'` dataset.
388 
389  Note, if `'filter'` is provided as part of the `dataId` when running this task (even though
390  `deepCoadd_obj` does not use `'filter'`), then this will override the `filt` kwargs
391  provided in the YAML file, and the calculations will be done in that filter.
392 
393  This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
394  to organize and excecute the calculations.
395 
396  """
397  @property
398  def _DefaultName(self):
399  raise NotImplementedError('Subclass must define "_DefaultName" attribute')
400 
401  @property
402  def outputDataset(self):
403  raise NotImplementedError('Subclass must define "outputDataset" attribute')
404 
405  @property
406  def inputDataset(self):
407  raise NotImplementedError('Subclass must define "inputDataset" attribute')
408 
409  @property
410  def ConfigClass(self):
411  raise NotImplementedError('Subclass must define "ConfigClass" attribute')
412 
413  def runDataRef(self, patchRef):
414  parq = patchRef.get()
415  dataId = patchRef.dataId
416  funcs = self.getFunctors()
417  df = self.run(parq, funcs=funcs, dataId=dataId)
418  self.write(df, patchRef)
419  return df
420 
421  def run(self, parq, funcs=None, dataId=None):
422  """Do postprocessing calculations
423 
424  Takes a `ParquetTable` object and dataId,
425  returns a dataframe with results of postprocessing calculations.
426 
427  Parameters
428  ----------
429  parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
430  ParquetTable from which calculations are done.
431  funcs : `lsst.pipe.tasks.functors.Functors`
432  Functors to apply to the table's columns
433  dataId : dict, optional
434  Used to add a `patchId` column to the output dataframe.
435 
436  Returns
437  ------
438  `pandas.DataFrame`
439 
440  """
441  filt = dataId.get('filter', None)
442  return self.transform(filt, parq, funcs, dataId).df
443 
444  def getFunctors(self):
445  funcs = CompositeFunctor.from_file(self.config.functorFile)
446  funcs.update(dict(PostprocessAnalysis._defaultFuncs))
447  return funcs
448 
449  def getAnalysis(self, parq, funcs=None, filt=None):
450  # Avoids disk access if funcs is passed
451  if funcs is None:
452  funcs = self.getFunctors()
453  analysis = PostprocessAnalysis(parq, funcs, filt=filt)
454  return analysis
455 
456  def transform(self, filt, parq, funcs, dataId):
457  analysis = self.getAnalysis(parq, funcs=funcs, filt=filt)
458  df = analysis.df
459  if dataId is not None:
460  for key, value in dataId.items():
461  df[key] = value
462 
463  return pipeBase.Struct(
464  df=df,
465  analysis=analysis
466  )
467 
468  def write(self, df, parqRef):
469  parqRef.put(ParquetTable(dataFrame=df), self.outputDataset)
470 
471  def writeMetadata(self, dataRef):
472  """No metadata to write.
473  """
474  pass
475 
476 
477 class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
478  filterMap = pexConfig.DictField(
479  keytype=str,
480  itemtype=str,
481  default={},
482  doc="Dictionary mapping full filter name to short one for column name munging."
483  )
484  camelCase = pexConfig.Field(
485  dtype=bool,
486  default=True,
487  doc=("Write per-filter columns names with camelCase, else underscore "
488  "For example: gPsfFlux instead of g_PsfFlux.")
489  )
490  multilevelOutput = pexConfig.Field(
491  dtype=bool,
492  default=False,
493  doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
494  "and name-munged (False).")
495  )
496 
497 
499  """Compute Flatted Object Table as defined in the DPDD
500 
501  Do the same set of postprocessing calculations on all bands
502 
503  This is identical to `PostprocessTask`, except for that it does the
504  specified functor calculations for all filters present in the
505  input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
506  by the YAML file will be superceded.
507  """
508  _DefaultName = "transformObjectCatalog"
509  ConfigClass = TransformObjectCatalogConfig
510 
511  inputDataset = 'deepCoadd_obj'
512  outputDataset = 'objectTable'
513 
514  @classmethod
515  def _makeArgumentParser(cls):
516  parser = ArgumentParser(name=cls._DefaultName)
517  parser.add_id_argument("--id", cls.inputDataset,
518  ContainerClass=CoaddDataIdContainer,
519  help="data ID, e.g. --id tract=12345 patch=1,2")
520  return parser
521 
522  def run(self, parq, funcs=None, dataId=None):
523  dfDict = {}
524  analysisDict = {}
525  for filt in parq.columnLevelNames['filter']:
526  result = self.transform(filt, parq, funcs, dataId)
527  dfDict[filt] = result.df
528  analysisDict[filt] = result.analysis
529 
530  # This makes a multilevel column index, with filter as first level
531  df = pd.concat(dfDict, axis=1, names=['filter', 'column'])
532 
533  if not self.config.multilevelOutput:
534  noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
535  if dataId is not None:
536  noDupCols += list(dataId.keys())
537  df = flattenFilters(df, self.config.filterMap, noDupCols=noDupCols,
538  camelCase=self.config.camelCase)
539 
540  return df
541 
542 
544 
545  def makeDataRefList(self, namespace):
546  """Make self.refList from self.idList
547 
548  Generate a list of data references given tract and/or patch.
549  This was adapted from `TractQADataIdContainer`, which was
550  `TractDataIdContainer` modifie to not require "filter".
551  Only existing dataRefs are returned.
552  """
553  def getPatchRefList(tract):
554  return [namespace.butler.dataRef(datasetType=self.datasetType,
555  tract=tract.getId(),
556  patch="%d,%d" % patch.getIndex()) for patch in tract]
557 
558  tractRefs = defaultdict(list) # Data references for each tract
559  for dataId in self.idList:
560  skymap = self.getSkymap(namespace)
561 
562  if "tract" in dataId:
563  tractId = dataId["tract"]
564  if "patch" in dataId:
565  tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
566  tract=tractId,
567  patch=dataId['patch']))
568  else:
569  tractRefs[tractId] += getPatchRefList(skymap[tractId])
570  else:
571  tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
572  for tract in skymap)
573  outputRefList = []
574  for tractRefList in tractRefs.values():
575  existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
576  outputRefList.append(existingRefs)
577 
578  self.refList = outputRefList
579 
580 
581 class ConsolidateObjectTableConfig(pexConfig.Config):
582  coaddName = pexConfig.Field(
583  dtype=str,
584  default="deep",
585  doc="Name of coadd"
586  )
587 
588 
590  """Write patch-merged source tables to a tract-level parquet file
591  """
592  _DefaultName = "consolidateObjectTable"
593  ConfigClass = ConsolidateObjectTableConfig
594 
595  inputDataset = 'objectTable'
596  outputDataset = 'objectTable_tract'
597 
598  @classmethod
599  def _makeArgumentParser(cls):
600  parser = ArgumentParser(name=cls._DefaultName)
601 
602  parser.add_id_argument("--id", cls.inputDataset,
603  help="data ID, e.g. --id tract=12345",
604  ContainerClass=TractObjectDataIdContainer)
605  return parser
606 
607  def runDataRef(self, patchRefList):
608  df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
609  patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
610 
611  def writeMetadata(self, dataRef):
612  """No metadata to write.
613  """
614  pass
def flattenFilters(df, filterDict, noDupCols=['coord_ra', coord_dec, camelCase=False)
Definition: postprocess.py:36
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def run(self, parq, funcs=None, dataId=None)
Definition: postprocess.py:421
def __init__(self, parq, functors, filt=None, flags=None)
Definition: postprocess.py:259
def __init__(self, butler=None, schema=None, kwargs)
Definition: postprocess.py:91
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:902
def compute(self, dropna=False, pool=None)
Definition: postprocess.py:300
def run(self, catalogs, tract, patch)
Definition: postprocess.py:146
table::Key< int > type
Definition: Detector.cc:163
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
Definition: postprocess.py:97
def run(self, parq, funcs=None, dataId=None)
Definition: postprocess.py:522
def transform(self, filt, parq, funcs, dataId)
Definition: postprocess.py:456
def getAnalysis(self, parq, funcs=None, filt=None)
Definition: postprocess.py:449
daf::base::PropertyList * list
Definition: fits.cc:903