LSSTApplications  20.0.0
LSSTDataManagementBasePackage
postprocess.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import functools
23 import pandas as pd
24 from collections import defaultdict
25 
26 import lsst.geom
27 import lsst.pex.config as pexConfig
28 import lsst.pipe.base as pipeBase
29 import lsst.afw.table as afwTable
30 from lsst.meas.base import SingleFrameMeasurementTask
31 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
32 from lsst.coadd.utils.coaddDataIdContainer import CoaddDataIdContainer
33 
34 from .parquetTable import ParquetTable
35 from .multiBandUtils import makeMergeArgumentParser, MergeSourcesRunner
36 from .functors import CompositeFunctor, RAColumn, DecColumn, Column
37 
38 
39 def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False):
40  """Flattens a dataframe with multilevel column index
41  """
42  newDf = pd.DataFrame()
43  for filt, filtShort in filterDict.items():
44  subdf = df[filt]
45  columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
46  newColumns = {c: columnFormat.format(filtShort, c)
47  for c in subdf.columns if c not in noDupCols}
48  cols = list(newColumns.keys())
49  newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
50 
51  newDf = pd.concat([subdf[noDupCols], newDf], axis=1)
52  return newDf
53 
54 
55 class WriteObjectTableConfig(pexConfig.Config):
56  priorityList = pexConfig.ListField(
57  dtype=str,
58  default=[],
59  doc="Priority-ordered list of bands for the merge."
60  )
61  engine = pexConfig.Field(
62  dtype=str,
63  default="pyarrow",
64  doc="Parquet engine for writing (pyarrow or fastparquet)"
65  )
66  coaddName = pexConfig.Field(
67  dtype=str,
68  default="deep",
69  doc="Name of coadd"
70  )
71 
72  def validate(self):
73  pexConfig.Config.validate(self)
74  if len(self.priorityList) == 0:
75  raise RuntimeError("No priority list provided")
76 
77 
79  """Write filter-merged source tables to parquet
80  """
81  _DefaultName = "writeObjectTable"
82  ConfigClass = WriteObjectTableConfig
83  RunnerClass = MergeSourcesRunner
84 
85  # Names of table datasets to be merged
86  inputDatasets = ('forced_src', 'meas', 'ref')
87 
88  # Tag of output dataset written by `MergeSourcesTask.write`
89  outputDataset = 'obj'
90 
91  def __init__(self, butler=None, schema=None, **kwargs):
92  # It is a shame that this class can't use the default init for CmdLineTask
93  # But to do so would require its own special task runner, which is many
94  # more lines of specialization, so this is how it is for now
95  CmdLineTask.__init__(self, **kwargs)
96 
97  def runDataRef(self, patchRefList):
98  """!
99  @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
100  subclasses that inherit from MergeSourcesTask.
101  @param[in] patchRefList list of data references for each filter
102  """
103  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
104  dataId = patchRefList[0].dataId
105  mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
106  self.write(patchRefList[0], mergedCatalog)
107 
108  @classmethod
109  def _makeArgumentParser(cls):
110  """Create a suitable ArgumentParser.
111 
112  We will use the ArgumentParser to get a list of data
113  references for patches; the RunnerClass will sort them into lists
114  of data references for the same patch.
115 
116  References first of self.inputDatasets, rather than
117  self.inputDataset
118  """
120 
121  def readCatalog(self, patchRef):
122  """Read input catalogs
123 
124  Read all the input datasets given by the 'inputDatasets'
125  attribute.
126 
127  Parameters
128  ----------
129  patchRef : `lsst.daf.persistence.ButlerDataRef`
130  Data reference for patch
131 
132  Returns
133  -------
134  Tuple consisting of filter name and a dict of catalogs, keyed by
135  dataset name
136  """
137  filterName = patchRef.dataId["filter"]
138  catalogDict = {}
139  for dataset in self.inputDatasets:
140  catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
141  self.log.info("Read %d sources from %s for filter %s: %s" %
142  (len(catalog), dataset, filterName, patchRef.dataId))
143  catalogDict[dataset] = catalog
144  return filterName, catalogDict
145 
146  def run(self, catalogs, tract, patch):
147  """Merge multiple catalogs.
148 
149  Parameters
150  ----------
151  catalogs : `dict`
152  Mapping from filter names to dict of catalogs.
153  tract : int
154  tractId to use for the tractId column
155  patch : str
156  patchId to use for the patchId column
157 
158  Returns
159  -------
160  catalog : `lsst.pipe.tasks.parquetTable.ParquetTable`
161  Merged dataframe, with each column prefixed by
162  `filter_tag(filt)`, wrapped in the parquet writer shim class.
163  """
164 
165  dfs = []
166  for filt, tableDict in catalogs.items():
167  for dataset, table in tableDict.items():
168  # Convert afwTable to pandas DataFrame
169  df = table.asAstropy().to_pandas().set_index('id', drop=True)
170 
171  # Sort columns by name, to ensure matching schema among patches
172  df = df.reindex(sorted(df.columns), axis=1)
173  df['tractId'] = tract
174  df['patchId'] = patch
175 
176  # Make columns a 3-level MultiIndex
177  df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
178  names=('dataset', 'filter', 'column'))
179  dfs.append(df)
180 
181  catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
182  return ParquetTable(dataFrame=catalog)
183 
184  def write(self, patchRef, catalog):
185  """Write the output.
186 
187  Parameters
188  ----------
189  catalog : `ParquetTable`
190  Catalog to write
191  patchRef : `lsst.daf.persistence.ButlerDataRef`
192  Data reference for patch
193  """
194  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
195  # since the filter isn't actually part of the data ID for the dataset we're saving,
196  # it's confusing to see it in the log message, even if the butler simply ignores it.
197  mergeDataId = patchRef.dataId.copy()
198  del mergeDataId["filter"]
199  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
200 
201  def writeMetadata(self, dataRefList):
202  """No metadata to write, and not sure how to write it for a list of dataRefs.
203  """
204  pass
205 
206 
207 class WriteSourceTableConfig(pexConfig.Config):
208  doApplyExternalPhotoCalib = pexConfig.Field(
209  dtype=bool,
210  default=False,
211  doc=("Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
212  "generating Source Tables from older src tables which do not already have local calib columns")
213  )
214  doApplyExternalSkyWcs = pexConfig.Field(
215  dtype=bool,
216  default=False,
217  doc=("Add local WCS columns from the calexp.wcs? Should only set True if "
218  "generating Source Tables from older src tables which do not already have local calib columns")
219  )
220 
221 
223  """Write source table to parquet
224  """
225  _DefaultName = "writeSourceTable"
226  ConfigClass = WriteSourceTableConfig
227 
228  def runDataRef(self, dataRef):
229  src = dataRef.get('src')
230  if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
231  src = self.addCalibColumns(src, dataRef)
232 
233  ccdVisitId = dataRef.get('ccdExposureId')
234  result = self.run(src, ccdVisitId=ccdVisitId)
235  dataRef.put(result.table, 'source')
236 
237  def run(self, catalog, ccdVisitId=None):
238  """Convert `src` catalog to parquet
239 
240  Parameters
241  ----------
242  catalog: `afwTable.SourceCatalog`
243  catalog to be converted
244  ccdVisitId: `int`
245  ccdVisitId to be added as a column
246 
247  Returns
248  -------
249  result : `lsst.pipe.base.Struct`
250  ``table``
251  `ParquetTable` version of the input catalog
252  """
253  self.log.info("Generating parquet table from src catalog")
254  df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
255  df['ccdVisitId'] = ccdVisitId
256  return pipeBase.Struct(table=ParquetTable(dataFrame=df))
257 
258  def addCalibColumns(self, catalog, dataRef):
259  """Add columns with local calibration evaluated at each centroid
260 
261  for backwards compatibility with old repos.
262  This exists for the purpose of converting old src catalogs
263  (which don't have the expected local calib columns) to Source Tables.
264 
265  Parameters
266  ----------
267  catalog: `afwTable.SourceCatalog`
268  catalog to which calib columns will be added
269  dataRef: `lsst.daf.persistence.ButlerDataRef
270  for fetching the calibs from disk.
271 
272  Returns
273  -------
274  newCat: `afwTable.SourceCatalog`
275  Source Catalog with requested local calib columns
276  """
277  mapper = afwTable.SchemaMapper(catalog.schema)
278  measureConfig = SingleFrameMeasurementTask.ConfigClass()
279  measureConfig.doReplaceWithNoise = False
280 
281  # Just need the WCS or the PhotoCalib attached to an exposue
282  exposure = dataRef.get('calexp_sub',
284 
285  mapper = afwTable.SchemaMapper(catalog.schema)
286  mapper.addMinimalSchema(catalog.schema, True)
287  schema = mapper.getOutputSchema()
288 
289  exposureIdInfo = dataRef.get("expIdInfo")
290  measureConfig.plugins.names = []
291  if self.config.doApplyExternalSkyWcs:
292  plugin = 'base_LocalWcs'
293  if plugin in schema:
294  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
295  else:
296  measureConfig.plugins.names.add(plugin)
297 
298  if self.config.doApplyExternalPhotoCalib:
299  plugin = 'base_LocalPhotoCalib'
300  if plugin in schema:
301  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
302  else:
303  measureConfig.plugins.names.add(plugin)
304 
305  measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
306  newCat = afwTable.SourceCatalog(schema)
307  newCat.extend(catalog, mapper=mapper)
308  measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
309  return newCat
310 
311  def writeMetadata(self, dataRef):
312  """No metadata to write.
313  """
314  pass
315 
316  @classmethod
317  def _makeArgumentParser(cls):
318  parser = ArgumentParser(name=cls._DefaultName)
319  parser.add_id_argument("--id", 'src',
320  help="data ID, e.g. --id visit=12345 ccd=0")
321  return parser
322 
323 
325  """Calculate columns from ParquetTable
326 
327  This object manages and organizes an arbitrary set of computations
328  on a catalog. The catalog is defined by a
329  `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
330  `deepCoadd_obj` dataset, and the computations are defined by a collection
331  of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
332  a `CompositeFunctor`).
333 
334  After the object is initialized, accessing the `.df` attribute (which
335  holds the `pandas.DataFrame` containing the results of the calculations) triggers
336  computation of said dataframe.
337 
338  One of the conveniences of using this object is the ability to define a desired common
339  filter for all functors. This enables the same functor collection to be passed to
340  several different `PostprocessAnalysis` objects without having to change the original
341  functor collection, since the `filt` keyword argument of this object triggers an
342  overwrite of the `filt` property for all functors in the collection.
343 
344  This object also allows a list of refFlags to be passed, and defines a set of default
345  refFlags that are always included even if not requested.
346 
347  If a list of `ParquetTable` object is passed, rather than a single one, then the
348  calculations will be mapped over all the input catalogs. In principle, it should
349  be straightforward to parallelize this activity, but initial tests have failed
350  (see TODO in code comments).
351 
352  Parameters
353  ----------
354  parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
355  Source catalog(s) for computation
356 
357  functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
358  Computations to do (functors that act on `parq`).
359  If a dict, the output
360  DataFrame will have columns keyed accordingly.
361  If a list, the column keys will come from the
362  `.shortname` attribute of each functor.
363 
364  filt : `str` (optional)
365  Filter in which to calculate. If provided,
366  this will overwrite any existing `.filt` attribute
367  of the provided functors.
368 
369  flags : `list` (optional)
370  List of flags (per-band) to include in output table.
371 
372  refFlags : `list` (optional)
373  List of refFlags (only reference band) to include in output table.
374 
375 
376  """
377  _defaultRefFlags = []
378  _defaultFuncs = (('coord_ra', RAColumn()),
379  ('coord_dec', DecColumn()))
380 
381  def __init__(self, parq, functors, filt=None, flags=None, refFlags=None):
382  self.parq = parq
383  self.functors = functors
384 
385  self.filt = filt
386  self.flags = list(flags) if flags is not None else []
388  if refFlags is not None:
389  self.refFlags += list(refFlags)
390 
391  self._df = None
392 
393  @property
394  def defaultFuncs(self):
395  funcs = dict(self._defaultFuncs)
396  return funcs
397 
398  @property
399  def func(self):
400  additionalFuncs = self.defaultFuncs
401  additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
402  additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
403 
404  if isinstance(self.functors, CompositeFunctor):
405  func = self.functors
406  else:
407  func = CompositeFunctor(self.functors)
408 
409  func.funcDict.update(additionalFuncs)
410  func.filt = self.filt
411 
412  return func
413 
414  @property
415  def noDupCols(self):
416  return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
417 
418  @property
419  def df(self):
420  if self._df is None:
421  self.compute()
422  return self._df
423 
424  def compute(self, dropna=False, pool=None):
425  # map over multiple parquet tables
426  if type(self.parq) in (list, tuple):
427  if pool is None:
428  dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
429  else:
430  # TODO: Figure out why this doesn't work (pyarrow pickling issues?)
431  dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
432  self._df = pd.concat(dflist)
433  else:
434  self._df = self.func(self.parq, dropna=dropna)
435 
436  return self._df
437 
438 
439 class TransformCatalogBaseConfig(pexConfig.Config):
440  functorFile = pexConfig.Field(
441  dtype=str,
442  doc='Path to YAML file specifying functors to be computed',
443  default=None,
444  optional=True
445  )
446 
447 
449  """Base class for transforming/standardizing a catalog
450 
451  by applying functors that convert units and apply calibrations.
452  The purpose of this task is to perform a set of computations on
453  an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
454  results to a new dataset (which needs to be declared in an `outputDataset`
455  attribute).
456 
457  The calculations to be performed are defined in a YAML file that specifies
458  a set of functors to be computed, provided as
459  a `--functorFile` config parameter. An example of such a YAML file
460  is the following:
461 
462  funcs:
463  psfMag:
464  functor: Mag
465  args:
466  - base_PsfFlux
467  filt: HSC-G
468  dataset: meas
469  cmodel_magDiff:
470  functor: MagDiff
471  args:
472  - modelfit_CModel
473  - base_PsfFlux
474  filt: HSC-G
475  gauss_magDiff:
476  functor: MagDiff
477  args:
478  - base_GaussianFlux
479  - base_PsfFlux
480  filt: HSC-G
481  count:
482  functor: Column
483  args:
484  - base_InputCount_value
485  filt: HSC-G
486  deconvolved_moments:
487  functor: DeconvolvedMoments
488  filt: HSC-G
489  dataset: forced_src
490  refFlags:
491  - calib_psfUsed
492  - merge_measurement_i
493  - merge_measurement_r
494  - merge_measurement_z
495  - merge_measurement_y
496  - merge_measurement_g
497  - base_PixelFlags_flag_inexact_psfCenter
498  - detect_isPrimary
499 
500  The names for each entry under "func" will become the names of columns in the
501  output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
502  Positional arguments to be passed to each functor are in the `args` list,
503  and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
504  `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
505 
506  The "refFlags" entry is shortcut for a bunch of `Column` functors with the original column and
507  taken from the `'ref'` dataset.
508 
509  The "flags" entry will be expanded out per band.
510 
511  Note, if `'filter'` is provided as part of the `dataId` when running this task (even though
512  `deepCoadd_obj` does not use `'filter'`), then this will override the `filt` kwargs
513  provided in the YAML file, and the calculations will be done in that filter.
514 
515  This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
516  to organize and excecute the calculations.
517 
518  """
519  @property
520  def _DefaultName(self):
521  raise NotImplementedError('Subclass must define "_DefaultName" attribute')
522 
523  @property
524  def outputDataset(self):
525  raise NotImplementedError('Subclass must define "outputDataset" attribute')
526 
527  @property
528  def inputDataset(self):
529  raise NotImplementedError('Subclass must define "inputDataset" attribute')
530 
531  @property
532  def ConfigClass(self):
533  raise NotImplementedError('Subclass must define "ConfigClass" attribute')
534 
535  def runDataRef(self, dataRef):
536  parq = dataRef.get()
537  funcs = self.getFunctors()
538  df = self.run(parq, funcs=funcs, dataId=dataRef.dataId)
539  self.write(df, dataRef)
540  return df
541 
542  def run(self, parq, funcs=None, dataId=None):
543  """Do postprocessing calculations
544 
545  Takes a `ParquetTable` object and dataId,
546  returns a dataframe with results of postprocessing calculations.
547 
548  Parameters
549  ----------
550  parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
551  ParquetTable from which calculations are done.
552  funcs : `lsst.pipe.tasks.functors.Functors`
553  Functors to apply to the table's columns
554  dataId : dict, optional
555  Used to add a `patchId` column to the output dataframe.
556 
557  Returns
558  ------
559  `pandas.DataFrame`
560 
561  """
562  self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
563 
564  filt = dataId.get('filter', None)
565  df = self.transform(filt, parq, funcs, dataId).df
566  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
567  return df
568 
569  def getFunctors(self):
570  funcs = CompositeFunctor.from_file(self.config.functorFile)
571  funcs.update(dict(PostprocessAnalysis._defaultFuncs))
572  return funcs
573 
574  def getAnalysis(self, parq, funcs=None, filt=None):
575  # Avoids disk access if funcs is passed
576  if funcs is None:
577  funcs = self.getFunctors()
578  analysis = PostprocessAnalysis(parq, funcs, filt=filt)
579  return analysis
580 
581  def transform(self, filt, parq, funcs, dataId):
582  analysis = self.getAnalysis(parq, funcs=funcs, filt=filt)
583  df = analysis.df
584  if dataId is not None:
585  for key, value in dataId.items():
586  df[key] = value
587 
588  return pipeBase.Struct(
589  df=df,
590  analysis=analysis
591  )
592 
593  def write(self, df, parqRef):
594  parqRef.put(ParquetTable(dataFrame=df), self.outputDataset)
595 
596  def writeMetadata(self, dataRef):
597  """No metadata to write.
598  """
599  pass
600 
601 
602 class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
603  coaddName = pexConfig.Field(
604  dtype=str,
605  default="deep",
606  doc="Name of coadd"
607  )
608  filterMap = pexConfig.DictField(
609  keytype=str,
610  itemtype=str,
611  default={},
612  doc=("Dictionary mapping full filter name to short one for column name munging."
613  "These filters determine the output columns no matter what filters the "
614  "input data actually contain.")
615  )
616  camelCase = pexConfig.Field(
617  dtype=bool,
618  default=True,
619  doc=("Write per-filter columns names with camelCase, else underscore "
620  "For example: gPsfFlux instead of g_PsfFlux.")
621  )
622  multilevelOutput = pexConfig.Field(
623  dtype=bool,
624  default=False,
625  doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
626  "and name-munged (False).")
627  )
628 
629 
631  """Compute Flatted Object Table as defined in the DPDD
632 
633  Do the same set of postprocessing calculations on all bands
634 
635  This is identical to `TransformCatalogBaseTask`, except for that it does the
636  specified functor calculations for all filters present in the
637  input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
638  by the YAML file will be superceded.
639  """
640  _DefaultName = "transformObjectCatalog"
641  ConfigClass = TransformObjectCatalogConfig
642 
643  inputDataset = 'deepCoadd_obj'
644  outputDataset = 'objectTable'
645 
646  @classmethod
647  def _makeArgumentParser(cls):
648  parser = ArgumentParser(name=cls._DefaultName)
649  parser.add_id_argument("--id", cls.inputDataset,
650  ContainerClass=CoaddDataIdContainer,
651  help="data ID, e.g. --id tract=12345 patch=1,2")
652  return parser
653 
654  def run(self, parq, funcs=None, dataId=None):
655  dfDict = {}
656  analysisDict = {}
657  templateDf = pd.DataFrame()
658  # Perform transform for data of filters that exist in parq and are
659  # specified in config.filterMap
660  for filt in parq.columnLevelNames['filter']:
661  if filt not in self.config.filterMap:
662  self.log.info("Ignoring %s data in the input", filt)
663  continue
664  self.log.info("Transforming the catalog of filter %s", filt)
665  result = self.transform(filt, parq, funcs, dataId)
666  dfDict[filt] = result.df
667  analysisDict[filt] = result.analysis
668  if templateDf.empty:
669  templateDf = result.df
670 
671  # Fill NaNs in columns of other wanted filters
672  for filt in self.config.filterMap:
673  if filt not in dfDict:
674  self.log.info("Adding empty columns for filter %s", filt)
675  dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
676 
677  # This makes a multilevel column index, with filter as first level
678  df = pd.concat(dfDict, axis=1, names=['filter', 'column'])
679 
680  if not self.config.multilevelOutput:
681  noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
682  if dataId is not None:
683  noDupCols += list(dataId.keys())
684  df = flattenFilters(df, self.config.filterMap, noDupCols=noDupCols,
685  camelCase=self.config.camelCase)
686 
687  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
688  return df
689 
690 
692 
693  def makeDataRefList(self, namespace):
694  """Make self.refList from self.idList
695 
696  Generate a list of data references given tract and/or patch.
697  This was adapted from `TractQADataIdContainer`, which was
698  `TractDataIdContainer` modifie to not require "filter".
699  Only existing dataRefs are returned.
700  """
701  def getPatchRefList(tract):
702  return [namespace.butler.dataRef(datasetType=self.datasetType,
703  tract=tract.getId(),
704  patch="%d,%d" % patch.getIndex()) for patch in tract]
705 
706  tractRefs = defaultdict(list) # Data references for each tract
707  for dataId in self.idList:
708  skymap = self.getSkymap(namespace)
709 
710  if "tract" in dataId:
711  tractId = dataId["tract"]
712  if "patch" in dataId:
713  tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
714  tract=tractId,
715  patch=dataId['patch']))
716  else:
717  tractRefs[tractId] += getPatchRefList(skymap[tractId])
718  else:
719  tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
720  for tract in skymap)
721  outputRefList = []
722  for tractRefList in tractRefs.values():
723  existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
724  outputRefList.append(existingRefs)
725 
726  self.refList = outputRefList
727 
728 
729 class ConsolidateObjectTableConfig(pexConfig.Config):
730  coaddName = pexConfig.Field(
731  dtype=str,
732  default="deep",
733  doc="Name of coadd"
734  )
735 
736 
738  """Write patch-merged source tables to a tract-level parquet file
739  """
740  _DefaultName = "consolidateObjectTable"
741  ConfigClass = ConsolidateObjectTableConfig
742 
743  inputDataset = 'objectTable'
744  outputDataset = 'objectTable_tract'
745 
746  @classmethod
747  def _makeArgumentParser(cls):
748  parser = ArgumentParser(name=cls._DefaultName)
749 
750  parser.add_id_argument("--id", cls.inputDataset,
751  help="data ID, e.g. --id tract=12345",
752  ContainerClass=TractObjectDataIdContainer)
753  return parser
754 
755  def runDataRef(self, patchRefList):
756  df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
757  patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
758 
759  def writeMetadata(self, dataRef):
760  """No metadata to write.
761  """
762  pass
763 
764 
765 class TransformSourceTableConfig(TransformCatalogBaseConfig):
766  pass
767 
768 
770  """Transform/standardize a source catalog
771  """
772  _DefaultName = "transformSourceTable"
773  ConfigClass = TransformSourceTableConfig
774 
775  inputDataset = 'source'
776  outputDataset = 'sourceTable'
777 
778  def writeMetadata(self, dataRef):
779  """No metadata to write.
780  """
781  pass
782 
783  @classmethod
784  def _makeArgumentParser(cls):
785  parser = ArgumentParser(name=cls._DefaultName)
786  parser.add_id_argument("--id", datasetType=cls.inputDataset,
787  level="sensor",
788  help="data ID, e.g. --id visit=12345 ccd=0")
789  return parser
790 
791 
793  """DataIdContainer that groups sensor-level id's by visit
794  """
795 
796  def makeDataRefList(self, namespace):
797  """Make self.refList from self.idList
798 
799  Generate a list of data references grouped by visit.
800 
801  Parameters
802  ----------
803  namespace : `argparse.Namespace`
804  Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
805  """
806  def ccdDataRefList(visitId):
807  """Get all possible ccds for a given visit"""
808  ccds = namespace.butler.queryMetadata('src', ['ccd'], dataId={'visit': visitId})
809  return [namespace.butler.dataRef(datasetType=self.datasetType,
810  visit=visitId,
811  ccd=ccd) for ccd in ccds]
812  # Group by visits
813  visitRefs = defaultdict(list)
814  for dataId in self.idList:
815  if "visit" in dataId:
816  visitId = dataId["visit"]
817  if "ccd" in dataId:
818  visitRefs[visitId].append(namespace.butler.dataRef(datasetType=self.datasetType,
819  visit=visitId, ccd=dataId['ccd']))
820  else:
821  visitRefs[visitId] += ccdDataRefList(visitId)
822  outputRefList = []
823  for refList in visitRefs.values():
824  existingRefs = [ref for ref in refList if ref.datasetExists()]
825  outputRefList.append(existingRefs)
826 
827  self.refList = outputRefList
828 
829 
830 class ConsolidateSourceTableConfig(pexConfig.Config):
831  pass
832 
833 
834 class ConsolidateSourceTableTask(CmdLineTask):
835  """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
836  """
837  _DefaultName = 'consolidateSourceTable'
838  ConfigClass = ConsolidateSourceTableConfig
839 
840  inputDataset = 'sourceTable'
841  outputDataset = 'sourceTable_visit'
842 
843  def runDataRef(self, dataRefList):
844  self.log.info("Concatenating %s per-detector Source Tables", len(dataRefList))
845  df = pd.concat([dataRef.get().toDataFrame() for dataRef in dataRefList])
846  dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
847 
848  @classmethod
849  def _makeArgumentParser(cls):
850  parser = ArgumentParser(name=cls._DefaultName)
851 
852  parser.add_id_argument("--id", cls.inputDataset,
853  help="data ID, e.g. --id visit=12345",
854  ContainerClass=VisitDataIdContainer)
855  return parser
856 
857  def writeMetadata(self, dataRef):
858  """No metadata to write.
859  """
860  pass
861 
862  def writeConfig(self, butler, clobber=False, doBackup=True):
863  """No config to write.
864  """
865  pass
lsst.pipe.tasks.postprocess.TransformSourceTableTask._DefaultName
_DefaultName
Definition: postprocess.py:772
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask.outputDataset
outputDataset
Definition: postprocess.py:841
lsst.pipe.tasks.postprocess.WriteObjectTableTask.readCatalog
def readCatalog(self, patchRef)
Definition: postprocess.py:121
lsst.pipe.tasks.postprocess.WriteObjectTableConfig
Definition: postprocess.py:55
lsst.pipe.tasks.postprocess.WriteObjectTableTask
Definition: postprocess.py:78
lsst.pipe.tasks.functors.RAColumn
Definition: functors.py:468
lsst.pipe.tasks.postprocess.TransformObjectCatalogTask.inputDataset
inputDataset
Definition: postprocess.py:643
lsst.pipe.tasks.postprocess.TractObjectDataIdContainer.refList
refList
Definition: postprocess.py:726
lsst.pipe.tasks.postprocess.WriteSourceTableTask.runDataRef
def runDataRef(self, dataRef)
Definition: postprocess.py:228
lsst.pipe.base.argumentParser.DataIdContainer.refList
refList
Definition: argumentParser.py:108
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:198
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask._DefaultName
_DefaultName
Definition: postprocess.py:837
lsst.pipe.tasks.postprocess.PostprocessAnalysis.functors
functors
Definition: postprocess.py:383
lsst.pipe.tasks.postprocess.TransformSourceTableTask.inputDataset
inputDataset
Definition: postprocess.py:775
lsst.pipe.base.argumentParser.DataIdContainer.datasetType
datasetType
Definition: argumentParser.py:98
lsst.pipe.tasks.postprocess.WriteObjectTableTask.outputDataset
outputDataset
Definition: postprocess.py:89
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: postprocess.py:759
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask._DefaultName
_DefaultName
Definition: postprocess.py:740
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask.inputDataset
inputDataset
Definition: postprocess.py:743
lsst::coadd::utils.coaddDataIdContainer.CoaddDataIdContainer
Definition: coaddDataIdContainer.py:29
lsst.pipe.tasks.postprocess.WriteObjectTableTask.write
def write(self, patchRef, catalog)
Definition: postprocess.py:184
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.pipe.base.argumentParser.ArgumentParser
Definition: argumentParser.py:407
lsst.pipe.tasks.postprocess.WriteObjectTableTask.__init__
def __init__(self, butler=None, schema=None, **kwargs)
Definition: postprocess.py:91
lsst.pipe.tasks.postprocess.WriteObjectTableTask.runDataRef
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
Definition: postprocess.py:97
lsst.pipe.tasks.functors.DecColumn
Definition: functors.py:481
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.getAnalysis
def getAnalysis(self, parq, funcs=None, filt=None)
Definition: postprocess.py:574
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.inputDataset
def inputDataset(self)
Definition: postprocess.py:528
lsst.pipe.tasks.postprocess.TransformSourceTableTask
Definition: postprocess.py:769
lsst.pipe.base.argumentParser.DataIdContainer
Definition: argumentParser.py:75
lsst.pipe.tasks.postprocess.TractObjectDataIdContainer
Definition: postprocess.py:691
lsst.pipe.tasks.postprocess.WriteObjectTableTask.writeMetadata
def writeMetadata(self, dataRefList)
Definition: postprocess.py:201
lsst.pipe.tasks.postprocess.TransformObjectCatalogTask.run
def run(self, parq, funcs=None, dataId=None)
Definition: postprocess.py:654
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.outputDataset
def outputDataset(self)
Definition: postprocess.py:524
lsst.pipe.tasks.postprocess.WriteSourceTableTask._DefaultName
_DefaultName
Definition: postprocess.py:225
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask
Definition: postprocess.py:448
lsst.pipe.tasks.postprocess.flattenFilters
def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCase=False)
Definition: postprocess.py:39
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask.runDataRef
def runDataRef(self, dataRefList)
Definition: postprocess.py:843
lsst.pipe.tasks.postprocess.TransformCatalogBaseConfig
Definition: postprocess.py:439
lsst.pipe.tasks.multiBandUtils.makeMergeArgumentParser
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
Definition: multiBandUtils.py:112
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask
Definition: postprocess.py:737
lsst.pipe.tasks.postprocess.PostprocessAnalysis.flags
flags
Definition: postprocess.py:386
lsst::meas::base.sfm.SingleFrameMeasurementTask
Definition: sfm.py:159
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask.runDataRef
def runDataRef(self, patchRefList)
Definition: postprocess.py:755
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: postprocess.py:596
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.runDataRef
def runDataRef(self, dataRef)
Definition: postprocess.py:535
lsst::meas::base
Definition: Algorithm.h:37
lsst.pipe.tasks.postprocess.WriteSourceTableTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: postprocess.py:311
lsst::afw::table._source.SourceCatalog
Definition: _source.py:33
lsst.pipe.base.task.Task.config
config
Definition: task.py:149
lsst.pipe.tasks.postprocess.WriteObjectTableTask._DefaultName
_DefaultName
Definition: postprocess.py:81
lsst.pipe.tasks.postprocess.VisitDataIdContainer
Definition: postprocess.py:792
lsst::afw::table::SchemaMapper
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
lsst::coadd::utils.coaddDataIdContainer.CoaddDataIdContainer.getSkymap
def getSkymap(self, namespace)
Definition: coaddDataIdContainer.py:37
lsst.pipe.base.task.Task.log
log
Definition: task.py:148
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.run
def run(self, parq, funcs=None, dataId=None)
Definition: postprocess.py:542
lsst.pipe.tasks.functors.CompositeFunctor
Definition: functors.py:209
lsst::afw::table
Definition: table.dox:3
lsst.pipe.tasks.postprocess.ConsolidateObjectTableTask.outputDataset
outputDataset
Definition: postprocess.py:744
lsst.pipe.tasks.postprocess.WriteObjectTableTask.run
def run(self, catalogs, tract, patch)
Definition: postprocess.py:146
lsst.pipe.tasks.postprocess.PostprocessAnalysis._df
_df
Definition: postprocess.py:391
lsst.pipe.tasks.postprocess.PostprocessAnalysis.parq
parq
Definition: postprocess.py:382
lsst.pipe.tasks.postprocess.PostprocessAnalysis.compute
def compute(self, dropna=False, pool=None)
Definition: postprocess.py:424
object
lsst.pipe.tasks.postprocess.PostprocessAnalysis.noDupCols
def noDupCols(self)
Definition: postprocess.py:415
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: postprocess.py:857
lsst::geom
Definition: geomOperators.dox:4
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask.writeConfig
def writeConfig(self, butler, clobber=False, doBackup=True)
Definition: postprocess.py:862
lsst.pipe.tasks.postprocess.TransformSourceTableTask.writeMetadata
def writeMetadata(self, dataRef)
Definition: postprocess.py:778
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.transform
def transform(self, filt, parq, funcs, dataId)
Definition: postprocess.py:581
lsst.pipe.tasks.postprocess.PostprocessAnalysis
Definition: postprocess.py:324
lsst.pipe.tasks.postprocess.PostprocessAnalysis.refFlags
refFlags
Definition: postprocess.py:387
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.tasks.postprocess.PostprocessAnalysis._defaultRefFlags
_defaultRefFlags
Definition: postprocess.py:377
lsst.pipe.tasks.postprocess.PostprocessAnalysis.df
def df(self)
Definition: postprocess.py:419
type
table::Key< int > type
Definition: Detector.cc:163
lsst.pipe.tasks.postprocess.WriteObjectTableConfig.validate
def validate(self)
Definition: postprocess.py:72
lsst::geom::Point< int, 2 >
lsst.pipe.tasks.postprocess.WriteSourceTableTask.run
def run(self, catalog, ccdVisitId=None)
Definition: postprocess.py:237
lsst.pipe.tasks.postprocess.PostprocessAnalysis.func
def func(self)
Definition: postprocess.py:399
lsst.pipe.tasks.functors.Column
Definition: functors.py:409
lsst.pipe.tasks.postprocess.ConsolidateObjectTableConfig
Definition: postprocess.py:729
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.write
def write(self, df, parqRef)
Definition: postprocess.py:593
lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask.inputDataset
inputDataset
Definition: postprocess.py:840
lsst::geom::Box2I
An integer coordinate rectangle.
Definition: Box.h:55
lsst.pipe.tasks.postprocess.PostprocessAnalysis.filt
filt
Definition: postprocess.py:385
lsst.pipe.tasks.postprocess.WriteObjectTableTask.inputDatasets
inputDatasets
Definition: postprocess.py:86
lsst.pipe.tasks.postprocess.WriteSourceTableTask.addCalibColumns
def addCalibColumns(self, catalog, dataRef)
Definition: postprocess.py:258
lsst.pipe.tasks.postprocess.TransformObjectCatalogTask._DefaultName
_DefaultName
Definition: postprocess.py:640
lsst.pipe.tasks.postprocess.TransformObjectCatalogTask
Definition: postprocess.py:630
lsst.pipe.tasks.postprocess.PostprocessAnalysis.defaultFuncs
def defaultFuncs(self)
Definition: postprocess.py:394
lsst::coadd::utils.coaddDataIdContainer
Definition: coaddDataIdContainer.py:1
lsst.pipe.base
Definition: __init__.py:1
lsst.pipe.tasks.postprocess.ConsolidateSourceTableConfig
Definition: postprocess.py:830
lsst.pipe.tasks.parquetTable.ParquetTable
Definition: parquetTable.py:34
lsst.pipe.tasks.postprocess.WriteObjectTableConfig.priorityList
priorityList
Definition: postprocess.py:56
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.getFunctors
def getFunctors(self)
Definition: postprocess.py:569
lsst.pipe.tasks.postprocess.TractObjectDataIdContainer.makeDataRefList
def makeDataRefList(self, namespace)
Definition: postprocess.py:693
lsst.pipe.tasks.postprocess.VisitDataIdContainer.makeDataRefList
def makeDataRefList(self, namespace)
Definition: postprocess.py:796
lsst.pipe.tasks.postprocess.TransformCatalogBaseTask.ConfigClass
def ConfigClass(self)
Definition: postprocess.py:532
lsst.pipe.tasks.postprocess.PostprocessAnalysis._defaultFuncs
_defaultFuncs
Definition: postprocess.py:378
lsst.pipe.tasks.postprocess.PostprocessAnalysis.__init__
def __init__(self, parq, functors, filt=None, flags=None, refFlags=None)
Definition: postprocess.py:381
lsst.pipe.base.argumentParser.DataIdContainer.idList
idList
Definition: argumentParser.py:104
lsst.pipe.tasks.postprocess.WriteSourceTableTask
Definition: postprocess.py:222
lsst.pipe.base.cmdLineTask.CmdLineTask
Definition: cmdLineTask.py:492