Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+a777afbe81,g07dc498a13+7e3c5f68a2,g12483e3c20+0145ec33cd,g1409bbee79+7e3c5f68a2,g1a7e361dbc+7e3c5f68a2,g1fd858c14a+9f35e23ec3,g35bb328faa+fcb1d3bbc8,g3ad4f90e5c+0145ec33cd,g3bd4b5ce2c+cbf1bea503,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g5477a8d5ce+db04660fe6,g60b5630c4e+0145ec33cd,g623d845a50+0145ec33cd,g6f0c2978f1+3526b51a37,g75b6c65c88+d54b601591,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+4639f750a5,g89139ef638+7e3c5f68a2,g9125e01d80+fcb1d3bbc8,g919ac25b3e+6220c5324a,g95236ca021+f7a31438ed,g989de1cb63+7e3c5f68a2,g9f33ca652e+2d6fa11d35,gaaedd4e678+7e3c5f68a2,gabe3b4be73+1e0a283bba,gb1101e3267+4a428ef779,gb4a253aaf5+0122250889,gb58c049af0+f03b321e39,gc99c83e5f0+76d20ab76d,gcf25f946ba+4639f750a5,gd6cbbdb0b4+c8606af20c,gde0f65d7ad+3d8a3b7e46,ge278dab8ac+932305ba37,gf795337580+03b96afe58,gfba249425e+fcb1d3bbc8,w.2025.08
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40import functools
41import logging
42import numbers
43import os
44
45import numpy as np
46import pandas as pd
47import astropy.table
48from astro_metadata_translator.headers import merge_headers
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header
60
61from .functors import CompositeFunctor, Column
62
63log = logging.getLogger(__name__)
64
65
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
68 """
69 newDf = pd.DataFrame()
70 # band is the level 0 index
71 dfBands = df.columns.unique(level=0).values
72 for band in dfBands:
73 subdf = df[band]
74 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c in subdf.columns if c not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
79
80 # Band must be present in the input and output or else column is all NaN:
81 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
82 # Get the unexploded columns from any present band's partition
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
85 return newDf
86
87
89 """A helper class for stacking astropy tables without having them all in
90 memory at once.
91
92 Parameters
93 ----------
94 capacity : `int`
95 Full size of the final table.
96
97 Notes
98 -----
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
102 """
103
104 def __init__(self, capacity):
105 self.index = 0
106 self.capacity = capacity
107 self.result = None
108
109 @classmethod
110 def from_handles(cls, handles):
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
113
114 Parameters
115 ----------
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
120
121 Returns
122 -------
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
126 """
127 capacity = sum(handle.get(component="rowcount") for handle in handles)
128 return cls(capacity=capacity)
129
130 def extend(self, table):
131 """Add a single table to the stack.
132
133 Parameters
134 ----------
135 table : `astropy.table.Table`
136 An astropy table instance.
137 """
138 if self.result is None:
139 self.result = astropy.table.Table()
140 for name in table.colnames:
141 column = table[name]
142 column_cls = type(column)
143 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
144 self.result[name][:len(table)] = column
145 self.index = len(table)
146 self.result.meta = table.meta.copy()
147 else:
148 next_index = self.index + len(table)
149 if set(self.result.colnames) != set(table.colnames):
150 raise TypeError(
151 "Inconsistent columns in concatentation: "
152 f"{set(self.result.colnames).symmetric_difference(table.colnames)}"
153 )
154 for name in table.colnames:
155 out_col = self.result[name]
156 in_col = table[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.result[name][self.index:next_index] = table[name]
160 self.index = next_index
161 # Butler provenance should be stripped on merge. It will be
162 # added by butler on write. No attempt is made here to combine
163 # provenance from multiple input tables.
164 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
165 strip_provenance_from_fits_header(self.result.meta)
166
167 @classmethod
168 def vstack_handles(cls, handles):
169 """Vertically stack tables represented by deferred dataset handles.
170
171 Parameters
172 ----------
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
177
178 Returns
179 -------
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
183 """
184 handles = tuple(handles) # guard against single-pass iterators
185 vstack = cls.from_handles(handles)
186 for handle in handles:
187 vstack.extend(handle.get())
188 return vstack.result
189
190
191class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
192 defaultTemplates={"coaddName": "deep"},
193 dimensions=("tract", "patch", "skymap")):
194 inputCatalogMeas = connectionTypes.Input(
195 doc="Catalog of source measurements on the deepCoadd.",
196 dimensions=("tract", "patch", "band", "skymap"),
197 storageClass="SourceCatalog",
198 name="{coaddName}Coadd_meas",
199 multiple=True
200 )
201 inputCatalogForcedSrc = connectionTypes.Input(
202 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
203 dimensions=("tract", "patch", "band", "skymap"),
204 storageClass="SourceCatalog",
205 name="{coaddName}Coadd_forced_src",
206 multiple=True
207 )
208 inputCatalogPsfsMultiprofit = connectionTypes.Input(
209 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
210 dimensions=("tract", "patch", "band", "skymap"),
211 storageClass="ArrowAstropy",
212 name="{coaddName}Coadd_psfs_multiprofit",
213 multiple=True,
214 )
215 outputCatalog = connectionTypes.Output(
216 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
217 "stored as a DataFrame with a multi-level column index per-patch.",
218 dimensions=("tract", "patch", "skymap"),
219 storageClass="DataFrame",
220 name="{coaddName}Coadd_obj"
221 )
222
223
224class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
225 pipelineConnections=WriteObjectTableConnections):
226 coaddName = pexConfig.Field(
227 dtype=str,
228 default="deep",
229 doc="Name of coadd"
230 )
231
232
233class WriteObjectTableTask(pipeBase.PipelineTask):
234 """Write filter-merged object tables as a DataFrame in parquet format.
235 """
236 _DefaultName = "writeObjectTable"
237 ConfigClass = WriteObjectTableConfig
238
239 # Tag of output dataset written by `MergeSourcesTask.write`
240 outputDataset = "obj"
241
242 def runQuantum(self, butlerQC, inputRefs, outputRefs):
243 inputs = butlerQC.get(inputRefs)
244
245 catalogs = defaultdict(dict)
246 for dataset, connection in (
247 ("meas", "inputCatalogMeas"),
248 ("forced_src", "inputCatalogForcedSrc"),
249 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
250 ):
251 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
252 catalogs[ref.dataId["band"]][dataset] = cat
253
254 dataId = butlerQC.quantum.dataId
255 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
256 outputs = pipeBase.Struct(outputCatalog=df)
257 butlerQC.put(outputs, outputRefs)
258
259 def run(self, catalogs, tract, patch):
260 """Merge multiple catalogs.
261
262 Parameters
263 ----------
264 catalogs : `dict`
265 Mapping from filter names to dict of catalogs.
266 tract : int
267 tractId to use for the tractId column.
268 patch : str
269 patchId to use for the patchId column.
270
271 Returns
272 -------
273 catalog : `pandas.DataFrame`
274 Merged dataframe.
275
276 Raises
277 ------
278 ValueError
279 Raised if any of the catalogs is of an unsupported type.
280 """
281 dfs = []
282 for filt, tableDict in catalogs.items():
283 for dataset, table in tableDict.items():
284 # Convert afwTable to pandas DataFrame if needed
285 if isinstance(table, pd.DataFrame):
286 df = table
287 elif isinstance(table, afwTable.SourceCatalog):
288 df = table.asAstropy().to_pandas()
289 elif isinstance(table, astropy.table.Table):
290 df = table.to_pandas()
291 else:
292 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
293 df.set_index("id", drop=True, inplace=True)
294
295 # Sort columns by name, to ensure matching schema among patches
296 df = df.reindex(sorted(df.columns), axis=1)
297 df = df.assign(tractId=tract, patchId=patch)
298
299 # Make columns a 3-level MultiIndex
300 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
301 names=("dataset", "band", "column"))
302 dfs.append(df)
303
304 # We do this dance and not `pd.concat(dfs)` because the pandas
305 # concatenation uses infinite memory.
306 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
307 return catalog
308
309
310class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
311 defaultTemplates={"catalogType": ""},
312 dimensions=("instrument", "visit", "detector")):
313
314 catalog = connectionTypes.Input(
315 doc="Input full-depth catalog of sources produced by CalibrateTask",
316 name="{catalogType}src",
317 storageClass="SourceCatalog",
318 dimensions=("instrument", "visit", "detector")
319 )
320 outputCatalog = connectionTypes.Output(
321 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
322 name="{catalogType}source",
323 storageClass="ArrowAstropy",
324 dimensions=("instrument", "visit", "detector")
325 )
326
327
328class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
329 pipelineConnections=WriteSourceTableConnections):
330 pass
331
332
333class WriteSourceTableTask(pipeBase.PipelineTask):
334 """Write source table to DataFrame Parquet format.
335 """
336 _DefaultName = "writeSourceTable"
337 ConfigClass = WriteSourceTableConfig
338
339 def runQuantum(self, butlerQC, inputRefs, outputRefs):
340 inputs = butlerQC.get(inputRefs)
341 inputs["visit"] = butlerQC.quantum.dataId["visit"]
342 inputs["detector"] = butlerQC.quantum.dataId["detector"]
343 result = self.run(**inputs)
344 outputs = pipeBase.Struct(outputCatalog=result.table)
345 butlerQC.put(outputs, outputRefs)
346
347 def run(self, catalog, visit, detector, **kwargs):
348 """Convert `src` catalog to an Astropy table.
349
350 Parameters
351 ----------
352 catalog: `afwTable.SourceCatalog`
353 catalog to be converted
354 visit, detector: `int`
355 Visit and detector ids to be added as columns.
356 **kwargs
357 Additional keyword arguments are ignored as a convenience for
358 subclasses that pass the same arguments to several different
359 methods.
360
361 Returns
362 -------
363 result : `~lsst.pipe.base.Struct`
364 ``table``
365 `astropy.table.Table` version of the input catalog
366 """
367 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
368 tbl = catalog.asAstropy()
369 tbl["visit"] = visit
370 # int16 instead of uint8 because databases don't like unsigned bytes.
371 tbl["detector"] = np.int16(detector)
372
373 return pipeBase.Struct(table=tbl)
374
375
376class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
377 defaultTemplates={"catalogType": ""},
378 dimensions=("instrument", "visit", "detector", "skymap")):
379 visitSummary = connectionTypes.Input(
380 doc="Input visit-summary catalog with updated calibration objects.",
381 name="finalVisitSummary",
382 storageClass="ExposureCatalog",
383 dimensions=("instrument", "visit",),
384 )
385
386 def __init__(self, config):
387 # We don't want the input catalog here to be an initial existence
388 # constraint in QG generation, because that can unfortunately limit the
389 # set of data IDs of inputs to other tasks, even those that run earlier
390 # (e.g. updateVisitSummary), when the input 'src' catalog is not
391 # produced. It's safer to just use 'visitSummary' existence as an
392 # initial constraint, and then let the graph prune out the detectors
393 # that don't have a 'src' for this task only.
394 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
395
396
397class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
398 pipelineConnections=WriteRecalibratedSourceTableConnections):
399
400 doReevaluatePhotoCalib = pexConfig.Field(
401 dtype=bool,
402 default=True,
403 doc=("Add or replace local photoCalib columns"),
404 )
405 doReevaluateSkyWcs = pexConfig.Field(
406 dtype=bool,
407 default=True,
408 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
409 )
410
411
412class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
413 """Write source table to DataFrame Parquet format.
414 """
415 _DefaultName = "writeRecalibratedSourceTable"
416 ConfigClass = WriteRecalibratedSourceTableConfig
417
418 def runQuantum(self, butlerQC, inputRefs, outputRefs):
419 inputs = butlerQC.get(inputRefs)
420
421 inputs["visit"] = butlerQC.quantum.dataId["visit"]
422 inputs["detector"] = butlerQC.quantum.dataId["detector"]
423
424 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
425 exposure = ExposureF()
426 inputs["exposure"] = self.prepareCalibratedExposure(
427 exposure=exposure,
428 visitSummary=inputs["visitSummary"],
429 detectorId=butlerQC.quantum.dataId["detector"]
430 )
431 inputs["catalog"] = self.addCalibColumns(**inputs)
432
433 result = self.run(**inputs)
434 outputs = pipeBase.Struct(outputCatalog=result.table)
435 butlerQC.put(outputs, outputRefs)
436
437 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
438 """Prepare a calibrated exposure and apply external calibrations
439 if so configured.
440
441 Parameters
442 ----------
443 exposure : `lsst.afw.image.exposure.Exposure`
444 Input exposure to adjust calibrations. May be an empty Exposure.
445 detectorId : `int`
446 Detector ID associated with the exposure.
447 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
448 Exposure catalog with all calibration objects. WCS and PhotoCalib
449 are always applied if ``visitSummary`` is provided and those
450 components are not `None`.
451
452 Returns
453 -------
454 exposure : `lsst.afw.image.exposure.Exposure`
455 Exposure with adjusted calibrations.
456 """
457 if visitSummary is not None:
458 row = visitSummary.find(detectorId)
459 if row is None:
460 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
461 if (photoCalib := row.getPhotoCalib()) is None:
462 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
463 "skipping reevaluation of photoCalib.", detectorId)
464 exposure.setPhotoCalib(None)
465 else:
466 exposure.setPhotoCalib(photoCalib)
467 if (skyWcs := row.getWcs()) is None:
468 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
469 "skipping reevaluation of skyWcs.", detectorId)
470 exposure.setWcs(None)
471 else:
472 exposure.setWcs(skyWcs)
473
474 return exposure
475
476 def addCalibColumns(self, catalog, exposure, **kwargs):
477 """Add replace columns with calibs evaluated at each centroid
478
479 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
480 a source catalog, by rerunning the plugins.
481
482 Parameters
483 ----------
484 catalog : `lsst.afw.table.SourceCatalog`
485 catalog to which calib columns will be added
486 exposure : `lsst.afw.image.exposure.Exposure`
487 Exposure with attached PhotoCalibs and SkyWcs attributes to be
488 reevaluated at local centroids. Pixels are not required.
489 **kwargs
490 Additional keyword arguments are ignored to facilitate passing the
491 same arguments to several methods.
492
493 Returns
494 -------
495 newCat: `lsst.afw.table.SourceCatalog`
496 Source Catalog with requested local calib columns
497 """
498 measureConfig = SingleFrameMeasurementTask.ConfigClass()
499 measureConfig.doReplaceWithNoise = False
500
501 # Clear all slots, because we aren't running the relevant plugins.
502 for slot in measureConfig.slots:
503 setattr(measureConfig.slots, slot, None)
504
505 measureConfig.plugins.names = []
506 if self.config.doReevaluateSkyWcs:
507 measureConfig.plugins.names.add("base_LocalWcs")
508 self.log.info("Re-evaluating base_LocalWcs plugin")
509 if self.config.doReevaluatePhotoCalib:
510 measureConfig.plugins.names.add("base_LocalPhotoCalib")
511 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
512 pluginsNotToCopy = tuple(measureConfig.plugins.names)
513
514 # Create a new schema and catalog
515 # Copy all columns from original except for the ones to reevaluate
516 aliasMap = catalog.schema.getAliasMap()
517 mapper = afwTable.SchemaMapper(catalog.schema)
518 for item in catalog.schema:
519 if not item.field.getName().startswith(pluginsNotToCopy):
520 mapper.addMapping(item.key)
521
522 schema = mapper.getOutputSchema()
523 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
524 schema.setAliasMap(aliasMap)
525 newCat = afwTable.SourceCatalog(schema)
526 newCat.extend(catalog, mapper=mapper)
527
528 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
529 # update here. LocalPhotoCalibs are applied during transform tasks.
530 # Update coord_ra/coord_dec, which are expected to be positions on the
531 # sky and are used as such in sdm tables without transform
532 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
533 afwTable.updateSourceCoords(exposure.wcs, newCat)
534 wcsPlugin = measurement.plugins["base_LocalWcs"]
535 else:
536 wcsPlugin = None
537
538 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
539 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
540 else:
541 pcPlugin = None
542
543 for row in newCat:
544 if wcsPlugin is not None:
545 wcsPlugin.measure(row, exposure)
546 if pcPlugin is not None:
547 pcPlugin.measure(row, exposure)
548
549 return newCat
550
551
552class PostprocessAnalysis(object):
553 """Calculate columns from DataFrames or handles storing DataFrames.
554
555 This object manages and organizes an arbitrary set of computations
556 on a catalog. The catalog is defined by a
557 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
558 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
559 computations are defined by a collection of
560 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
561 ``CompositeFunctor``).
562
563 After the object is initialized, accessing the ``.df`` attribute (which
564 holds the `pandas.DataFrame` containing the results of the calculations)
565 triggers computation of said dataframe.
566
567 One of the conveniences of using this object is the ability to define a
568 desired common filter for all functors. This enables the same functor
569 collection to be passed to several different `PostprocessAnalysis` objects
570 without having to change the original functor collection, since the ``filt``
571 keyword argument of this object triggers an overwrite of the ``filt``
572 property for all functors in the collection.
573
574 This object also allows a list of refFlags to be passed, and defines a set
575 of default refFlags that are always included even if not requested.
576
577 If a list of DataFrames or Handles is passed, rather than a single one,
578 then the calculations will be mapped over all the input catalogs. In
579 principle, it should be straightforward to parallelize this activity, but
580 initial tests have failed (see TODO in code comments).
581
582 Parameters
583 ----------
584 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
585 `~lsst.pipe.base.InMemoryDatasetHandle` or
586 list of these.
587 Source catalog(s) for computation.
588 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
589 Computations to do (functors that act on ``handles``).
590 If a dict, the output
591 DataFrame will have columns keyed accordingly.
592 If a list, the column keys will come from the
593 ``.shortname`` attribute of each functor.
594
595 filt : `str`, optional
596 Filter in which to calculate. If provided,
597 this will overwrite any existing ``.filt`` attribute
598 of the provided functors.
599
600 flags : `list`, optional
601 List of flags (per-band) to include in output table.
602 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
603
604 refFlags : `list`, optional
605 List of refFlags (only reference band) to include in output table.
606
607 forcedFlags : `list`, optional
608 List of flags (per-band) to include in output table.
609 Taken from the ``forced_src`` dataset if applied to a
610 multilevel Object Table. Intended for flags from measurement plugins
611 only run during multi-band forced-photometry.
612 """
613 _defaultRefFlags = []
614 _defaultFuncs = ()
615
616 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
617 self.handles = handles
618 self.functors = functors
619
620 self.filt = filt
621 self.flags = list(flags) if flags is not None else []
622 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
623 self.refFlags = list(self._defaultRefFlags)
624 if refFlags is not None:
625 self.refFlags += list(refFlags)
626
627 self._df = None
628
629 @property
630 def defaultFuncs(self):
631 funcs = dict(self._defaultFuncs)
632 return funcs
633
634 @property
635 def func(self):
636 additionalFuncs = self.defaultFuncs
637 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
638 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
639 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
640
641 if isinstance(self.functors, CompositeFunctor):
642 func = self.functors
643 else:
644 func = CompositeFunctor(self.functors)
645
646 func.funcDict.update(additionalFuncs)
647 func.filt = self.filt
648
649 return func
650
651 @property
652 def noDupCols(self):
653 return [name for name, func in self.func.funcDict.items() if func.noDup]
654
655 @property
656 def df(self):
657 if self._df is None:
658 self.compute()
659 return self._df
660
661 def compute(self, dropna=False, pool=None):
662 # map over multiple handles
663 if type(self.handles) in (list, tuple):
664 if pool is None:
665 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
666 else:
667 # TODO: Figure out why this doesn't work (pyarrow pickling
668 # issues?)
669 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
670 self._df = pd.concat(dflist)
671 else:
672 self._df = self.func(self.handles, dropna=dropna)
673
674 return self._df
675
676
677class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
678 dimensions=()):
679 """Expected Connections for subclasses of TransformCatalogBaseTask.
680
681 Must be subclassed.
682 """
683 inputCatalog = connectionTypes.Input(
684 name="",
685 storageClass="DataFrame",
686 )
687 outputCatalog = connectionTypes.Output(
688 name="",
689 storageClass="ArrowAstropy",
690 )
691
692
693class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
694 pipelineConnections=TransformCatalogBaseConnections):
695 functorFile = pexConfig.Field(
696 dtype=str,
697 doc="Path to YAML file specifying Science Data Model functors to use "
698 "when copying columns and computing calibrated values.",
699 default=None,
700 optional=True
701 )
702 primaryKey = pexConfig.Field(
703 dtype=str,
704 doc="Name of column to be set as the DataFrame index. If None, the index"
705 "will be named `id`",
706 default=None,
707 optional=True
708 )
709 columnsFromDataId = pexConfig.ListField(
710 dtype=str,
711 default=None,
712 optional=True,
713 doc="Columns to extract from the dataId",
714 )
715
716
717class TransformCatalogBaseTask(pipeBase.PipelineTask):
718 """Base class for transforming/standardizing a catalog by applying functors
719 that convert units and apply calibrations.
720
721 The purpose of this task is to perform a set of computations on an input
722 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
723 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
724 a new dataset (which needs to be declared in an ``outputDataset``
725 attribute).
726
727 The calculations to be performed are defined in a YAML file that specifies
728 a set of functors to be computed, provided as a ``--functorFile`` config
729 parameter. An example of such a YAML file is the following:
730
731 funcs:
732 sourceId:
733 functor: Index
734 x:
735 functor: Column
736 args: slot_Centroid_x
737 y:
738 functor: Column
739 args: slot_Centroid_y
740 psfFlux:
741 functor: LocalNanojansky
742 args:
743 - slot_PsfFlux_instFlux
744 - slot_PsfFlux_instFluxErr
745 - base_LocalPhotoCalib
746 - base_LocalPhotoCalibErr
747 psfFluxErr:
748 functor: LocalNanojanskyErr
749 args:
750 - slot_PsfFlux_instFlux
751 - slot_PsfFlux_instFluxErr
752 - base_LocalPhotoCalib
753 - base_LocalPhotoCalibErr
754 flags:
755 - detect_isPrimary
756
757 The names for each entry under "func" will become the names of columns in
758 the output dataset. All the functors referenced are defined in
759 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
760 functor are in the `args` list, and any additional entries for each column
761 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
762 treated as keyword arguments to be passed to the functor initialization.
763
764 The "flags" entry is the default shortcut for `Column` functors.
765 All columns listed under "flags" will be copied to the output table
766 untransformed. They can be of any datatype.
767 In the special case of transforming a multi-level oject table with
768 band and dataset indices (deepCoadd_obj), these will be taked from the
769 ``meas`` dataset and exploded out per band.
770
771 There are two special shortcuts that only apply when transforming
772 multi-level Object (deepCoadd_obj) tables:
773 - The "refFlags" entry is shortcut for `Column` functor
774 taken from the ``ref`` dataset if transforming an ObjectTable.
775 - The "forcedFlags" entry is shortcut for `Column` functors.
776 taken from the ``forced_src`` dataset if transforming an ObjectTable.
777 These are expanded out per band.
778
779
780 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
781 to organize and excecute the calculations.
782 """
783 @property
784 def _DefaultName(self):
785 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
786
787 @property
788 def outputDataset(self):
789 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
790
791 @property
792 def inputDataset(self):
793 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
794
795 @property
796 def ConfigClass(self):
797 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
798
799 def __init__(self, *args, **kwargs):
800 super().__init__(*args, **kwargs)
801 if self.config.functorFile:
802 self.log.info("Loading tranform functor definitions from %s",
803 self.config.functorFile)
804 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
805 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
806 else:
807 self.funcs = None
808
809 def runQuantum(self, butlerQC, inputRefs, outputRefs):
810 inputs = butlerQC.get(inputRefs)
811 if self.funcs is None:
812 raise ValueError("config.functorFile is None. "
813 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
814 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
815 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
816 butlerQC.put(result, outputRefs)
817
818 def run(self, handle, funcs=None, dataId=None, band=None):
819 """Do postprocessing calculations
820
821 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
822 ``DataFrame`` object and dataId,
823 returns a dataframe with results of postprocessing calculations.
824
825 Parameters
826 ----------
827 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
828 `~lsst.pipe.base.InMemoryDatasetHandle` or
829 `~pandas.DataFrame`, or list of these.
830 DataFrames from which calculations are done.
831 funcs : `~lsst.pipe.tasks.functors.Functor`
832 Functors to apply to the table's columns
833 dataId : dict, optional
834 Used to add a `patchId` column to the output dataframe.
835 band : `str`, optional
836 Filter band that is being processed.
837
838 Returns
839 -------
840 result : `lsst.pipe.base.Struct`
841 Result struct, with a single ``outputCatalog`` attribute holding
842 the transformed catalog.
843 """
844 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
845
846 df = self.transform(band, handle, funcs, dataId).df
847 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
848 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
849 return result
850
851 def getFunctors(self):
852 return self.funcs
853
854 def getAnalysis(self, handles, funcs=None, band=None):
855 if funcs is None:
856 funcs = self.funcs
857 analysis = PostprocessAnalysis(handles, funcs, filt=band)
858 return analysis
859
860 def transform(self, band, handles, funcs, dataId):
861 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
862 df = analysis.df
863 if dataId and self.config.columnsFromDataId:
864 for key in self.config.columnsFromDataId:
865 if key in dataId:
866 if key == "detector":
867 # int16 instead of uint8 because databases don't like unsigned bytes.
868 df[key] = np.int16(dataId[key])
869 else:
870 df[key] = dataId[key]
871 else:
872 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
873
874 if self.config.primaryKey:
875 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
876 df.reset_index(inplace=True, drop=True)
877 df.set_index(self.config.primaryKey, inplace=True)
878
879 return pipeBase.Struct(
880 df=df,
881 analysis=analysis
882 )
883
884
885class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
886 defaultTemplates={"coaddName": "deep"},
887 dimensions=("tract", "patch", "skymap")):
888 inputCatalog = connectionTypes.Input(
889 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
890 "stored as a DataFrame with a multi-level column index per-patch.",
891 dimensions=("tract", "patch", "skymap"),
892 storageClass="DataFrame",
893 name="{coaddName}Coadd_obj",
894 deferLoad=True,
895 )
896 inputCatalogRef = connectionTypes.Input(
897 doc="Catalog marking the primary detection (which band provides a good shape and position)"
898 "for each detection in deepCoadd_mergeDet.",
899 dimensions=("tract", "patch", "skymap"),
900 storageClass="SourceCatalog",
901 name="{coaddName}Coadd_ref",
902 deferLoad=True,
903 )
904 inputCatalogSersicMultiprofit = connectionTypes.Input(
905 doc="Catalog of source measurements on the deepCoadd.",
906 dimensions=("tract", "patch", "skymap"),
907 storageClass="ArrowAstropy",
908 name="{coaddName}Coadd_Sersic_multiprofit",
909 deferLoad=True,
910 )
911 outputCatalog = connectionTypes.Output(
912 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
913 "data model.",
914 dimensions=("tract", "patch", "skymap"),
915 storageClass="ArrowAstropy",
916 name="objectTable"
917 )
918
919 def __init__(self, *, config=None):
920 super().__init__(config=config)
921 if config.multilevelOutput:
922 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
923
924
925class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
926 pipelineConnections=TransformObjectCatalogConnections):
927 coaddName = pexConfig.Field(
928 dtype=str,
929 default="deep",
930 doc="Name of coadd"
931 )
932 outputBands = pexConfig.ListField(
933 dtype=str,
934 default=None,
935 optional=True,
936 doc=("These bands and only these bands will appear in the output,"
937 " NaN-filled if the input does not include them."
938 " If None, then use all bands found in the input.")
939 )
940 camelCase = pexConfig.Field(
941 dtype=bool,
942 default=False,
943 doc=("Write per-band columns names with camelCase, else underscore "
944 "For example: gPsFlux instead of g_PsFlux.")
945 )
946 multilevelOutput = pexConfig.Field(
947 dtype=bool,
948 default=False,
949 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
950 "and name-munged (False). If True, the output storage class will be "
951 "set to DataFrame, since astropy tables do not support multi-level indexing."),
952 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
953 )
954 goodFlags = pexConfig.ListField(
955 dtype=str,
956 default=[],
957 doc=("List of 'good' flags that should be set False when populating empty tables. "
958 "All other flags are considered to be 'bad' flags and will be set to True.")
959 )
960 floatFillValue = pexConfig.Field(
961 dtype=float,
962 default=np.nan,
963 doc="Fill value for float fields when populating empty tables."
964 )
965 integerFillValue = pexConfig.Field(
966 dtype=int,
967 default=-1,
968 doc="Fill value for integer fields when populating empty tables."
969 )
970
971 def setDefaults(self):
972 super().setDefaults()
973 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
974 self.primaryKey = "objectId"
975 self.columnsFromDataId = ["tract", "patch"]
976 self.goodFlags = ["calib_astrometry_used",
977 "calib_photometry_reserved",
978 "calib_photometry_used",
979 "calib_psf_candidate",
980 "calib_psf_reserved",
981 "calib_psf_used"]
982
983
984class TransformObjectCatalogTask(TransformCatalogBaseTask):
985 """Produce a flattened Object Table to match the format specified in
986 sdm_schemas.
987
988 Do the same set of postprocessing calculations on all bands.
989
990 This is identical to `TransformCatalogBaseTask`, except for that it does
991 the specified functor calculations for all filters present in the
992 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
993 by the YAML file will be superceded.
994 """
995 _DefaultName = "transformObjectCatalog"
996 ConfigClass = TransformObjectCatalogConfig
997
998 datasets_multiband = ("ref", "Sersic_multiprofit")
999
1000 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1001 inputs = butlerQC.get(inputRefs)
1002 if self.funcs is None:
1003 raise ValueError("config.functorFile is None. "
1004 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1005 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
1006 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1007 handle_ref=inputs["inputCatalogRef"],
1008 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
1009 )
1010 butlerQC.put(result, outputRefs)
1011
1012 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1013 # NOTE: band kwarg is ignored here.
1014 # TODO: Document and improve funcs argument usage in DM-48895
1015 # self.getAnalysis only supports list, dict and CompositeFunctor
1016 if isinstance(funcs, CompositeFunctor):
1017 funcDict_in = funcs.funcDict
1018 elif isinstance(funcs, dict):
1019 funcDict_in = funcs
1020 elif isinstance(funcs, list):
1021 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
1022 else:
1023 raise TypeError(f"Unsupported {type(funcs)=}")
1024
1025 handles_multi = {}
1026 funcDicts_multiband = {}
1027 for dataset in self.datasets_multiband:
1028 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
1029 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
1030 handles_multi[dataset] = handle_multi
1031 funcDicts_multiband[dataset] = {}
1032
1033 dfDict = {}
1034 analysisDict = {}
1035 templateDf = pd.DataFrame()
1036
1037 columns = handle.get(component="columns")
1038 inputBands = columns.unique(level=1).values
1039
1040 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1041
1042 # Split up funcs for per-band and multiband tables
1043 funcDict_band = {}
1044
1045 for name, func in funcDict_in.items():
1046 if func.dataset in funcDicts_multiband:
1047 # This is something like a MultibandColumn
1048 if band := getattr(func, "band_to_check", None):
1049 if band not in outputBands:
1050 continue
1051 # This is something like a ReferenceBand that has configurable bands
1052 elif hasattr(func, "bands"):
1053 # TODO: Determine if this can be avoided DM-48895
1054 # This will work fine if the init doesn't manipulate bands
1055 # If it does, then one would need to make a new functor
1056 # Determining the (kw)args is tricky in that case
1057 func.bands = tuple(inputBands)
1058
1059 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1060 funcDict[name] = func
1061
1062 funcs_band = CompositeFunctor(funcDict_band)
1063
1064 # Perform transform for data of filters that exist in the handle dataframe.
1065 for inputBand in inputBands:
1066 if inputBand not in outputBands:
1067 self.log.info("Ignoring %s band data in the input", inputBand)
1068 continue
1069 self.log.info("Transforming the catalog of band %s", inputBand)
1070 result = self.transform(inputBand, handle, funcs_band, dataId)
1071 dfDict[inputBand] = result.df
1072 analysisDict[inputBand] = result.analysis
1073 if templateDf.empty:
1074 templateDf = result.df
1075
1076 # Put filler values in columns of other wanted bands
1077 for filt in outputBands:
1078 if filt not in dfDict:
1079 self.log.info("Adding empty columns for band %s", filt)
1080 dfTemp = templateDf.copy()
1081 for col in dfTemp.columns:
1082 testValue = dfTemp[col].values[0]
1083 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1084 # Boolean flag type, check if it is a "good" flag
1085 if col in self.config.goodFlags:
1086 fillValue = False
1087 else:
1088 fillValue = True
1089 elif isinstance(testValue, numbers.Integral):
1090 # Checking numbers.Integral catches all flavors
1091 # of python, numpy, pandas, etc. integers.
1092 # We must ensure this is not an unsigned integer.
1093 if isinstance(testValue, np.unsignedinteger):
1094 raise ValueError("Parquet tables may not have unsigned integer columns.")
1095 else:
1096 fillValue = self.config.integerFillValue
1097 else:
1098 fillValue = self.config.floatFillValue
1099 dfTemp[col].values[:] = fillValue
1100 dfDict[filt] = dfTemp
1101
1102 # This makes a multilevel column index, with band as first level
1103 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1104 name_index = df.index.name
1105
1106 # TODO: Remove in DM-48895
1107 if not self.config.multilevelOutput:
1108 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1109 if self.config.primaryKey in noDupCols:
1110 noDupCols.remove(self.config.primaryKey)
1111 if dataId and self.config.columnsFromDataId:
1112 noDupCols += self.config.columnsFromDataId
1113 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1114 inputBands=inputBands)
1115
1116 # Apply per-dataset functors to each multiband dataset in turn
1117 for dataset, funcDict in funcDicts_multiband.items():
1118 handle_multiband = handles_multi[dataset]
1119 df_dataset = handle_multiband.get()
1120 if isinstance(df_dataset, astropy.table.Table):
1121 df_dataset = df_dataset.to_pandas().set_index(name_index, drop=False)
1122 elif isinstance(df_dataset, afwTable.SourceCatalog):
1123 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1124 # TODO: should funcDict have noDup funcs removed?
1125 # noDup was intended for per-band tables.
1126 result = self.transform(
1127 None,
1128 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1129 CompositeFunctor(funcDict),
1130 dataId,
1131 )
1132 result.df.index.name = name_index
1133 # Drop columns from dataId if present (patch, tract)
1134 if self.config.columnsFromDataId:
1135 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1136 if columns_drop:
1137 result.df.drop(columns_drop, axis=1, inplace=True)
1138 # Make the same multi-index for the multiband table if needed
1139 # This might end up making copies, one of several reasons to avoid
1140 # using multilevel indexes, or DataFrames at all
1141 to_concat = pd.concat(
1142 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1143 ) if self.config.multilevelOutput else result.df
1144 df = pd.concat([df, to_concat], axis=1)
1145 analysisDict[dataset] = result.analysis
1146 del result
1147
1148 df.index.name = self.config.primaryKey
1149
1150 if not self.config.multilevelOutput:
1151 tbl = pandas_to_astropy(df)
1152 else:
1153 tbl = df
1154
1155 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1156
1157 return pipeBase.Struct(outputCatalog=tbl)
1158
1159
1160class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1161 dimensions=("tract", "skymap")):
1162 inputCatalogs = connectionTypes.Input(
1163 doc="Per-Patch objectTables conforming to the standard data model.",
1164 name="objectTable",
1165 storageClass="ArrowAstropy",
1166 dimensions=("tract", "patch", "skymap"),
1167 multiple=True,
1168 deferLoad=True,
1169 )
1170 outputCatalog = connectionTypes.Output(
1171 doc="Pre-tract horizontal concatenation of the input objectTables",
1172 name="objectTable_tract",
1173 storageClass="ArrowAstropy",
1174 dimensions=("tract", "skymap"),
1175 )
1176
1177
1178class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1179 pipelineConnections=ConsolidateObjectTableConnections):
1180 coaddName = pexConfig.Field(
1181 dtype=str,
1182 default="deep",
1183 doc="Name of coadd"
1184 )
1185
1186
1187class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1188 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1189
1190 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1191 """
1192 _DefaultName = "consolidateObjectTable"
1193 ConfigClass = ConsolidateObjectTableConfig
1194
1195 inputDataset = "objectTable"
1196 outputDataset = "objectTable_tract"
1197
1198 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1199 inputs = butlerQC.get(inputRefs)
1200 self.log.info("Concatenating %s per-patch Object Tables",
1201 len(inputs["inputCatalogs"]))
1202 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1203 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1204
1205
1206class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1207 defaultTemplates={"catalogType": ""},
1208 dimensions=("instrument", "visit", "detector")):
1209
1210 inputCatalog = connectionTypes.Input(
1211 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1212 name="{catalogType}source",
1213 storageClass="DataFrame",
1214 dimensions=("instrument", "visit", "detector"),
1215 deferLoad=True
1216 )
1217 outputCatalog = connectionTypes.Output(
1218 doc="Narrower, per-detector Source Table transformed and converted per a "
1219 "specified set of functors",
1220 name="{catalogType}sourceTable",
1221 storageClass="ArrowAstropy",
1222 dimensions=("instrument", "visit", "detector")
1223 )
1224
1225
1226class TransformSourceTableConfig(TransformCatalogBaseConfig,
1227 pipelineConnections=TransformSourceTableConnections):
1228
1229 def setDefaults(self):
1230 super().setDefaults()
1231 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1232 self.primaryKey = "sourceId"
1233 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1234
1235
1236class TransformSourceTableTask(TransformCatalogBaseTask):
1237 """Transform/standardize a source catalog
1238 """
1239 _DefaultName = "transformSourceTable"
1240 ConfigClass = TransformSourceTableConfig
1241
1242
1243class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1244 dimensions=("instrument", "visit",),
1245 defaultTemplates={"calexpType": ""}):
1246 calexp = connectionTypes.Input(
1247 doc="Processed exposures used for metadata",
1248 name="calexp",
1249 storageClass="ExposureF",
1250 dimensions=("instrument", "visit", "detector"),
1251 deferLoad=True,
1252 multiple=True,
1253 )
1254 visitSummary = connectionTypes.Output(
1255 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1256 "detector id for the id and are sorted for fast lookups of a "
1257 "detector."),
1258 name="visitSummary",
1259 storageClass="ExposureCatalog",
1260 dimensions=("instrument", "visit"),
1261 )
1262 visitSummarySchema = connectionTypes.InitOutput(
1263 doc="Schema of the visitSummary catalog",
1264 name="visitSummary_schema",
1265 storageClass="ExposureCatalog",
1266 )
1267
1268
1269class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1270 pipelineConnections=ConsolidateVisitSummaryConnections):
1271 """Config for ConsolidateVisitSummaryTask"""
1272 pass
1273
1274
1275class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1276 """Task to consolidate per-detector visit metadata.
1277
1278 This task aggregates the following metadata from all the detectors in a
1279 single visit into an exposure catalog:
1280 - The visitInfo.
1281 - The wcs.
1282 - The photoCalib.
1283 - The physical_filter and band (if available).
1284 - The psf size, shape, and effective area at the center of the detector.
1285 - The corners of the bounding box in right ascension/declination.
1286
1287 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1288 are not persisted here because of storage concerns, and because of their
1289 limited utility as summary statistics.
1290
1291 Tests for this task are performed in ci_hsc_gen3.
1292 """
1293 _DefaultName = "consolidateVisitSummary"
1294 ConfigClass = ConsolidateVisitSummaryConfig
1295
1296 def __init__(self, **kwargs):
1297 super().__init__(**kwargs)
1298 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1299 self.schema.addField("visit", type="L", doc="Visit number")
1300 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1301 self.schema.addField("band", type="String", size=32, doc="Name of band")
1302 ExposureSummaryStats.update_schema(self.schema)
1303 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1304
1305 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1306 dataRefs = butlerQC.get(inputRefs.calexp)
1307 visit = dataRefs[0].dataId["visit"]
1308
1309 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1310 len(dataRefs), visit)
1311
1312 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1313
1314 butlerQC.put(expCatalog, outputRefs.visitSummary)
1315
1316 def _combineExposureMetadata(self, visit, dataRefs):
1317 """Make a combined exposure catalog from a list of dataRefs.
1318 These dataRefs must point to exposures with wcs, summaryStats,
1319 and other visit metadata.
1320
1321 Parameters
1322 ----------
1323 visit : `int`
1324 Visit identification number.
1325 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1326 List of dataRefs in visit.
1327
1328 Returns
1329 -------
1330 visitSummary : `lsst.afw.table.ExposureCatalog`
1331 Exposure catalog with per-detector summary information.
1332 """
1333 cat = afwTable.ExposureCatalog(self.schema)
1334 cat.resize(len(dataRefs))
1335
1336 cat["visit"] = visit
1337
1338 for i, dataRef in enumerate(dataRefs):
1339 visitInfo = dataRef.get(component="visitInfo")
1340 filterLabel = dataRef.get(component="filter")
1341 summaryStats = dataRef.get(component="summaryStats")
1342 detector = dataRef.get(component="detector")
1343 wcs = dataRef.get(component="wcs")
1344 photoCalib = dataRef.get(component="photoCalib")
1345 detector = dataRef.get(component="detector")
1346 bbox = dataRef.get(component="bbox")
1347 validPolygon = dataRef.get(component="validPolygon")
1348
1349 rec = cat[i]
1350 rec.setBBox(bbox)
1351 rec.setVisitInfo(visitInfo)
1352 rec.setWcs(wcs)
1353 rec.setPhotoCalib(photoCalib)
1354 rec.setValidPolygon(validPolygon)
1355
1356 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1357 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1358 rec.setId(detector.getId())
1359 summaryStats.update_record(rec)
1360
1361 if not cat:
1362 raise pipeBase.NoWorkFound(
1363 "No detectors had sufficient information to make a visit summary row."
1364 )
1365
1366 metadata = dafBase.PropertyList()
1367 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1368 # We are looping over existing datarefs, so the following is true
1369 metadata.add("COMMENT", "Only detectors with data have entries.")
1370 cat.setMetadata(metadata)
1371
1372 cat.sort()
1373 return cat
1374
1375
1376class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1377 defaultTemplates={"catalogType": ""},
1378 dimensions=("instrument", "visit")):
1379 inputCatalogs = connectionTypes.Input(
1380 doc="Input per-detector Source Tables",
1381 name="{catalogType}sourceTable",
1382 storageClass="ArrowAstropy",
1383 dimensions=("instrument", "visit", "detector"),
1384 multiple=True,
1385 deferLoad=True,
1386 )
1387 outputCatalog = connectionTypes.Output(
1388 doc="Per-visit concatenation of Source Table",
1389 name="{catalogType}sourceTable_visit",
1390 storageClass="ArrowAstropy",
1391 dimensions=("instrument", "visit")
1392 )
1393
1394
1395class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1396 pipelineConnections=ConsolidateSourceTableConnections):
1397 pass
1398
1399
1400class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1401 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1402 """
1403 _DefaultName = "consolidateSourceTable"
1404 ConfigClass = ConsolidateSourceTableConfig
1405
1406 inputDataset = "sourceTable"
1407 outputDataset = "sourceTable_visit"
1408
1409 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1410 from .makeWarp import reorderRefs
1411
1412 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1413 detectorOrder.sort()
1414 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1415 inputs = butlerQC.get(inputRefs)
1416 self.log.info("Concatenating %s per-detector Source Tables",
1417 len(inputs["inputCatalogs"]))
1418 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1419 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1420
1421
1422class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1423 dimensions=("instrument",),
1424 defaultTemplates={"calexpType": ""}):
1425 visitSummaryRefs = connectionTypes.Input(
1426 doc="Data references for per-visit consolidated exposure metadata",
1427 name="finalVisitSummary",
1428 storageClass="ExposureCatalog",
1429 dimensions=("instrument", "visit"),
1430 multiple=True,
1431 deferLoad=True,
1432 )
1433 outputCatalog = connectionTypes.Output(
1434 doc="CCD and Visit metadata table",
1435 name="ccdVisitTable",
1436 storageClass="ArrowAstropy",
1437 dimensions=("instrument",)
1438 )
1439
1440
1441class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1442 pipelineConnections=MakeCcdVisitTableConnections):
1443 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1444
1445
1446class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1447 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1448 """
1449 _DefaultName = "makeCcdVisitTable"
1450 ConfigClass = MakeCcdVisitTableConfig
1451
1452 def run(self, visitSummaryRefs):
1453 """Make a table of ccd information from the visit summary catalogs.
1454
1455 Parameters
1456 ----------
1457 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1458 List of DeferredDatasetHandles pointing to exposure catalogs with
1459 per-detector summary information.
1460
1461 Returns
1462 -------
1463 result : `~lsst.pipe.base.Struct`
1464 Results struct with attribute:
1465
1466 ``outputCatalog``
1467 Catalog of ccd and visit information.
1468 """
1469 ccdEntries = []
1470 for visitSummaryRef in visitSummaryRefs:
1471 visitSummary = visitSummaryRef.get()
1472 if not visitSummary:
1473 continue
1474 visitInfo = visitSummary[0].getVisitInfo()
1475
1476 # Strip provenance to prevent merge confusion.
1477 strip_provenance_from_fits_header(visitSummary.metadata)
1478
1479 ccdEntry = {}
1480 summaryTable = visitSummary.asAstropy()
1481 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1482 "pixelScale", "zenithDistance",
1483 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1484 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1485 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1486 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1487 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1488 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1489 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1490 "maxDistToNearestPsf",
1491 "effTime", "effTimePsfSigmaScale",
1492 "effTimeSkyBgScale", "effTimeZeroPointScale",
1493 "magLim"]
1494 ccdEntry = summaryTable[selectColumns]
1495 # 'visit' is the human readable visit number.
1496 # 'visitId' is the key to the visitId table. They are the same.
1497 # Technically you should join to get the visit from the visit
1498 # table.
1499 ccdEntry.rename_column("visit", "visitId")
1500 ccdEntry.rename_column("id", "detectorId")
1501
1502 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1503 # compatibility. To be removed after September 2023.
1504 ccdEntry["decl"] = ccdEntry["dec"]
1505
1506 ccdEntry["ccdVisitId"] = [
1507 self.config.idGenerator.apply(
1508 visitSummaryRef.dataId,
1509 detector=detector_id,
1510 is_exposure=False,
1511 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1512 # because it's usually the ID for a whole catalog
1513 # with a {visit, detector}, and that's the main
1514 # use case for IdGenerator. This usage for a
1515 # summary table is rare.
1516 for detector_id in summaryTable["id"]
1517 ]
1518 ccdEntry["detector"] = summaryTable["id"]
1519 ccdEntry["seeing"] = (
1520 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1521 )
1522 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1523 ccdEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns")
1524 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1525 expTime = visitInfo.getExposureTime()
1526 ccdEntry["obsStart"] = (
1527 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1528 )
1529 expTime_days = expTime / (60*60*24)
1530 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1531 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1532 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1533 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1534 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1535 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1536 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1537 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1538 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1539 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1540 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1541 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1542 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1543 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1544 # values are actually wanted.
1545 ccdEntries.append(ccdEntry)
1546
1547 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1548 return pipeBase.Struct(outputCatalog=outputCatalog)
1549
1550
1551class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1552 dimensions=("instrument",),
1553 defaultTemplates={"calexpType": ""}):
1554 visitSummaries = connectionTypes.Input(
1555 doc="Per-visit consolidated exposure metadata",
1556 name="finalVisitSummary",
1557 storageClass="ExposureCatalog",
1558 dimensions=("instrument", "visit",),
1559 multiple=True,
1560 deferLoad=True,
1561 )
1562 outputCatalog = connectionTypes.Output(
1563 doc="Visit metadata table",
1564 name="visitTable",
1565 storageClass="ArrowAstropy",
1566 dimensions=("instrument",)
1567 )
1568
1569
1570class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1571 pipelineConnections=MakeVisitTableConnections):
1572 pass
1573
1574
1575class MakeVisitTableTask(pipeBase.PipelineTask):
1576 """Produce a `visitTable` from the visit summary exposure catalogs.
1577 """
1578 _DefaultName = "makeVisitTable"
1579 ConfigClass = MakeVisitTableConfig
1580
1581 def run(self, visitSummaries):
1582 """Make a table of visit information from the visit summary catalogs.
1583
1584 Parameters
1585 ----------
1586 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1587 List of exposure catalogs with per-detector summary information.
1588 Returns
1589 -------
1590 result : `~lsst.pipe.base.Struct`
1591 Results struct with attribute:
1592
1593 ``outputCatalog``
1594 Catalog of visit information.
1595 """
1596 visitEntries = []
1597 for visitSummary in visitSummaries:
1598 visitSummary = visitSummary.get()
1599 if not visitSummary:
1600 continue
1601 visitRow = visitSummary[0]
1602 visitInfo = visitRow.getVisitInfo()
1603
1604 visitEntry = {}
1605 visitEntry["visitId"] = visitRow["visit"]
1606 visitEntry["visit"] = visitRow["visit"]
1607 visitEntry["physical_filter"] = visitRow["physical_filter"]
1608 visitEntry["band"] = visitRow["band"]
1609 raDec = visitInfo.getBoresightRaDec()
1610 visitEntry["ra"] = raDec.getRa().asDegrees()
1611 visitEntry["dec"] = raDec.getDec().asDegrees()
1612
1613 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1614 # compatibility. To be removed after September 2023.
1615 visitEntry["decl"] = visitEntry["dec"]
1616
1617 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1618 azAlt = visitInfo.getBoresightAzAlt()
1619 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1620 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1621 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1622 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1623 expTime = visitInfo.getExposureTime()
1624 visitEntry["expTime"] = expTime
1625 visitEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns")
1626 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1627 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1628 expTime_days = expTime / (60*60*24)
1629 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1630 visitEntries.append(visitEntry)
1631
1632 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1633 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1634 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1635
1636 outputCatalog = astropy.table.Table(rows=visitEntries)
1637 return pipeBase.Struct(outputCatalog=outputCatalog)
1638
1639
1640class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1641 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1642
1643 inputCatalog = connectionTypes.Input(
1644 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1645 "By default, it is the output of ForcedPhotCcdTask on calexps",
1646 name="forced_src",
1647 storageClass="SourceCatalog",
1648 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1649 )
1650 inputCatalogDiff = connectionTypes.Input(
1651 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1652 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1653 name="forced_diff",
1654 storageClass="SourceCatalog",
1655 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1656 )
1657 outputCatalog = connectionTypes.Output(
1658 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1659 name="mergedForcedSource",
1660 storageClass="DataFrame",
1661 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1662 )
1663
1664
1665class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1666 pipelineConnections=WriteForcedSourceTableConnections):
1668 doc="Column on which to join the two input tables on and make the primary key of the output",
1669 dtype=str,
1670 default="objectId",
1671 )
1672
1673
1674class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1675 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1676
1677 Because the predecessor ForcedPhotCcdTask operates per-detector,
1678 per-tract, (i.e., it has tract in its dimensions), detectors
1679 on the tract boundary may have multiple forced source catalogs.
1680
1681 The successor task TransformForcedSourceTable runs per-patch
1682 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1683 available multiple epochs.
1684 """
1685 _DefaultName = "writeForcedSourceTable"
1686 ConfigClass = WriteForcedSourceTableConfig
1687
1688 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1689 inputs = butlerQC.get(inputRefs)
1690 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1691 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1692 inputs["band"] = butlerQC.quantum.dataId["band"]
1693 outputs = self.run(**inputs)
1694 butlerQC.put(outputs, outputRefs)
1695
1696 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1697 dfs = []
1698 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1699 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1700 df = df.reindex(sorted(df.columns), axis=1)
1701 df["visit"] = visit
1702 # int16 instead of uint8 because databases don't like unsigned bytes.
1703 df["detector"] = np.int16(detector)
1704 df["band"] = band if band else pd.NA
1705 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1706 names=("dataset", "column"))
1707
1708 dfs.append(df)
1709
1710 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1711 return pipeBase.Struct(outputCatalog=outputCatalog)
1712
1713
1714class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1715 dimensions=("instrument", "skymap", "patch", "tract")):
1716
1717 inputCatalogs = connectionTypes.Input(
1718 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1719 name="mergedForcedSource",
1720 storageClass="DataFrame",
1721 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1722 multiple=True,
1723 deferLoad=True
1724 )
1725 referenceCatalog = connectionTypes.Input(
1726 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1727 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1728 "are expected.",
1729 name="objectTable",
1730 storageClass="DataFrame",
1731 dimensions=("tract", "patch", "skymap"),
1732 deferLoad=True
1733 )
1734 outputCatalog = connectionTypes.Output(
1735 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1736 "specified set of functors",
1737 name="forcedSourceTable",
1738 storageClass="DataFrame",
1739 dimensions=("tract", "patch", "skymap")
1740 )
1741
1742
1743class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1744 pipelineConnections=TransformForcedSourceTableConnections):
1745 referenceColumns = pexConfig.ListField(
1746 dtype=str,
1747 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1748 optional=True,
1749 doc="Columns to pull from reference catalog",
1750 )
1751 keyRef = lsst.pex.config.Field(
1752 doc="Column on which to join the two input tables on and make the primary key of the output",
1753 dtype=str,
1754 default="objectId",
1755 )
1757 doc="Rename the output DataFrame index to this name",
1758 dtype=str,
1759 default="forcedSourceId",
1760 )
1761
1762 def setDefaults(self):
1763 super().setDefaults()
1764 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1765 self.columnsFromDataId = ["tract", "patch"]
1766
1767
1768class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1769 """Transform/standardize a ForcedSource catalog
1770
1771 Transforms each wide, per-detector forcedSource DataFrame per the
1772 specification file (per-camera defaults found in ForcedSource.yaml).
1773 All epochs that overlap the patch are aggregated into one per-patch
1774 narrow-DataFrame file.
1775
1776 No de-duplication of rows is performed. Duplicate resolutions flags are
1777 pulled in from the referenceCatalog: `detect_isPrimary`,
1778 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1779 for analysis or compare duplicates for QA.
1780
1781 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1782 per-visit rows can be retreived by joining with the CcdVisitTable on
1783 ccdVisitId.
1784 """
1785 _DefaultName = "transformForcedSourceTable"
1786 ConfigClass = TransformForcedSourceTableConfig
1787
1788 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1789 inputs = butlerQC.get(inputRefs)
1790 if self.funcs is None:
1791 raise ValueError("config.functorFile is None. "
1792 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1793 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1794 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1795
1796 butlerQC.put(outputs, outputRefs)
1797
1798 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1799 dfs = []
1800 refColumns = list(self.config.referenceColumns)
1801 refColumns.append(self.config.keyRef)
1802 ref = referenceCatalog.get(parameters={"columns": refColumns})
1803 if ref.index.name != self.config.keyRef:
1804 # If the DataFrame we loaded was originally written as some other
1805 # Parquet type, it probably doesn't have the index set. If it was
1806 # written as a DataFrame, the index should already be set and
1807 # trying to set it again would be an error, since it doens't exist
1808 # as a regular column anymore.
1809 ref.set_index(self.config.keyRef, inplace=True)
1810 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1811 for handle in inputCatalogs:
1812 result = self.transform(None, handle, funcs, dataId)
1813 # Filter for only rows that were detected on (overlap) the patch
1814 dfs.append(result.df.join(ref, how="inner"))
1815
1816 outputCatalog = pd.concat(dfs)
1817
1818 # Now that we are done joining on config.keyRef
1819 # Change index to config.key by
1820 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1821 # Add config.keyRef to the column list
1822 outputCatalog.reset_index(inplace=True)
1823 # Set the forcedSourceId to the index. This is specified in the
1824 # ForcedSource.yaml
1825 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1826 # Rename it to the config.key
1827 outputCatalog.index.rename(self.config.key, inplace=True)
1828
1829 self.log.info("Made a table of %d columns and %d rows",
1830 len(outputCatalog.columns), len(outputCatalog))
1831 return pipeBase.Struct(outputCatalog=outputCatalog)
1832
1833
1834class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1835 defaultTemplates={"catalogType": ""},
1836 dimensions=("instrument", "tract")):
1837 inputCatalogs = connectionTypes.Input(
1838 doc="Input per-patch DataFrame Tables to be concatenated",
1839 name="{catalogType}ForcedSourceTable",
1840 storageClass="DataFrame",
1841 dimensions=("tract", "patch", "skymap"),
1842 multiple=True,
1843 )
1844
1845 outputCatalog = connectionTypes.Output(
1846 doc="Output per-tract concatenation of DataFrame Tables",
1847 name="{catalogType}ForcedSourceTable_tract",
1848 storageClass="DataFrame",
1849 dimensions=("tract", "skymap"),
1850 )
1851
1852
1853class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1854 pipelineConnections=ConsolidateTractConnections):
1855 pass
1856
1857
1858class ConsolidateTractTask(pipeBase.PipelineTask):
1859 """Concatenate any per-patch, dataframe list into a single
1860 per-tract DataFrame.
1861 """
1862 _DefaultName = "ConsolidateTract"
1863 ConfigClass = ConsolidateTractConfig
1864
1865 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1866 inputs = butlerQC.get(inputRefs)
1867 # Not checking at least one inputCatalog exists because that'd be an
1868 # empty QG.
1869 self.log.info("Concatenating %s per-patch %s Tables",
1870 len(inputs["inputCatalogs"]),
1871 inputRefs.inputCatalogs[0].datasetType.name)
1872 df = pd.concat(inputs["inputCatalogs"])
1873 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)