LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38import functools
39import pandas as pd
40import logging
41import numpy as np
42import numbers
43import os
44
45import lsst.geom
46import lsst.pex.config as pexConfig
47import lsst.pipe.base as pipeBase
48import lsst.daf.base as dafBase
49from lsst.pipe.base import connectionTypes
50import lsst.afw.table as afwTable
51from lsst.afw.image import ExposureSummaryStats, ExposureF
52from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
53
54from .functors import CompositeFunctor, Column
55
56log = logging.getLogger(__name__)
57
58
59def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
60 """Flattens a dataframe with multilevel column index.
61 """
62 newDf = pd.DataFrame()
63 # band is the level 0 index
64 dfBands = df.columns.unique(level=0).values
65 for band in dfBands:
66 subdf = df[band]
67 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
68 newColumns = {c: columnFormat.format(band, c)
69 for c in subdf.columns if c not in noDupCols}
70 cols = list(newColumns.keys())
71 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
72
73 # Band must be present in the input and output or else column is all NaN:
74 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
75 # Get the unexploded columns from any present band's partition
76 noDupDf = df[presentBands[0]][noDupCols]
77 newDf = pd.concat([noDupDf, newDf], axis=1)
78 return newDf
79
80
81class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
82 defaultTemplates={"coaddName": "deep"},
83 dimensions=("tract", "patch", "skymap")):
84 inputCatalogMeas = connectionTypes.Input(
85 doc="Catalog of source measurements on the deepCoadd.",
86 dimensions=("tract", "patch", "band", "skymap"),
87 storageClass="SourceCatalog",
88 name="{coaddName}Coadd_meas",
89 multiple=True
90 )
91 inputCatalogForcedSrc = connectionTypes.Input(
92 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
93 dimensions=("tract", "patch", "band", "skymap"),
94 storageClass="SourceCatalog",
95 name="{coaddName}Coadd_forced_src",
96 multiple=True
97 )
98 inputCatalogRef = connectionTypes.Input(
99 doc="Catalog marking the primary detection (which band provides a good shape and position)"
100 "for each detection in deepCoadd_mergeDet.",
101 dimensions=("tract", "patch", "skymap"),
102 storageClass="SourceCatalog",
103 name="{coaddName}Coadd_ref"
104 )
105 outputCatalog = connectionTypes.Output(
106 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
107 "stored as a DataFrame with a multi-level column index per-patch.",
108 dimensions=("tract", "patch", "skymap"),
109 storageClass="DataFrame",
110 name="{coaddName}Coadd_obj"
111 )
112
113
114class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
115 pipelineConnections=WriteObjectTableConnections):
116 coaddName = pexConfig.Field(
117 dtype=str,
118 default="deep",
119 doc="Name of coadd"
120 )
121
122
123class WriteObjectTableTask(pipeBase.PipelineTask):
124 """Write filter-merged source tables as a DataFrame in parquet format.
125 """
126 _DefaultName = "writeObjectTable"
127 ConfigClass = WriteObjectTableConfig
128
129 # Names of table datasets to be merged
130 inputDatasets = ("forced_src", "meas", "ref")
131
132 # Tag of output dataset written by `MergeSourcesTask.write`
133 outputDataset = "obj"
134
135 def runQuantum(self, butlerQC, inputRefs, outputRefs):
136 inputs = butlerQC.get(inputRefs)
137
138 measDict = {ref.dataId["band"]: {"meas": cat} for ref, cat in
139 zip(inputRefs.inputCatalogMeas, inputs["inputCatalogMeas"])}
140 forcedSourceDict = {ref.dataId["band"]: {"forced_src": cat} for ref, cat in
141 zip(inputRefs.inputCatalogForcedSrc, inputs["inputCatalogForcedSrc"])}
142
143 catalogs = {}
144 for band in measDict.keys():
145 catalogs[band] = {"meas": measDict[band]["meas"],
146 "forced_src": forcedSourceDict[band]["forced_src"],
147 "ref": inputs["inputCatalogRef"]}
148 dataId = butlerQC.quantum.dataId
149 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
150 outputs = pipeBase.Struct(outputCatalog=df)
151 butlerQC.put(outputs, outputRefs)
152
153 def run(self, catalogs, tract, patch):
154 """Merge multiple catalogs.
155
156 Parameters
157 ----------
158 catalogs : `dict`
159 Mapping from filter names to dict of catalogs.
160 tract : int
161 tractId to use for the tractId column.
162 patch : str
163 patchId to use for the patchId column.
164
165 Returns
166 -------
167 catalog : `pandas.DataFrame`
168 Merged dataframe.
169 """
170 dfs = []
171 for filt, tableDict in catalogs.items():
172 for dataset, table in tableDict.items():
173 # Convert afwTable to pandas DataFrame
174 df = table.asAstropy().to_pandas().set_index("id", drop=True)
175
176 # Sort columns by name, to ensure matching schema among patches
177 df = df.reindex(sorted(df.columns), axis=1)
178 df = df.assign(tractId=tract, patchId=patch)
179
180 # Make columns a 3-level MultiIndex
181 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
182 names=("dataset", "band", "column"))
183 dfs.append(df)
184
185 # We do this dance and not `pd.concat(dfs)` because the pandas
186 # concatenation uses infinite memory.
187 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
188 return catalog
189
190
191class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
192 defaultTemplates={"catalogType": ""},
193 dimensions=("instrument", "visit", "detector")):
194
195 catalog = connectionTypes.Input(
196 doc="Input full-depth catalog of sources produced by CalibrateTask",
197 name="{catalogType}src",
198 storageClass="SourceCatalog",
199 dimensions=("instrument", "visit", "detector")
200 )
201 outputCatalog = connectionTypes.Output(
202 doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is "
203 "replaced with an index; all other columns are unchanged.",
204 name="{catalogType}source",
205 storageClass="DataFrame",
206 dimensions=("instrument", "visit", "detector")
207 )
208
209
210class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
211 pipelineConnections=WriteSourceTableConnections):
212 pass
213
214
215class WriteSourceTableTask(pipeBase.PipelineTask):
216 """Write source table to DataFrame Parquet format.
217 """
218 _DefaultName = "writeSourceTable"
219 ConfigClass = WriteSourceTableConfig
220
221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
222 inputs = butlerQC.get(inputRefs)
223 inputs["visit"] = butlerQC.quantum.dataId["visit"]
224 inputs["detector"] = butlerQC.quantum.dataId["detector"]
225 result = self.run(**inputs)
226 outputs = pipeBase.Struct(outputCatalog=result.table)
227 butlerQC.put(outputs, outputRefs)
228
229 def run(self, catalog, visit, detector, **kwargs):
230 """Convert `src` catalog to DataFrame
231
232 Parameters
233 ----------
234 catalog: `afwTable.SourceCatalog`
235 catalog to be converted
236 visit, detector: `int`
237 Visit and detector ids to be added as columns.
238 **kwargs
239 Additional keyword arguments are ignored as a convenience for
240 subclasses that pass the same arguments to several different
241 methods.
242
243 Returns
244 -------
245 result : `~lsst.pipe.base.Struct`
246 ``table``
247 `DataFrame` version of the input catalog
248 """
249 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
250 df = catalog.asAstropy().to_pandas().set_index("id", drop=True)
251 df["visit"] = visit
252 # int16 instead of uint8 because databases don't like unsigned bytes.
253 df["detector"] = np.int16(detector)
254
255 return pipeBase.Struct(table=df)
256
257
258class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
259 defaultTemplates={"catalogType": ""},
260 dimensions=("instrument", "visit", "detector", "skymap")):
261 visitSummary = connectionTypes.Input(
262 doc="Input visit-summary catalog with updated calibration objects.",
263 name="finalVisitSummary",
264 storageClass="ExposureCatalog",
265 dimensions=("instrument", "visit",),
266 )
267
268
269class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
270 pipelineConnections=WriteRecalibratedSourceTableConnections):
271
272 doReevaluatePhotoCalib = pexConfig.Field(
273 dtype=bool,
274 default=True,
275 doc=("Add or replace local photoCalib columns"),
276 )
277 doReevaluateSkyWcs = pexConfig.Field(
278 dtype=bool,
279 default=True,
280 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
281 )
282
283
284class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
285 """Write source table to DataFrame Parquet format.
286 """
287 _DefaultName = "writeRecalibratedSourceTable"
288 ConfigClass = WriteRecalibratedSourceTableConfig
289
290 def runQuantum(self, butlerQC, inputRefs, outputRefs):
291 inputs = butlerQC.get(inputRefs)
292
293 inputs["visit"] = butlerQC.quantum.dataId["visit"]
294 inputs["detector"] = butlerQC.quantum.dataId["detector"]
295
296 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
297 exposure = ExposureF()
298 inputs["exposure"] = self.prepareCalibratedExposure(
299 exposure=exposure,
300 visitSummary=inputs["visitSummary"],
301 detectorId=butlerQC.quantum.dataId["detector"]
302 )
303 inputs["catalog"] = self.addCalibColumns(**inputs)
304
305 result = self.run(**inputs)
306 outputs = pipeBase.Struct(outputCatalog=result.table)
307 butlerQC.put(outputs, outputRefs)
308
309 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
310 """Prepare a calibrated exposure and apply external calibrations
311 if so configured.
312
313 Parameters
314 ----------
315 exposure : `lsst.afw.image.exposure.Exposure`
316 Input exposure to adjust calibrations. May be an empty Exposure.
317 detectorId : `int`
318 Detector ID associated with the exposure.
319 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
320 Exposure catalog with all calibration objects. WCS and PhotoCalib
321 are always applied if ``visitSummary`` is provided and those
322 components are not `None`.
323
324 Returns
325 -------
326 exposure : `lsst.afw.image.exposure.Exposure`
327 Exposure with adjusted calibrations.
328 """
329 if visitSummary is not None:
330 row = visitSummary.find(detectorId)
331 if row is None:
332 raise RuntimeError(f"Visit summary for detector {detectorId} is unexpectedly missing.")
333 if (photoCalib := row.getPhotoCalib()) is None:
334 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
335 "skipping reevaluation of photoCalib.", detectorId)
336 exposure.setPhotoCalib(None)
337 else:
338 exposure.setPhotoCalib(photoCalib)
339 if (skyWcs := row.getWcs()) is None:
340 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
341 "skipping reevaluation of skyWcs.", detectorId)
342 exposure.setWcs(None)
343 else:
344 exposure.setWcs(skyWcs)
345
346 return exposure
347
348 def addCalibColumns(self, catalog, exposure, **kwargs):
349 """Add replace columns with calibs evaluated at each centroid
350
351 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
352 a source catalog, by rerunning the plugins.
353
354 Parameters
355 ----------
356 catalog : `lsst.afw.table.SourceCatalog`
357 catalog to which calib columns will be added
358 exposure : `lsst.afw.image.exposure.Exposure`
359 Exposure with attached PhotoCalibs and SkyWcs attributes to be
360 reevaluated at local centroids. Pixels are not required.
361 **kwargs
362 Additional keyword arguments are ignored to facilitate passing the
363 same arguments to several methods.
364
365 Returns
366 -------
367 newCat: `lsst.afw.table.SourceCatalog`
368 Source Catalog with requested local calib columns
369 """
370 measureConfig = SingleFrameMeasurementTask.ConfigClass()
371 measureConfig.doReplaceWithNoise = False
372
373 # Clear all slots, because we aren't running the relevant plugins.
374 for slot in measureConfig.slots:
375 setattr(measureConfig.slots, slot, None)
376
377 measureConfig.plugins.names = []
378 if self.config.doReevaluateSkyWcs:
379 measureConfig.plugins.names.add("base_LocalWcs")
380 self.log.info("Re-evaluating base_LocalWcs plugin")
381 if self.config.doReevaluatePhotoCalib:
382 measureConfig.plugins.names.add("base_LocalPhotoCalib")
383 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
384 pluginsNotToCopy = tuple(measureConfig.plugins.names)
385
386 # Create a new schema and catalog
387 # Copy all columns from original except for the ones to reevaluate
388 aliasMap = catalog.schema.getAliasMap()
389 mapper = afwTable.SchemaMapper(catalog.schema)
390 for item in catalog.schema:
391 if not item.field.getName().startswith(pluginsNotToCopy):
392 mapper.addMapping(item.key)
393
394 schema = mapper.getOutputSchema()
395 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
396 schema.setAliasMap(aliasMap)
397 newCat = afwTable.SourceCatalog(schema)
398 newCat.extend(catalog, mapper=mapper)
399
400 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
401 # update here. LocalPhotoCalibs are applied during transform tasks.
402 # Update coord_ra/coord_dec, which are expected to be positions on the
403 # sky and are used as such in sdm tables without transform
404 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
405 afwTable.updateSourceCoords(exposure.wcs, newCat)
406 wcsPlugin = measurement.plugins["base_LocalWcs"]
407 else:
408 wcsPlugin = None
409
410 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
411 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
412 else:
413 pcPlugin = None
414
415 for row in newCat:
416 if wcsPlugin is not None:
417 wcsPlugin.measure(row, exposure)
418 if pcPlugin is not None:
419 pcPlugin.measure(row, exposure)
420
421 return newCat
422
423
424class PostprocessAnalysis(object):
425 """Calculate columns from DataFrames or handles storing DataFrames.
426
427 This object manages and organizes an arbitrary set of computations
428 on a catalog. The catalog is defined by a
429 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
430 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
431 computations are defined by a collection of
432 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
433 ``CompositeFunctor``).
434
435 After the object is initialized, accessing the ``.df`` attribute (which
436 holds the `pandas.DataFrame` containing the results of the calculations)
437 triggers computation of said dataframe.
438
439 One of the conveniences of using this object is the ability to define a
440 desired common filter for all functors. This enables the same functor
441 collection to be passed to several different `PostprocessAnalysis` objects
442 without having to change the original functor collection, since the ``filt``
443 keyword argument of this object triggers an overwrite of the ``filt``
444 property for all functors in the collection.
445
446 This object also allows a list of refFlags to be passed, and defines a set
447 of default refFlags that are always included even if not requested.
448
449 If a list of DataFrames or Handles is passed, rather than a single one,
450 then the calculations will be mapped over all the input catalogs. In
451 principle, it should be straightforward to parallelize this activity, but
452 initial tests have failed (see TODO in code comments).
453
454 Parameters
455 ----------
456 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
457 `~lsst.pipe.base.InMemoryDatasetHandle` or
458 list of these.
459 Source catalog(s) for computation.
460 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
461 Computations to do (functors that act on ``handles``).
462 If a dict, the output
463 DataFrame will have columns keyed accordingly.
464 If a list, the column keys will come from the
465 ``.shortname`` attribute of each functor.
466
467 filt : `str`, optional
468 Filter in which to calculate. If provided,
469 this will overwrite any existing ``.filt`` attribute
470 of the provided functors.
471
472 flags : `list`, optional
473 List of flags (per-band) to include in output table.
474 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
475
476 refFlags : `list`, optional
477 List of refFlags (only reference band) to include in output table.
478
479 forcedFlags : `list`, optional
480 List of flags (per-band) to include in output table.
481 Taken from the ``forced_src`` dataset if applied to a
482 multilevel Object Table. Intended for flags from measurement plugins
483 only run during multi-band forced-photometry.
484 """
485 _defaultRefFlags = []
486 _defaultFuncs = ()
487
488 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
489 self.handles = handles
490 self.functors = functors
491
492 self.filt = filt
493 self.flags = list(flags) if flags is not None else []
494 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
495 self.refFlags = list(self._defaultRefFlags)
496 if refFlags is not None:
497 self.refFlags += list(refFlags)
498
499 self._df = None
500
501 @property
502 def defaultFuncs(self):
503 funcs = dict(self._defaultFuncs)
504 return funcs
505
506 @property
507 def func(self):
508 additionalFuncs = self.defaultFuncs
509 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
510 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
511 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
512
513 if isinstance(self.functors, CompositeFunctor):
514 func = self.functors
515 else:
516 func = CompositeFunctor(self.functors)
517
518 func.funcDict.update(additionalFuncs)
519 func.filt = self.filt
520
521 return func
522
523 @property
524 def noDupCols(self):
525 return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == "ref"]
526
527 @property
528 def df(self):
529 if self._df is None:
530 self.compute()
531 return self._df
532
533 def compute(self, dropna=False, pool=None):
534 # map over multiple handles
535 if type(self.handles) in (list, tuple):
536 if pool is None:
537 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
538 else:
539 # TODO: Figure out why this doesn't work (pyarrow pickling
540 # issues?)
541 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
542 self._df = pd.concat(dflist)
543 else:
544 self._df = self.func(self.handles, dropna=dropna)
545
546 return self._df
547
548
549class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
550 dimensions=()):
551 """Expected Connections for subclasses of TransformCatalogBaseTask.
552
553 Must be subclassed.
554 """
555 inputCatalog = connectionTypes.Input(
556 name="",
557 storageClass="DataFrame",
558 )
559 outputCatalog = connectionTypes.Output(
560 name="",
561 storageClass="DataFrame",
562 )
563
564
565class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
566 pipelineConnections=TransformCatalogBaseConnections):
567 functorFile = pexConfig.Field(
568 dtype=str,
569 doc="Path to YAML file specifying Science Data Model functors to use "
570 "when copying columns and computing calibrated values.",
571 default=None,
572 optional=True
573 )
574 primaryKey = pexConfig.Field(
575 dtype=str,
576 doc="Name of column to be set as the DataFrame index. If None, the index"
577 "will be named `id`",
578 default=None,
579 optional=True
580 )
581 columnsFromDataId = pexConfig.ListField(
582 dtype=str,
583 default=None,
584 optional=True,
585 doc="Columns to extract from the dataId",
586 )
587
588
589class TransformCatalogBaseTask(pipeBase.PipelineTask):
590 """Base class for transforming/standardizing a catalog by applying functors
591 that convert units and apply calibrations.
592
593 The purpose of this task is to perform a set of computations on an input
594 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
595 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
596 a new dataset (which needs to be declared in an ``outputDataset``
597 attribute).
598
599 The calculations to be performed are defined in a YAML file that specifies
600 a set of functors to be computed, provided as a ``--functorFile`` config
601 parameter. An example of such a YAML file is the following:
602
603 funcs:
604 sourceId:
605 functor: Index
606 x:
607 functor: Column
608 args: slot_Centroid_x
609 y:
610 functor: Column
611 args: slot_Centroid_y
612 psfFlux:
613 functor: LocalNanojansky
614 args:
615 - slot_PsfFlux_instFlux
616 - slot_PsfFlux_instFluxErr
617 - base_LocalPhotoCalib
618 - base_LocalPhotoCalibErr
619 psfFluxErr:
620 functor: LocalNanojanskyErr
621 args:
622 - slot_PsfFlux_instFlux
623 - slot_PsfFlux_instFluxErr
624 - base_LocalPhotoCalib
625 - base_LocalPhotoCalibErr
626 flags:
627 - detect_isPrimary
628
629 The names for each entry under "func" will become the names of columns in
630 the output dataset. All the functors referenced are defined in
631 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
632 functor are in the `args` list, and any additional entries for each column
633 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
634 treated as keyword arguments to be passed to the functor initialization.
635
636 The "flags" entry is the default shortcut for `Column` functors.
637 All columns listed under "flags" will be copied to the output table
638 untransformed. They can be of any datatype.
639 In the special case of transforming a multi-level oject table with
640 band and dataset indices (deepCoadd_obj), these will be taked from the
641 ``meas`` dataset and exploded out per band.
642
643 There are two special shortcuts that only apply when transforming
644 multi-level Object (deepCoadd_obj) tables:
645 - The "refFlags" entry is shortcut for `Column` functor
646 taken from the ``ref`` dataset if transforming an ObjectTable.
647 - The "forcedFlags" entry is shortcut for `Column` functors.
648 taken from the ``forced_src`` dataset if transforming an ObjectTable.
649 These are expanded out per band.
650
651
652 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
653 to organize and excecute the calculations.
654 """
655 @property
656 def _DefaultName(self):
657 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
658
659 @property
660 def outputDataset(self):
661 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
662
663 @property
664 def inputDataset(self):
665 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
666
667 @property
668 def ConfigClass(self):
669 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
670
671 def __init__(self, *args, **kwargs):
672 super().__init__(*args, **kwargs)
673 if self.config.functorFile:
674 self.log.info("Loading tranform functor definitions from %s",
675 self.config.functorFile)
676 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
677 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
678 else:
679 self.funcs = None
680
681 def runQuantum(self, butlerQC, inputRefs, outputRefs):
682 inputs = butlerQC.get(inputRefs)
683 if self.funcs is None:
684 raise ValueError("config.functorFile is None. "
685 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
686 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
687 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
688 outputs = pipeBase.Struct(outputCatalog=result)
689 butlerQC.put(outputs, outputRefs)
690
691 def run(self, handle, funcs=None, dataId=None, band=None):
692 """Do postprocessing calculations
693
694 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
695 ``DataFrame`` object and dataId,
696 returns a dataframe with results of postprocessing calculations.
697
698 Parameters
699 ----------
700 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
701 `~lsst.pipe.base.InMemoryDatasetHandle` or
702 `~pandas.DataFrame`, or list of these.
703 DataFrames from which calculations are done.
704 funcs : `~lsst.pipe.tasks.functors.Functor`
705 Functors to apply to the table's columns
706 dataId : dict, optional
707 Used to add a `patchId` column to the output dataframe.
708 band : `str`, optional
709 Filter band that is being processed.
710
711 Returns
712 -------
713 df : `pandas.DataFrame`
714 """
715 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
716
717 df = self.transform(band, handle, funcs, dataId).df
718 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
719 return df
720
721 def getFunctors(self):
722 return self.funcs
723
724 def getAnalysis(self, handles, funcs=None, band=None):
725 if funcs is None:
726 funcs = self.funcs
727 analysis = PostprocessAnalysis(handles, funcs, filt=band)
728 return analysis
729
730 def transform(self, band, handles, funcs, dataId):
731 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
732 df = analysis.df
733 if dataId and self.config.columnsFromDataId:
734 for key in self.config.columnsFromDataId:
735 if key in dataId:
736 if key == "detector":
737 # int16 instead of uint8 because databases don't like unsigned bytes.
738 df[key] = np.int16(dataId[key])
739 else:
740 df[key] = dataId[key]
741 else:
742 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
743
744 if self.config.primaryKey:
745 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
746 df.reset_index(inplace=True, drop=True)
747 df.set_index(self.config.primaryKey, inplace=True)
748
749 return pipeBase.Struct(
750 df=df,
751 analysis=analysis
752 )
753
754
755class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
756 defaultTemplates={"coaddName": "deep"},
757 dimensions=("tract", "patch", "skymap")):
758 inputCatalog = connectionTypes.Input(
759 doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
760 "stored as a DataFrame with a multi-level column index per-patch.",
761 dimensions=("tract", "patch", "skymap"),
762 storageClass="DataFrame",
763 name="{coaddName}Coadd_obj",
764 deferLoad=True,
765 )
766 outputCatalog = connectionTypes.Output(
767 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
768 "data model.",
769 dimensions=("tract", "patch", "skymap"),
770 storageClass="DataFrame",
771 name="objectTable"
772 )
773
774
775class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
776 pipelineConnections=TransformObjectCatalogConnections):
777 coaddName = pexConfig.Field(
778 dtype=str,
779 default="deep",
780 doc="Name of coadd"
781 )
782 outputBands = pexConfig.ListField(
783 dtype=str,
784 default=None,
785 optional=True,
786 doc=("These bands and only these bands will appear in the output,"
787 " NaN-filled if the input does not include them."
788 " If None, then use all bands found in the input.")
789 )
790 camelCase = pexConfig.Field(
791 dtype=bool,
792 default=False,
793 doc=("Write per-band columns names with camelCase, else underscore "
794 "For example: gPsFlux instead of g_PsFlux.")
795 )
796 multilevelOutput = pexConfig.Field(
797 dtype=bool,
798 default=False,
799 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
800 "and name-munged (False).")
801 )
802 goodFlags = pexConfig.ListField(
803 dtype=str,
804 default=[],
805 doc=("List of 'good' flags that should be set False when populating empty tables. "
806 "All other flags are considered to be 'bad' flags and will be set to True.")
807 )
808 floatFillValue = pexConfig.Field(
809 dtype=float,
810 default=np.nan,
811 doc="Fill value for float fields when populating empty tables."
812 )
813 integerFillValue = pexConfig.Field(
814 dtype=int,
815 default=-1,
816 doc="Fill value for integer fields when populating empty tables."
817 )
818
819 def setDefaults(self):
820 super().setDefaults()
821 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
822 self.primaryKey = "objectId"
823 self.columnsFromDataId = ["tract", "patch"]
824 self.goodFlags = ["calib_astrometry_used",
825 "calib_photometry_reserved",
826 "calib_photometry_used",
827 "calib_psf_candidate",
828 "calib_psf_reserved",
829 "calib_psf_used"]
830
831
832class TransformObjectCatalogTask(TransformCatalogBaseTask):
833 """Produce a flattened Object Table to match the format specified in
834 sdm_schemas.
835
836 Do the same set of postprocessing calculations on all bands.
837
838 This is identical to `TransformCatalogBaseTask`, except for that it does
839 the specified functor calculations for all filters present in the
840 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
841 by the YAML file will be superceded.
842 """
843 _DefaultName = "transformObjectCatalog"
844 ConfigClass = TransformObjectCatalogConfig
845
846 def run(self, handle, funcs=None, dataId=None, band=None):
847 # NOTE: band kwarg is ignored here.
848 dfDict = {}
849 analysisDict = {}
850 templateDf = pd.DataFrame()
851
852 columns = handle.get(component="columns")
853 inputBands = columns.unique(level=1).values
854
855 outputBands = self.config.outputBands if self.config.outputBands else inputBands
856
857 # Perform transform for data of filters that exist in the handle dataframe.
858 for inputBand in inputBands:
859 if inputBand not in outputBands:
860 self.log.info("Ignoring %s band data in the input", inputBand)
861 continue
862 self.log.info("Transforming the catalog of band %s", inputBand)
863 result = self.transform(inputBand, handle, funcs, dataId)
864 dfDict[inputBand] = result.df
865 analysisDict[inputBand] = result.analysis
866 if templateDf.empty:
867 templateDf = result.df
868
869 # Put filler values in columns of other wanted bands
870 for filt in outputBands:
871 if filt not in dfDict:
872 self.log.info("Adding empty columns for band %s", filt)
873 dfTemp = templateDf.copy()
874 for col in dfTemp.columns:
875 testValue = dfTemp[col].values[0]
876 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
877 # Boolean flag type, check if it is a "good" flag
878 if col in self.config.goodFlags:
879 fillValue = False
880 else:
881 fillValue = True
882 elif isinstance(testValue, numbers.Integral):
883 # Checking numbers.Integral catches all flavors
884 # of python, numpy, pandas, etc. integers.
885 # We must ensure this is not an unsigned integer.
886 if isinstance(testValue, np.unsignedinteger):
887 raise ValueError("Parquet tables may not have unsigned integer columns.")
888 else:
889 fillValue = self.config.integerFillValue
890 else:
891 fillValue = self.config.floatFillValue
892 dfTemp[col].values[:] = fillValue
893 dfDict[filt] = dfTemp
894
895 # This makes a multilevel column index, with band as first level
896 df = pd.concat(dfDict, axis=1, names=["band", "column"])
897
898 if not self.config.multilevelOutput:
899 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
900 if self.config.primaryKey in noDupCols:
901 noDupCols.remove(self.config.primaryKey)
902 if dataId and self.config.columnsFromDataId:
903 noDupCols += self.config.columnsFromDataId
904 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
905 inputBands=inputBands)
906
907 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
908
909 return df
910
911
912class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
913 dimensions=("tract", "skymap")):
914 inputCatalogs = connectionTypes.Input(
915 doc="Per-Patch objectTables conforming to the standard data model.",
916 name="objectTable",
917 storageClass="DataFrame",
918 dimensions=("tract", "patch", "skymap"),
919 multiple=True,
920 )
921 outputCatalog = connectionTypes.Output(
922 doc="Pre-tract horizontal concatenation of the input objectTables",
923 name="objectTable_tract",
924 storageClass="DataFrame",
925 dimensions=("tract", "skymap"),
926 )
927
928
929class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
930 pipelineConnections=ConsolidateObjectTableConnections):
931 coaddName = pexConfig.Field(
932 dtype=str,
933 default="deep",
934 doc="Name of coadd"
935 )
936
937
938class ConsolidateObjectTableTask(pipeBase.PipelineTask):
939 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
940
941 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
942 """
943 _DefaultName = "consolidateObjectTable"
944 ConfigClass = ConsolidateObjectTableConfig
945
946 inputDataset = "objectTable"
947 outputDataset = "objectTable_tract"
948
949 def runQuantum(self, butlerQC, inputRefs, outputRefs):
950 inputs = butlerQC.get(inputRefs)
951 self.log.info("Concatenating %s per-patch Object Tables",
952 len(inputs["inputCatalogs"]))
953 df = pd.concat(inputs["inputCatalogs"])
954 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
955
956
957class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
958 defaultTemplates={"catalogType": ""},
959 dimensions=("instrument", "visit", "detector")):
960
961 inputCatalog = connectionTypes.Input(
962 doc="Wide input catalog of sources produced by WriteSourceTableTask",
963 name="{catalogType}source",
964 storageClass="DataFrame",
965 dimensions=("instrument", "visit", "detector"),
966 deferLoad=True
967 )
968 outputCatalog = connectionTypes.Output(
969 doc="Narrower, per-detector Source Table transformed and converted per a "
970 "specified set of functors",
971 name="{catalogType}sourceTable",
972 storageClass="DataFrame",
973 dimensions=("instrument", "visit", "detector")
974 )
975
976
977class TransformSourceTableConfig(TransformCatalogBaseConfig,
978 pipelineConnections=TransformSourceTableConnections):
979
980 def setDefaults(self):
981 super().setDefaults()
982 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
983 self.primaryKey = "sourceId"
984 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
985
986
987class TransformSourceTableTask(TransformCatalogBaseTask):
988 """Transform/standardize a source catalog
989 """
990 _DefaultName = "transformSourceTable"
991 ConfigClass = TransformSourceTableConfig
992
993
994class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
995 dimensions=("instrument", "visit",),
996 defaultTemplates={"calexpType": ""}):
997 calexp = connectionTypes.Input(
998 doc="Processed exposures used for metadata",
999 name="calexp",
1000 storageClass="ExposureF",
1001 dimensions=("instrument", "visit", "detector"),
1002 deferLoad=True,
1003 multiple=True,
1004 )
1005 visitSummary = connectionTypes.Output(
1006 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1007 "detector id for the id and are sorted for fast lookups of a "
1008 "detector."),
1009 name="visitSummary",
1010 storageClass="ExposureCatalog",
1011 dimensions=("instrument", "visit"),
1012 )
1013 visitSummarySchema = connectionTypes.InitOutput(
1014 doc="Schema of the visitSummary catalog",
1015 name="visitSummary_schema",
1016 storageClass="ExposureCatalog",
1017 )
1018
1019
1020class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1021 pipelineConnections=ConsolidateVisitSummaryConnections):
1022 """Config for ConsolidateVisitSummaryTask"""
1023 pass
1024
1025
1026class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1027 """Task to consolidate per-detector visit metadata.
1028
1029 This task aggregates the following metadata from all the detectors in a
1030 single visit into an exposure catalog:
1031 - The visitInfo.
1032 - The wcs.
1033 - The photoCalib.
1034 - The physical_filter and band (if available).
1035 - The psf size, shape, and effective area at the center of the detector.
1036 - The corners of the bounding box in right ascension/declination.
1037
1038 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1039 are not persisted here because of storage concerns, and because of their
1040 limited utility as summary statistics.
1041
1042 Tests for this task are performed in ci_hsc_gen3.
1043 """
1044 _DefaultName = "consolidateVisitSummary"
1045 ConfigClass = ConsolidateVisitSummaryConfig
1046
1047 def __init__(self, **kwargs):
1048 super().__init__(**kwargs)
1049 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1050 self.schema.addField("visit", type="L", doc="Visit number")
1051 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1052 self.schema.addField("band", type="String", size=32, doc="Name of band")
1053 ExposureSummaryStats.update_schema(self.schema)
1054 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1055
1056 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1057 dataRefs = butlerQC.get(inputRefs.calexp)
1058 visit = dataRefs[0].dataId["visit"]
1059
1060 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1061 len(dataRefs), visit)
1062
1063 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1064
1065 butlerQC.put(expCatalog, outputRefs.visitSummary)
1066
1067 def _combineExposureMetadata(self, visit, dataRefs):
1068 """Make a combined exposure catalog from a list of dataRefs.
1069 These dataRefs must point to exposures with wcs, summaryStats,
1070 and other visit metadata.
1071
1072 Parameters
1073 ----------
1074 visit : `int`
1075 Visit identification number.
1076 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1077 List of dataRefs in visit.
1078
1079 Returns
1080 -------
1081 visitSummary : `lsst.afw.table.ExposureCatalog`
1082 Exposure catalog with per-detector summary information.
1083 """
1084 cat = afwTable.ExposureCatalog(self.schema)
1085 cat.resize(len(dataRefs))
1086
1087 cat["visit"] = visit
1088
1089 for i, dataRef in enumerate(dataRefs):
1090 visitInfo = dataRef.get(component="visitInfo")
1091 filterLabel = dataRef.get(component="filter")
1092 summaryStats = dataRef.get(component="summaryStats")
1093 detector = dataRef.get(component="detector")
1094 wcs = dataRef.get(component="wcs")
1095 photoCalib = dataRef.get(component="photoCalib")
1096 detector = dataRef.get(component="detector")
1097 bbox = dataRef.get(component="bbox")
1098 validPolygon = dataRef.get(component="validPolygon")
1099
1100 rec = cat[i]
1101 rec.setBBox(bbox)
1102 rec.setVisitInfo(visitInfo)
1103 rec.setWcs(wcs)
1104 rec.setPhotoCalib(photoCalib)
1105 rec.setValidPolygon(validPolygon)
1106
1107 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1108 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1109 rec.setId(detector.getId())
1110 summaryStats.update_record(rec)
1111
1112 metadata = dafBase.PropertyList()
1113 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1114 # We are looping over existing datarefs, so the following is true
1115 metadata.add("COMMENT", "Only detectors with data have entries.")
1116 cat.setMetadata(metadata)
1117
1118 cat.sort()
1119 return cat
1120
1121
1122class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1123 defaultTemplates={"catalogType": ""},
1124 dimensions=("instrument", "visit")):
1125 inputCatalogs = connectionTypes.Input(
1126 doc="Input per-detector Source Tables",
1127 name="{catalogType}sourceTable",
1128 storageClass="DataFrame",
1129 dimensions=("instrument", "visit", "detector"),
1130 multiple=True
1131 )
1132 outputCatalog = connectionTypes.Output(
1133 doc="Per-visit concatenation of Source Table",
1134 name="{catalogType}sourceTable_visit",
1135 storageClass="DataFrame",
1136 dimensions=("instrument", "visit")
1137 )
1138
1139
1140class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1141 pipelineConnections=ConsolidateSourceTableConnections):
1142 pass
1143
1144
1145class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1146 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1147 """
1148 _DefaultName = "consolidateSourceTable"
1149 ConfigClass = ConsolidateSourceTableConfig
1150
1151 inputDataset = "sourceTable"
1152 outputDataset = "sourceTable_visit"
1153
1154 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1155 from .makeWarp import reorderRefs
1156
1157 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1158 detectorOrder.sort()
1159 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1160 inputs = butlerQC.get(inputRefs)
1161 self.log.info("Concatenating %s per-detector Source Tables",
1162 len(inputs["inputCatalogs"]))
1163 df = pd.concat(inputs["inputCatalogs"])
1164 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1165
1166
1167class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1168 dimensions=("instrument",),
1169 defaultTemplates={"calexpType": ""}):
1170 visitSummaryRefs = connectionTypes.Input(
1171 doc="Data references for per-visit consolidated exposure metadata",
1172 name="finalVisitSummary",
1173 storageClass="ExposureCatalog",
1174 dimensions=("instrument", "visit"),
1175 multiple=True,
1176 deferLoad=True,
1177 )
1178 outputCatalog = connectionTypes.Output(
1179 doc="CCD and Visit metadata table",
1180 name="ccdVisitTable",
1181 storageClass="DataFrame",
1182 dimensions=("instrument",)
1183 )
1184
1185
1186class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1187 pipelineConnections=MakeCcdVisitTableConnections):
1188 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1189
1190
1191class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1192 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1193 """
1194 _DefaultName = "makeCcdVisitTable"
1195 ConfigClass = MakeCcdVisitTableConfig
1196
1197 def run(self, visitSummaryRefs):
1198 """Make a table of ccd information from the visit summary catalogs.
1199
1200 Parameters
1201 ----------
1202 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1203 List of DeferredDatasetHandles pointing to exposure catalogs with
1204 per-detector summary information.
1205
1206 Returns
1207 -------
1208 result : `~lsst.pipe.base.Struct`
1209 Results struct with attribute:
1210
1211 ``outputCatalog``
1212 Catalog of ccd and visit information.
1213 """
1214 ccdEntries = []
1215 for visitSummaryRef in visitSummaryRefs:
1216 visitSummary = visitSummaryRef.get()
1217 visitInfo = visitSummary[0].getVisitInfo()
1218
1219 ccdEntry = {}
1220 summaryTable = visitSummary.asAstropy()
1221 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1222 "pixelScale", "zenithDistance",
1223 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1224 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1225 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1226 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1227 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1228 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1229 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1230 "maxDistToNearestPsf",
1231 "effTime", "effTimePsfSigmaScale",
1232 "effTimeSkyBgScale", "effTimeZeroPointScale",
1233 "magLim"]
1234 ccdEntry = summaryTable[selectColumns].to_pandas().set_index("id")
1235 # 'visit' is the human readable visit number.
1236 # 'visitId' is the key to the visitId table. They are the same.
1237 # Technically you should join to get the visit from the visit
1238 # table.
1239 ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1240
1241 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1242 # compatibility. To be removed after September 2023.
1243 ccdEntry["decl"] = ccdEntry.loc[:, "dec"]
1244
1245 ccdEntry["ccdVisitId"] = [
1246 self.config.idGenerator.apply(
1247 visitSummaryRef.dataId,
1248 detector=detector_id,
1249 is_exposure=False,
1250 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1251 # because it's usually the ID for a whole catalog
1252 # with a {visit, detector}, and that's the main
1253 # use case for IdGenerator. This usage for a
1254 # summary table is rare.
1255 for detector_id in summaryTable["id"]
1256 ]
1257 ccdEntry["detector"] = summaryTable["id"]
1258 ccdEntry["seeing"] = (
1259 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1260 )
1261 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1262 ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1263 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1264 ccdEntry["obsStart"] = (
1265 ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=ccdEntry["expTime"].values[0])
1266 )
1267 expTime_days = ccdEntry["expTime"] / (60*60*24)
1268 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1269 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1270 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1271 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1272 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1273 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1274 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1275 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1276 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1277 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1278 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1279 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1280 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1281 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1282 # values are actually wanted.
1283 ccdEntries.append(ccdEntry)
1284
1285 outputCatalog = pd.concat(ccdEntries)
1286 outputCatalog.set_index("ccdVisitId", inplace=True, verify_integrity=True)
1287 return pipeBase.Struct(outputCatalog=outputCatalog)
1288
1289
1290class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1291 dimensions=("instrument",),
1292 defaultTemplates={"calexpType": ""}):
1293 visitSummaries = connectionTypes.Input(
1294 doc="Per-visit consolidated exposure metadata",
1295 name="finalVisitSummary",
1296 storageClass="ExposureCatalog",
1297 dimensions=("instrument", "visit",),
1298 multiple=True,
1299 deferLoad=True,
1300 )
1301 outputCatalog = connectionTypes.Output(
1302 doc="Visit metadata table",
1303 name="visitTable",
1304 storageClass="DataFrame",
1305 dimensions=("instrument",)
1306 )
1307
1308
1309class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1310 pipelineConnections=MakeVisitTableConnections):
1311 pass
1312
1313
1314class MakeVisitTableTask(pipeBase.PipelineTask):
1315 """Produce a `visitTable` from the visit summary exposure catalogs.
1316 """
1317 _DefaultName = "makeVisitTable"
1318 ConfigClass = MakeVisitTableConfig
1319
1320 def run(self, visitSummaries):
1321 """Make a table of visit information from the visit summary catalogs.
1322
1323 Parameters
1324 ----------
1325 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1326 List of exposure catalogs with per-detector summary information.
1327 Returns
1328 -------
1329 result : `~lsst.pipe.base.Struct`
1330 Results struct with attribute:
1331
1332 ``outputCatalog``
1333 Catalog of visit information.
1334 """
1335 visitEntries = []
1336 for visitSummary in visitSummaries:
1337 visitSummary = visitSummary.get()
1338 visitRow = visitSummary[0]
1339 visitInfo = visitRow.getVisitInfo()
1340
1341 visitEntry = {}
1342 visitEntry["visitId"] = visitRow["visit"]
1343 visitEntry["visit"] = visitRow["visit"]
1344 visitEntry["physical_filter"] = visitRow["physical_filter"]
1345 visitEntry["band"] = visitRow["band"]
1346 raDec = visitInfo.getBoresightRaDec()
1347 visitEntry["ra"] = raDec.getRa().asDegrees()
1348 visitEntry["dec"] = raDec.getDec().asDegrees()
1349
1350 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1351 # compatibility. To be removed after September 2023.
1352 visitEntry["decl"] = visitEntry["dec"]
1353
1354 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1355 azAlt = visitInfo.getBoresightAzAlt()
1356 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1357 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1358 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1359 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1360 expTime = visitInfo.getExposureTime()
1361 visitEntry["expTime"] = expTime
1362 visitEntry["expMidpt"] = visitInfo.getDate().toPython()
1363 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1364 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1365 expTime_days = expTime / (60*60*24)
1366 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1367 visitEntries.append(visitEntry)
1368
1369 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1370 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1371 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1372
1373 outputCatalog = pd.DataFrame(data=visitEntries)
1374 outputCatalog.set_index("visitId", inplace=True, verify_integrity=True)
1375 return pipeBase.Struct(outputCatalog=outputCatalog)
1376
1377
1378class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1379 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1380
1381 inputCatalog = connectionTypes.Input(
1382 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1383 "By default, it is the output of ForcedPhotCcdTask on calexps",
1384 name="forced_src",
1385 storageClass="SourceCatalog",
1386 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1387 )
1388 inputCatalogDiff = connectionTypes.Input(
1389 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1390 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1391 name="forced_diff",
1392 storageClass="SourceCatalog",
1393 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1394 )
1395 outputCatalog = connectionTypes.Output(
1396 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1397 name="mergedForcedSource",
1398 storageClass="DataFrame",
1399 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1400 )
1401
1402
1403class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1404 pipelineConnections=WriteForcedSourceTableConnections):
1406 doc="Column on which to join the two input tables on and make the primary key of the output",
1407 dtype=str,
1408 default="objectId",
1409 )
1410
1411
1412class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1413 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1414
1415 Because the predecessor ForcedPhotCcdTask operates per-detector,
1416 per-tract, (i.e., it has tract in its dimensions), detectors
1417 on the tract boundary may have multiple forced source catalogs.
1418
1419 The successor task TransformForcedSourceTable runs per-patch
1420 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1421 available multiple epochs.
1422 """
1423 _DefaultName = "writeForcedSourceTable"
1424 ConfigClass = WriteForcedSourceTableConfig
1425
1426 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1427 inputs = butlerQC.get(inputRefs)
1428 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1429 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1430 inputs["band"] = butlerQC.quantum.dataId["band"]
1431 outputs = self.run(**inputs)
1432 butlerQC.put(outputs, outputRefs)
1433
1434 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1435 dfs = []
1436 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1437 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1438 df = df.reindex(sorted(df.columns), axis=1)
1439 df["visit"] = visit
1440 # int16 instead of uint8 because databases don't like unsigned bytes.
1441 df["detector"] = np.int16(detector)
1442 df["band"] = band if band else pd.NA
1443 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1444 names=("dataset", "column"))
1445
1446 dfs.append(df)
1447
1448 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1449 return pipeBase.Struct(outputCatalog=outputCatalog)
1450
1451
1452class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1453 dimensions=("instrument", "skymap", "patch", "tract")):
1454
1455 inputCatalogs = connectionTypes.Input(
1456 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1457 name="mergedForcedSource",
1458 storageClass="DataFrame",
1459 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1460 multiple=True,
1461 deferLoad=True
1462 )
1463 referenceCatalog = connectionTypes.Input(
1464 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1465 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1466 "are expected.",
1467 name="objectTable",
1468 storageClass="DataFrame",
1469 dimensions=("tract", "patch", "skymap"),
1470 deferLoad=True
1471 )
1472 outputCatalog = connectionTypes.Output(
1473 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1474 "specified set of functors",
1475 name="forcedSourceTable",
1476 storageClass="DataFrame",
1477 dimensions=("tract", "patch", "skymap")
1478 )
1479
1480
1481class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1482 pipelineConnections=TransformForcedSourceTableConnections):
1483 referenceColumns = pexConfig.ListField(
1484 dtype=str,
1485 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1486 optional=True,
1487 doc="Columns to pull from reference catalog",
1488 )
1489 keyRef = lsst.pex.config.Field(
1490 doc="Column on which to join the two input tables on and make the primary key of the output",
1491 dtype=str,
1492 default="objectId",
1493 )
1495 doc="Rename the output DataFrame index to this name",
1496 dtype=str,
1497 default="forcedSourceId",
1498 )
1499
1500 def setDefaults(self):
1501 super().setDefaults()
1502 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1503 self.columnsFromDataId = ["tract", "patch"]
1504
1505
1506class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1507 """Transform/standardize a ForcedSource catalog
1508
1509 Transforms each wide, per-detector forcedSource DataFrame per the
1510 specification file (per-camera defaults found in ForcedSource.yaml).
1511 All epochs that overlap the patch are aggregated into one per-patch
1512 narrow-DataFrame file.
1513
1514 No de-duplication of rows is performed. Duplicate resolutions flags are
1515 pulled in from the referenceCatalog: `detect_isPrimary`,
1516 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1517 for analysis or compare duplicates for QA.
1518
1519 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1520 per-visit rows can be retreived by joining with the CcdVisitTable on
1521 ccdVisitId.
1522 """
1523 _DefaultName = "transformForcedSourceTable"
1524 ConfigClass = TransformForcedSourceTableConfig
1525
1526 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1527 inputs = butlerQC.get(inputRefs)
1528 if self.funcs is None:
1529 raise ValueError("config.functorFile is None. "
1530 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1531 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1532 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1533
1534 butlerQC.put(outputs, outputRefs)
1535
1536 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1537 dfs = []
1538 ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
1539 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1540 for handle in inputCatalogs:
1541 result = self.transform(None, handle, funcs, dataId)
1542 # Filter for only rows that were detected on (overlap) the patch
1543 dfs.append(result.df.join(ref, how="inner"))
1544
1545 outputCatalog = pd.concat(dfs)
1546
1547 # Now that we are done joining on config.keyRef
1548 # Change index to config.key by
1549 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1550 # Add config.keyRef to the column list
1551 outputCatalog.reset_index(inplace=True)
1552 # Set the forcedSourceId to the index. This is specified in the
1553 # ForcedSource.yaml
1554 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1555 # Rename it to the config.key
1556 outputCatalog.index.rename(self.config.key, inplace=True)
1557
1558 self.log.info("Made a table of %d columns and %d rows",
1559 len(outputCatalog.columns), len(outputCatalog))
1560 return pipeBase.Struct(outputCatalog=outputCatalog)
1561
1562
1563class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1564 defaultTemplates={"catalogType": ""},
1565 dimensions=("instrument", "tract")):
1566 inputCatalogs = connectionTypes.Input(
1567 doc="Input per-patch DataFrame Tables to be concatenated",
1568 name="{catalogType}ForcedSourceTable",
1569 storageClass="DataFrame",
1570 dimensions=("tract", "patch", "skymap"),
1571 multiple=True,
1572 )
1573
1574 outputCatalog = connectionTypes.Output(
1575 doc="Output per-tract concatenation of DataFrame Tables",
1576 name="{catalogType}ForcedSourceTable_tract",
1577 storageClass="DataFrame",
1578 dimensions=("tract", "skymap"),
1579 )
1580
1581
1582class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1583 pipelineConnections=ConsolidateTractConnections):
1584 pass
1585
1586
1587class ConsolidateTractTask(pipeBase.PipelineTask):
1588 """Concatenate any per-patch, dataframe list into a single
1589 per-tract DataFrame.
1590 """
1591 _DefaultName = "ConsolidateTract"
1592 ConfigClass = ConsolidateTractConfig
1593
1594 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1595 inputs = butlerQC.get(inputRefs)
1596 # Not checking at least one inputCatalog exists because that'd be an
1597 # empty QG.
1598 self.log.info("Concatenating %s per-patch %s Tables",
1599 len(inputs["inputCatalogs"]),
1600 inputRefs.inputCatalogs[0].datasetType.name)
1601 df = pd.concat(inputs["inputCatalogs"])
1602 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > transform
Custom catalog class for ExposureRecord/Table.
Definition Exposure.h:310
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)