LSST Applications g180d380827+0f66a164bb,g2079a07aa2+86d27d4dc4,g2305ad1205+7d304bc7a0,g29320951ab+500695df56,g2bbee38e9b+0e5473021a,g337abbeb29+0e5473021a,g33d1c0ed96+0e5473021a,g3a166c0a6a+0e5473021a,g3ddfee87b4+e42ea45bea,g48712c4677+36a86eeaa5,g487adcacf7+2dd8f347ac,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+c70619cc9d,g5a732f18d5+53520f316c,g5ea96fc03c+341ea1ce94,g64a986408d+f7cd9c7162,g858d7b2824+f7cd9c7162,g8a8a8dda67+585e252eca,g99cad8db69+469ab8c039,g9ddcbc5298+9a081db1e4,ga1e77700b3+15fc3df1f7,gb0e22166c9+60f28cb32d,gba4ed39666+c2a2e4ac27,gbb8dafda3b+c92fc63c7e,gbd866b1f37+f7cd9c7162,gc120e1dc64+02c66aa596,gc28159a63d+0e5473021a,gc3e9b769f7+b0068a2d9f,gcf0d15dbbd+e42ea45bea,gdaeeff99f8+f9a426f77a,ge6526c86ff+84383d05b3,ge79ae78c31+0e5473021a,gee10cc3b42+585e252eca,gff1a9f87cc+f7cd9c7162,w.2024.17
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38import functools
39import pandas as pd
40import logging
41import numpy as np
42import numbers
43import os
44
45import lsst.geom
46import lsst.pex.config as pexConfig
47import lsst.pipe.base as pipeBase
48import lsst.daf.base as dafBase
49from lsst.pipe.base import connectionTypes
50import lsst.afw.table as afwTable
51from lsst.afw.image import ExposureSummaryStats
52from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
53
54from .functors import CompositeFunctor, Column
55
56log = logging.getLogger(__name__)
57
58
59def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
60 """Flattens a dataframe with multilevel column index.
61 """
62 newDf = pd.DataFrame()
63 # band is the level 0 index
64 dfBands = df.columns.unique(level=0).values
65 for band in dfBands:
66 subdf = df[band]
67 columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
68 newColumns = {c: columnFormat.format(band, c)
69 for c in subdf.columns if c not in noDupCols}
70 cols = list(newColumns.keys())
71 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
72
73 # Band must be present in the input and output or else column is all NaN:
74 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
75 # Get the unexploded columns from any present band's partition
76 noDupDf = df[presentBands[0]][noDupCols]
77 newDf = pd.concat([noDupDf, newDf], axis=1)
78 return newDf
79
80
81class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
82 defaultTemplates={"coaddName": "deep"},
83 dimensions=("tract", "patch", "skymap")):
84 inputCatalogMeas = connectionTypes.Input(
85 doc="Catalog of source measurements on the deepCoadd.",
86 dimensions=("tract", "patch", "band", "skymap"),
87 storageClass="SourceCatalog",
88 name="{coaddName}Coadd_meas",
89 multiple=True
90 )
91 inputCatalogForcedSrc = connectionTypes.Input(
92 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
93 dimensions=("tract", "patch", "band", "skymap"),
94 storageClass="SourceCatalog",
95 name="{coaddName}Coadd_forced_src",
96 multiple=True
97 )
98 inputCatalogRef = connectionTypes.Input(
99 doc="Catalog marking the primary detection (which band provides a good shape and position)"
100 "for each detection in deepCoadd_mergeDet.",
101 dimensions=("tract", "patch", "skymap"),
102 storageClass="SourceCatalog",
103 name="{coaddName}Coadd_ref"
104 )
105 outputCatalog = connectionTypes.Output(
106 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
107 "stored as a DataFrame with a multi-level column index per-patch.",
108 dimensions=("tract", "patch", "skymap"),
109 storageClass="DataFrame",
110 name="{coaddName}Coadd_obj"
111 )
112
113
114class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
115 pipelineConnections=WriteObjectTableConnections):
116 engine = pexConfig.Field(
117 dtype=str,
118 default="pyarrow",
119 doc="Parquet engine for writing (pyarrow or fastparquet)",
120 deprecated="This config is no longer used, and will be removed after v26."
121 )
122 coaddName = pexConfig.Field(
123 dtype=str,
124 default="deep",
125 doc="Name of coadd"
126 )
127
128
129class WriteObjectTableTask(pipeBase.PipelineTask):
130 """Write filter-merged source tables as a DataFrame in parquet format.
131 """
132 _DefaultName = "writeObjectTable"
133 ConfigClass = WriteObjectTableConfig
134
135 # Names of table datasets to be merged
136 inputDatasets = ('forced_src', 'meas', 'ref')
137
138 # Tag of output dataset written by `MergeSourcesTask.write`
139 outputDataset = 'obj'
140
141 def runQuantum(self, butlerQC, inputRefs, outputRefs):
142 inputs = butlerQC.get(inputRefs)
143
144 measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
145 zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
146 forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
147 zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
148
149 catalogs = {}
150 for band in measDict.keys():
151 catalogs[band] = {'meas': measDict[band]['meas'],
152 'forced_src': forcedSourceDict[band]['forced_src'],
153 'ref': inputs['inputCatalogRef']}
154 dataId = butlerQC.quantum.dataId
155 df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
156 outputs = pipeBase.Struct(outputCatalog=df)
157 butlerQC.put(outputs, outputRefs)
158
159 def run(self, catalogs, tract, patch):
160 """Merge multiple catalogs.
161
162 Parameters
163 ----------
164 catalogs : `dict`
165 Mapping from filter names to dict of catalogs.
166 tract : int
167 tractId to use for the tractId column.
168 patch : str
169 patchId to use for the patchId column.
170
171 Returns
172 -------
173 catalog : `pandas.DataFrame`
174 Merged dataframe.
175 """
176 dfs = []
177 for filt, tableDict in catalogs.items():
178 for dataset, table in tableDict.items():
179 # Convert afwTable to pandas DataFrame
180 df = table.asAstropy().to_pandas().set_index('id', drop=True)
181
182 # Sort columns by name, to ensure matching schema among patches
183 df = df.reindex(sorted(df.columns), axis=1)
184 df = df.assign(tractId=tract, patchId=patch)
185
186 # Make columns a 3-level MultiIndex
187 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
188 names=('dataset', 'band', 'column'))
189 dfs.append(df)
190
191 # We do this dance and not `pd.concat(dfs)` because the pandas
192 # concatenation uses infinite memory.
193 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
194 return catalog
195
196
197class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
198 defaultTemplates={"catalogType": ""},
199 dimensions=("instrument", "visit", "detector")):
200
201 catalog = connectionTypes.Input(
202 doc="Input full-depth catalog of sources produced by CalibrateTask",
203 name="{catalogType}src",
204 storageClass="SourceCatalog",
205 dimensions=("instrument", "visit", "detector")
206 )
207 outputCatalog = connectionTypes.Output(
208 doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is "
209 "replaced with an index; all other columns are unchanged.",
210 name="{catalogType}source",
211 storageClass="DataFrame",
212 dimensions=("instrument", "visit", "detector")
213 )
214
215
216class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
217 pipelineConnections=WriteSourceTableConnections):
218 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
219
220
221class WriteSourceTableTask(pipeBase.PipelineTask):
222 """Write source table to DataFrame Parquet format.
223 """
224 _DefaultName = "writeSourceTable"
225 ConfigClass = WriteSourceTableConfig
226
227 def runQuantum(self, butlerQC, inputRefs, outputRefs):
228 inputs = butlerQC.get(inputRefs)
229 inputs['ccdVisitId'] = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
230 result = self.run(**inputs)
231 outputs = pipeBase.Struct(outputCatalog=result.table)
232 butlerQC.put(outputs, outputRefs)
233
234 def run(self, catalog, ccdVisitId=None, **kwargs):
235 """Convert `src` catalog to DataFrame
236
237 Parameters
238 ----------
239 catalog: `afwTable.SourceCatalog`
240 catalog to be converted
241 ccdVisitId: `int`
242 ccdVisitId to be added as a column
243 **kwargs
244 Additional keyword arguments are ignored as a convenience for
245 subclasses that pass the same arguments to several different
246 methods.
247
248 Returns
249 -------
250 result : `~lsst.pipe.base.Struct`
251 ``table``
252 `DataFrame` version of the input catalog
253 """
254 self.log.info("Generating DataFrame from src catalog ccdVisitId=%s", ccdVisitId)
255 df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
256 df['ccdVisitId'] = ccdVisitId
257
258 return pipeBase.Struct(table=df)
259
260
261class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
262 defaultTemplates={"catalogType": ""},
263 dimensions=("instrument", "visit", "detector", "skymap")):
264 exposure = connectionTypes.Input(
265 doc="Input exposure to perform photometry on.",
266 name="calexp",
267 storageClass="ExposureF",
268 dimensions=["instrument", "visit", "detector"],
269 # TODO: remove on DM-39584
270 deprecated=(
271 "Deprecated, as the `calexp` is not needed and just creates unnecessary i/o. "
272 "Will be removed after v26."
273 ),
274 )
275 visitSummary = connectionTypes.Input(
276 doc="Input visit-summary catalog with updated calibration objects.",
277 name="finalVisitSummary",
278 storageClass="ExposureCatalog",
279 dimensions=("instrument", "visit",),
280 )
281
282
283class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
284 pipelineConnections=WriteRecalibratedSourceTableConnections):
285
286 doReevaluatePhotoCalib = pexConfig.Field(
287 dtype=bool,
288 default=True,
289 doc=("Add or replace local photoCalib columns"),
290 )
291 doReevaluateSkyWcs = pexConfig.Field(
292 dtype=bool,
293 default=True,
294 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
295 )
296 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
297
298
299class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
300 """Write source table to DataFrame Parquet format.
301 """
302 _DefaultName = "writeRecalibratedSourceTable"
303 ConfigClass = WriteRecalibratedSourceTableConfig
304
305 def runQuantum(self, butlerQC, inputRefs, outputRefs):
306 inputs = butlerQC.get(inputRefs)
307
308 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
309 inputs['idGenerator'] = idGenerator
310 inputs['ccdVisitId'] = idGenerator.catalog_id
311
312 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
313 inputs['exposure'] = self.prepareCalibratedExposure(
314 exposure=inputs["exposure"],
315 visitSummary=inputs["visitSummary"],
316 detectorId=butlerQC.quantum.dataId["detector"]
317 )
318 inputs['catalog'] = self.addCalibColumns(**inputs)
319
320 result = self.run(**inputs)
321 outputs = pipeBase.Struct(outputCatalog=result.table)
322 butlerQC.put(outputs, outputRefs)
323
324 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
325 """Prepare a calibrated exposure and apply external calibrations
326 if so configured.
327
328 Parameters
329 ----------
330 exposure : `lsst.afw.image.exposure.Exposure`
331 Input exposure to adjust calibrations. May be empty.
332 detectorId : `int`
333 Detector ID associated with the exposure.
334 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
335 Exposure catalog with all calibration objects. WCS and PhotoCalib
336 are always applied if ``visitSummary`` is provided and those
337 components are not `None`.
338
339 Returns
340 -------
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Exposure with adjusted calibrations.
343 """
344 if visitSummary is not None:
345 row = visitSummary.find(detectorId)
346 if row is None:
347 raise RuntimeError(f"Visit summary for detector {detectorId} is unexpectedly missing.")
348 if (photoCalib := row.getPhotoCalib()) is None:
349 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
350 "skipping reevaluation of photoCalib.", detectorId)
351 exposure.setPhotoCalib(None)
352 else:
353 exposure.setPhotoCalib(photoCalib)
354 if (skyWcs := row.getWcs()) is None:
355 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
356 "skipping reevaluation of skyWcs.", detectorId)
357 exposure.setWcs(None)
358 else:
359 exposure.setWcs(skyWcs)
360
361 return exposure
362
363 def addCalibColumns(self, catalog, exposure, idGenerator, **kwargs):
364 """Add replace columns with calibs evaluated at each centroid
365
366 Add or replace 'base_LocalWcs' `base_LocalPhotoCalib' columns in a
367 a source catalog, by rerunning the plugins.
368
369 Parameters
370 ----------
371 catalog : `lsst.afw.table.SourceCatalog`
372 catalog to which calib columns will be added
373 exposure : `lsst.afw.image.exposure.Exposure`
374 Exposure with attached PhotoCalibs and SkyWcs attributes to be
375 reevaluated at local centroids. Pixels are not required.
376 idGenerator : `lsst.meas.base.IdGenerator`
377 Object that generates Source IDs and random seeds.
378 **kwargs
379 Additional keyword arguments are ignored to facilitate passing the
380 same arguments to several methods.
381
382 Returns
383 -------
384 newCat: `lsst.afw.table.SourceCatalog`
385 Source Catalog with requested local calib columns
386 """
387 measureConfig = SingleFrameMeasurementTask.ConfigClass()
388 measureConfig.doReplaceWithNoise = False
389
390 # Clear all slots, because we aren't running the relevant plugins.
391 for slot in measureConfig.slots:
392 setattr(measureConfig.slots, slot, None)
393
394 measureConfig.plugins.names = []
395 if self.config.doReevaluateSkyWcs:
396 measureConfig.plugins.names.add("base_LocalWcs")
397 self.log.info("Re-evaluating base_LocalWcs plugin")
398 if self.config.doReevaluatePhotoCalib:
399 measureConfig.plugins.names.add("base_LocalPhotoCalib")
400 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
401 pluginsNotToCopy = tuple(measureConfig.plugins.names)
402
403 # Create a new schema and catalog
404 # Copy all columns from original except for the ones to reevaluate
405 aliasMap = catalog.schema.getAliasMap()
406 mapper = afwTable.SchemaMapper(catalog.schema)
407 for item in catalog.schema:
408 if not item.field.getName().startswith(pluginsNotToCopy):
409 mapper.addMapping(item.key)
410
411 schema = mapper.getOutputSchema()
412 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
413 schema.setAliasMap(aliasMap)
414 newCat = afwTable.SourceCatalog(schema)
415 newCat.extend(catalog, mapper=mapper)
416
417 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
418 # update here. LocalPhotoCalibs are applied during transform tasks.
419 # Update coord_ra/coord_dec, which are expected to be positions on the
420 # sky and are used as such in sdm tables without transform
421 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
422 afwTable.updateSourceCoords(exposure.wcs, newCat)
423 wcsPlugin = measurement.plugins["base_LocalWcs"]
424 else:
425 wcsPlugin = None
426
427 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
428 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
429 else:
430 pcPlugin = None
431
432 for row in newCat:
433 if wcsPlugin is not None:
434 wcsPlugin.measure(row, exposure)
435 if pcPlugin is not None:
436 pcPlugin.measure(row, exposure)
437
438 return newCat
439
440
442 """Calculate columns from DataFrames or handles storing DataFrames.
443
444 This object manages and organizes an arbitrary set of computations
445 on a catalog. The catalog is defined by a
446 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
447 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
448 computations are defined by a collection of
449 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
450 ``CompositeFunctor``).
451
452 After the object is initialized, accessing the ``.df`` attribute (which
453 holds the `pandas.DataFrame` containing the results of the calculations)
454 triggers computation of said dataframe.
455
456 One of the conveniences of using this object is the ability to define a
457 desired common filter for all functors. This enables the same functor
458 collection to be passed to several different `PostprocessAnalysis` objects
459 without having to change the original functor collection, since the ``filt``
460 keyword argument of this object triggers an overwrite of the ``filt``
461 property for all functors in the collection.
462
463 This object also allows a list of refFlags to be passed, and defines a set
464 of default refFlags that are always included even if not requested.
465
466 If a list of DataFrames or Handles is passed, rather than a single one,
467 then the calculations will be mapped over all the input catalogs. In
468 principle, it should be straightforward to parallelize this activity, but
469 initial tests have failed (see TODO in code comments).
470
471 Parameters
472 ----------
473 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
474 `~lsst.pipe.base.InMemoryDatasetHandle` or
475 list of these.
476 Source catalog(s) for computation.
477 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
478 Computations to do (functors that act on ``handles``).
479 If a dict, the output
480 DataFrame will have columns keyed accordingly.
481 If a list, the column keys will come from the
482 ``.shortname`` attribute of each functor.
483
484 filt : `str`, optional
485 Filter in which to calculate. If provided,
486 this will overwrite any existing ``.filt`` attribute
487 of the provided functors.
488
489 flags : `list`, optional
490 List of flags (per-band) to include in output table.
491 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
492
493 refFlags : `list`, optional
494 List of refFlags (only reference band) to include in output table.
495
496 forcedFlags : `list`, optional
497 List of flags (per-band) to include in output table.
498 Taken from the ``forced_src`` dataset if applied to a
499 multilevel Object Table. Intended for flags from measurement plugins
500 only run during multi-band forced-photometry.
501 """
502 _defaultRefFlags = []
503 _defaultFuncs = ()
504
505 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
506 self.handles = handles
507 self.functors = functors
508
509 self.filt = filt
510 self.flags = list(flags) if flags is not None else []
511 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
512 self.refFlags = list(self._defaultRefFlags)
513 if refFlags is not None:
514 self.refFlags += list(refFlags)
515
516 self._df = None
517
518 @property
519 def defaultFuncs(self):
520 funcs = dict(self._defaultFuncs)
521 return funcs
522
523 @property
524 def func(self):
525 additionalFuncs = self.defaultFuncs
526 additionalFuncs.update({flag: Column(flag, dataset='forced_src') for flag in self.forcedFlags})
527 additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
528 additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
529
530 if isinstance(self.functors, CompositeFunctor):
531 func = self.functors
532 else:
533 func = CompositeFunctor(self.functors)
534
535 func.funcDict.update(additionalFuncs)
536 func.filt = self.filt
537
538 return func
539
540 @property
541 def noDupCols(self):
542 return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
543
544 @property
545 def df(self):
546 if self._df is None:
547 self.compute()
548 return self._df
549
550 def compute(self, dropna=False, pool=None):
551 # map over multiple handles
552 if type(self.handles) in (list, tuple):
553 if pool is None:
554 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
555 else:
556 # TODO: Figure out why this doesn't work (pyarrow pickling
557 # issues?)
558 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
559 self._df = pd.concat(dflist)
560 else:
561 self._df = self.func(self.handles, dropna=dropna)
562
563 return self._df
564
565
566class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
567 dimensions=()):
568 """Expected Connections for subclasses of TransformCatalogBaseTask.
569
570 Must be subclassed.
571 """
572 inputCatalog = connectionTypes.Input(
573 name="",
574 storageClass="DataFrame",
575 )
576 outputCatalog = connectionTypes.Output(
577 name="",
578 storageClass="DataFrame",
579 )
580
581
582class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
583 pipelineConnections=TransformCatalogBaseConnections):
584 functorFile = pexConfig.Field(
585 dtype=str,
586 doc="Path to YAML file specifying Science Data Model functors to use "
587 "when copying columns and computing calibrated values.",
588 default=None,
589 optional=True
590 )
591 primaryKey = pexConfig.Field(
592 dtype=str,
593 doc="Name of column to be set as the DataFrame index. If None, the index"
594 "will be named `id`",
595 default=None,
596 optional=True
597 )
598 columnsFromDataId = pexConfig.ListField(
599 dtype=str,
600 default=None,
601 optional=True,
602 doc="Columns to extract from the dataId",
603 )
604
605
606class TransformCatalogBaseTask(pipeBase.PipelineTask):
607 """Base class for transforming/standardizing a catalog by applying functors
608 that convert units and apply calibrations.
609
610 The purpose of this task is to perform a set of computations on an input
611 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
612 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
613 a new dataset (which needs to be declared in an ``outputDataset``
614 attribute).
615
616 The calculations to be performed are defined in a YAML file that specifies
617 a set of functors to be computed, provided as a ``--functorFile`` config
618 parameter. An example of such a YAML file is the following:
619
620 funcs:
621 sourceId:
622 functor: Index
623 x:
624 functor: Column
625 args: slot_Centroid_x
626 y:
627 functor: Column
628 args: slot_Centroid_y
629 psfFlux:
630 functor: LocalNanojansky
631 args:
632 - slot_PsfFlux_instFlux
633 - slot_PsfFlux_instFluxErr
634 - base_LocalPhotoCalib
635 - base_LocalPhotoCalibErr
636 psfFluxErr:
637 functor: LocalNanojanskyErr
638 args:
639 - slot_PsfFlux_instFlux
640 - slot_PsfFlux_instFluxErr
641 - base_LocalPhotoCalib
642 - base_LocalPhotoCalibErr
643 flags:
644 - detect_isPrimary
645
646 The names for each entry under "func" will become the names of columns in
647 the output dataset. All the functors referenced are defined in
648 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
649 functor are in the `args` list, and any additional entries for each column
650 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
651 treated as keyword arguments to be passed to the functor initialization.
652
653 The "flags" entry is the default shortcut for `Column` functors.
654 All columns listed under "flags" will be copied to the output table
655 untransformed. They can be of any datatype.
656 In the special case of transforming a multi-level oject table with
657 band and dataset indices (deepCoadd_obj), these will be taked from the
658 `meas` dataset and exploded out per band.
659
660 There are two special shortcuts that only apply when transforming
661 multi-level Object (deepCoadd_obj) tables:
662 - The "refFlags" entry is shortcut for `Column` functor
663 taken from the `'ref'` dataset if transforming an ObjectTable.
664 - The "forcedFlags" entry is shortcut for `Column` functors.
665 taken from the ``forced_src`` dataset if transforming an ObjectTable.
666 These are expanded out per band.
667
668
669 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
670 to organize and excecute the calculations.
671 """
672 @property
673 def _DefaultName(self):
674 raise NotImplementedError('Subclass must define "_DefaultName" attribute')
675
676 @property
677 def outputDataset(self):
678 raise NotImplementedError('Subclass must define "outputDataset" attribute')
679
680 @property
681 def inputDataset(self):
682 raise NotImplementedError('Subclass must define "inputDataset" attribute')
683
684 @property
685 def ConfigClass(self):
686 raise NotImplementedError('Subclass must define "ConfigClass" attribute')
687
688 def __init__(self, *args, **kwargs):
689 super().__init__(*args, **kwargs)
690 if self.config.functorFile:
691 self.log.info('Loading tranform functor definitions from %s',
692 self.config.functorFile)
693 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
694 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
695 else:
696 self.funcs = None
697
698 def runQuantum(self, butlerQC, inputRefs, outputRefs):
699 inputs = butlerQC.get(inputRefs)
700 if self.funcs is None:
701 raise ValueError("config.functorFile is None. "
702 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
703 result = self.run(handle=inputs['inputCatalog'], funcs=self.funcs,
704 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
705 outputs = pipeBase.Struct(outputCatalog=result)
706 butlerQC.put(outputs, outputRefs)
707
708 def run(self, handle, funcs=None, dataId=None, band=None):
709 """Do postprocessing calculations
710
711 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
712 ``DataFrame`` object and dataId,
713 returns a dataframe with results of postprocessing calculations.
714
715 Parameters
716 ----------
717 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
718 `~lsst.pipe.base.InMemoryDatasetHandle` or
719 `~pandas.DataFrame`, or list of these.
720 DataFrames from which calculations are done.
721 funcs : `~lsst.pipe.tasks.functors.Functor`
722 Functors to apply to the table's columns
723 dataId : dict, optional
724 Used to add a `patchId` column to the output dataframe.
725 band : `str`, optional
726 Filter band that is being processed.
727
728 Returns
729 -------
730 df : `pandas.DataFrame`
731 """
732 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
733
734 df = self.transform(band, handle, funcs, dataId).df
735 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
736 return df
737
738 def getFunctors(self):
739 return self.funcs
740
741 def getAnalysis(self, handles, funcs=None, band=None):
742 if funcs is None:
743 funcs = self.funcs
744 analysis = PostprocessAnalysis(handles, funcs, filt=band)
745 return analysis
746
747 def transform(self, band, handles, funcs, dataId):
748 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
749 df = analysis.df
750 if dataId and self.config.columnsFromDataId:
751 for key in self.config.columnsFromDataId:
752 if key in dataId:
753 df[key] = dataId[key]
754 else:
755 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
756
757 if self.config.primaryKey:
758 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
759 df.reset_index(inplace=True, drop=True)
760 df.set_index(self.config.primaryKey, inplace=True)
761
762 return pipeBase.Struct(
763 df=df,
764 analysis=analysis
765 )
766
767
768class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
769 defaultTemplates={"coaddName": "deep"},
770 dimensions=("tract", "patch", "skymap")):
771 inputCatalog = connectionTypes.Input(
772 doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
773 "stored as a DataFrame with a multi-level column index per-patch.",
774 dimensions=("tract", "patch", "skymap"),
775 storageClass="DataFrame",
776 name="{coaddName}Coadd_obj",
777 deferLoad=True,
778 )
779 outputCatalog = connectionTypes.Output(
780 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
781 "data model.",
782 dimensions=("tract", "patch", "skymap"),
783 storageClass="DataFrame",
784 name="objectTable"
785 )
786
787
788class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
789 pipelineConnections=TransformObjectCatalogConnections):
790 coaddName = pexConfig.Field(
791 dtype=str,
792 default="deep",
793 doc="Name of coadd"
794 )
795 # TODO: remove in DM-27177
796 filterMap = pexConfig.DictField(
797 keytype=str,
798 itemtype=str,
799 default={},
800 doc=("Dictionary mapping full filter name to short one for column name munging."
801 "These filters determine the output columns no matter what filters the "
802 "input data actually contain."),
803 deprecated=("Coadds are now identified by the band, so this transform is unused."
804 "Will be removed after v22.")
805 )
806 outputBands = pexConfig.ListField(
807 dtype=str,
808 default=None,
809 optional=True,
810 doc=("These bands and only these bands will appear in the output,"
811 " NaN-filled if the input does not include them."
812 " If None, then use all bands found in the input.")
813 )
814 camelCase = pexConfig.Field(
815 dtype=bool,
816 default=False,
817 doc=("Write per-band columns names with camelCase, else underscore "
818 "For example: gPsFlux instead of g_PsFlux.")
819 )
820 multilevelOutput = pexConfig.Field(
821 dtype=bool,
822 default=False,
823 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
824 "and name-munged (False).")
825 )
826 goodFlags = pexConfig.ListField(
827 dtype=str,
828 default=[],
829 doc=("List of 'good' flags that should be set False when populating empty tables. "
830 "All other flags are considered to be 'bad' flags and will be set to True.")
831 )
832 floatFillValue = pexConfig.Field(
833 dtype=float,
834 default=np.nan,
835 doc="Fill value for float fields when populating empty tables."
836 )
837 integerFillValue = pexConfig.Field(
838 dtype=int,
839 default=-1,
840 doc="Fill value for integer fields when populating empty tables."
841 )
842
843 def setDefaults(self):
844 super().setDefaults()
845 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Object.yaml')
846 self.primaryKey = 'objectId'
847 self.columnsFromDataId = ['tract', 'patch']
848 self.goodFlags = ['calib_astrometry_used',
849 'calib_photometry_reserved',
850 'calib_photometry_used',
851 'calib_psf_candidate',
852 'calib_psf_reserved',
853 'calib_psf_used']
854
855
856class TransformObjectCatalogTask(TransformCatalogBaseTask):
857 """Produce a flattened Object Table to match the format specified in
858 sdm_schemas.
859
860 Do the same set of postprocessing calculations on all bands.
861
862 This is identical to `TransformCatalogBaseTask`, except for that it does
863 the specified functor calculations for all filters present in the
864 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
865 by the YAML file will be superceded.
866 """
867 _DefaultName = "transformObjectCatalog"
868 ConfigClass = TransformObjectCatalogConfig
869
870 def run(self, handle, funcs=None, dataId=None, band=None):
871 # NOTE: band kwarg is ignored here.
872 dfDict = {}
873 analysisDict = {}
874 templateDf = pd.DataFrame()
875
876 columns = handle.get(component='columns')
877 inputBands = columns.unique(level=1).values
878
879 outputBands = self.config.outputBands if self.config.outputBands else inputBands
880
881 # Perform transform for data of filters that exist in the handle dataframe.
882 for inputBand in inputBands:
883 if inputBand not in outputBands:
884 self.log.info("Ignoring %s band data in the input", inputBand)
885 continue
886 self.log.info("Transforming the catalog of band %s", inputBand)
887 result = self.transform(inputBand, handle, funcs, dataId)
888 dfDict[inputBand] = result.df
889 analysisDict[inputBand] = result.analysis
890 if templateDf.empty:
891 templateDf = result.df
892
893 # Put filler values in columns of other wanted bands
894 for filt in outputBands:
895 if filt not in dfDict:
896 self.log.info("Adding empty columns for band %s", filt)
897 dfTemp = templateDf.copy()
898 for col in dfTemp.columns:
899 testValue = dfTemp[col].values[0]
900 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
901 # Boolean flag type, check if it is a "good" flag
902 if col in self.config.goodFlags:
903 fillValue = False
904 else:
905 fillValue = True
906 elif isinstance(testValue, numbers.Integral):
907 # Checking numbers.Integral catches all flavors
908 # of python, numpy, pandas, etc. integers.
909 # We must ensure this is not an unsigned integer.
910 if isinstance(testValue, np.unsignedinteger):
911 raise ValueError("Parquet tables may not have unsigned integer columns.")
912 else:
913 fillValue = self.config.integerFillValue
914 else:
915 fillValue = self.config.floatFillValue
916 dfTemp[col].values[:] = fillValue
917 dfDict[filt] = dfTemp
918
919 # This makes a multilevel column index, with band as first level
920 df = pd.concat(dfDict, axis=1, names=['band', 'column'])
921
922 if not self.config.multilevelOutput:
923 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
924 if self.config.primaryKey in noDupCols:
925 noDupCols.remove(self.config.primaryKey)
926 if dataId and self.config.columnsFromDataId:
927 noDupCols += self.config.columnsFromDataId
928 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
929 inputBands=inputBands)
930
931 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
932
933 return df
934
935
936class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
937 dimensions=("tract", "skymap")):
938 inputCatalogs = connectionTypes.Input(
939 doc="Per-Patch objectTables conforming to the standard data model.",
940 name="objectTable",
941 storageClass="DataFrame",
942 dimensions=("tract", "patch", "skymap"),
943 multiple=True,
944 )
945 outputCatalog = connectionTypes.Output(
946 doc="Pre-tract horizontal concatenation of the input objectTables",
947 name="objectTable_tract",
948 storageClass="DataFrame",
949 dimensions=("tract", "skymap"),
950 )
951
952
953class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
954 pipelineConnections=ConsolidateObjectTableConnections):
955 coaddName = pexConfig.Field(
956 dtype=str,
957 default="deep",
958 doc="Name of coadd"
959 )
960
961
962class ConsolidateObjectTableTask(pipeBase.PipelineTask):
963 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
964
965 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
966 """
967 _DefaultName = "consolidateObjectTable"
968 ConfigClass = ConsolidateObjectTableConfig
969
970 inputDataset = 'objectTable'
971 outputDataset = 'objectTable_tract'
972
973 def runQuantum(self, butlerQC, inputRefs, outputRefs):
974 inputs = butlerQC.get(inputRefs)
975 self.log.info("Concatenating %s per-patch Object Tables",
976 len(inputs['inputCatalogs']))
977 df = pd.concat(inputs['inputCatalogs'])
978 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
979
980
981class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
982 defaultTemplates={"catalogType": ""},
983 dimensions=("instrument", "visit", "detector")):
984
985 inputCatalog = connectionTypes.Input(
986 doc="Wide input catalog of sources produced by WriteSourceTableTask",
987 name="{catalogType}source",
988 storageClass="DataFrame",
989 dimensions=("instrument", "visit", "detector"),
990 deferLoad=True
991 )
992 outputCatalog = connectionTypes.Output(
993 doc="Narrower, per-detector Source Table transformed and converted per a "
994 "specified set of functors",
995 name="{catalogType}sourceTable",
996 storageClass="DataFrame",
997 dimensions=("instrument", "visit", "detector")
998 )
999
1000
1001class TransformSourceTableConfig(TransformCatalogBaseConfig,
1002 pipelineConnections=TransformSourceTableConnections):
1003
1004 def setDefaults(self):
1005 super().setDefaults()
1006 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Source.yaml')
1007 self.primaryKey = 'sourceId'
1008 self.columnsFromDataId = ['visit', 'detector', 'band', 'physical_filter']
1009
1010
1011class TransformSourceTableTask(TransformCatalogBaseTask):
1012 """Transform/standardize a source catalog
1013 """
1014 _DefaultName = "transformSourceTable"
1015 ConfigClass = TransformSourceTableConfig
1016
1017
1018class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1019 dimensions=("instrument", "visit",),
1020 defaultTemplates={"calexpType": ""}):
1021 calexp = connectionTypes.Input(
1022 doc="Processed exposures used for metadata",
1023 name="calexp",
1024 storageClass="ExposureF",
1025 dimensions=("instrument", "visit", "detector"),
1026 deferLoad=True,
1027 multiple=True,
1028 )
1029 visitSummary = connectionTypes.Output(
1030 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1031 "detector id for the id and are sorted for fast lookups of a "
1032 "detector."),
1033 name="visitSummary",
1034 storageClass="ExposureCatalog",
1035 dimensions=("instrument", "visit"),
1036 )
1037 visitSummarySchema = connectionTypes.InitOutput(
1038 doc="Schema of the visitSummary catalog",
1039 name="visitSummary_schema",
1040 storageClass="ExposureCatalog",
1041 )
1042
1043
1044class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1045 pipelineConnections=ConsolidateVisitSummaryConnections):
1046 """Config for ConsolidateVisitSummaryTask"""
1047 pass
1048
1049
1050class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1051 """Task to consolidate per-detector visit metadata.
1052
1053 This task aggregates the following metadata from all the detectors in a
1054 single visit into an exposure catalog:
1055 - The visitInfo.
1056 - The wcs.
1057 - The photoCalib.
1058 - The physical_filter and band (if available).
1059 - The psf size, shape, and effective area at the center of the detector.
1060 - The corners of the bounding box in right ascension/declination.
1061
1062 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1063 are not persisted here because of storage concerns, and because of their
1064 limited utility as summary statistics.
1065
1066 Tests for this task are performed in ci_hsc_gen3.
1067 """
1068 _DefaultName = "consolidateVisitSummary"
1069 ConfigClass = ConsolidateVisitSummaryConfig
1070
1071 def __init__(self, **kwargs):
1072 super().__init__(**kwargs)
1073 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1074 self.schema.addField('visit', type='L', doc='Visit number')
1075 self.schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
1076 self.schema.addField('band', type='String', size=32, doc='Name of band')
1077 ExposureSummaryStats.update_schema(self.schema)
1078 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1079
1080 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1081 dataRefs = butlerQC.get(inputRefs.calexp)
1082 visit = dataRefs[0].dataId['visit']
1083
1084 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1085 len(dataRefs), visit)
1086
1087 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1088
1089 butlerQC.put(expCatalog, outputRefs.visitSummary)
1090
1091 def _combineExposureMetadata(self, visit, dataRefs):
1092 """Make a combined exposure catalog from a list of dataRefs.
1093 These dataRefs must point to exposures with wcs, summaryStats,
1094 and other visit metadata.
1095
1096 Parameters
1097 ----------
1098 visit : `int`
1099 Visit identification number.
1100 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1101 List of dataRefs in visit.
1102
1103 Returns
1104 -------
1105 visitSummary : `lsst.afw.table.ExposureCatalog`
1106 Exposure catalog with per-detector summary information.
1107 """
1108 cat = afwTable.ExposureCatalog(self.schema)
1109 cat.resize(len(dataRefs))
1110
1111 cat['visit'] = visit
1112
1113 for i, dataRef in enumerate(dataRefs):
1114 visitInfo = dataRef.get(component='visitInfo')
1115 filterLabel = dataRef.get(component='filter')
1116 summaryStats = dataRef.get(component='summaryStats')
1117 detector = dataRef.get(component='detector')
1118 wcs = dataRef.get(component='wcs')
1119 photoCalib = dataRef.get(component='photoCalib')
1120 detector = dataRef.get(component='detector')
1121 bbox = dataRef.get(component='bbox')
1122 validPolygon = dataRef.get(component='validPolygon')
1123
1124 rec = cat[i]
1125 rec.setBBox(bbox)
1126 rec.setVisitInfo(visitInfo)
1127 rec.setWcs(wcs)
1128 rec.setPhotoCalib(photoCalib)
1129 rec.setValidPolygon(validPolygon)
1130
1131 rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1132 rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1133 rec.setId(detector.getId())
1134 summaryStats.update_record(rec)
1135
1136 metadata = dafBase.PropertyList()
1137 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1138 # We are looping over existing datarefs, so the following is true
1139 metadata.add("COMMENT", "Only detectors with data have entries.")
1140 cat.setMetadata(metadata)
1141
1142 cat.sort()
1143 return cat
1144
1145
1146class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1147 defaultTemplates={"catalogType": ""},
1148 dimensions=("instrument", "visit")):
1149 inputCatalogs = connectionTypes.Input(
1150 doc="Input per-detector Source Tables",
1151 name="{catalogType}sourceTable",
1152 storageClass="DataFrame",
1153 dimensions=("instrument", "visit", "detector"),
1154 multiple=True
1155 )
1156 outputCatalog = connectionTypes.Output(
1157 doc="Per-visit concatenation of Source Table",
1158 name="{catalogType}sourceTable_visit",
1159 storageClass="DataFrame",
1160 dimensions=("instrument", "visit")
1161 )
1162
1163
1164class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1165 pipelineConnections=ConsolidateSourceTableConnections):
1166 pass
1167
1168
1169class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1170 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1171 """
1172 _DefaultName = 'consolidateSourceTable'
1173 ConfigClass = ConsolidateSourceTableConfig
1174
1175 inputDataset = 'sourceTable'
1176 outputDataset = 'sourceTable_visit'
1177
1178 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1179 from .makeWarp import reorderRefs
1180
1181 detectorOrder = [ref.dataId['detector'] for ref in inputRefs.inputCatalogs]
1182 detectorOrder.sort()
1183 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey='detector')
1184 inputs = butlerQC.get(inputRefs)
1185 self.log.info("Concatenating %s per-detector Source Tables",
1186 len(inputs['inputCatalogs']))
1187 df = pd.concat(inputs['inputCatalogs'])
1188 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1189
1190
1191class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1192 dimensions=("instrument",),
1193 defaultTemplates={"calexpType": ""}):
1194 visitSummaryRefs = connectionTypes.Input(
1195 doc="Data references for per-visit consolidated exposure metadata",
1196 name="finalVisitSummary",
1197 storageClass="ExposureCatalog",
1198 dimensions=("instrument", "visit"),
1199 multiple=True,
1200 deferLoad=True,
1201 )
1202 outputCatalog = connectionTypes.Output(
1203 doc="CCD and Visit metadata table",
1204 name="ccdVisitTable",
1205 storageClass="DataFrame",
1206 dimensions=("instrument",)
1207 )
1208
1209
1210class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1211 pipelineConnections=MakeCcdVisitTableConnections):
1212 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1213
1214
1215class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1216 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1217 """
1218 _DefaultName = 'makeCcdVisitTable'
1219 ConfigClass = MakeCcdVisitTableConfig
1220
1221 def run(self, visitSummaryRefs):
1222 """Make a table of ccd information from the visit summary catalogs.
1223
1224 Parameters
1225 ----------
1226 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1227 List of DeferredDatasetHandles pointing to exposure catalogs with
1228 per-detector summary information.
1229
1230 Returns
1231 -------
1232 result : `~lsst.pipe.base.Struct`
1233 Results struct with attribute:
1234
1235 ``outputCatalog``
1236 Catalog of ccd and visit information.
1237 """
1238 ccdEntries = []
1239 for visitSummaryRef in visitSummaryRefs:
1240 visitSummary = visitSummaryRef.get()
1241 visitInfo = visitSummary[0].getVisitInfo()
1242
1243 ccdEntry = {}
1244 summaryTable = visitSummary.asAstropy()
1245 selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'dec', 'zenithDistance',
1246 'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise',
1247 'astromOffsetMean', 'astromOffsetStd', 'nPsfStar',
1248 'psfStarDeltaE1Median', 'psfStarDeltaE2Median',
1249 'psfStarDeltaE1Scatter', 'psfStarDeltaE2Scatter',
1250 'psfStarDeltaSizeMedian', 'psfStarDeltaSizeScatter',
1251 'psfStarScaledDeltaSizeScatter',
1252 'psfTraceRadiusDelta', 'maxDistToNearestPsf',
1253 'effTime', 'effTimePsfSigmaScale',
1254 'effTimeSkyBgScale', 'effTimeZeroPointScale']
1255 ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
1256 # 'visit' is the human readable visit number.
1257 # 'visitId' is the key to the visitId table. They are the same.
1258 # Technically you should join to get the visit from the visit
1259 # table.
1260 ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1261
1262 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1263 # compatibility. To be removed after September 2023.
1264 ccdEntry["decl"] = ccdEntry.loc[:, "dec"]
1265
1266 ccdEntry['ccdVisitId'] = [
1267 self.config.idGenerator.apply(
1268 visitSummaryRef.dataId,
1269 detector=detector_id,
1270 is_exposure=False,
1271 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1272 # because it's usually the ID for a whole catalog
1273 # with a {visit, detector}, and that's the main
1274 # use case for IdGenerator. This usage for a
1275 # summary table is rare.
1276 for detector_id in summaryTable['id']
1277 ]
1278 ccdEntry['detector'] = summaryTable['id']
1279 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() if vR.getWcs()
1280 else np.nan for vR in visitSummary])
1281 ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1282
1283 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1284 ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1285 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1286 expTime = visitInfo.getExposureTime()
1287 ccdEntry['expTime'] = expTime
1288 ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1289 expTime_days = expTime / (60*60*24)
1290 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1291 ccdEntry['darkTime'] = visitInfo.getDarkTime()
1292 ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x']
1293 ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y']
1294 ccdEntry['llcra'] = summaryTable['raCorners'][:, 0]
1295 ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0]
1296 ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1]
1297 ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1]
1298 ccdEntry['urcra'] = summaryTable['raCorners'][:, 2]
1299 ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
1300 ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
1301 ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
1302 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1303 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1304 # values are actually wanted.
1305 ccdEntries.append(ccdEntry)
1306
1307 outputCatalog = pd.concat(ccdEntries)
1308 outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True)
1309 return pipeBase.Struct(outputCatalog=outputCatalog)
1310
1311
1312class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1313 dimensions=("instrument",),
1314 defaultTemplates={"calexpType": ""}):
1315 visitSummaries = connectionTypes.Input(
1316 doc="Per-visit consolidated exposure metadata",
1317 name="finalVisitSummary",
1318 storageClass="ExposureCatalog",
1319 dimensions=("instrument", "visit",),
1320 multiple=True,
1321 deferLoad=True,
1322 )
1323 outputCatalog = connectionTypes.Output(
1324 doc="Visit metadata table",
1325 name="visitTable",
1326 storageClass="DataFrame",
1327 dimensions=("instrument",)
1328 )
1329
1330
1331class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1332 pipelineConnections=MakeVisitTableConnections):
1333 pass
1334
1335
1336class MakeVisitTableTask(pipeBase.PipelineTask):
1337 """Produce a `visitTable` from the visit summary exposure catalogs.
1338 """
1339 _DefaultName = 'makeVisitTable'
1340 ConfigClass = MakeVisitTableConfig
1341
1342 def run(self, visitSummaries):
1343 """Make a table of visit information from the visit summary catalogs.
1344
1345 Parameters
1346 ----------
1347 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1348 List of exposure catalogs with per-detector summary information.
1349 Returns
1350 -------
1351 result : `~lsst.pipe.base.Struct`
1352 Results struct with attribute:
1353
1354 ``outputCatalog``
1355 Catalog of visit information.
1356 """
1357 visitEntries = []
1358 for visitSummary in visitSummaries:
1359 visitSummary = visitSummary.get()
1360 visitRow = visitSummary[0]
1361 visitInfo = visitRow.getVisitInfo()
1362
1363 visitEntry = {}
1364 visitEntry["visitId"] = visitRow['visit']
1365 visitEntry["visit"] = visitRow['visit']
1366 visitEntry["physical_filter"] = visitRow['physical_filter']
1367 visitEntry["band"] = visitRow['band']
1368 raDec = visitInfo.getBoresightRaDec()
1369 visitEntry["ra"] = raDec.getRa().asDegrees()
1370 visitEntry["dec"] = raDec.getDec().asDegrees()
1371
1372 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1373 # compatibility. To be removed after September 2023.
1374 visitEntry["decl"] = visitEntry["dec"]
1375
1376 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1377 azAlt = visitInfo.getBoresightAzAlt()
1378 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1379 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1380 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1381 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1382 expTime = visitInfo.getExposureTime()
1383 visitEntry["expTime"] = expTime
1384 visitEntry["expMidpt"] = visitInfo.getDate().toPython()
1385 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1386 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1387 expTime_days = expTime / (60*60*24)
1388 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1389 visitEntries.append(visitEntry)
1390
1391 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1392 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1393 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1394
1395 outputCatalog = pd.DataFrame(data=visitEntries)
1396 outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
1397 return pipeBase.Struct(outputCatalog=outputCatalog)
1398
1399
1400class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1401 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1402
1403 inputCatalog = connectionTypes.Input(
1404 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1405 "By default, it is the output of ForcedPhotCcdTask on calexps",
1406 name="forced_src",
1407 storageClass="SourceCatalog",
1408 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1409 )
1410 inputCatalogDiff = connectionTypes.Input(
1411 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1412 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1413 name="forced_diff",
1414 storageClass="SourceCatalog",
1415 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1416 )
1417 outputCatalog = connectionTypes.Output(
1418 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1419 name="mergedForcedSource",
1420 storageClass="DataFrame",
1421 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1422 )
1423
1424
1425class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1426 pipelineConnections=WriteForcedSourceTableConnections):
1428 doc="Column on which to join the two input tables on and make the primary key of the output",
1429 dtype=str,
1430 default="objectId",
1431 )
1432 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1433
1434
1435class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1436 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1437
1438 Because the predecessor ForcedPhotCcdTask operates per-detector,
1439 per-tract, (i.e., it has tract in its dimensions), detectors
1440 on the tract boundary may have multiple forced source catalogs.
1441
1442 The successor task TransformForcedSourceTable runs per-patch
1443 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1444 available multiple epochs.
1445 """
1446 _DefaultName = "writeForcedSourceTable"
1447 ConfigClass = WriteForcedSourceTableConfig
1448
1449 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1450 inputs = butlerQC.get(inputRefs)
1451 # Add ccdVisitId to allow joining with CcdVisitTable
1452 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
1453 inputs['ccdVisitId'] = idGenerator.catalog_id
1454 inputs['band'] = butlerQC.quantum.dataId['band']
1455 outputs = self.run(**inputs)
1456 butlerQC.put(outputs, outputRefs)
1457
1458 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1459 dfs = []
1460 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
1461 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1462 df = df.reindex(sorted(df.columns), axis=1)
1463 df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
1464 df['band'] = band if band else pd.NA
1465 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1466 names=('dataset', 'column'))
1467
1468 dfs.append(df)
1469
1470 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1471 return pipeBase.Struct(outputCatalog=outputCatalog)
1472
1473
1474class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1475 dimensions=("instrument", "skymap", "patch", "tract")):
1476
1477 inputCatalogs = connectionTypes.Input(
1478 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1479 name="mergedForcedSource",
1480 storageClass="DataFrame",
1481 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1482 multiple=True,
1483 deferLoad=True
1484 )
1485 referenceCatalog = connectionTypes.Input(
1486 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1487 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1488 "are expected.",
1489 name="objectTable",
1490 storageClass="DataFrame",
1491 dimensions=("tract", "patch", "skymap"),
1492 deferLoad=True
1493 )
1494 outputCatalog = connectionTypes.Output(
1495 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1496 "specified set of functors",
1497 name="forcedSourceTable",
1498 storageClass="DataFrame",
1499 dimensions=("tract", "patch", "skymap")
1500 )
1501
1502
1503class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1504 pipelineConnections=TransformForcedSourceTableConnections):
1505 referenceColumns = pexConfig.ListField(
1506 dtype=str,
1507 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1508 optional=True,
1509 doc="Columns to pull from reference catalog",
1510 )
1511 keyRef = lsst.pex.config.Field(
1512 doc="Column on which to join the two input tables on and make the primary key of the output",
1513 dtype=str,
1514 default="objectId",
1515 )
1517 doc="Rename the output DataFrame index to this name",
1518 dtype=str,
1519 default="forcedSourceId",
1520 )
1521
1522 def setDefaults(self):
1523 super().setDefaults()
1524 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'ForcedSource.yaml')
1525 self.columnsFromDataId = ['tract', 'patch']
1526
1527
1528class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1529 """Transform/standardize a ForcedSource catalog
1530
1531 Transforms each wide, per-detector forcedSource DataFrame per the
1532 specification file (per-camera defaults found in ForcedSource.yaml).
1533 All epochs that overlap the patch are aggregated into one per-patch
1534 narrow-DataFrame file.
1535
1536 No de-duplication of rows is performed. Duplicate resolutions flags are
1537 pulled in from the referenceCatalog: `detect_isPrimary`,
1538 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1539 for analysis or compare duplicates for QA.
1540
1541 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1542 per-visit rows can be retreived by joining with the CcdVisitTable on
1543 ccdVisitId.
1544 """
1545 _DefaultName = "transformForcedSourceTable"
1546 ConfigClass = TransformForcedSourceTableConfig
1547
1548 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1549 inputs = butlerQC.get(inputRefs)
1550 if self.funcs is None:
1551 raise ValueError("config.functorFile is None. "
1552 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1553 outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
1554 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1555
1556 butlerQC.put(outputs, outputRefs)
1557
1558 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1559 dfs = []
1560 ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
1561 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1562 for handle in inputCatalogs:
1563 result = self.transform(None, handle, funcs, dataId)
1564 # Filter for only rows that were detected on (overlap) the patch
1565 dfs.append(result.df.join(ref, how='inner'))
1566
1567 outputCatalog = pd.concat(dfs)
1568
1569 # Now that we are done joining on config.keyRef
1570 # Change index to config.key by
1571 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1572 # Add config.keyRef to the column list
1573 outputCatalog.reset_index(inplace=True)
1574 # Set the forcedSourceId to the index. This is specified in the
1575 # ForcedSource.yaml
1576 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1577 # Rename it to the config.key
1578 outputCatalog.index.rename(self.config.key, inplace=True)
1579
1580 self.log.info("Made a table of %d columns and %d rows",
1581 len(outputCatalog.columns), len(outputCatalog))
1582 return pipeBase.Struct(outputCatalog=outputCatalog)
1583
1584
1585class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1586 defaultTemplates={"catalogType": ""},
1587 dimensions=("instrument", "tract")):
1588 inputCatalogs = connectionTypes.Input(
1589 doc="Input per-patch DataFrame Tables to be concatenated",
1590 name="{catalogType}ForcedSourceTable",
1591 storageClass="DataFrame",
1592 dimensions=("tract", "patch", "skymap"),
1593 multiple=True,
1594 )
1595
1596 outputCatalog = connectionTypes.Output(
1597 doc="Output per-tract concatenation of DataFrame Tables",
1598 name="{catalogType}ForcedSourceTable_tract",
1599 storageClass="DataFrame",
1600 dimensions=("tract", "skymap"),
1601 )
1602
1603
1604class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1605 pipelineConnections=ConsolidateTractConnections):
1606 pass
1607
1608
1609class ConsolidateTractTask(pipeBase.PipelineTask):
1610 """Concatenate any per-patch, dataframe list into a single
1611 per-tract DataFrame.
1612 """
1613 _DefaultName = 'ConsolidateTract'
1614 ConfigClass = ConsolidateTractConfig
1615
1616 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1617 inputs = butlerQC.get(inputRefs)
1618 # Not checking at least one inputCatalog exists because that'd be an
1619 # empty QG.
1620 self.log.info("Concatenating %s per-patch %s Tables",
1621 len(inputs['inputCatalogs']),
1622 inputRefs.inputCatalogs[0].datasetType.name)
1623 df = pd.concat(inputs['inputCatalogs'])
1624 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > transform
Custom catalog class for ExposureRecord/Table.
Definition Exposure.h:311
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
compute(self, dropna=False, pool=None)
__init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None)
run(self, handle, funcs=None, dataId=None, band=None)
runQuantum(self, butlerQC, inputRefs, outputRefs)
transform(self, band, handles, funcs, dataId)
getAnalysis(self, handles, funcs=None, band=None)
daf::base::PropertySet * set
Definition fits.cc:931
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)