LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
postprocess.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import functools
23 import pandas as pd
24 from collections import defaultdict
25 import numpy as np
26 
27 import lsst.geom
28 import lsst.pex.config as pexConfig
29 import lsst.pipe.base as pipeBase
30 import lsst.daf.base as dafBase
31 from lsst.pipe.base import connectionTypes
32 import lsst.afw.table as afwTable
33 from lsst.meas.base import SingleFrameMeasurementTask
34 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
35 from lsst.coadd.utils.coaddDataIdContainer import CoaddDataIdContainer
36 from lsst.daf.butler import DeferredDatasetHandle, DataCoordinate
37 
38 from .parquetTable import ParquetTable
39 from .multiBandUtils import makeMergeArgumentParser, MergeSourcesRunner
40 from .functors import CompositeFunctor, Column
41 
42 
43 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
44  """Flattens a dataframe with multilevel column index
45  """
46  newDf = pd.DataFrame()
47  # band is the level 0 index
48  dfBands = df.columns.unique(level=0).values
49  for band in dfBands:
50  subdf = df[band]
51  columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
52  newColumns = {c: columnFormat.format(band, c)
53  for c in subdf.columns if c not in noDupCols}
54  cols = list(newColumns.keys())
55  newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
56 
57  # Band must be present in the input and output or else column is all NaN:
58  presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
59  # Get the unexploded columns from any present band's partition
60  noDupDf = df[presentBands[0]][noDupCols]
61  newDf = pd.concat([noDupDf, newDf], axis=1)
62  return newDf
63 
64 
65 class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
66  defaultTemplates={"coaddName": "deep"},
67  dimensions=("tract", "patch", "skymap")):
68  inputCatalogMeas = connectionTypes.Input(
69  doc="Catalog of source measurements on the deepCoadd.",
70  dimensions=("tract", "patch", "band", "skymap"),
71  storageClass="SourceCatalog",
72  name="{coaddName}Coadd_meas",
73  multiple=True
74  )
75  inputCatalogForcedSrc = connectionTypes.Input(
76  doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
77  dimensions=("tract", "patch", "band", "skymap"),
78  storageClass="SourceCatalog",
79  name="{coaddName}Coadd_forced_src",
80  multiple=True
81  )
82  inputCatalogRef = connectionTypes.Input(
83  doc="Catalog marking the primary detection (which band provides a good shape and position)"
84  "for each detection in deepCoadd_mergeDet.",
85  dimensions=("tract", "patch", "skymap"),
86  storageClass="SourceCatalog",
87  name="{coaddName}Coadd_ref"
88  )
89  outputCatalog = connectionTypes.Output(
90  doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
91  "stored as a DataFrame with a multi-level column index per-patch.",
92  dimensions=("tract", "patch", "skymap"),
93  storageClass="DataFrame",
94  name="{coaddName}Coadd_obj"
95  )
96 
97 
98 class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
99  pipelineConnections=WriteObjectTableConnections):
100  engine = pexConfig.Field(
101  dtype=str,
102  default="pyarrow",
103  doc="Parquet engine for writing (pyarrow or fastparquet)"
104  )
105  coaddName = pexConfig.Field(
106  dtype=str,
107  default="deep",
108  doc="Name of coadd"
109  )
110 
111 
112 class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
113  """Write filter-merged source tables to parquet
114  """
115  _DefaultName = "writeObjectTable"
116  ConfigClass = WriteObjectTableConfig
117  RunnerClass = MergeSourcesRunner
118 
119  # Names of table datasets to be merged
120  inputDatasets = ('forced_src', 'meas', 'ref')
121 
122  # Tag of output dataset written by `MergeSourcesTask.write`
123  outputDataset = 'obj'
124 
125  def __init__(self, butler=None, schema=None, **kwargs):
126  # It is a shame that this class can't use the default init for CmdLineTask
127  # But to do so would require its own special task runner, which is many
128  # more lines of specialization, so this is how it is for now
129  super().__init__(**kwargs)
130 
131  def runDataRef(self, patchRefList):
132  """!
133  @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
134  subclasses that inherit from MergeSourcesTask.
135  @param[in] patchRefList list of data references for each filter
136  """
137  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
138  dataId = patchRefList[0].dataId
139  mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
140  self.write(patchRefList[0], ParquetTable(dataFrame=mergedCatalog))
141 
142  def runQuantum(self, butlerQC, inputRefs, outputRefs):
143  inputs = butlerQC.get(inputRefs)
144 
145  measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
146  zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
147  forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
148  zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
149 
150  catalogs = {}
151  for band in measDict.keys():
152  catalogs[band] = {'meas': measDict[band]['meas'],
153  'forced_src': forcedSourceDict[band]['forced_src'],
154  'ref': inputs['inputCatalogRef']}
155  dataId = butlerQC.quantum.dataId
156  df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
157  outputs = pipeBase.Struct(outputCatalog=df)
158  butlerQC.put(outputs, outputRefs)
159 
160  @classmethod
161  def _makeArgumentParser(cls):
162  """Create a suitable ArgumentParser.
163 
164  We will use the ArgumentParser to get a list of data
165  references for patches; the RunnerClass will sort them into lists
166  of data references for the same patch.
167 
168  References first of self.inputDatasets, rather than
169  self.inputDataset
170  """
171  return makeMergeArgumentParser(cls._DefaultName, cls.inputDatasets[0])
172 
173  def readCatalog(self, patchRef):
174  """Read input catalogs
175 
176  Read all the input datasets given by the 'inputDatasets'
177  attribute.
178 
179  Parameters
180  ----------
181  patchRef : `lsst.daf.persistence.ButlerDataRef`
182  Data reference for patch
183 
184  Returns
185  -------
186  Tuple consisting of band name and a dict of catalogs, keyed by
187  dataset name
188  """
189  band = patchRef.get(self.config.coaddName + "Coadd_filterLabel", immediate=True).bandLabel
190  catalogDict = {}
191  for dataset in self.inputDatasets:
192  catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
193  self.log.info("Read %d sources from %s for band %s: %s",
194  len(catalog), dataset, band, patchRef.dataId)
195  catalogDict[dataset] = catalog
196  return band, catalogDict
197 
198  def run(self, catalogs, tract, patch):
199  """Merge multiple catalogs.
200 
201  Parameters
202  ----------
203  catalogs : `dict`
204  Mapping from filter names to dict of catalogs.
205  tract : int
206  tractId to use for the tractId column
207  patch : str
208  patchId to use for the patchId column
209 
210  Returns
211  -------
212  catalog : `pandas.DataFrame`
213  Merged dataframe
214  """
215 
216  dfs = []
217  for filt, tableDict in catalogs.items():
218  for dataset, table in tableDict.items():
219  # Convert afwTable to pandas DataFrame
220  df = table.asAstropy().to_pandas().set_index('id', drop=True)
221 
222  # Sort columns by name, to ensure matching schema among patches
223  df = df.reindex(sorted(df.columns), axis=1)
224  df['tractId'] = tract
225  df['patchId'] = patch
226 
227  # Make columns a 3-level MultiIndex
228  df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
229  names=('dataset', 'band', 'column'))
230  dfs.append(df)
231 
232  catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
233  return catalog
234 
235  def write(self, patchRef, catalog):
236  """Write the output.
237 
238  Parameters
239  ----------
240  catalog : `ParquetTable`
241  Catalog to write
242  patchRef : `lsst.daf.persistence.ButlerDataRef`
243  Data reference for patch
244  """
245  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
246  # since the filter isn't actually part of the data ID for the dataset we're saving,
247  # it's confusing to see it in the log message, even if the butler simply ignores it.
248  mergeDataId = patchRef.dataId.copy()
249  del mergeDataId["filter"]
250  self.log.info("Wrote merged catalog: %s", mergeDataId)
251 
252  def writeMetadata(self, dataRefList):
253  """No metadata to write, and not sure how to write it for a list of dataRefs.
254  """
255  pass
256 
257 
258 class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
259  defaultTemplates={"catalogType": ""},
260  dimensions=("instrument", "visit", "detector")):
261 
262  catalog = connectionTypes.Input(
263  doc="Input full-depth catalog of sources produced by CalibrateTask",
264  name="{catalogType}src",
265  storageClass="SourceCatalog",
266  dimensions=("instrument", "visit", "detector")
267  )
268  outputCatalog = connectionTypes.Output(
269  doc="Catalog of sources, `src` in Parquet format. The 'id' column is "
270  "replaced with an index; all other columns are unchanged.",
271  name="{catalogType}source",
272  storageClass="DataFrame",
273  dimensions=("instrument", "visit", "detector")
274  )
275 
276 
277 class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
278  pipelineConnections=WriteSourceTableConnections):
279  doApplyExternalPhotoCalib = pexConfig.Field(
280  dtype=bool,
281  default=False,
282  doc=("Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
283  "generating Source Tables from older src tables which do not already have local calib columns")
284  )
285  doApplyExternalSkyWcs = pexConfig.Field(
286  dtype=bool,
287  default=False,
288  doc=("Add local WCS columns from the calexp.wcs? Should only set True if "
289  "generating Source Tables from older src tables which do not already have local calib columns")
290  )
291 
292 
293 class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
294  """Write source table to parquet
295  """
296  _DefaultName = "writeSourceTable"
297  ConfigClass = WriteSourceTableConfig
298 
299  def runDataRef(self, dataRef):
300  src = dataRef.get('src')
301  if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
302  src = self.addCalibColumns(src, dataRef)
303 
304  ccdVisitId = dataRef.get('ccdExposureId')
305  result = self.run(src, ccdVisitId=ccdVisitId)
306  dataRef.put(result.table, 'source')
307 
308  def runQuantum(self, butlerQC, inputRefs, outputRefs):
309  inputs = butlerQC.get(inputRefs)
310  inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
311  result = self.run(**inputs).table
312  outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
313  butlerQC.put(outputs, outputRefs)
314 
315  def run(self, catalog, ccdVisitId=None):
316  """Convert `src` catalog to parquet
317 
318  Parameters
319  ----------
320  catalog: `afwTable.SourceCatalog`
321  catalog to be converted
322  ccdVisitId: `int`
323  ccdVisitId to be added as a column
324 
325  Returns
326  -------
327  result : `lsst.pipe.base.Struct`
328  ``table``
329  `ParquetTable` version of the input catalog
330  """
331  self.log.info("Generating parquet table from src catalog %s", ccdVisitId)
332  df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
333  df['ccdVisitId'] = ccdVisitId
334  return pipeBase.Struct(table=ParquetTable(dataFrame=df))
335 
336  def addCalibColumns(self, catalog, dataRef):
337  """Add columns with local calibration evaluated at each centroid
338 
339  for backwards compatibility with old repos.
340  This exists for the purpose of converting old src catalogs
341  (which don't have the expected local calib columns) to Source Tables.
342 
343  Parameters
344  ----------
345  catalog: `afwTable.SourceCatalog`
346  catalog to which calib columns will be added
347  dataRef: `lsst.daf.persistence.ButlerDataRef
348  for fetching the calibs from disk.
349 
350  Returns
351  -------
352  newCat: `afwTable.SourceCatalog`
353  Source Catalog with requested local calib columns
354  """
355  mapper = afwTable.SchemaMapper(catalog.schema)
356  measureConfig = SingleFrameMeasurementTask.ConfigClass()
357  measureConfig.doReplaceWithNoise = False
358 
359  # Just need the WCS or the PhotoCalib attached to an exposue
360  exposure = dataRef.get('calexp_sub',
362 
363  mapper = afwTable.SchemaMapper(catalog.schema)
364  mapper.addMinimalSchema(catalog.schema, True)
365  schema = mapper.getOutputSchema()
366 
367  exposureIdInfo = dataRef.get("expIdInfo")
368  measureConfig.plugins.names = []
369  if self.config.doApplyExternalSkyWcs:
370  plugin = 'base_LocalWcs'
371  if plugin in schema:
372  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
373  else:
374  measureConfig.plugins.names.add(plugin)
375 
376  if self.config.doApplyExternalPhotoCalib:
377  plugin = 'base_LocalPhotoCalib'
378  if plugin in schema:
379  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
380  else:
381  measureConfig.plugins.names.add(plugin)
382 
383  measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
384  newCat = afwTable.SourceCatalog(schema)
385  newCat.extend(catalog, mapper=mapper)
386  measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
387  return newCat
388 
389  def writeMetadata(self, dataRef):
390  """No metadata to write.
391  """
392  pass
393 
394  @classmethod
395  def _makeArgumentParser(cls):
396  parser = ArgumentParser(name=cls._DefaultName)
397  parser.add_id_argument("--id", 'src',
398  help="data ID, e.g. --id visit=12345 ccd=0")
399  return parser
400 
401 
402 class PostprocessAnalysis(object):
403  """Calculate columns from ParquetTable
404 
405  This object manages and organizes an arbitrary set of computations
406  on a catalog. The catalog is defined by a
407  `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
408  `deepCoadd_obj` dataset, and the computations are defined by a collection
409  of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
410  a `CompositeFunctor`).
411 
412  After the object is initialized, accessing the `.df` attribute (which
413  holds the `pandas.DataFrame` containing the results of the calculations) triggers
414  computation of said dataframe.
415 
416  One of the conveniences of using this object is the ability to define a desired common
417  filter for all functors. This enables the same functor collection to be passed to
418  several different `PostprocessAnalysis` objects without having to change the original
419  functor collection, since the `filt` keyword argument of this object triggers an
420  overwrite of the `filt` property for all functors in the collection.
421 
422  This object also allows a list of refFlags to be passed, and defines a set of default
423  refFlags that are always included even if not requested.
424 
425  If a list of `ParquetTable` object is passed, rather than a single one, then the
426  calculations will be mapped over all the input catalogs. In principle, it should
427  be straightforward to parallelize this activity, but initial tests have failed
428  (see TODO in code comments).
429 
430  Parameters
431  ----------
432  parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
433  Source catalog(s) for computation
434 
435  functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
436  Computations to do (functors that act on `parq`).
437  If a dict, the output
438  DataFrame will have columns keyed accordingly.
439  If a list, the column keys will come from the
440  `.shortname` attribute of each functor.
441 
442  filt : `str` (optional)
443  Filter in which to calculate. If provided,
444  this will overwrite any existing `.filt` attribute
445  of the provided functors.
446 
447  flags : `list` (optional)
448  List of flags (per-band) to include in output table.
449  Taken from the `meas` dataset if applied to a multilevel Object Table.
450 
451  refFlags : `list` (optional)
452  List of refFlags (only reference band) to include in output table.
453 
454  forcedFlags : `list` (optional)
455  List of flags (per-band) to include in output table.
456  Taken from the ``forced_src`` dataset if applied to a
457  multilevel Object Table. Intended for flags from measurement plugins
458  only run during multi-band forced-photometry.
459  """
460  _defaultRefFlags = []
461  _defaultFuncs = ()
462 
463  def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
464  self.parq = parq
465  self.functors = functors
466 
467  self.filt = filt
468  self.flags = list(flags) if flags is not None else []
469  self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
470  self.refFlags = list(self._defaultRefFlags)
471  if refFlags is not None:
472  self.refFlags += list(refFlags)
473 
474  self._df = None
475 
476  @property
477  def defaultFuncs(self):
478  funcs = dict(self._defaultFuncs)
479  return funcs
480 
481  @property
482  def func(self):
483  additionalFuncs = self.defaultFuncs
484  additionalFuncs.update({flag: Column(flag, dataset='forced_src') for flag in self.forcedFlags})
485  additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
486  additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
487 
488  if isinstance(self.functors, CompositeFunctor):
489  func = self.functors
490  else:
491  func = CompositeFunctor(self.functors)
492 
493  func.funcDict.update(additionalFuncs)
494  func.filt = self.filt
495 
496  return func
497 
498  @property
499  def noDupCols(self):
500  return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
501 
502  @property
503  def df(self):
504  if self._df is None:
505  self.compute()
506  return self._df
507 
508  def compute(self, dropna=False, pool=None):
509  # map over multiple parquet tables
510  if type(self.parq) in (list, tuple):
511  if pool is None:
512  dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
513  else:
514  # TODO: Figure out why this doesn't work (pyarrow pickling issues?)
515  dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
516  self._df = pd.concat(dflist)
517  else:
518  self._df = self.func(self.parq, dropna=dropna)
519 
520  return self._df
521 
522 
523 class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
524  dimensions=()):
525  """Expected Connections for subclasses of TransformCatalogBaseTask.
526 
527  Must be subclassed.
528  """
529  inputCatalog = connectionTypes.Input(
530  name="",
531  storageClass="DataFrame",
532  )
533  outputCatalog = connectionTypes.Output(
534  name="",
535  storageClass="DataFrame",
536  )
537 
538 
539 class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
540  pipelineConnections=TransformCatalogBaseConnections):
541  functorFile = pexConfig.Field(
542  dtype=str,
543  doc="Path to YAML file specifying Science Data Model functors to use "
544  "when copying columns and computing calibrated values.",
545  default=None,
546  optional=True
547  )
548  primaryKey = pexConfig.Field(
549  dtype=str,
550  doc="Name of column to be set as the DataFrame index. If None, the index"
551  "will be named `id`",
552  default=None,
553  optional=True
554  )
555 
556 
557 class TransformCatalogBaseTask(CmdLineTask, pipeBase.PipelineTask):
558  """Base class for transforming/standardizing a catalog
559 
560  by applying functors that convert units and apply calibrations.
561  The purpose of this task is to perform a set of computations on
562  an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
563  results to a new dataset (which needs to be declared in an `outputDataset`
564  attribute).
565 
566  The calculations to be performed are defined in a YAML file that specifies
567  a set of functors to be computed, provided as
568  a `--functorFile` config parameter. An example of such a YAML file
569  is the following:
570 
571  funcs:
572  psfMag:
573  functor: Mag
574  args:
575  - base_PsfFlux
576  filt: HSC-G
577  dataset: meas
578  cmodel_magDiff:
579  functor: MagDiff
580  args:
581  - modelfit_CModel
582  - base_PsfFlux
583  filt: HSC-G
584  gauss_magDiff:
585  functor: MagDiff
586  args:
587  - base_GaussianFlux
588  - base_PsfFlux
589  filt: HSC-G
590  count:
591  functor: Column
592  args:
593  - base_InputCount_value
594  filt: HSC-G
595  deconvolved_moments:
596  functor: DeconvolvedMoments
597  filt: HSC-G
598  dataset: forced_src
599  refFlags:
600  - calib_psfUsed
601  - merge_measurement_i
602  - merge_measurement_r
603  - merge_measurement_z
604  - merge_measurement_y
605  - merge_measurement_g
606  - base_PixelFlags_flag_inexact_psfCenter
607  - detect_isPrimary
608 
609  The names for each entry under "func" will become the names of columns in the
610  output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
611  Positional arguments to be passed to each functor are in the `args` list,
612  and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
613  `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
614 
615  The "flags" entry is the default shortcut for `Column` functors.
616  All columns listed under "flags" will be copied to the output table
617  untransformed. They can be of any datatype.
618  In the special case of transforming a multi-level oject table with
619  band and dataset indices (deepCoadd_obj), these will be taked from the
620  `meas` dataset and exploded out per band.
621 
622  There are two special shortcuts that only apply when transforming
623  multi-level Object (deepCoadd_obj) tables:
624  - The "refFlags" entry is shortcut for `Column` functor
625  taken from the `'ref'` dataset if transforming an ObjectTable.
626  - The "forcedFlags" entry is shortcut for `Column` functors.
627  taken from the ``forced_src`` dataset if transforming an ObjectTable.
628  These are expanded out per band.
629 
630 
631  This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
632  to organize and excecute the calculations.
633 
634  """
635  @property
636  def _DefaultName(self):
637  raise NotImplementedError('Subclass must define "_DefaultName" attribute')
638 
639  @property
640  def outputDataset(self):
641  raise NotImplementedError('Subclass must define "outputDataset" attribute')
642 
643  @property
644  def inputDataset(self):
645  raise NotImplementedError('Subclass must define "inputDataset" attribute')
646 
647  @property
648  def ConfigClass(self):
649  raise NotImplementedError('Subclass must define "ConfigClass" attribute')
650 
651  def __init__(self, *args, **kwargs):
652  super().__init__(*args, **kwargs)
653  if self.config.functorFile:
654  self.log.info('Loading tranform functor definitions from %s',
655  self.config.functorFile)
656  self.funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
657  self.funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
658  else:
659  self.funcsfuncs = None
660 
661  def runQuantum(self, butlerQC, inputRefs, outputRefs):
662  inputs = butlerQC.get(inputRefs)
663  if self.funcsfuncs is None:
664  raise ValueError("config.functorFile is None. "
665  "Must be a valid path to yaml in order to run Task as a PipelineTask.")
666  result = self.runrun(parq=inputs['inputCatalog'], funcs=self.funcsfuncs,
667  dataId=outputRefs.outputCatalog.dataId.full)
668  outputs = pipeBase.Struct(outputCatalog=result)
669  butlerQC.put(outputs, outputRefs)
670 
671  def runDataRef(self, dataRef):
672  parq = dataRef.get()
673  if self.funcsfuncs is None:
674  raise ValueError("config.functorFile is None. "
675  "Must be a valid path to yaml in order to run as a CommandlineTask.")
676  df = self.runrun(parq, funcs=self.funcsfuncs, dataId=dataRef.dataId)
677  self.writewrite(df, dataRef)
678  return df
679 
680  def run(self, parq, funcs=None, dataId=None, band=None):
681  """Do postprocessing calculations
682 
683  Takes a `ParquetTable` object and dataId,
684  returns a dataframe with results of postprocessing calculations.
685 
686  Parameters
687  ----------
688  parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
689  ParquetTable from which calculations are done.
690  funcs : `lsst.pipe.tasks.functors.Functors`
691  Functors to apply to the table's columns
692  dataId : dict, optional
693  Used to add a `patchId` column to the output dataframe.
694  band : `str`, optional
695  Filter band that is being processed.
696 
697  Returns
698  ------
699  `pandas.DataFrame`
700 
701  """
702  self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
703 
704  df = self.transformtransform(band, parq, funcs, dataId).df
705  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
706  return df
707 
708  def getFunctors(self):
709  return self.funcsfuncs
710 
711  def getAnalysis(self, parq, funcs=None, band=None):
712  if funcs is None:
713  funcs = self.funcsfuncs
714  analysis = PostprocessAnalysis(parq, funcs, filt=band)
715  return analysis
716 
717  def transform(self, band, parq, funcs, dataId):
718  analysis = self.getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
719  df = analysis.df
720  if dataId is not None:
721  for key, value in dataId.items():
722  df[str(key)] = value
723 
724  if self.config.primaryKey:
725  if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
726  df.reset_index(inplace=True, drop=True)
727  df.set_index(self.config.primaryKey, inplace=True)
728 
729  return pipeBase.Struct(
730  df=df,
731  analysis=analysis
732  )
733 
734  def write(self, df, parqRef):
735  parqRef.put(ParquetTable(dataFrame=df), self.outputDatasetoutputDataset)
736 
737  def writeMetadata(self, dataRef):
738  """No metadata to write.
739  """
740  pass
741 
742 
743 class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
744  defaultTemplates={"coaddName": "deep"},
745  dimensions=("tract", "patch", "skymap")):
746  inputCatalog = connectionTypes.Input(
747  doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
748  "stored as a DataFrame with a multi-level column index per-patch.",
749  dimensions=("tract", "patch", "skymap"),
750  storageClass="DataFrame",
751  name="{coaddName}Coadd_obj",
752  deferLoad=True,
753  )
754  outputCatalog = connectionTypes.Output(
755  doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
756  "data model.",
757  dimensions=("tract", "patch", "skymap"),
758  storageClass="DataFrame",
759  name="objectTable"
760  )
761 
762 
763 class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
764  pipelineConnections=TransformObjectCatalogConnections):
765  coaddName = pexConfig.Field(
766  dtype=str,
767  default="deep",
768  doc="Name of coadd"
769  )
770  # TODO: remove in DM-27177
771  filterMap = pexConfig.DictField(
772  keytype=str,
773  itemtype=str,
774  default={},
775  doc=("Dictionary mapping full filter name to short one for column name munging."
776  "These filters determine the output columns no matter what filters the "
777  "input data actually contain."),
778  deprecated=("Coadds are now identified by the band, so this transform is unused."
779  "Will be removed after v22.")
780  )
781  outputBands = pexConfig.ListField(
782  dtype=str,
783  default=None,
784  optional=True,
785  doc=("These bands and only these bands will appear in the output,"
786  " NaN-filled if the input does not include them."
787  " If None, then use all bands found in the input.")
788  )
789  camelCase = pexConfig.Field(
790  dtype=bool,
791  default=False,
792  doc=("Write per-band columns names with camelCase, else underscore "
793  "For example: gPsFlux instead of g_PsFlux.")
794  )
795  multilevelOutput = pexConfig.Field(
796  dtype=bool,
797  default=False,
798  doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
799  "and name-munged (False).")
800  )
801 
802  def setDefaults(self):
803  super().setDefaults()
804  self.primaryKey = 'objectId'
805 
806 
807 class TransformObjectCatalogTask(TransformCatalogBaseTask):
808  """Produce a flattened Object Table to match the format specified in
809  sdm_schemas.
810 
811  Do the same set of postprocessing calculations on all bands
812 
813  This is identical to `TransformCatalogBaseTask`, except for that it does the
814  specified functor calculations for all filters present in the
815  input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
816  by the YAML file will be superceded.
817  """
818  _DefaultName = "transformObjectCatalog"
819  ConfigClass = TransformObjectCatalogConfig
820 
821  # Used by Gen 2 runDataRef only:
822  inputDataset = 'deepCoadd_obj'
823  outputDataset = 'objectTable'
824 
825  @classmethod
826  def _makeArgumentParser(cls):
827  parser = ArgumentParser(name=cls._DefaultName)
828  parser.add_id_argument("--id", cls.inputDataset,
829  ContainerClass=CoaddDataIdContainer,
830  help="data ID, e.g. --id tract=12345 patch=1,2")
831  return parser
832 
833  def run(self, parq, funcs=None, dataId=None, band=None):
834  # NOTE: band kwarg is ignored here.
835  dfDict = {}
836  analysisDict = {}
837  templateDf = pd.DataFrame()
838 
839  if isinstance(parq, DeferredDatasetHandle):
840  columns = parq.get(component='columns')
841  inputBands = columns.unique(level=1).values
842  else:
843  inputBands = parq.columnLevelNames['band']
844 
845  outputBands = self.config.outputBands if self.config.outputBands else inputBands
846 
847  # Perform transform for data of filters that exist in parq.
848  for inputBand in inputBands:
849  if inputBand not in outputBands:
850  self.log.info("Ignoring %s band data in the input", inputBand)
851  continue
852  self.log.info("Transforming the catalog of band %s", inputBand)
853  result = self.transform(inputBand, parq, funcs, dataId)
854  dfDict[inputBand] = result.df
855  analysisDict[inputBand] = result.analysis
856  if templateDf.empty:
857  templateDf = result.df
858 
859  # Fill NaNs in columns of other wanted bands
860  for filt in outputBands:
861  if filt not in dfDict:
862  self.log.info("Adding empty columns for band %s", filt)
863  dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
864 
865  # This makes a multilevel column index, with band as first level
866  df = pd.concat(dfDict, axis=1, names=['band', 'column'])
867 
868  if not self.config.multilevelOutput:
869  noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
870  if self.config.primaryKey in noDupCols:
871  noDupCols.remove(self.config.primaryKey)
872  if dataId is not None:
873  noDupCols += list(dataId.keys())
874  df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
875  inputBands=inputBands)
876 
877  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
878 
879  return df
880 
881 
882 class TractObjectDataIdContainer(CoaddDataIdContainer):
883 
884  def makeDataRefList(self, namespace):
885  """Make self.refList from self.idList
886 
887  Generate a list of data references given tract and/or patch.
888  This was adapted from `TractQADataIdContainer`, which was
889  `TractDataIdContainer` modifie to not require "filter".
890  Only existing dataRefs are returned.
891  """
892  def getPatchRefList(tract):
893  return [namespace.butler.dataRef(datasetType=self.datasetType,
894  tract=tract.getId(),
895  patch="%d,%d" % patch.getIndex()) for patch in tract]
896 
897  tractRefs = defaultdict(list) # Data references for each tract
898  for dataId in self.idList:
899  skymap = self.getSkymap(namespace)
900 
901  if "tract" in dataId:
902  tractId = dataId["tract"]
903  if "patch" in dataId:
904  tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
905  tract=tractId,
906  patch=dataId['patch']))
907  else:
908  tractRefs[tractId] += getPatchRefList(skymap[tractId])
909  else:
910  tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
911  for tract in skymap)
912  outputRefList = []
913  for tractRefList in tractRefs.values():
914  existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
915  outputRefList.append(existingRefs)
916 
917  self.refList = outputRefList
918 
919 
920 class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
921  dimensions=("tract", "skymap")):
922  inputCatalogs = connectionTypes.Input(
923  doc="Per-Patch objectTables conforming to the standard data model.",
924  name="objectTable",
925  storageClass="DataFrame",
926  dimensions=("tract", "patch", "skymap"),
927  multiple=True,
928  )
929  outputCatalog = connectionTypes.Output(
930  doc="Pre-tract horizontal concatenation of the input objectTables",
931  name="objectTable_tract",
932  storageClass="DataFrame",
933  dimensions=("tract", "skymap"),
934  )
935 
936 
937 class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
938  pipelineConnections=ConsolidateObjectTableConnections):
939  coaddName = pexConfig.Field(
940  dtype=str,
941  default="deep",
942  doc="Name of coadd"
943  )
944 
945 
946 class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
947  """Write patch-merged source tables to a tract-level parquet file
948 
949  Concatenates `objectTable` list into a per-visit `objectTable_tract`
950  """
951  _DefaultName = "consolidateObjectTable"
952  ConfigClass = ConsolidateObjectTableConfig
953 
954  inputDataset = 'objectTable'
955  outputDataset = 'objectTable_tract'
956 
957  def runQuantum(self, butlerQC, inputRefs, outputRefs):
958  inputs = butlerQC.get(inputRefs)
959  self.log.info("Concatenating %s per-patch Object Tables",
960  len(inputs['inputCatalogs']))
961  df = pd.concat(inputs['inputCatalogs'])
962  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
963 
964  @classmethod
965  def _makeArgumentParser(cls):
966  parser = ArgumentParser(name=cls._DefaultName)
967 
968  parser.add_id_argument("--id", cls.inputDataset,
969  help="data ID, e.g. --id tract=12345",
970  ContainerClass=TractObjectDataIdContainer)
971  return parser
972 
973  def runDataRef(self, patchRefList):
974  df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
975  patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
976 
977  def writeMetadata(self, dataRef):
978  """No metadata to write.
979  """
980  pass
981 
982 
983 class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
984  defaultTemplates={"catalogType": ""},
985  dimensions=("instrument", "visit", "detector")):
986 
987  inputCatalog = connectionTypes.Input(
988  doc="Wide input catalog of sources produced by WriteSourceTableTask",
989  name="{catalogType}source",
990  storageClass="DataFrame",
991  dimensions=("instrument", "visit", "detector"),
992  deferLoad=True
993  )
994  outputCatalog = connectionTypes.Output(
995  doc="Narrower, per-detector Source Table transformed and converted per a "
996  "specified set of functors",
997  name="{catalogType}sourceTable",
998  storageClass="DataFrame",
999  dimensions=("instrument", "visit", "detector")
1000  )
1001 
1002 
1003 class TransformSourceTableConfig(TransformCatalogBaseConfig,
1004  pipelineConnections=TransformSourceTableConnections):
1005 
1006  def setDefaults(self):
1007  super().setDefaults()
1008  self.primaryKey = 'sourceId'
1009 
1010 
1011 class TransformSourceTableTask(TransformCatalogBaseTask):
1012  """Transform/standardize a source catalog
1013  """
1014  _DefaultName = "transformSourceTable"
1015  ConfigClass = TransformSourceTableConfig
1016 
1017  inputDataset = 'source'
1018  outputDataset = 'sourceTable'
1019 
1020  @classmethod
1021  def _makeArgumentParser(cls):
1022  parser = ArgumentParser(name=cls._DefaultName)
1023  parser.add_id_argument("--id", datasetType=cls.inputDataset,
1024  level="sensor",
1025  help="data ID, e.g. --id visit=12345 ccd=0")
1026  return parser
1027 
1028  def runDataRef(self, dataRef):
1029  """Override to specify band label to run()."""
1030  parq = dataRef.get()
1031  funcs = self.getFunctors()
1032  band = dataRef.get("calexp_filterLabel", immediate=True).bandLabel
1033  df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
1034  self.write(df, dataRef)
1035  return df
1036 
1037 
1038 class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1039  dimensions=("instrument", "visit",),
1040  defaultTemplates={"calexpType": ""}):
1041  calexp = connectionTypes.Input(
1042  doc="Processed exposures used for metadata",
1043  name="{calexpType}calexp",
1044  storageClass="ExposureF",
1045  dimensions=("instrument", "visit", "detector"),
1046  deferLoad=True,
1047  multiple=True,
1048  )
1049  visitSummary = connectionTypes.Output(
1050  doc=("Per-visit consolidated exposure metadata. These catalogs use "
1051  "detector id for the id and are sorted for fast lookups of a "
1052  "detector."),
1053  name="{calexpType}visitSummary",
1054  storageClass="ExposureCatalog",
1055  dimensions=("instrument", "visit"),
1056  )
1057 
1058 
1059 class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1060  pipelineConnections=ConsolidateVisitSummaryConnections):
1061  """Config for ConsolidateVisitSummaryTask"""
1062  pass
1063 
1064 
1065 class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1066  """Task to consolidate per-detector visit metadata.
1067 
1068  This task aggregates the following metadata from all the detectors in a
1069  single visit into an exposure catalog:
1070  - The visitInfo.
1071  - The wcs.
1072  - The photoCalib.
1073  - The physical_filter and band (if available).
1074  - The psf size, shape, and effective area at the center of the detector.
1075  - The corners of the bounding box in right ascension/declination.
1076 
1077  Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1078  are not persisted here because of storage concerns, and because of their
1079  limited utility as summary statistics.
1080 
1081  Tests for this task are performed in ci_hsc_gen3.
1082  """
1083  _DefaultName = "consolidateVisitSummary"
1084  ConfigClass = ConsolidateVisitSummaryConfig
1085 
1086  @classmethod
1087  def _makeArgumentParser(cls):
1088  parser = ArgumentParser(name=cls._DefaultName)
1089 
1090  parser.add_id_argument("--id", "calexp",
1091  help="data ID, e.g. --id visit=12345",
1092  ContainerClass=VisitDataIdContainer)
1093  return parser
1094 
1095  def writeMetadata(self, dataRef):
1096  """No metadata to persist, so override to remove metadata persistance.
1097  """
1098  pass
1099 
1100  def writeConfig(self, butler, clobber=False, doBackup=True):
1101  """No config to persist, so override to remove config persistance.
1102  """
1103  pass
1104 
1105  def runDataRef(self, dataRefList):
1106  visit = dataRefList[0].dataId['visit']
1107 
1108  self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1109  len(dataRefList), visit)
1110 
1111  expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=False)
1112 
1113  dataRefList[0].put(expCatalog, 'visitSummary', visit=visit)
1114 
1115  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1116  dataRefs = butlerQC.get(inputRefs.calexp)
1117  visit = dataRefs[0].dataId.byName()['visit']
1118 
1119  self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1120  len(dataRefs), visit)
1121 
1122  expCatalog = self._combineExposureMetadata(visit, dataRefs)
1123 
1124  butlerQC.put(expCatalog, outputRefs.visitSummary)
1125 
1126  def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1127  """Make a combined exposure catalog from a list of dataRefs.
1128  These dataRefs must point to exposures with wcs, summaryStats,
1129  and other visit metadata.
1130 
1131  Parameters
1132  ----------
1133  visit : `int`
1134  Visit identification number.
1135  dataRefs : `list`
1136  List of dataRefs in visit. May be list of
1137  `lsst.daf.persistence.ButlerDataRef` (Gen2) or
1138  `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1139  isGen3 : `bool`, optional
1140  Specifies if this is a Gen3 list of datarefs.
1141 
1142  Returns
1143  -------
1144  visitSummary : `lsst.afw.table.ExposureCatalog`
1145  Exposure catalog with per-detector summary information.
1146  """
1147  schema = self._makeVisitSummarySchema()
1148  cat = afwTable.ExposureCatalog(schema)
1149  cat.resize(len(dataRefs))
1150 
1151  cat['visit'] = visit
1152 
1153  for i, dataRef in enumerate(dataRefs):
1154  if isGen3:
1155  visitInfo = dataRef.get(component='visitInfo')
1156  filterLabel = dataRef.get(component='filterLabel')
1157  summaryStats = dataRef.get(component='summaryStats')
1158  detector = dataRef.get(component='detector')
1159  wcs = dataRef.get(component='wcs')
1160  photoCalib = dataRef.get(component='photoCalib')
1161  detector = dataRef.get(component='detector')
1162  bbox = dataRef.get(component='bbox')
1163  validPolygon = dataRef.get(component='validPolygon')
1164  else:
1165  # Note that we need to read the calexp because there is
1166  # no magic access to the psf except through the exposure.
1167  gen2_read_bbox = lsst.geom.BoxI(lsst.geom.PointI(0, 0), lsst.geom.PointI(1, 1))
1168  exp = dataRef.get(datasetType='calexp_sub', bbox=gen2_read_bbox)
1169  visitInfo = exp.getInfo().getVisitInfo()
1170  filterLabel = dataRef.get("calexp_filterLabel")
1171  summaryStats = exp.getInfo().getSummaryStats()
1172  wcs = exp.getWcs()
1173  photoCalib = exp.getPhotoCalib()
1174  detector = exp.getDetector()
1175  bbox = dataRef.get(datasetType='calexp_bbox')
1176  validPolygon = exp.getInfo().getValidPolygon()
1177 
1178  rec = cat[i]
1179  rec.setBBox(bbox)
1180  rec.setVisitInfo(visitInfo)
1181  rec.setWcs(wcs)
1182  rec.setPhotoCalib(photoCalib)
1183  rec.setValidPolygon(validPolygon)
1184 
1185  rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1186  rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1187  rec.setId(detector.getId())
1188  rec['psfSigma'] = summaryStats.psfSigma
1189  rec['psfIxx'] = summaryStats.psfIxx
1190  rec['psfIyy'] = summaryStats.psfIyy
1191  rec['psfIxy'] = summaryStats.psfIxy
1192  rec['psfArea'] = summaryStats.psfArea
1193  rec['raCorners'][:] = summaryStats.raCorners
1194  rec['decCorners'][:] = summaryStats.decCorners
1195  rec['ra'] = summaryStats.ra
1196  rec['decl'] = summaryStats.decl
1197  rec['zenithDistance'] = summaryStats.zenithDistance
1198  rec['zeroPoint'] = summaryStats.zeroPoint
1199  rec['skyBg'] = summaryStats.skyBg
1200  rec['skyNoise'] = summaryStats.skyNoise
1201  rec['meanVar'] = summaryStats.meanVar
1202  rec['astromOffsetMean'] = summaryStats.astromOffsetMean
1203  rec['astromOffsetStd'] = summaryStats.astromOffsetStd
1204 
1205  metadata = dafBase.PropertyList()
1206  metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1207  # We are looping over existing datarefs, so the following is true
1208  metadata.add("COMMENT", "Only detectors with data have entries.")
1209  cat.setMetadata(metadata)
1210 
1211  cat.sort()
1212  return cat
1213 
1214  def _makeVisitSummarySchema(self):
1215  """Make the schema for the visitSummary catalog."""
1216  schema = afwTable.ExposureTable.makeMinimalSchema()
1217  schema.addField('visit', type='I', doc='Visit number')
1218  schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
1219  schema.addField('band', type='String', size=32, doc='Name of band')
1220  schema.addField('psfSigma', type='F',
1221  doc='PSF model second-moments determinant radius (center of chip) (pixel)')
1222  schema.addField('psfArea', type='F',
1223  doc='PSF model effective area (center of chip) (pixel**2)')
1224  schema.addField('psfIxx', type='F',
1225  doc='PSF model Ixx (center of chip) (pixel**2)')
1226  schema.addField('psfIyy', type='F',
1227  doc='PSF model Iyy (center of chip) (pixel**2)')
1228  schema.addField('psfIxy', type='F',
1229  doc='PSF model Ixy (center of chip) (pixel**2)')
1230  schema.addField('raCorners', type='ArrayD', size=4,
1231  doc='Right Ascension of bounding box corners (degrees)')
1232  schema.addField('decCorners', type='ArrayD', size=4,
1233  doc='Declination of bounding box corners (degrees)')
1234  schema.addField('ra', type='D',
1235  doc='Right Ascension of bounding box center (degrees)')
1236  schema.addField('decl', type='D',
1237  doc='Declination of bounding box center (degrees)')
1238  schema.addField('zenithDistance', type='F',
1239  doc='Zenith distance of bounding box center (degrees)')
1240  schema.addField('zeroPoint', type='F',
1241  doc='Mean zeropoint in detector (mag)')
1242  schema.addField('skyBg', type='F',
1243  doc='Average sky background (ADU)')
1244  schema.addField('skyNoise', type='F',
1245  doc='Average sky noise (ADU)')
1246  schema.addField('meanVar', type='F',
1247  doc='Mean variance of the weight plane (ADU**2)')
1248  schema.addField('astromOffsetMean', type='F',
1249  doc='Mean offset of astrometric calibration matches (arcsec)')
1250  schema.addField('astromOffsetStd', type='F',
1251  doc='Standard deviation of offsets of astrometric calibration matches (arcsec)')
1252 
1253  return schema
1254 
1255 
1256 class VisitDataIdContainer(DataIdContainer):
1257  """DataIdContainer that groups sensor-level id's by visit
1258  """
1259 
1260  def makeDataRefList(self, namespace):
1261  """Make self.refList from self.idList
1262 
1263  Generate a list of data references grouped by visit.
1264 
1265  Parameters
1266  ----------
1267  namespace : `argparse.Namespace`
1268  Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
1269  """
1270  # Group by visits
1271  visitRefs = defaultdict(list)
1272  for dataId in self.idList:
1273  if "visit" in dataId:
1274  visitId = dataId["visit"]
1275  # append all subsets to
1276  subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1277  visitRefs[visitId].extend([dataRef for dataRef in subset])
1278 
1279  outputRefList = []
1280  for refList in visitRefs.values():
1281  existingRefs = [ref for ref in refList if ref.datasetExists()]
1282  if existingRefs:
1283  outputRefList.append(existingRefs)
1284 
1285  self.refList = outputRefList
1286 
1287 
1288 class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1289  defaultTemplates={"catalogType": ""},
1290  dimensions=("instrument", "visit")):
1291  inputCatalogs = connectionTypes.Input(
1292  doc="Input per-detector Source Tables",
1293  name="{catalogType}sourceTable",
1294  storageClass="DataFrame",
1295  dimensions=("instrument", "visit", "detector"),
1296  multiple=True
1297  )
1298  outputCatalog = connectionTypes.Output(
1299  doc="Per-visit concatenation of Source Table",
1300  name="{catalogType}sourceTable_visit",
1301  storageClass="DataFrame",
1302  dimensions=("instrument", "visit")
1303  )
1304 
1305 
1306 class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1307  pipelineConnections=ConsolidateSourceTableConnections):
1308  pass
1309 
1310 
1311 class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1312  """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1313  """
1314  _DefaultName = 'consolidateSourceTable'
1315  ConfigClass = ConsolidateSourceTableConfig
1316 
1317  inputDataset = 'sourceTable'
1318  outputDataset = 'sourceTable_visit'
1319 
1320  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1321  inputs = butlerQC.get(inputRefs)
1322  self.log.info("Concatenating %s per-detector Source Tables",
1323  len(inputs['inputCatalogs']))
1324  df = pd.concat(inputs['inputCatalogs'])
1325  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1326 
1327  def runDataRef(self, dataRefList):
1328  self.log.info("Concatenating %s per-detector Source Tables", len(dataRefList))
1329  df = pd.concat([dataRef.get().toDataFrame() for dataRef in dataRefList])
1330  dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1331 
1332  @classmethod
1333  def _makeArgumentParser(cls):
1334  parser = ArgumentParser(name=cls._DefaultName)
1335 
1336  parser.add_id_argument("--id", cls.inputDataset,
1337  help="data ID, e.g. --id visit=12345",
1338  ContainerClass=VisitDataIdContainer)
1339  return parser
1340 
1341  def writeMetadata(self, dataRef):
1342  """No metadata to write.
1343  """
1344  pass
1345 
1346  def writeConfig(self, butler, clobber=False, doBackup=True):
1347  """No config to write.
1348  """
1349  pass
1350 
1351 
1352 class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1353  dimensions=("instrument",),
1354  defaultTemplates={}):
1355  visitSummaryRefs = connectionTypes.Input(
1356  doc="Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1357  name="visitSummary",
1358  storageClass="ExposureCatalog",
1359  dimensions=("instrument", "visit"),
1360  multiple=True,
1361  deferLoad=True,
1362  )
1363  outputCatalog = connectionTypes.Output(
1364  doc="CCD and Visit metadata table",
1365  name="ccdVisitTable",
1366  storageClass="DataFrame",
1367  dimensions=("instrument",)
1368  )
1369 
1370 
1371 class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1372  pipelineConnections=MakeCcdVisitTableConnections):
1373  pass
1374 
1375 
1376 class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1377  """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1378  """
1379  _DefaultName = 'makeCcdVisitTable'
1380  ConfigClass = MakeCcdVisitTableConfig
1381 
1382  def run(self, visitSummaryRefs):
1383  """ Make a table of ccd information from the `visitSummary` catalogs.
1384  Parameters
1385  ----------
1386  visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1387  List of DeferredDatasetHandles pointing to exposure catalogs with
1388  per-detector summary information.
1389  Returns
1390  -------
1391  result : `lsst.pipe.Base.Struct`
1392  Results struct with attribute:
1393  - `outputCatalog`
1394  Catalog of ccd and visit information.
1395  """
1396  ccdEntries = []
1397  for visitSummaryRef in visitSummaryRefs:
1398  visitSummary = visitSummaryRef.get()
1399  visitInfo = visitSummary[0].getVisitInfo()
1400 
1401  ccdEntry = {}
1402  summaryTable = visitSummary.asAstropy()
1403  selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'decl', 'zenithDistance',
1404  'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise']
1405  ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
1406  # 'visit' is the human readible visit number
1407  # 'visitId' is the key to the visitId table. They are the same
1408  # Technically you should join to get the visit from the visit table
1409  ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1410  dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id) for id in
1411  summaryTable['id']]
1412  packer = visitSummaryRef.dataId.universe.makePacker('visit_detector', visitSummaryRef.dataId)
1413  ccdVisitIds = [packer.pack(dataId) for dataId in dataIds]
1414  ccdEntry['ccdVisitId'] = ccdVisitIds
1415  ccdEntry['detector'] = summaryTable['id']
1416  pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() for vR in visitSummary])
1417  ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1418 
1419  ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1420  ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1421  expTime = visitInfo.getExposureTime()
1422  ccdEntry['expTime'] = expTime
1423  ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1424  ccdEntry['darkTime'] = visitInfo.getDarkTime()
1425  ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x']
1426  ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y']
1427  ccdEntry['llcra'] = summaryTable['raCorners'][:, 0]
1428  ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0]
1429  ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1]
1430  ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1]
1431  ccdEntry['urcra'] = summaryTable['raCorners'][:, 2]
1432  ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
1433  ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
1434  ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
1435  # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, and flags,
1436  # and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. values are actually wanted.
1437  ccdEntries.append(ccdEntry)
1438 
1439  outputCatalog = pd.concat(ccdEntries)
1440  outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True)
1441  return pipeBase.Struct(outputCatalog=outputCatalog)
1442 
1443 
1444 class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1445  dimensions=("instrument",),
1446  defaultTemplates={}):
1447  visitSummaries = connectionTypes.Input(
1448  doc="Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1449  name="visitSummary",
1450  storageClass="ExposureCatalog",
1451  dimensions=("instrument", "visit",),
1452  multiple=True,
1453  deferLoad=True,
1454  )
1455  outputCatalog = connectionTypes.Output(
1456  doc="Visit metadata table",
1457  name="visitTable",
1458  storageClass="DataFrame",
1459  dimensions=("instrument",)
1460  )
1461 
1462 
1463 class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1464  pipelineConnections=MakeVisitTableConnections):
1465  pass
1466 
1467 
1468 class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1469  """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1470  """
1471  _DefaultName = 'makeVisitTable'
1472  ConfigClass = MakeVisitTableConfig
1473 
1474  def run(self, visitSummaries):
1475  """ Make a table of visit information from the `visitSummary` catalogs
1476 
1477  Parameters
1478  ----------
1479  visitSummaries : list of `lsst.afw.table.ExposureCatalog`
1480  List of exposure catalogs with per-detector summary information.
1481  Returns
1482  -------
1483  result : `lsst.pipe.Base.Struct`
1484  Results struct with attribute:
1485  ``outputCatalog``
1486  Catalog of visit information.
1487  """
1488  visitEntries = []
1489  for visitSummary in visitSummaries:
1490  visitSummary = visitSummary.get()
1491  visitRow = visitSummary[0]
1492  visitInfo = visitRow.getVisitInfo()
1493 
1494  visitEntry = {}
1495  visitEntry["visitId"] = visitRow['visit']
1496  visitEntry["visit"] = visitRow['visit']
1497  visitEntry["physical_filter"] = visitRow['physical_filter']
1498  visitEntry["band"] = visitRow['band']
1499  raDec = visitInfo.getBoresightRaDec()
1500  visitEntry["ra"] = raDec.getRa().asDegrees()
1501  visitEntry["decl"] = raDec.getDec().asDegrees()
1502  visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1503  azAlt = visitInfo.getBoresightAzAlt()
1504  visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1505  visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1506  visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1507  visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1508  visitEntry["obsStart"] = visitInfo.getDate().toPython()
1509  visitEntry["expTime"] = visitInfo.getExposureTime()
1510  visitEntries.append(visitEntry)
1511  # TODO: DM-30623, Add programId, exposureType, expMidpt, cameraTemp, mirror1Temp, mirror2Temp,
1512  # mirror3Temp, domeTemp, externalTemp, dimmSeeing, pwvGPS, pwvMW, flags, nExposures
1513 
1514  outputCatalog = pd.DataFrame(data=visitEntries)
1515  outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
1516  return pipeBase.Struct(outputCatalog=outputCatalog)
1517 
1518 
1519 class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1520  dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1521 
1522  inputCatalog = connectionTypes.Input(
1523  doc="Primary per-detector, single-epoch forced-photometry catalog. "
1524  "By default, it is the output of ForcedPhotCcdTask on calexps",
1525  name="forced_src",
1526  storageClass="SourceCatalog",
1527  dimensions=("instrument", "visit", "detector", "skymap", "tract")
1528  )
1529  inputCatalogDiff = connectionTypes.Input(
1530  doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1531  "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1532  name="forced_diff",
1533  storageClass="SourceCatalog",
1534  dimensions=("instrument", "visit", "detector", "skymap", "tract")
1535  )
1536  outputCatalog = connectionTypes.Output(
1537  doc="InputCatalogs horizonatally joined on `objectId` in Parquet format",
1538  name="forcedSource",
1539  storageClass="DataFrame",
1540  dimensions=("instrument", "visit", "detector")
1541  )
1542 
1543 
1544 class WriteForcedSourceTableConfig(WriteSourceTableConfig,
1545  pipelineConnections=WriteForcedSourceTableConnections):
1546  key = lsst.pex.config.Field(
1547  doc="Column on which to join the two input tables on and make the primary key of the output",
1548  dtype=str,
1549  default="objectId",
1550  )
1551 
1552 
1553 class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1554  """Merge and convert per-detector forced source catalogs to parquet
1555  """
1556  _DefaultName = "writeForcedSourceTable"
1557  ConfigClass = WriteForcedSourceTableConfig
1558 
1559  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1560  inputs = butlerQC.get(inputRefs)
1561  # Add ccdVisitId to allow joining with CcdVisitTable
1562  inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
1563  inputs['band'] = butlerQC.quantum.dataId.full['band']
1564  outputs = self.run(**inputs)
1565  butlerQC.put(outputs, outputRefs)
1566 
1567  def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1568  dfs = []
1569  for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
1570  df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1571  df = df.reindex(sorted(df.columns), axis=1)
1572  df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
1573  df['band'] = band if band else pd.NA
1574  df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1575  names=('dataset', 'column'))
1576 
1577  dfs.append(df)
1578 
1579  outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1580  return pipeBase.Struct(outputCatalog=outputCatalog)
1581 
1582 
1583 class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1584  dimensions=("instrument", "skymap", "patch", "tract")):
1585 
1586  inputCatalogs = connectionTypes.Input(
1587  doc="Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1588  name="forcedSource",
1589  storageClass="DataFrame",
1590  dimensions=("instrument", "visit", "detector"),
1591  multiple=True,
1592  deferLoad=True
1593  )
1594  referenceCatalog = connectionTypes.Input(
1595  doc="Reference catalog which was used to seed the forcedPhot. Columns "
1596  "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1597  "are expected.",
1598  name="objectTable",
1599  storageClass="DataFrame",
1600  dimensions=("tract", "patch", "skymap"),
1601  deferLoad=True
1602  )
1603  outputCatalog = connectionTypes.Output(
1604  doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1605  "specified set of functors",
1606  name="forcedSourceTable",
1607  storageClass="DataFrame",
1608  dimensions=("tract", "patch", "skymap")
1609  )
1610 
1611 
1612 class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1613  pipelineConnections=TransformForcedSourceTableConnections):
1614  referenceColumns = pexConfig.ListField(
1615  dtype=str,
1616  default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1617  optional=True,
1618  doc="Columns to pull from reference catalog",
1619  )
1620  keyRef = lsst.pex.config.Field(
1621  doc="Column on which to join the two input tables on and make the primary key of the output",
1622  dtype=str,
1623  default="objectId",
1624  )
1625  key = lsst.pex.config.Field(
1626  doc="Rename the output DataFrame index to this name",
1627  dtype=str,
1628  default="forcedSourceId",
1629  )
1630 
1631 
1632 class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1633  """Transform/standardize a ForcedSource catalog
1634 
1635  Transforms each wide, per-detector forcedSource parquet table per the
1636  specification file (per-camera defaults found in ForcedSource.yaml).
1637  All epochs that overlap the patch are aggregated into one per-patch
1638  narrow-parquet file.
1639 
1640  No de-duplication of rows is performed. Duplicate resolutions flags are
1641  pulled in from the referenceCatalog: `detect_isPrimary`,
1642  `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1643  for analysis or compare duplicates for QA.
1644 
1645  The resulting table includes multiple bands. Epochs (MJDs) and other useful
1646  per-visit rows can be retreived by joining with the CcdVisitTable on
1647  ccdVisitId.
1648  """
1649  _DefaultName = "transformForcedSourceTable"
1650  ConfigClass = TransformForcedSourceTableConfig
1651 
1652  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1653  inputs = butlerQC.get(inputRefs)
1654  if self.funcs is None:
1655  raise ValueError("config.functorFile is None. "
1656  "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1657  outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
1658  dataId=outputRefs.outputCatalog.dataId.full)
1659 
1660  butlerQC.put(outputs, outputRefs)
1661 
1662  def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1663  dfs = []
1664  ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
1665  self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1666  for handle in inputCatalogs:
1667  result = self.transform(None, handle, funcs, dataId)
1668  # Filter for only rows that were detected on (overlap) the patch
1669  dfs.append(result.df.join(ref, how='inner'))
1670 
1671  outputCatalog = pd.concat(dfs)
1672 
1673  # Now that we are done joining on config.keyRef
1674  # Change index to config.key by
1675  outputCatalog.index.rename(self.config.keyRef, inplace=True)
1676  # Add config.keyRef to the column list
1677  outputCatalog.reset_index(inplace=True)
1678  # set the forcedSourceId to the index. This is specified in the ForcedSource.yaml
1679  outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1680  # Rename it to the config.key
1681  outputCatalog.index.rename(self.config.key, inplace=True)
1682 
1683  self.log.info("Made a table of %d columns and %d rows",
1684  len(outputCatalog.columns), len(outputCatalog))
1685  return pipeBase.Struct(outputCatalog=outputCatalog)
1686 
1687 
1688 class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1689  defaultTemplates={"catalogType": ""},
1690  dimensions=("instrument", "tract")):
1691  inputCatalogs = connectionTypes.Input(
1692  doc="Input per-patch DataFrame Tables to be concatenated",
1693  name="{catalogType}ForcedSourceTable",
1694  storageClass="DataFrame",
1695  dimensions=("tract", "patch", "skymap"),
1696  multiple=True,
1697  )
1698 
1699  outputCatalog = connectionTypes.Output(
1700  doc="Output per-tract concatenation of DataFrame Tables",
1701  name="{catalogType}ForcedSourceTable_tract",
1702  storageClass="DataFrame",
1703  dimensions=("tract", "skymap"),
1704  )
1705 
1706 
1707 class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1708  pipelineConnections=ConsolidateTractConnections):
1709  pass
1710 
1711 
1712 class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
1713  """Concatenate any per-patch, dataframe list into a single
1714  per-tract DataFrame
1715  """
1716  _DefaultName = 'ConsolidateTract'
1717  ConfigClass = ConsolidateTractConfig
1718 
1719  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1720  inputs = butlerQC.get(inputRefs)
1721  # Not checking at least one inputCatalog exists because that'd be an empty QG
1722  self.log.info("Concatenating %s per-patch %s Tables",
1723  len(inputs['inputCatalogs']),
1724  inputRefs.inputCatalogs[0].datasetType.name)
1725  df = pd.concat(inputs['inputCatalogs'])
1726  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > type
Definition: Detector.cc:163
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
An integer coordinate rectangle.
Definition: Box.h:55
def getAnalysis(self, parq, funcs=None, band=None)
Definition: postprocess.py:711
def transform(self, band, parq, funcs, dataId)
Definition: postprocess.py:717
def run(self, parq, funcs=None, dataId=None, band=None)
Definition: postprocess.py:680
def runQuantum(self, butlerQC, inputRefs, outputRefs)
Definition: postprocess.py:661
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
void write(OutputArchiveHandle &handle) const override
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
Definition: postprocess.py:43