LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
postprocess.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import functools
23 import pandas as pd
24 from collections import defaultdict
25 import numpy as np
26 import numbers
27 
28 import lsst.geom
29 import lsst.pex.config as pexConfig
30 import lsst.pipe.base as pipeBase
31 import lsst.daf.base as dafBase
32 from lsst.pipe.base import connectionTypes
33 import lsst.afw.table as afwTable
34 from lsst.meas.base import SingleFrameMeasurementTask
35 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
36 from lsst.coadd.utils.coaddDataIdContainer import CoaddDataIdContainer
37 from lsst.daf.butler import DeferredDatasetHandle, DataCoordinate
38 
39 from .parquetTable import ParquetTable
40 from .multiBandUtils import makeMergeArgumentParser, MergeSourcesRunner
41 from .functors import CompositeFunctor, Column
42 
43 
44 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
45  """Flattens a dataframe with multilevel column index
46  """
47  newDf = pd.DataFrame()
48  # band is the level 0 index
49  dfBands = df.columns.unique(level=0).values
50  for band in dfBands:
51  subdf = df[band]
52  columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
53  newColumns = {c: columnFormat.format(band, c)
54  for c in subdf.columns if c not in noDupCols}
55  cols = list(newColumns.keys())
56  newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
57 
58  # Band must be present in the input and output or else column is all NaN:
59  presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
60  # Get the unexploded columns from any present band's partition
61  noDupDf = df[presentBands[0]][noDupCols]
62  newDf = pd.concat([noDupDf, newDf], axis=1)
63  return newDf
64 
65 
66 class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
67  defaultTemplates={"coaddName": "deep"},
68  dimensions=("tract", "patch", "skymap")):
69  inputCatalogMeas = connectionTypes.Input(
70  doc="Catalog of source measurements on the deepCoadd.",
71  dimensions=("tract", "patch", "band", "skymap"),
72  storageClass="SourceCatalog",
73  name="{coaddName}Coadd_meas",
74  multiple=True
75  )
76  inputCatalogForcedSrc = connectionTypes.Input(
77  doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
78  dimensions=("tract", "patch", "band", "skymap"),
79  storageClass="SourceCatalog",
80  name="{coaddName}Coadd_forced_src",
81  multiple=True
82  )
83  inputCatalogRef = connectionTypes.Input(
84  doc="Catalog marking the primary detection (which band provides a good shape and position)"
85  "for each detection in deepCoadd_mergeDet.",
86  dimensions=("tract", "patch", "skymap"),
87  storageClass="SourceCatalog",
88  name="{coaddName}Coadd_ref"
89  )
90  outputCatalog = connectionTypes.Output(
91  doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
92  "stored as a DataFrame with a multi-level column index per-patch.",
93  dimensions=("tract", "patch", "skymap"),
94  storageClass="DataFrame",
95  name="{coaddName}Coadd_obj"
96  )
97 
98 
99 class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
100  pipelineConnections=WriteObjectTableConnections):
101  engine = pexConfig.Field(
102  dtype=str,
103  default="pyarrow",
104  doc="Parquet engine for writing (pyarrow or fastparquet)"
105  )
106  coaddName = pexConfig.Field(
107  dtype=str,
108  default="deep",
109  doc="Name of coadd"
110  )
111 
112 
113 class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
114  """Write filter-merged source tables to parquet
115  """
116  _DefaultName = "writeObjectTable"
117  ConfigClass = WriteObjectTableConfig
118  RunnerClass = MergeSourcesRunner
119 
120  # Names of table datasets to be merged
121  inputDatasets = ('forced_src', 'meas', 'ref')
122 
123  # Tag of output dataset written by `MergeSourcesTask.write`
124  outputDataset = 'obj'
125 
126  def __init__(self, butler=None, schema=None, **kwargs):
127  # It is a shame that this class can't use the default init for CmdLineTask
128  # But to do so would require its own special task runner, which is many
129  # more lines of specialization, so this is how it is for now
130  super().__init__(**kwargs)
131 
132  def runDataRef(self, patchRefList):
133  """!
134  @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
135  subclasses that inherit from MergeSourcesTask.
136  @param[in] patchRefList list of data references for each filter
137  """
138  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
139  dataId = patchRefList[0].dataId
140  mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
141  self.write(patchRefList[0], ParquetTable(dataFrame=mergedCatalog))
142 
143  def runQuantum(self, butlerQC, inputRefs, outputRefs):
144  inputs = butlerQC.get(inputRefs)
145 
146  measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
147  zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
148  forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
149  zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
150 
151  catalogs = {}
152  for band in measDict.keys():
153  catalogs[band] = {'meas': measDict[band]['meas'],
154  'forced_src': forcedSourceDict[band]['forced_src'],
155  'ref': inputs['inputCatalogRef']}
156  dataId = butlerQC.quantum.dataId
157  df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
158  outputs = pipeBase.Struct(outputCatalog=df)
159  butlerQC.put(outputs, outputRefs)
160 
161  @classmethod
162  def _makeArgumentParser(cls):
163  """Create a suitable ArgumentParser.
164 
165  We will use the ArgumentParser to get a list of data
166  references for patches; the RunnerClass will sort them into lists
167  of data references for the same patch.
168 
169  References first of self.inputDatasets, rather than
170  self.inputDataset
171  """
172  return makeMergeArgumentParser(cls._DefaultName, cls.inputDatasets[0])
173 
174  def readCatalog(self, patchRef):
175  """Read input catalogs
176 
177  Read all the input datasets given by the 'inputDatasets'
178  attribute.
179 
180  Parameters
181  ----------
182  patchRef : `lsst.daf.persistence.ButlerDataRef`
183  Data reference for patch
184 
185  Returns
186  -------
187  Tuple consisting of band name and a dict of catalogs, keyed by
188  dataset name
189  """
190  band = patchRef.get(self.config.coaddName + "Coadd_filterLabel", immediate=True).bandLabel
191  catalogDict = {}
192  for dataset in self.inputDatasets:
193  catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
194  self.log.info("Read %d sources from %s for band %s: %s",
195  len(catalog), dataset, band, patchRef.dataId)
196  catalogDict[dataset] = catalog
197  return band, catalogDict
198 
199  def run(self, catalogs, tract, patch):
200  """Merge multiple catalogs.
201 
202  Parameters
203  ----------
204  catalogs : `dict`
205  Mapping from filter names to dict of catalogs.
206  tract : int
207  tractId to use for the tractId column
208  patch : str
209  patchId to use for the patchId column
210 
211  Returns
212  -------
213  catalog : `pandas.DataFrame`
214  Merged dataframe
215  """
216 
217  dfs = []
218  for filt, tableDict in catalogs.items():
219  for dataset, table in tableDict.items():
220  # Convert afwTable to pandas DataFrame
221  df = table.asAstropy().to_pandas().set_index('id', drop=True)
222 
223  # Sort columns by name, to ensure matching schema among patches
224  df = df.reindex(sorted(df.columns), axis=1)
225  df['tractId'] = tract
226  df['patchId'] = patch
227 
228  # Make columns a 3-level MultiIndex
229  df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
230  names=('dataset', 'band', 'column'))
231  dfs.append(df)
232 
233  catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
234  return catalog
235 
236  def write(self, patchRef, catalog):
237  """Write the output.
238 
239  Parameters
240  ----------
241  catalog : `ParquetTable`
242  Catalog to write
243  patchRef : `lsst.daf.persistence.ButlerDataRef`
244  Data reference for patch
245  """
246  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
247  # since the filter isn't actually part of the data ID for the dataset we're saving,
248  # it's confusing to see it in the log message, even if the butler simply ignores it.
249  mergeDataId = patchRef.dataId.copy()
250  del mergeDataId["filter"]
251  self.log.info("Wrote merged catalog: %s", mergeDataId)
252 
253  def writeMetadata(self, dataRefList):
254  """No metadata to write, and not sure how to write it for a list of dataRefs.
255  """
256  pass
257 
258 
259 class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
260  defaultTemplates={"catalogType": ""},
261  dimensions=("instrument", "visit", "detector")):
262 
263  catalog = connectionTypes.Input(
264  doc="Input full-depth catalog of sources produced by CalibrateTask",
265  name="{catalogType}src",
266  storageClass="SourceCatalog",
267  dimensions=("instrument", "visit", "detector")
268  )
269  outputCatalog = connectionTypes.Output(
270  doc="Catalog of sources, `src` in Parquet format. The 'id' column is "
271  "replaced with an index; all other columns are unchanged.",
272  name="{catalogType}source",
273  storageClass="DataFrame",
274  dimensions=("instrument", "visit", "detector")
275  )
276 
277 
278 class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
279  pipelineConnections=WriteSourceTableConnections):
280  doApplyExternalPhotoCalib = pexConfig.Field(
281  dtype=bool,
282  default=False,
283  doc=("Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
284  "generating Source Tables from older src tables which do not already have local calib columns")
285  )
286  doApplyExternalSkyWcs = pexConfig.Field(
287  dtype=bool,
288  default=False,
289  doc=("Add local WCS columns from the calexp.wcs? Should only set True if "
290  "generating Source Tables from older src tables which do not already have local calib columns")
291  )
292 
293 
294 class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
295  """Write source table to parquet
296  """
297  _DefaultName = "writeSourceTable"
298  ConfigClass = WriteSourceTableConfig
299 
300  def runDataRef(self, dataRef):
301  src = dataRef.get('src')
302  if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
303  src = self.addCalibColumns(src, dataRef)
304 
305  ccdVisitId = dataRef.get('ccdExposureId')
306  result = self.run(src, ccdVisitId=ccdVisitId)
307  dataRef.put(result.table, 'source')
308 
309  def runQuantum(self, butlerQC, inputRefs, outputRefs):
310  inputs = butlerQC.get(inputRefs)
311  inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
312  result = self.run(**inputs).table
313  outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
314  butlerQC.put(outputs, outputRefs)
315 
316  def run(self, catalog, ccdVisitId=None):
317  """Convert `src` catalog to parquet
318 
319  Parameters
320  ----------
321  catalog: `afwTable.SourceCatalog`
322  catalog to be converted
323  ccdVisitId: `int`
324  ccdVisitId to be added as a column
325 
326  Returns
327  -------
328  result : `lsst.pipe.base.Struct`
329  ``table``
330  `ParquetTable` version of the input catalog
331  """
332  self.log.info("Generating parquet table from src catalog %s", ccdVisitId)
333  df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
334  df['ccdVisitId'] = ccdVisitId
335  return pipeBase.Struct(table=ParquetTable(dataFrame=df))
336 
337  def addCalibColumns(self, catalog, dataRef):
338  """Add columns with local calibration evaluated at each centroid
339 
340  for backwards compatibility with old repos.
341  This exists for the purpose of converting old src catalogs
342  (which don't have the expected local calib columns) to Source Tables.
343 
344  Parameters
345  ----------
346  catalog: `afwTable.SourceCatalog`
347  catalog to which calib columns will be added
348  dataRef: `lsst.daf.persistence.ButlerDataRef
349  for fetching the calibs from disk.
350 
351  Returns
352  -------
353  newCat: `afwTable.SourceCatalog`
354  Source Catalog with requested local calib columns
355  """
356  mapper = afwTable.SchemaMapper(catalog.schema)
357  measureConfig = SingleFrameMeasurementTask.ConfigClass()
358  measureConfig.doReplaceWithNoise = False
359 
360  # Just need the WCS or the PhotoCalib attached to an exposue
361  exposure = dataRef.get('calexp_sub',
363 
364  mapper = afwTable.SchemaMapper(catalog.schema)
365  mapper.addMinimalSchema(catalog.schema, True)
366  schema = mapper.getOutputSchema()
367 
368  exposureIdInfo = dataRef.get("expIdInfo")
369  measureConfig.plugins.names = []
370  if self.config.doApplyExternalSkyWcs:
371  plugin = 'base_LocalWcs'
372  if plugin in schema:
373  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
374  else:
375  measureConfig.plugins.names.add(plugin)
376 
377  if self.config.doApplyExternalPhotoCalib:
378  plugin = 'base_LocalPhotoCalib'
379  if plugin in schema:
380  raise RuntimeError(f"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
381  else:
382  measureConfig.plugins.names.add(plugin)
383 
384  measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
385  newCat = afwTable.SourceCatalog(schema)
386  newCat.extend(catalog, mapper=mapper)
387  measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
388  return newCat
389 
390  def writeMetadata(self, dataRef):
391  """No metadata to write.
392  """
393  pass
394 
395  @classmethod
396  def _makeArgumentParser(cls):
397  parser = ArgumentParser(name=cls._DefaultName)
398  parser.add_id_argument("--id", 'src',
399  help="data ID, e.g. --id visit=12345 ccd=0")
400  return parser
401 
402 
403 class PostprocessAnalysis(object):
404  """Calculate columns from ParquetTable
405 
406  This object manages and organizes an arbitrary set of computations
407  on a catalog. The catalog is defined by a
408  `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
409  `deepCoadd_obj` dataset, and the computations are defined by a collection
410  of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
411  a `CompositeFunctor`).
412 
413  After the object is initialized, accessing the `.df` attribute (which
414  holds the `pandas.DataFrame` containing the results of the calculations) triggers
415  computation of said dataframe.
416 
417  One of the conveniences of using this object is the ability to define a desired common
418  filter for all functors. This enables the same functor collection to be passed to
419  several different `PostprocessAnalysis` objects without having to change the original
420  functor collection, since the `filt` keyword argument of this object triggers an
421  overwrite of the `filt` property for all functors in the collection.
422 
423  This object also allows a list of refFlags to be passed, and defines a set of default
424  refFlags that are always included even if not requested.
425 
426  If a list of `ParquetTable` object is passed, rather than a single one, then the
427  calculations will be mapped over all the input catalogs. In principle, it should
428  be straightforward to parallelize this activity, but initial tests have failed
429  (see TODO in code comments).
430 
431  Parameters
432  ----------
433  parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
434  Source catalog(s) for computation
435 
436  functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
437  Computations to do (functors that act on `parq`).
438  If a dict, the output
439  DataFrame will have columns keyed accordingly.
440  If a list, the column keys will come from the
441  `.shortname` attribute of each functor.
442 
443  filt : `str` (optional)
444  Filter in which to calculate. If provided,
445  this will overwrite any existing `.filt` attribute
446  of the provided functors.
447 
448  flags : `list` (optional)
449  List of flags (per-band) to include in output table.
450  Taken from the `meas` dataset if applied to a multilevel Object Table.
451 
452  refFlags : `list` (optional)
453  List of refFlags (only reference band) to include in output table.
454 
455  forcedFlags : `list` (optional)
456  List of flags (per-band) to include in output table.
457  Taken from the ``forced_src`` dataset if applied to a
458  multilevel Object Table. Intended for flags from measurement plugins
459  only run during multi-band forced-photometry.
460  """
461  _defaultRefFlags = []
462  _defaultFuncs = ()
463 
464  def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
465  self.parq = parq
466  self.functors = functors
467 
468  self.filt = filt
469  self.flags = list(flags) if flags is not None else []
470  self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
471  self.refFlags = list(self._defaultRefFlags)
472  if refFlags is not None:
473  self.refFlags += list(refFlags)
474 
475  self._df = None
476 
477  @property
478  def defaultFuncs(self):
479  funcs = dict(self._defaultFuncs)
480  return funcs
481 
482  @property
483  def func(self):
484  additionalFuncs = self.defaultFuncs
485  additionalFuncs.update({flag: Column(flag, dataset='forced_src') for flag in self.forcedFlags})
486  additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
487  additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
488 
489  if isinstance(self.functors, CompositeFunctor):
490  func = self.functors
491  else:
492  func = CompositeFunctor(self.functors)
493 
494  func.funcDict.update(additionalFuncs)
495  func.filt = self.filt
496 
497  return func
498 
499  @property
500  def noDupCols(self):
501  return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
502 
503  @property
504  def df(self):
505  if self._df is None:
506  self.compute()
507  return self._df
508 
509  def compute(self, dropna=False, pool=None):
510  # map over multiple parquet tables
511  if type(self.parq) in (list, tuple):
512  if pool is None:
513  dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
514  else:
515  # TODO: Figure out why this doesn't work (pyarrow pickling issues?)
516  dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
517  self._df = pd.concat(dflist)
518  else:
519  self._df = self.func(self.parq, dropna=dropna)
520 
521  return self._df
522 
523 
524 class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
525  dimensions=()):
526  """Expected Connections for subclasses of TransformCatalogBaseTask.
527 
528  Must be subclassed.
529  """
530  inputCatalog = connectionTypes.Input(
531  name="",
532  storageClass="DataFrame",
533  )
534  outputCatalog = connectionTypes.Output(
535  name="",
536  storageClass="DataFrame",
537  )
538 
539 
540 class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
541  pipelineConnections=TransformCatalogBaseConnections):
542  functorFile = pexConfig.Field(
543  dtype=str,
544  doc="Path to YAML file specifying Science Data Model functors to use "
545  "when copying columns and computing calibrated values.",
546  default=None,
547  optional=True
548  )
549  primaryKey = pexConfig.Field(
550  dtype=str,
551  doc="Name of column to be set as the DataFrame index. If None, the index"
552  "will be named `id`",
553  default=None,
554  optional=True
555  )
556 
557 
558 class TransformCatalogBaseTask(CmdLineTask, pipeBase.PipelineTask):
559  """Base class for transforming/standardizing a catalog
560 
561  by applying functors that convert units and apply calibrations.
562  The purpose of this task is to perform a set of computations on
563  an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
564  results to a new dataset (which needs to be declared in an `outputDataset`
565  attribute).
566 
567  The calculations to be performed are defined in a YAML file that specifies
568  a set of functors to be computed, provided as
569  a `--functorFile` config parameter. An example of such a YAML file
570  is the following:
571 
572  funcs:
573  psfMag:
574  functor: Mag
575  args:
576  - base_PsfFlux
577  filt: HSC-G
578  dataset: meas
579  cmodel_magDiff:
580  functor: MagDiff
581  args:
582  - modelfit_CModel
583  - base_PsfFlux
584  filt: HSC-G
585  gauss_magDiff:
586  functor: MagDiff
587  args:
588  - base_GaussianFlux
589  - base_PsfFlux
590  filt: HSC-G
591  count:
592  functor: Column
593  args:
594  - base_InputCount_value
595  filt: HSC-G
596  deconvolved_moments:
597  functor: DeconvolvedMoments
598  filt: HSC-G
599  dataset: forced_src
600  refFlags:
601  - calib_psfUsed
602  - merge_measurement_i
603  - merge_measurement_r
604  - merge_measurement_z
605  - merge_measurement_y
606  - merge_measurement_g
607  - base_PixelFlags_flag_inexact_psfCenter
608  - detect_isPrimary
609 
610  The names for each entry under "func" will become the names of columns in the
611  output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
612  Positional arguments to be passed to each functor are in the `args` list,
613  and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
614  `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
615 
616  The "flags" entry is the default shortcut for `Column` functors.
617  All columns listed under "flags" will be copied to the output table
618  untransformed. They can be of any datatype.
619  In the special case of transforming a multi-level oject table with
620  band and dataset indices (deepCoadd_obj), these will be taked from the
621  `meas` dataset and exploded out per band.
622 
623  There are two special shortcuts that only apply when transforming
624  multi-level Object (deepCoadd_obj) tables:
625  - The "refFlags" entry is shortcut for `Column` functor
626  taken from the `'ref'` dataset if transforming an ObjectTable.
627  - The "forcedFlags" entry is shortcut for `Column` functors.
628  taken from the ``forced_src`` dataset if transforming an ObjectTable.
629  These are expanded out per band.
630 
631 
632  This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
633  to organize and excecute the calculations.
634 
635  """
636  @property
637  def _DefaultName(self):
638  raise NotImplementedError('Subclass must define "_DefaultName" attribute')
639 
640  @property
641  def outputDataset(self):
642  raise NotImplementedError('Subclass must define "outputDataset" attribute')
643 
644  @property
645  def inputDataset(self):
646  raise NotImplementedError('Subclass must define "inputDataset" attribute')
647 
648  @property
649  def ConfigClass(self):
650  raise NotImplementedError('Subclass must define "ConfigClass" attribute')
651 
652  def __init__(self, *args, **kwargs):
653  super().__init__(*args, **kwargs)
654  if self.config.functorFile:
655  self.log.info('Loading tranform functor definitions from %s',
656  self.config.functorFile)
657  self.funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
658  self.funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
659  else:
660  self.funcsfuncs = None
661 
662  def runQuantum(self, butlerQC, inputRefs, outputRefs):
663  inputs = butlerQC.get(inputRefs)
664  if self.funcsfuncs is None:
665  raise ValueError("config.functorFile is None. "
666  "Must be a valid path to yaml in order to run Task as a PipelineTask.")
667  result = self.runrun(parq=inputs['inputCatalog'], funcs=self.funcsfuncs,
668  dataId=outputRefs.outputCatalog.dataId.full)
669  outputs = pipeBase.Struct(outputCatalog=result)
670  butlerQC.put(outputs, outputRefs)
671 
672  def runDataRef(self, dataRef):
673  parq = dataRef.get()
674  if self.funcsfuncs is None:
675  raise ValueError("config.functorFile is None. "
676  "Must be a valid path to yaml in order to run as a CommandlineTask.")
677  df = self.runrun(parq, funcs=self.funcsfuncs, dataId=dataRef.dataId)
678  self.writewrite(df, dataRef)
679  return df
680 
681  def run(self, parq, funcs=None, dataId=None, band=None):
682  """Do postprocessing calculations
683 
684  Takes a `ParquetTable` object and dataId,
685  returns a dataframe with results of postprocessing calculations.
686 
687  Parameters
688  ----------
689  parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
690  ParquetTable from which calculations are done.
691  funcs : `lsst.pipe.tasks.functors.Functors`
692  Functors to apply to the table's columns
693  dataId : dict, optional
694  Used to add a `patchId` column to the output dataframe.
695  band : `str`, optional
696  Filter band that is being processed.
697 
698  Returns
699  ------
700  `pandas.DataFrame`
701 
702  """
703  self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
704 
705  df = self.transformtransform(band, parq, funcs, dataId).df
706  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
707  return df
708 
709  def getFunctors(self):
710  return self.funcsfuncs
711 
712  def getAnalysis(self, parq, funcs=None, band=None):
713  if funcs is None:
714  funcs = self.funcsfuncs
715  analysis = PostprocessAnalysis(parq, funcs, filt=band)
716  return analysis
717 
718  def transform(self, band, parq, funcs, dataId):
719  analysis = self.getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
720  df = analysis.df
721  if dataId is not None:
722  for key, value in dataId.items():
723  df[str(key)] = value
724 
725  if self.config.primaryKey:
726  if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
727  df.reset_index(inplace=True, drop=True)
728  df.set_index(self.config.primaryKey, inplace=True)
729 
730  return pipeBase.Struct(
731  df=df,
732  analysis=analysis
733  )
734 
735  def write(self, df, parqRef):
736  parqRef.put(ParquetTable(dataFrame=df), self.outputDatasetoutputDataset)
737 
738  def writeMetadata(self, dataRef):
739  """No metadata to write.
740  """
741  pass
742 
743 
744 class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
745  defaultTemplates={"coaddName": "deep"},
746  dimensions=("tract", "patch", "skymap")):
747  inputCatalog = connectionTypes.Input(
748  doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
749  "stored as a DataFrame with a multi-level column index per-patch.",
750  dimensions=("tract", "patch", "skymap"),
751  storageClass="DataFrame",
752  name="{coaddName}Coadd_obj",
753  deferLoad=True,
754  )
755  outputCatalog = connectionTypes.Output(
756  doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
757  "data model.",
758  dimensions=("tract", "patch", "skymap"),
759  storageClass="DataFrame",
760  name="objectTable"
761  )
762 
763 
764 class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
765  pipelineConnections=TransformObjectCatalogConnections):
766  coaddName = pexConfig.Field(
767  dtype=str,
768  default="deep",
769  doc="Name of coadd"
770  )
771  # TODO: remove in DM-27177
772  filterMap = pexConfig.DictField(
773  keytype=str,
774  itemtype=str,
775  default={},
776  doc=("Dictionary mapping full filter name to short one for column name munging."
777  "These filters determine the output columns no matter what filters the "
778  "input data actually contain."),
779  deprecated=("Coadds are now identified by the band, so this transform is unused."
780  "Will be removed after v22.")
781  )
782  outputBands = pexConfig.ListField(
783  dtype=str,
784  default=None,
785  optional=True,
786  doc=("These bands and only these bands will appear in the output,"
787  " NaN-filled if the input does not include them."
788  " If None, then use all bands found in the input.")
789  )
790  camelCase = pexConfig.Field(
791  dtype=bool,
792  default=False,
793  doc=("Write per-band columns names with camelCase, else underscore "
794  "For example: gPsFlux instead of g_PsFlux.")
795  )
796  multilevelOutput = pexConfig.Field(
797  dtype=bool,
798  default=False,
799  doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
800  "and name-munged (False).")
801  )
802  goodFlags = pexConfig.ListField(
803  dtype=str,
804  default=[],
805  doc=("List of 'good' flags that should be set False when populating empty tables. "
806  "All other flags are considered to be 'bad' flags and will be set to True.")
807  )
808  floatFillValue = pexConfig.Field(
809  dtype=float,
810  default=np.nan,
811  doc="Fill value for float fields when populating empty tables."
812  )
813  integerFillValue = pexConfig.Field(
814  dtype=int,
815  default=-1,
816  doc="Fill value for integer fields when populating empty tables."
817  )
818 
819  def setDefaults(self):
820  super().setDefaults()
821  self.primaryKey = 'objectId'
822  self.goodFlags = ['calib_astrometry_used',
823  'calib_photometry_reserved',
824  'calib_photometry_used',
825  'calib_psf_candidate',
826  'calib_psf_reserved',
827  'calib_psf_used']
828 
829 
830 class TransformObjectCatalogTask(TransformCatalogBaseTask):
831  """Produce a flattened Object Table to match the format specified in
832  sdm_schemas.
833 
834  Do the same set of postprocessing calculations on all bands
835 
836  This is identical to `TransformCatalogBaseTask`, except for that it does the
837  specified functor calculations for all filters present in the
838  input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
839  by the YAML file will be superceded.
840  """
841  _DefaultName = "transformObjectCatalog"
842  ConfigClass = TransformObjectCatalogConfig
843 
844  # Used by Gen 2 runDataRef only:
845  inputDataset = 'deepCoadd_obj'
846  outputDataset = 'objectTable'
847 
848  @classmethod
849  def _makeArgumentParser(cls):
850  parser = ArgumentParser(name=cls._DefaultName)
851  parser.add_id_argument("--id", cls.inputDataset,
852  ContainerClass=CoaddDataIdContainer,
853  help="data ID, e.g. --id tract=12345 patch=1,2")
854  return parser
855 
856  def run(self, parq, funcs=None, dataId=None, band=None):
857  # NOTE: band kwarg is ignored here.
858  dfDict = {}
859  analysisDict = {}
860  templateDf = pd.DataFrame()
861 
862  if isinstance(parq, DeferredDatasetHandle):
863  columns = parq.get(component='columns')
864  inputBands = columns.unique(level=1).values
865  else:
866  inputBands = parq.columnLevelNames['band']
867 
868  outputBands = self.config.outputBands if self.config.outputBands else inputBands
869 
870  # Perform transform for data of filters that exist in parq.
871  for inputBand in inputBands:
872  if inputBand not in outputBands:
873  self.log.info("Ignoring %s band data in the input", inputBand)
874  continue
875  self.log.info("Transforming the catalog of band %s", inputBand)
876  result = self.transform(inputBand, parq, funcs, dataId)
877  dfDict[inputBand] = result.df
878  analysisDict[inputBand] = result.analysis
879  if templateDf.empty:
880  templateDf = result.df
881 
882  # Put filler values in columns of other wanted bands
883  for filt in outputBands:
884  if filt not in dfDict:
885  self.log.info("Adding empty columns for band %s", filt)
886  dfTemp = templateDf.copy()
887  for col in dfTemp.columns:
888  testValue = dfTemp[col].values[0]
889  if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
890  # Boolean flag type, check if it is a "good" flag
891  if col in self.config.goodFlags:
892  fillValue = False
893  else:
894  fillValue = True
895  elif isinstance(testValue, numbers.Integral):
896  # Checking numbers.Integral catches all flavors
897  # of python, numpy, pandas, etc. integers.
898  # We must ensure this is not an unsigned integer.
899  if isinstance(testValue, np.unsignedinteger):
900  raise ValueError("Parquet tables may not have unsigned integer columns.")
901  else:
902  fillValue = self.config.integerFillValue
903  else:
904  fillValue = self.config.floatFillValue
905  dfTemp[col].values[:] = fillValue
906  dfDict[filt] = dfTemp
907 
908  # This makes a multilevel column index, with band as first level
909  df = pd.concat(dfDict, axis=1, names=['band', 'column'])
910 
911  if not self.config.multilevelOutput:
912  noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
913  if self.config.primaryKey in noDupCols:
914  noDupCols.remove(self.config.primaryKey)
915  if dataId is not None:
916  noDupCols += list(dataId.keys())
917  df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
918  inputBands=inputBands)
919 
920  self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
921 
922  return df
923 
924 
925 class TractObjectDataIdContainer(CoaddDataIdContainer):
926 
927  def makeDataRefList(self, namespace):
928  """Make self.refList from self.idList
929 
930  Generate a list of data references given tract and/or patch.
931  This was adapted from `TractQADataIdContainer`, which was
932  `TractDataIdContainer` modifie to not require "filter".
933  Only existing dataRefs are returned.
934  """
935  def getPatchRefList(tract):
936  return [namespace.butler.dataRef(datasetType=self.datasetType,
937  tract=tract.getId(),
938  patch="%d,%d" % patch.getIndex()) for patch in tract]
939 
940  tractRefs = defaultdict(list) # Data references for each tract
941  for dataId in self.idList:
942  skymap = self.getSkymap(namespace)
943 
944  if "tract" in dataId:
945  tractId = dataId["tract"]
946  if "patch" in dataId:
947  tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
948  tract=tractId,
949  patch=dataId['patch']))
950  else:
951  tractRefs[tractId] += getPatchRefList(skymap[tractId])
952  else:
953  tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
954  for tract in skymap)
955  outputRefList = []
956  for tractRefList in tractRefs.values():
957  existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
958  outputRefList.append(existingRefs)
959 
960  self.refList = outputRefList
961 
962 
963 class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
964  dimensions=("tract", "skymap")):
965  inputCatalogs = connectionTypes.Input(
966  doc="Per-Patch objectTables conforming to the standard data model.",
967  name="objectTable",
968  storageClass="DataFrame",
969  dimensions=("tract", "patch", "skymap"),
970  multiple=True,
971  )
972  outputCatalog = connectionTypes.Output(
973  doc="Pre-tract horizontal concatenation of the input objectTables",
974  name="objectTable_tract",
975  storageClass="DataFrame",
976  dimensions=("tract", "skymap"),
977  )
978 
979 
980 class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
981  pipelineConnections=ConsolidateObjectTableConnections):
982  coaddName = pexConfig.Field(
983  dtype=str,
984  default="deep",
985  doc="Name of coadd"
986  )
987 
988 
989 class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
990  """Write patch-merged source tables to a tract-level parquet file
991 
992  Concatenates `objectTable` list into a per-visit `objectTable_tract`
993  """
994  _DefaultName = "consolidateObjectTable"
995  ConfigClass = ConsolidateObjectTableConfig
996 
997  inputDataset = 'objectTable'
998  outputDataset = 'objectTable_tract'
999 
1000  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1001  inputs = butlerQC.get(inputRefs)
1002  self.log.info("Concatenating %s per-patch Object Tables",
1003  len(inputs['inputCatalogs']))
1004  df = pd.concat(inputs['inputCatalogs'])
1005  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1006 
1007  @classmethod
1008  def _makeArgumentParser(cls):
1009  parser = ArgumentParser(name=cls._DefaultName)
1010 
1011  parser.add_id_argument("--id", cls.inputDataset,
1012  help="data ID, e.g. --id tract=12345",
1013  ContainerClass=TractObjectDataIdContainer)
1014  return parser
1015 
1016  def runDataRef(self, patchRefList):
1017  df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
1018  patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1019 
1020  def writeMetadata(self, dataRef):
1021  """No metadata to write.
1022  """
1023  pass
1024 
1025 
1026 class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1027  defaultTemplates={"catalogType": ""},
1028  dimensions=("instrument", "visit", "detector")):
1029 
1030  inputCatalog = connectionTypes.Input(
1031  doc="Wide input catalog of sources produced by WriteSourceTableTask",
1032  name="{catalogType}source",
1033  storageClass="DataFrame",
1034  dimensions=("instrument", "visit", "detector"),
1035  deferLoad=True
1036  )
1037  outputCatalog = connectionTypes.Output(
1038  doc="Narrower, per-detector Source Table transformed and converted per a "
1039  "specified set of functors",
1040  name="{catalogType}sourceTable",
1041  storageClass="DataFrame",
1042  dimensions=("instrument", "visit", "detector")
1043  )
1044 
1045 
1046 class TransformSourceTableConfig(TransformCatalogBaseConfig,
1047  pipelineConnections=TransformSourceTableConnections):
1048 
1049  def setDefaults(self):
1050  super().setDefaults()
1051  self.primaryKey = 'sourceId'
1052 
1053 
1054 class TransformSourceTableTask(TransformCatalogBaseTask):
1055  """Transform/standardize a source catalog
1056  """
1057  _DefaultName = "transformSourceTable"
1058  ConfigClass = TransformSourceTableConfig
1059 
1060  inputDataset = 'source'
1061  outputDataset = 'sourceTable'
1062 
1063  @classmethod
1064  def _makeArgumentParser(cls):
1065  parser = ArgumentParser(name=cls._DefaultName)
1066  parser.add_id_argument("--id", datasetType=cls.inputDataset,
1067  level="sensor",
1068  help="data ID, e.g. --id visit=12345 ccd=0")
1069  return parser
1070 
1071  def runDataRef(self, dataRef):
1072  """Override to specify band label to run()."""
1073  parq = dataRef.get()
1074  funcs = self.getFunctors()
1075  band = dataRef.get("calexp_filterLabel", immediate=True).bandLabel
1076  df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
1077  self.write(df, dataRef)
1078  return df
1079 
1080 
1081 class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1082  dimensions=("instrument", "visit",),
1083  defaultTemplates={"calexpType": ""}):
1084  calexp = connectionTypes.Input(
1085  doc="Processed exposures used for metadata",
1086  name="{calexpType}calexp",
1087  storageClass="ExposureF",
1088  dimensions=("instrument", "visit", "detector"),
1089  deferLoad=True,
1090  multiple=True,
1091  )
1092  visitSummary = connectionTypes.Output(
1093  doc=("Per-visit consolidated exposure metadata. These catalogs use "
1094  "detector id for the id and are sorted for fast lookups of a "
1095  "detector."),
1096  name="{calexpType}visitSummary",
1097  storageClass="ExposureCatalog",
1098  dimensions=("instrument", "visit"),
1099  )
1100 
1101 
1102 class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1103  pipelineConnections=ConsolidateVisitSummaryConnections):
1104  """Config for ConsolidateVisitSummaryTask"""
1105  pass
1106 
1107 
1108 class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1109  """Task to consolidate per-detector visit metadata.
1110 
1111  This task aggregates the following metadata from all the detectors in a
1112  single visit into an exposure catalog:
1113  - The visitInfo.
1114  - The wcs.
1115  - The photoCalib.
1116  - The physical_filter and band (if available).
1117  - The psf size, shape, and effective area at the center of the detector.
1118  - The corners of the bounding box in right ascension/declination.
1119 
1120  Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1121  are not persisted here because of storage concerns, and because of their
1122  limited utility as summary statistics.
1123 
1124  Tests for this task are performed in ci_hsc_gen3.
1125  """
1126  _DefaultName = "consolidateVisitSummary"
1127  ConfigClass = ConsolidateVisitSummaryConfig
1128 
1129  @classmethod
1130  def _makeArgumentParser(cls):
1131  parser = ArgumentParser(name=cls._DefaultName)
1132 
1133  parser.add_id_argument("--id", "calexp",
1134  help="data ID, e.g. --id visit=12345",
1135  ContainerClass=VisitDataIdContainer)
1136  return parser
1137 
1138  def writeMetadata(self, dataRef):
1139  """No metadata to persist, so override to remove metadata persistance.
1140  """
1141  pass
1142 
1143  def writeConfig(self, butler, clobber=False, doBackup=True):
1144  """No config to persist, so override to remove config persistance.
1145  """
1146  pass
1147 
1148  def runDataRef(self, dataRefList):
1149  visit = dataRefList[0].dataId['visit']
1150 
1151  self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1152  len(dataRefList), visit)
1153 
1154  expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=False)
1155 
1156  dataRefList[0].put(expCatalog, 'visitSummary', visit=visit)
1157 
1158  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1159  dataRefs = butlerQC.get(inputRefs.calexp)
1160  visit = dataRefs[0].dataId.byName()['visit']
1161 
1162  self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1163  len(dataRefs), visit)
1164 
1165  expCatalog = self._combineExposureMetadata(visit, dataRefs)
1166 
1167  butlerQC.put(expCatalog, outputRefs.visitSummary)
1168 
1169  def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1170  """Make a combined exposure catalog from a list of dataRefs.
1171  These dataRefs must point to exposures with wcs, summaryStats,
1172  and other visit metadata.
1173 
1174  Parameters
1175  ----------
1176  visit : `int`
1177  Visit identification number.
1178  dataRefs : `list`
1179  List of dataRefs in visit. May be list of
1180  `lsst.daf.persistence.ButlerDataRef` (Gen2) or
1181  `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1182  isGen3 : `bool`, optional
1183  Specifies if this is a Gen3 list of datarefs.
1184 
1185  Returns
1186  -------
1187  visitSummary : `lsst.afw.table.ExposureCatalog`
1188  Exposure catalog with per-detector summary information.
1189  """
1190  schema = self._makeVisitSummarySchema()
1191  cat = afwTable.ExposureCatalog(schema)
1192  cat.resize(len(dataRefs))
1193 
1194  cat['visit'] = visit
1195 
1196  for i, dataRef in enumerate(dataRefs):
1197  if isGen3:
1198  visitInfo = dataRef.get(component='visitInfo')
1199  filterLabel = dataRef.get(component='filterLabel')
1200  summaryStats = dataRef.get(component='summaryStats')
1201  detector = dataRef.get(component='detector')
1202  wcs = dataRef.get(component='wcs')
1203  photoCalib = dataRef.get(component='photoCalib')
1204  detector = dataRef.get(component='detector')
1205  bbox = dataRef.get(component='bbox')
1206  validPolygon = dataRef.get(component='validPolygon')
1207  else:
1208  # Note that we need to read the calexp because there is
1209  # no magic access to the psf except through the exposure.
1210  gen2_read_bbox = lsst.geom.BoxI(lsst.geom.PointI(0, 0), lsst.geom.PointI(1, 1))
1211  exp = dataRef.get(datasetType='calexp_sub', bbox=gen2_read_bbox)
1212  visitInfo = exp.getInfo().getVisitInfo()
1213  filterLabel = dataRef.get("calexp_filterLabel")
1214  summaryStats = exp.getInfo().getSummaryStats()
1215  wcs = exp.getWcs()
1216  photoCalib = exp.getPhotoCalib()
1217  detector = exp.getDetector()
1218  bbox = dataRef.get(datasetType='calexp_bbox')
1219  validPolygon = exp.getInfo().getValidPolygon()
1220 
1221  rec = cat[i]
1222  rec.setBBox(bbox)
1223  rec.setVisitInfo(visitInfo)
1224  rec.setWcs(wcs)
1225  rec.setPhotoCalib(photoCalib)
1226  rec.setValidPolygon(validPolygon)
1227 
1228  rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1229  rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1230  rec.setId(detector.getId())
1231  rec['psfSigma'] = summaryStats.psfSigma
1232  rec['psfIxx'] = summaryStats.psfIxx
1233  rec['psfIyy'] = summaryStats.psfIyy
1234  rec['psfIxy'] = summaryStats.psfIxy
1235  rec['psfArea'] = summaryStats.psfArea
1236  rec['raCorners'][:] = summaryStats.raCorners
1237  rec['decCorners'][:] = summaryStats.decCorners
1238  rec['ra'] = summaryStats.ra
1239  rec['decl'] = summaryStats.decl
1240  rec['zenithDistance'] = summaryStats.zenithDistance
1241  rec['zeroPoint'] = summaryStats.zeroPoint
1242  rec['skyBg'] = summaryStats.skyBg
1243  rec['skyNoise'] = summaryStats.skyNoise
1244  rec['meanVar'] = summaryStats.meanVar
1245  rec['astromOffsetMean'] = summaryStats.astromOffsetMean
1246  rec['astromOffsetStd'] = summaryStats.astromOffsetStd
1247  rec['nPsfStar'] = summaryStats.nPsfStar
1248  rec['psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1249  rec['psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1250  rec['psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1251  rec['psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1252  rec['psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1253  rec['psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1254  rec['psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1255 
1256  metadata = dafBase.PropertyList()
1257  metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1258  # We are looping over existing datarefs, so the following is true
1259  metadata.add("COMMENT", "Only detectors with data have entries.")
1260  cat.setMetadata(metadata)
1261 
1262  cat.sort()
1263  return cat
1264 
1265  def _makeVisitSummarySchema(self):
1266  """Make the schema for the visitSummary catalog."""
1267  schema = afwTable.ExposureTable.makeMinimalSchema()
1268  schema.addField('visit', type='I', doc='Visit number')
1269  schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
1270  schema.addField('band', type='String', size=32, doc='Name of band')
1271  schema.addField('psfSigma', type='F',
1272  doc='PSF model second-moments determinant radius (center of chip) (pixel)')
1273  schema.addField('psfArea', type='F',
1274  doc='PSF model effective area (center of chip) (pixel**2)')
1275  schema.addField('psfIxx', type='F',
1276  doc='PSF model Ixx (center of chip) (pixel**2)')
1277  schema.addField('psfIyy', type='F',
1278  doc='PSF model Iyy (center of chip) (pixel**2)')
1279  schema.addField('psfIxy', type='F',
1280  doc='PSF model Ixy (center of chip) (pixel**2)')
1281  schema.addField('raCorners', type='ArrayD', size=4,
1282  doc='Right Ascension of bounding box corners (degrees)')
1283  schema.addField('decCorners', type='ArrayD', size=4,
1284  doc='Declination of bounding box corners (degrees)')
1285  schema.addField('ra', type='D',
1286  doc='Right Ascension of bounding box center (degrees)')
1287  schema.addField('decl', type='D',
1288  doc='Declination of bounding box center (degrees)')
1289  schema.addField('zenithDistance', type='F',
1290  doc='Zenith distance of bounding box center (degrees)')
1291  schema.addField('zeroPoint', type='F',
1292  doc='Mean zeropoint in detector (mag)')
1293  schema.addField('skyBg', type='F',
1294  doc='Average sky background (ADU)')
1295  schema.addField('skyNoise', type='F',
1296  doc='Average sky noise (ADU)')
1297  schema.addField('meanVar', type='F',
1298  doc='Mean variance of the weight plane (ADU**2)')
1299  schema.addField('astromOffsetMean', type='F',
1300  doc='Mean offset of astrometric calibration matches (arcsec)')
1301  schema.addField('astromOffsetStd', type='F',
1302  doc='Standard deviation of offsets of astrometric calibration matches (arcsec)')
1303  schema.addField('nPsfStar', type='I', doc='Number of stars used for PSF model')
1304  schema.addField('psfStarDeltaE1Median', type='F',
1305  doc='Median E1 residual (starE1 - psfE1) for psf stars')
1306  schema.addField('psfStarDeltaE2Median', type='F',
1307  doc='Median E2 residual (starE2 - psfE2) for psf stars')
1308  schema.addField('psfStarDeltaE1Scatter', type='F',
1309  doc='Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1310  schema.addField('psfStarDeltaE2Scatter', type='F',
1311  doc='Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1312  schema.addField('psfStarDeltaSizeMedian', type='F',
1313  doc='Median size residual (starSize - psfSize) for psf stars (pixel)')
1314  schema.addField('psfStarDeltaSizeScatter', type='F',
1315  doc='Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1316  schema.addField('psfStarScaledDeltaSizeScatter', type='F',
1317  doc='Scatter (via MAD) of size residual scaled by median size squared')
1318 
1319  return schema
1320 
1321 
1322 class VisitDataIdContainer(DataIdContainer):
1323  """DataIdContainer that groups sensor-level id's by visit
1324  """
1325 
1326  def makeDataRefList(self, namespace):
1327  """Make self.refList from self.idList
1328 
1329  Generate a list of data references grouped by visit.
1330 
1331  Parameters
1332  ----------
1333  namespace : `argparse.Namespace`
1334  Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
1335  """
1336  # Group by visits
1337  visitRefs = defaultdict(list)
1338  for dataId in self.idList:
1339  if "visit" in dataId:
1340  visitId = dataId["visit"]
1341  # append all subsets to
1342  subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1343  visitRefs[visitId].extend([dataRef for dataRef in subset])
1344 
1345  outputRefList = []
1346  for refList in visitRefs.values():
1347  existingRefs = [ref for ref in refList if ref.datasetExists()]
1348  if existingRefs:
1349  outputRefList.append(existingRefs)
1350 
1351  self.refList = outputRefList
1352 
1353 
1354 class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1355  defaultTemplates={"catalogType": ""},
1356  dimensions=("instrument", "visit")):
1357  inputCatalogs = connectionTypes.Input(
1358  doc="Input per-detector Source Tables",
1359  name="{catalogType}sourceTable",
1360  storageClass="DataFrame",
1361  dimensions=("instrument", "visit", "detector"),
1362  multiple=True
1363  )
1364  outputCatalog = connectionTypes.Output(
1365  doc="Per-visit concatenation of Source Table",
1366  name="{catalogType}sourceTable_visit",
1367  storageClass="DataFrame",
1368  dimensions=("instrument", "visit")
1369  )
1370 
1371 
1372 class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1373  pipelineConnections=ConsolidateSourceTableConnections):
1374  pass
1375 
1376 
1377 class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1378  """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1379  """
1380  _DefaultName = 'consolidateSourceTable'
1381  ConfigClass = ConsolidateSourceTableConfig
1382 
1383  inputDataset = 'sourceTable'
1384  outputDataset = 'sourceTable_visit'
1385 
1386  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1387  inputs = butlerQC.get(inputRefs)
1388  self.log.info("Concatenating %s per-detector Source Tables",
1389  len(inputs['inputCatalogs']))
1390  df = pd.concat(inputs['inputCatalogs'])
1391  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1392 
1393  def runDataRef(self, dataRefList):
1394  self.log.info("Concatenating %s per-detector Source Tables", len(dataRefList))
1395  df = pd.concat([dataRef.get().toDataFrame() for dataRef in dataRefList])
1396  dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1397 
1398  @classmethod
1399  def _makeArgumentParser(cls):
1400  parser = ArgumentParser(name=cls._DefaultName)
1401 
1402  parser.add_id_argument("--id", cls.inputDataset,
1403  help="data ID, e.g. --id visit=12345",
1404  ContainerClass=VisitDataIdContainer)
1405  return parser
1406 
1407  def writeMetadata(self, dataRef):
1408  """No metadata to write.
1409  """
1410  pass
1411 
1412  def writeConfig(self, butler, clobber=False, doBackup=True):
1413  """No config to write.
1414  """
1415  pass
1416 
1417 
1418 class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1419  dimensions=("instrument",),
1420  defaultTemplates={"calexpType": ""}):
1421  visitSummaryRefs = connectionTypes.Input(
1422  doc="Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1423  name="{calexpType}visitSummary",
1424  storageClass="ExposureCatalog",
1425  dimensions=("instrument", "visit"),
1426  multiple=True,
1427  deferLoad=True,
1428  )
1429  outputCatalog = connectionTypes.Output(
1430  doc="CCD and Visit metadata table",
1431  name="ccdVisitTable",
1432  storageClass="DataFrame",
1433  dimensions=("instrument",)
1434  )
1435 
1436 
1437 class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1438  pipelineConnections=MakeCcdVisitTableConnections):
1439  pass
1440 
1441 
1442 class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1443  """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1444  """
1445  _DefaultName = 'makeCcdVisitTable'
1446  ConfigClass = MakeCcdVisitTableConfig
1447 
1448  def run(self, visitSummaryRefs):
1449  """ Make a table of ccd information from the `visitSummary` catalogs.
1450  Parameters
1451  ----------
1452  visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1453  List of DeferredDatasetHandles pointing to exposure catalogs with
1454  per-detector summary information.
1455  Returns
1456  -------
1457  result : `lsst.pipe.Base.Struct`
1458  Results struct with attribute:
1459  - `outputCatalog`
1460  Catalog of ccd and visit information.
1461  """
1462  ccdEntries = []
1463  for visitSummaryRef in visitSummaryRefs:
1464  visitSummary = visitSummaryRef.get()
1465  visitInfo = visitSummary[0].getVisitInfo()
1466 
1467  ccdEntry = {}
1468  summaryTable = visitSummary.asAstropy()
1469  selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'decl', 'zenithDistance',
1470  'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise']
1471  ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
1472  # 'visit' is the human readible visit number
1473  # 'visitId' is the key to the visitId table. They are the same
1474  # Technically you should join to get the visit from the visit table
1475  ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1476  dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id) for id in
1477  summaryTable['id']]
1478  packer = visitSummaryRef.dataId.universe.makePacker('visit_detector', visitSummaryRef.dataId)
1479  ccdVisitIds = [packer.pack(dataId) for dataId in dataIds]
1480  ccdEntry['ccdVisitId'] = ccdVisitIds
1481  ccdEntry['detector'] = summaryTable['id']
1482  pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() for vR in visitSummary])
1483  ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1484 
1485  ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1486  ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1487  expTime = visitInfo.getExposureTime()
1488  ccdEntry['expTime'] = expTime
1489  ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1490  ccdEntry['darkTime'] = visitInfo.getDarkTime()
1491  ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x']
1492  ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y']
1493  ccdEntry['llcra'] = summaryTable['raCorners'][:, 0]
1494  ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0]
1495  ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1]
1496  ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1]
1497  ccdEntry['urcra'] = summaryTable['raCorners'][:, 2]
1498  ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
1499  ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
1500  ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
1501  # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, and flags,
1502  # and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. values are actually wanted.
1503  ccdEntries.append(ccdEntry)
1504 
1505  outputCatalog = pd.concat(ccdEntries)
1506  outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True)
1507  return pipeBase.Struct(outputCatalog=outputCatalog)
1508 
1509 
1510 class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1511  dimensions=("instrument",),
1512  defaultTemplates={"calexpType": ""}):
1513  visitSummaries = connectionTypes.Input(
1514  doc="Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1515  name="{calexpType}visitSummary",
1516  storageClass="ExposureCatalog",
1517  dimensions=("instrument", "visit",),
1518  multiple=True,
1519  deferLoad=True,
1520  )
1521  outputCatalog = connectionTypes.Output(
1522  doc="Visit metadata table",
1523  name="visitTable",
1524  storageClass="DataFrame",
1525  dimensions=("instrument",)
1526  )
1527 
1528 
1529 class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1530  pipelineConnections=MakeVisitTableConnections):
1531  pass
1532 
1533 
1534 class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1535  """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1536  """
1537  _DefaultName = 'makeVisitTable'
1538  ConfigClass = MakeVisitTableConfig
1539 
1540  def run(self, visitSummaries):
1541  """ Make a table of visit information from the `visitSummary` catalogs
1542 
1543  Parameters
1544  ----------
1545  visitSummaries : list of `lsst.afw.table.ExposureCatalog`
1546  List of exposure catalogs with per-detector summary information.
1547  Returns
1548  -------
1549  result : `lsst.pipe.Base.Struct`
1550  Results struct with attribute:
1551  ``outputCatalog``
1552  Catalog of visit information.
1553  """
1554  visitEntries = []
1555  for visitSummary in visitSummaries:
1556  visitSummary = visitSummary.get()
1557  visitRow = visitSummary[0]
1558  visitInfo = visitRow.getVisitInfo()
1559 
1560  visitEntry = {}
1561  visitEntry["visitId"] = visitRow['visit']
1562  visitEntry["visit"] = visitRow['visit']
1563  visitEntry["physical_filter"] = visitRow['physical_filter']
1564  visitEntry["band"] = visitRow['band']
1565  raDec = visitInfo.getBoresightRaDec()
1566  visitEntry["ra"] = raDec.getRa().asDegrees()
1567  visitEntry["decl"] = raDec.getDec().asDegrees()
1568  visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1569  azAlt = visitInfo.getBoresightAzAlt()
1570  visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1571  visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1572  visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1573  visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1574  visitEntry["obsStart"] = visitInfo.getDate().toPython()
1575  visitEntry["expTime"] = visitInfo.getExposureTime()
1576  visitEntries.append(visitEntry)
1577  # TODO: DM-30623, Add programId, exposureType, expMidpt, cameraTemp, mirror1Temp, mirror2Temp,
1578  # mirror3Temp, domeTemp, externalTemp, dimmSeeing, pwvGPS, pwvMW, flags, nExposures
1579 
1580  outputCatalog = pd.DataFrame(data=visitEntries)
1581  outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
1582  return pipeBase.Struct(outputCatalog=outputCatalog)
1583 
1584 
1585 class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1586  dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1587 
1588  inputCatalog = connectionTypes.Input(
1589  doc="Primary per-detector, single-epoch forced-photometry catalog. "
1590  "By default, it is the output of ForcedPhotCcdTask on calexps",
1591  name="forced_src",
1592  storageClass="SourceCatalog",
1593  dimensions=("instrument", "visit", "detector", "skymap", "tract")
1594  )
1595  inputCatalogDiff = connectionTypes.Input(
1596  doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1597  "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1598  name="forced_diff",
1599  storageClass="SourceCatalog",
1600  dimensions=("instrument", "visit", "detector", "skymap", "tract")
1601  )
1602  outputCatalog = connectionTypes.Output(
1603  doc="InputCatalogs horizonatally joined on `objectId` in Parquet format",
1604  name="mergedForcedSource",
1605  storageClass="DataFrame",
1606  dimensions=("instrument", "visit", "detector", "skymap", "tract")
1607  )
1608 
1609 
1610 class WriteForcedSourceTableConfig(WriteSourceTableConfig,
1611  pipelineConnections=WriteForcedSourceTableConnections):
1612  key = lsst.pex.config.Field(
1613  doc="Column on which to join the two input tables on and make the primary key of the output",
1614  dtype=str,
1615  default="objectId",
1616  )
1617 
1618 
1619 class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1620  """Merge and convert per-detector forced source catalogs to parquet
1621 
1622  Because the predecessor ForcedPhotCcdTask operates per-detector,
1623  per-tract, (i.e., it has tract in its dimensions), detectors
1624  on the tract boundary may have multiple forced source catalogs.
1625 
1626  The successor task TransformForcedSourceTable runs per-patch
1627  and temporally-aggregates overlapping mergedForcedSource catalogs from all
1628  available multiple epochs.
1629  """
1630  _DefaultName = "writeForcedSourceTable"
1631  ConfigClass = WriteForcedSourceTableConfig
1632 
1633  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1634  inputs = butlerQC.get(inputRefs)
1635  # Add ccdVisitId to allow joining with CcdVisitTable
1636  inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
1637  inputs['band'] = butlerQC.quantum.dataId.full['band']
1638  outputs = self.run(**inputs)
1639  butlerQC.put(outputs, outputRefs)
1640 
1641  def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1642  dfs = []
1643  for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
1644  df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1645  df = df.reindex(sorted(df.columns), axis=1)
1646  df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
1647  df['band'] = band if band else pd.NA
1648  df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1649  names=('dataset', 'column'))
1650 
1651  dfs.append(df)
1652 
1653  outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1654  return pipeBase.Struct(outputCatalog=outputCatalog)
1655 
1656 
1657 class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1658  dimensions=("instrument", "skymap", "patch", "tract")):
1659 
1660  inputCatalogs = connectionTypes.Input(
1661  doc="Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1662  name="mergedForcedSource",
1663  storageClass="DataFrame",
1664  dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1665  multiple=True,
1666  deferLoad=True
1667  )
1668  referenceCatalog = connectionTypes.Input(
1669  doc="Reference catalog which was used to seed the forcedPhot. Columns "
1670  "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1671  "are expected.",
1672  name="objectTable",
1673  storageClass="DataFrame",
1674  dimensions=("tract", "patch", "skymap"),
1675  deferLoad=True
1676  )
1677  outputCatalog = connectionTypes.Output(
1678  doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1679  "specified set of functors",
1680  name="forcedSourceTable",
1681  storageClass="DataFrame",
1682  dimensions=("tract", "patch", "skymap")
1683  )
1684 
1685 
1686 class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1687  pipelineConnections=TransformForcedSourceTableConnections):
1688  referenceColumns = pexConfig.ListField(
1689  dtype=str,
1690  default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1691  optional=True,
1692  doc="Columns to pull from reference catalog",
1693  )
1694  keyRef = lsst.pex.config.Field(
1695  doc="Column on which to join the two input tables on and make the primary key of the output",
1696  dtype=str,
1697  default="objectId",
1698  )
1699  key = lsst.pex.config.Field(
1700  doc="Rename the output DataFrame index to this name",
1701  dtype=str,
1702  default="forcedSourceId",
1703  )
1704 
1705 
1706 class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1707  """Transform/standardize a ForcedSource catalog
1708 
1709  Transforms each wide, per-detector forcedSource parquet table per the
1710  specification file (per-camera defaults found in ForcedSource.yaml).
1711  All epochs that overlap the patch are aggregated into one per-patch
1712  narrow-parquet file.
1713 
1714  No de-duplication of rows is performed. Duplicate resolutions flags are
1715  pulled in from the referenceCatalog: `detect_isPrimary`,
1716  `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1717  for analysis or compare duplicates for QA.
1718 
1719  The resulting table includes multiple bands. Epochs (MJDs) and other useful
1720  per-visit rows can be retreived by joining with the CcdVisitTable on
1721  ccdVisitId.
1722  """
1723  _DefaultName = "transformForcedSourceTable"
1724  ConfigClass = TransformForcedSourceTableConfig
1725 
1726  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1727  inputs = butlerQC.get(inputRefs)
1728  if self.funcs is None:
1729  raise ValueError("config.functorFile is None. "
1730  "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1731  outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
1732  dataId=outputRefs.outputCatalog.dataId.full)
1733 
1734  butlerQC.put(outputs, outputRefs)
1735 
1736  def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1737  dfs = []
1738  ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
1739  self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1740  for handle in inputCatalogs:
1741  result = self.transform(None, handle, funcs, dataId)
1742  # Filter for only rows that were detected on (overlap) the patch
1743  dfs.append(result.df.join(ref, how='inner'))
1744 
1745  outputCatalog = pd.concat(dfs)
1746 
1747  # Now that we are done joining on config.keyRef
1748  # Change index to config.key by
1749  outputCatalog.index.rename(self.config.keyRef, inplace=True)
1750  # Add config.keyRef to the column list
1751  outputCatalog.reset_index(inplace=True)
1752  # set the forcedSourceId to the index. This is specified in the ForcedSource.yaml
1753  outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1754  # Rename it to the config.key
1755  outputCatalog.index.rename(self.config.key, inplace=True)
1756 
1757  self.log.info("Made a table of %d columns and %d rows",
1758  len(outputCatalog.columns), len(outputCatalog))
1759  return pipeBase.Struct(outputCatalog=outputCatalog)
1760 
1761 
1762 class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1763  defaultTemplates={"catalogType": ""},
1764  dimensions=("instrument", "tract")):
1765  inputCatalogs = connectionTypes.Input(
1766  doc="Input per-patch DataFrame Tables to be concatenated",
1767  name="{catalogType}ForcedSourceTable",
1768  storageClass="DataFrame",
1769  dimensions=("tract", "patch", "skymap"),
1770  multiple=True,
1771  )
1772 
1773  outputCatalog = connectionTypes.Output(
1774  doc="Output per-tract concatenation of DataFrame Tables",
1775  name="{catalogType}ForcedSourceTable_tract",
1776  storageClass="DataFrame",
1777  dimensions=("tract", "skymap"),
1778  )
1779 
1780 
1781 class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1782  pipelineConnections=ConsolidateTractConnections):
1783  pass
1784 
1785 
1786 class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
1787  """Concatenate any per-patch, dataframe list into a single
1788  per-tract DataFrame
1789  """
1790  _DefaultName = 'ConsolidateTract'
1791  ConfigClass = ConsolidateTractConfig
1792 
1793  def runQuantum(self, butlerQC, inputRefs, outputRefs):
1794  inputs = butlerQC.get(inputRefs)
1795  # Not checking at least one inputCatalog exists because that'd be an empty QG
1796  self.log.info("Concatenating %s per-patch %s Tables",
1797  len(inputs['inputCatalogs']),
1798  inputRefs.inputCatalogs[0].datasetType.name)
1799  df = pd.concat(inputs['inputCatalogs'])
1800  butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > type
Definition: Detector.cc:163
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
An integer coordinate rectangle.
Definition: Box.h:55
def getAnalysis(self, parq, funcs=None, band=None)
Definition: postprocess.py:712
def transform(self, band, parq, funcs, dataId)
Definition: postprocess.py:718
def run(self, parq, funcs=None, dataId=None, band=None)
Definition: postprocess.py:681
def runQuantum(self, butlerQC, inputRefs, outputRefs)
Definition: postprocess.py:662
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
void write(OutputArchiveHandle &handle) const override
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
Definition: postprocess.py:44