Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+05816baf74,g1ec0fe41b4+f536777771,g1fd858c14a+a9301854fb,g35bb328faa+fcb1d3bbc8,g4af146b050+a5c07d5b1d,g4d2262a081+6e5fcc2a4e,g53246c7159+fcb1d3bbc8,g56a49b3a55+9c12191793,g5a012ec0e7+3632fc3ff3,g60b5630c4e+ded28b650d,g67b6fd64d1+ed4b5058f4,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g8352419a5c+fcb1d3bbc8,g87b7deb4dc+7b42cf88bf,g8852436030+e5453db6e6,g89139ef638+ed4b5058f4,g8e3bb8577d+d38d73bdbd,g9125e01d80+fcb1d3bbc8,g94187f82dc+ded28b650d,g989de1cb63+ed4b5058f4,g9d31334357+ded28b650d,g9f33ca652e+50a8019d8c,gabe3b4be73+1e0a283bba,gabf8522325+fa80ff7197,gb1101e3267+d9fb1f8026,gb58c049af0+f03b321e39,gb665e3612d+2a0c9e9e84,gb89ab40317+ed4b5058f4,gcf25f946ba+e5453db6e6,gd6cbbdb0b4+bb83cc51f8,gdd1046aedd+ded28b650d,gde0f65d7ad+941d412827,ge278dab8ac+d65b3c2b70,ge410e46f29+ed4b5058f4,gf23fb2af72+b7cae620c0,gf5e32f922b+fcb1d3bbc8,gf67bdafdda+ed4b5058f4,w.2025.16
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40import functools
41import logging
42import numbers
43import os
44
45import numpy as np
46import pandas as pd
47import astropy.table
48from astro_metadata_translator.headers import merge_headers
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header
60
61from .functors import CompositeFunctor, Column
62
63log = logging.getLogger(__name__)
64
65
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
68 """
69 newDf = pd.DataFrame()
70 # band is the level 0 index
71 dfBands = df.columns.unique(level=0).values
72 for band in dfBands:
73 subdf = df[band]
74 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c in subdf.columns if c not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
79
80 # Band must be present in the input and output or else column is all NaN:
81 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
82 # Get the unexploded columns from any present band's partition
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
85 return newDf
86
87
89 """A helper class for stacking astropy tables without having them all in
90 memory at once.
91
92 Parameters
93 ----------
94 capacity : `int`
95 Full size of the final table.
96
97 Notes
98 -----
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
102 """
103
104 def __init__(self, capacity):
105 self.index = 0
106 self.capacity = capacity
107 self.result = None
108
109 @classmethod
110 def from_handles(cls, handles):
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
113
114 Parameters
115 ----------
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
120
121 Returns
122 -------
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
126 """
127 capacity = sum(handle.get(component="rowcount") for handle in handles)
128 return cls(capacity=capacity)
129
130 def extend(self, table):
131 """Add a single table to the stack.
132
133 Parameters
134 ----------
135 table : `astropy.table.Table`
136 An astropy table instance.
137 """
138 if self.result is None:
139 self.result = astropy.table.Table()
140 for name in table.colnames:
141 column = table[name]
142 column_cls = type(column)
143 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
144 self.result[name][:len(table)] = column
145 self.index = len(table)
146 self.result.meta = table.meta.copy()
147 else:
148 next_index = self.index + len(table)
149 if set(self.result.colnames) != set(table.colnames):
150 raise TypeError(
151 "Inconsistent columns in concatentation: "
152 f"{set(self.result.colnames).symmetric_difference(table.colnames)}"
153 )
154 for name in table.colnames:
155 out_col = self.result[name]
156 in_col = table[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.result[name][self.index:next_index] = table[name]
160 self.index = next_index
161 # Butler provenance should be stripped on merge. It will be
162 # added by butler on write. No attempt is made here to combine
163 # provenance from multiple input tables.
164 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
165 strip_provenance_from_fits_header(self.result.meta)
166
167 @classmethod
168 def vstack_handles(cls, handles):
169 """Vertically stack tables represented by deferred dataset handles.
170
171 Parameters
172 ----------
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
177
178 Returns
179 -------
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
183 """
184 handles = tuple(handles) # guard against single-pass iterators
185 # Ensure that zero length catalogs are not included
186 rowcount = tuple(handle.get(component="rowcount") for handle in handles)
187 handles = tuple(handle for handle, count in zip(handles, rowcount) if count > 0)
188
189 vstack = cls.from_handles(handles)
190 for handle in handles:
191 vstack.extend(handle.get())
192 return vstack.result
193
194
195class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
196 defaultTemplates={"coaddName": "deep"},
197 dimensions=("tract", "patch", "skymap")):
198 inputCatalogMeas = connectionTypes.Input(
199 doc="Catalog of source measurements on the deepCoadd.",
200 dimensions=("tract", "patch", "band", "skymap"),
201 storageClass="SourceCatalog",
202 name="{coaddName}Coadd_meas",
203 multiple=True
204 )
205 inputCatalogForcedSrc = connectionTypes.Input(
206 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
207 dimensions=("tract", "patch", "band", "skymap"),
208 storageClass="SourceCatalog",
209 name="{coaddName}Coadd_forced_src",
210 multiple=True
211 )
212 inputCatalogPsfsMultiprofit = connectionTypes.Input(
213 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
214 dimensions=("tract", "patch", "band", "skymap"),
215 storageClass="ArrowAstropy",
216 name="{coaddName}Coadd_psfs_multiprofit",
217 multiple=True,
218 )
219 outputCatalog = connectionTypes.Output(
220 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
221 "stored as a DataFrame with a multi-level column index per-patch.",
222 dimensions=("tract", "patch", "skymap"),
223 storageClass="DataFrame",
224 name="{coaddName}Coadd_obj"
225 )
226
227
228class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
229 pipelineConnections=WriteObjectTableConnections):
230 coaddName = pexConfig.Field(
231 dtype=str,
232 default="deep",
233 doc="Name of coadd"
234 )
235
236
237class WriteObjectTableTask(pipeBase.PipelineTask):
238 """Write filter-merged object tables as a DataFrame in parquet format.
239 """
240 _DefaultName = "writeObjectTable"
241 ConfigClass = WriteObjectTableConfig
242
243 # Tag of output dataset written by `MergeSourcesTask.write`
244 outputDataset = "obj"
245
246 def runQuantum(self, butlerQC, inputRefs, outputRefs):
247 inputs = butlerQC.get(inputRefs)
248
249 catalogs = defaultdict(dict)
250 for dataset, connection in (
251 ("meas", "inputCatalogMeas"),
252 ("forced_src", "inputCatalogForcedSrc"),
253 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
254 ):
255 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
256 catalogs[ref.dataId["band"]][dataset] = cat
257
258 dataId = butlerQC.quantum.dataId
259 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
260 outputs = pipeBase.Struct(outputCatalog=df)
261 butlerQC.put(outputs, outputRefs)
262
263 def run(self, catalogs, tract, patch):
264 """Merge multiple catalogs.
265
266 Parameters
267 ----------
268 catalogs : `dict`
269 Mapping from filter names to dict of catalogs.
270 tract : int
271 tractId to use for the tractId column.
272 patch : str
273 patchId to use for the patchId column.
274
275 Returns
276 -------
277 catalog : `pandas.DataFrame`
278 Merged dataframe.
279
280 Raises
281 ------
282 ValueError
283 Raised if any of the catalogs is of an unsupported type.
284 """
285 dfs = []
286 for filt, tableDict in catalogs.items():
287 for dataset, table in tableDict.items():
288 # Convert afwTable to pandas DataFrame if needed
289 if isinstance(table, pd.DataFrame):
290 df = table
291 elif isinstance(table, afwTable.SourceCatalog):
292 df = table.asAstropy().to_pandas()
293 elif isinstance(table, astropy.table.Table):
294 df = table.to_pandas()
295 else:
296 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
297 df.set_index("id", drop=True, inplace=True)
298
299 # Sort columns by name, to ensure matching schema among patches
300 df = df.reindex(sorted(df.columns), axis=1)
301 df = df.assign(tractId=tract, patchId=patch)
302
303 # Make columns a 3-level MultiIndex
304 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
305 names=("dataset", "band", "column"))
306 dfs.append(df)
307
308 # We do this dance and not `pd.concat(dfs)` because the pandas
309 # concatenation uses infinite memory.
310 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
311 return catalog
312
313
314class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
315 defaultTemplates={"catalogType": ""},
316 dimensions=("instrument", "visit", "detector")):
317
318 catalog = connectionTypes.Input(
319 doc="Input full-depth catalog of sources produced by CalibrateTask",
320 name="{catalogType}src",
321 storageClass="SourceCatalog",
322 dimensions=("instrument", "visit", "detector")
323 )
324 outputCatalog = connectionTypes.Output(
325 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
326 name="{catalogType}source",
327 storageClass="ArrowAstropy",
328 dimensions=("instrument", "visit", "detector")
329 )
330
331
332class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
333 pipelineConnections=WriteSourceTableConnections):
334 pass
335
336
337class WriteSourceTableTask(pipeBase.PipelineTask):
338 """Write source table to DataFrame Parquet format.
339 """
340 _DefaultName = "writeSourceTable"
341 ConfigClass = WriteSourceTableConfig
342
343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
344 inputs = butlerQC.get(inputRefs)
345 inputs["visit"] = butlerQC.quantum.dataId["visit"]
346 inputs["detector"] = butlerQC.quantum.dataId["detector"]
347 result = self.run(**inputs)
348 outputs = pipeBase.Struct(outputCatalog=result.table)
349 butlerQC.put(outputs, outputRefs)
350
351 def run(self, catalog, visit, detector, **kwargs):
352 """Convert `src` catalog to an Astropy table.
353
354 Parameters
355 ----------
356 catalog: `afwTable.SourceCatalog`
357 catalog to be converted
358 visit, detector: `int`
359 Visit and detector ids to be added as columns.
360 **kwargs
361 Additional keyword arguments are ignored as a convenience for
362 subclasses that pass the same arguments to several different
363 methods.
364
365 Returns
366 -------
367 result : `~lsst.pipe.base.Struct`
368 ``table``
369 `astropy.table.Table` version of the input catalog
370 """
371 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
372 tbl = catalog.asAstropy()
373 tbl["visit"] = visit
374 # int16 instead of uint8 because databases don't like unsigned bytes.
375 tbl["detector"] = np.int16(detector)
376
377 return pipeBase.Struct(table=tbl)
378
379
380class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
381 defaultTemplates={"catalogType": ""},
382 dimensions=("instrument", "visit", "detector", "skymap")):
383 visitSummary = connectionTypes.Input(
384 doc="Input visit-summary catalog with updated calibration objects.",
385 name="finalVisitSummary",
386 storageClass="ExposureCatalog",
387 dimensions=("instrument", "visit",),
388 )
389
390 def __init__(self, config):
391 # We don't want the input catalog here to be an initial existence
392 # constraint in QG generation, because that can unfortunately limit the
393 # set of data IDs of inputs to other tasks, even those that run earlier
394 # (e.g. updateVisitSummary), when the input 'src' catalog is not
395 # produced. It's safer to just use 'visitSummary' existence as an
396 # initial constraint, and then let the graph prune out the detectors
397 # that don't have a 'src' for this task only.
398 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
399
400
401class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
402 pipelineConnections=WriteRecalibratedSourceTableConnections):
403
404 doReevaluatePhotoCalib = pexConfig.Field(
405 dtype=bool,
406 default=True,
407 doc=("Add or replace local photoCalib columns"),
408 )
409 doReevaluateSkyWcs = pexConfig.Field(
410 dtype=bool,
411 default=True,
412 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
413 )
414
415
416class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
417 """Write source table to DataFrame Parquet format.
418 """
419 _DefaultName = "writeRecalibratedSourceTable"
420 ConfigClass = WriteRecalibratedSourceTableConfig
421
422 def runQuantum(self, butlerQC, inputRefs, outputRefs):
423 inputs = butlerQC.get(inputRefs)
424
425 inputs["visit"] = butlerQC.quantum.dataId["visit"]
426 inputs["detector"] = butlerQC.quantum.dataId["detector"]
427
428 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
429 exposure = ExposureF()
430 inputs["exposure"] = self.prepareCalibratedExposure(
431 exposure=exposure,
432 visitSummary=inputs["visitSummary"],
433 detectorId=butlerQC.quantum.dataId["detector"]
434 )
435 inputs["catalog"] = self.addCalibColumns(**inputs)
436
437 result = self.run(**inputs)
438 outputs = pipeBase.Struct(outputCatalog=result.table)
439 butlerQC.put(outputs, outputRefs)
440
441 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
442 """Prepare a calibrated exposure and apply external calibrations
443 if so configured.
444
445 Parameters
446 ----------
447 exposure : `lsst.afw.image.exposure.Exposure`
448 Input exposure to adjust calibrations. May be an empty Exposure.
449 detectorId : `int`
450 Detector ID associated with the exposure.
451 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
452 Exposure catalog with all calibration objects. WCS and PhotoCalib
453 are always applied if ``visitSummary`` is provided and those
454 components are not `None`.
455
456 Returns
457 -------
458 exposure : `lsst.afw.image.exposure.Exposure`
459 Exposure with adjusted calibrations.
460 """
461 if visitSummary is not None:
462 row = visitSummary.find(detectorId)
463 if row is None:
464 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
465 if (photoCalib := row.getPhotoCalib()) is None:
466 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
467 "skipping reevaluation of photoCalib.", detectorId)
468 exposure.setPhotoCalib(None)
469 else:
470 exposure.setPhotoCalib(photoCalib)
471 if (skyWcs := row.getWcs()) is None:
472 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
473 "skipping reevaluation of skyWcs.", detectorId)
474 exposure.setWcs(None)
475 else:
476 exposure.setWcs(skyWcs)
477
478 return exposure
479
480 def addCalibColumns(self, catalog, exposure, **kwargs):
481 """Add replace columns with calibs evaluated at each centroid
482
483 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
484 a source catalog, by rerunning the plugins.
485
486 Parameters
487 ----------
488 catalog : `lsst.afw.table.SourceCatalog`
489 catalog to which calib columns will be added
490 exposure : `lsst.afw.image.exposure.Exposure`
491 Exposure with attached PhotoCalibs and SkyWcs attributes to be
492 reevaluated at local centroids. Pixels are not required.
493 **kwargs
494 Additional keyword arguments are ignored to facilitate passing the
495 same arguments to several methods.
496
497 Returns
498 -------
499 newCat: `lsst.afw.table.SourceCatalog`
500 Source Catalog with requested local calib columns
501 """
502 measureConfig = SingleFrameMeasurementTask.ConfigClass()
503 measureConfig.doReplaceWithNoise = False
504
505 # Clear all slots, because we aren't running the relevant plugins.
506 for slot in measureConfig.slots:
507 setattr(measureConfig.slots, slot, None)
508
509 measureConfig.plugins.names = []
510 if self.config.doReevaluateSkyWcs:
511 measureConfig.plugins.names.add("base_LocalWcs")
512 self.log.info("Re-evaluating base_LocalWcs plugin")
513 if self.config.doReevaluatePhotoCalib:
514 measureConfig.plugins.names.add("base_LocalPhotoCalib")
515 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
516 pluginsNotToCopy = tuple(measureConfig.plugins.names)
517
518 # Create a new schema and catalog
519 # Copy all columns from original except for the ones to reevaluate
520 aliasMap = catalog.schema.getAliasMap()
521 mapper = afwTable.SchemaMapper(catalog.schema)
522 for item in catalog.schema:
523 if not item.field.getName().startswith(pluginsNotToCopy):
524 mapper.addMapping(item.key)
525
526 schema = mapper.getOutputSchema()
527 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
528 schema.setAliasMap(aliasMap)
529 newCat = afwTable.SourceCatalog(schema)
530 newCat.extend(catalog, mapper=mapper)
531
532 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
533 # update here. LocalPhotoCalibs are applied during transform tasks.
534 # Update coord_ra/coord_dec, which are expected to be positions on the
535 # sky and are used as such in sdm tables without transform
536 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
537 afwTable.updateSourceCoords(exposure.wcs, newCat)
538 wcsPlugin = measurement.plugins["base_LocalWcs"]
539 else:
540 wcsPlugin = None
541
542 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
543 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
544 else:
545 pcPlugin = None
546
547 for row in newCat:
548 if wcsPlugin is not None:
549 wcsPlugin.measure(row, exposure)
550 if pcPlugin is not None:
551 pcPlugin.measure(row, exposure)
552
553 return newCat
554
555
556class PostprocessAnalysis(object):
557 """Calculate columns from DataFrames or handles storing DataFrames.
558
559 This object manages and organizes an arbitrary set of computations
560 on a catalog. The catalog is defined by a
561 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
562 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
563 computations are defined by a collection of
564 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
565 ``CompositeFunctor``).
566
567 After the object is initialized, accessing the ``.df`` attribute (which
568 holds the `pandas.DataFrame` containing the results of the calculations)
569 triggers computation of said dataframe.
570
571 One of the conveniences of using this object is the ability to define a
572 desired common filter for all functors. This enables the same functor
573 collection to be passed to several different `PostprocessAnalysis` objects
574 without having to change the original functor collection, since the ``filt``
575 keyword argument of this object triggers an overwrite of the ``filt``
576 property for all functors in the collection.
577
578 This object also allows a list of refFlags to be passed, and defines a set
579 of default refFlags that are always included even if not requested.
580
581 If a list of DataFrames or Handles is passed, rather than a single one,
582 then the calculations will be mapped over all the input catalogs. In
583 principle, it should be straightforward to parallelize this activity, but
584 initial tests have failed (see TODO in code comments).
585
586 Parameters
587 ----------
588 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
589 `~lsst.pipe.base.InMemoryDatasetHandle` or
590 list of these.
591 Source catalog(s) for computation.
592 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
593 Computations to do (functors that act on ``handles``).
594 If a dict, the output
595 DataFrame will have columns keyed accordingly.
596 If a list, the column keys will come from the
597 ``.shortname`` attribute of each functor.
598
599 filt : `str`, optional
600 Filter in which to calculate. If provided,
601 this will overwrite any existing ``.filt`` attribute
602 of the provided functors.
603
604 flags : `list`, optional
605 List of flags (per-band) to include in output table.
606 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
607
608 refFlags : `list`, optional
609 List of refFlags (only reference band) to include in output table.
610
611 forcedFlags : `list`, optional
612 List of flags (per-band) to include in output table.
613 Taken from the ``forced_src`` dataset if applied to a
614 multilevel Object Table. Intended for flags from measurement plugins
615 only run during multi-band forced-photometry.
616 """
617 _defaultRefFlags = []
618 _defaultFuncs = ()
619
620 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
621 self.handles = handles
622 self.functors = functors
623
624 self.filt = filt
625 self.flags = list(flags) if flags is not None else []
626 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
627 self.refFlags = list(self._defaultRefFlags)
628 if refFlags is not None:
629 self.refFlags += list(refFlags)
630
631 self._df = None
632
633 @property
634 def defaultFuncs(self):
635 funcs = dict(self._defaultFuncs)
636 return funcs
637
638 @property
639 def func(self):
640 additionalFuncs = self.defaultFuncs
641 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
642 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
643 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
644
645 if isinstance(self.functors, CompositeFunctor):
646 func = self.functors
647 else:
648 func = CompositeFunctor(self.functors)
649
650 func.funcDict.update(additionalFuncs)
651 func.filt = self.filt
652
653 return func
654
655 @property
656 def noDupCols(self):
657 return [name for name, func in self.func.funcDict.items() if func.noDup]
658
659 @property
660 def df(self):
661 if self._df is None:
662 self.compute()
663 return self._df
664
665 def compute(self, dropna=False, pool=None):
666 # map over multiple handles
667 if type(self.handles) in (list, tuple):
668 if pool is None:
669 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
670 else:
671 # TODO: Figure out why this doesn't work (pyarrow pickling
672 # issues?)
673 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
674 self._df = pd.concat(dflist)
675 else:
676 self._df = self.func(self.handles, dropna=dropna)
677
678 return self._df
679
680
681class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
682 dimensions=()):
683 """Expected Connections for subclasses of TransformCatalogBaseTask.
684
685 Must be subclassed.
686 """
687 inputCatalog = connectionTypes.Input(
688 name="",
689 storageClass="DataFrame",
690 )
691 outputCatalog = connectionTypes.Output(
692 name="",
693 storageClass="ArrowAstropy",
694 )
695
696
697class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
698 pipelineConnections=TransformCatalogBaseConnections):
699 functorFile = pexConfig.Field(
700 dtype=str,
701 doc="Path to YAML file specifying Science Data Model functors to use "
702 "when copying columns and computing calibrated values.",
703 default=None,
704 optional=True
705 )
706 primaryKey = pexConfig.Field(
707 dtype=str,
708 doc="Name of column to be set as the DataFrame index. If None, the index"
709 "will be named `id`",
710 default=None,
711 optional=True
712 )
713 columnsFromDataId = pexConfig.ListField(
714 dtype=str,
715 default=None,
716 optional=True,
717 doc="Columns to extract from the dataId",
718 )
719
720
721class TransformCatalogBaseTask(pipeBase.PipelineTask):
722 """Base class for transforming/standardizing a catalog by applying functors
723 that convert units and apply calibrations.
724
725 The purpose of this task is to perform a set of computations on an input
726 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
727 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
728 a new dataset (which needs to be declared in an ``outputDataset``
729 attribute).
730
731 The calculations to be performed are defined in a YAML file that specifies
732 a set of functors to be computed, provided as a ``--functorFile`` config
733 parameter. An example of such a YAML file is the following:
734
735 funcs:
736 sourceId:
737 functor: Index
738 x:
739 functor: Column
740 args: slot_Centroid_x
741 y:
742 functor: Column
743 args: slot_Centroid_y
744 psfFlux:
745 functor: LocalNanojansky
746 args:
747 - slot_PsfFlux_instFlux
748 - slot_PsfFlux_instFluxErr
749 - base_LocalPhotoCalib
750 - base_LocalPhotoCalibErr
751 psfFluxErr:
752 functor: LocalNanojanskyErr
753 args:
754 - slot_PsfFlux_instFlux
755 - slot_PsfFlux_instFluxErr
756 - base_LocalPhotoCalib
757 - base_LocalPhotoCalibErr
758 flags:
759 - detect_isPrimary
760
761 The names for each entry under "func" will become the names of columns in
762 the output dataset. All the functors referenced are defined in
763 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
764 functor are in the `args` list, and any additional entries for each column
765 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
766 treated as keyword arguments to be passed to the functor initialization.
767
768 The "flags" entry is the default shortcut for `Column` functors.
769 All columns listed under "flags" will be copied to the output table
770 untransformed. They can be of any datatype.
771 In the special case of transforming a multi-level oject table with
772 band and dataset indices (deepCoadd_obj), these will be taked from the
773 ``meas`` dataset and exploded out per band.
774
775 There are two special shortcuts that only apply when transforming
776 multi-level Object (deepCoadd_obj) tables:
777 - The "refFlags" entry is shortcut for `Column` functor
778 taken from the ``ref`` dataset if transforming an ObjectTable.
779 - The "forcedFlags" entry is shortcut for `Column` functors.
780 taken from the ``forced_src`` dataset if transforming an ObjectTable.
781 These are expanded out per band.
782
783
784 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
785 to organize and excecute the calculations.
786 """
787 @property
788 def _DefaultName(self):
789 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
790
791 @property
792 def outputDataset(self):
793 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
794
795 @property
796 def inputDataset(self):
797 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
798
799 @property
800 def ConfigClass(self):
801 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
802
803 def __init__(self, *args, **kwargs):
804 super().__init__(*args, **kwargs)
805 if self.config.functorFile:
806 self.log.info("Loading tranform functor definitions from %s",
807 self.config.functorFile)
808 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
809 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
810 else:
811 self.funcs = None
812
813 def runQuantum(self, butlerQC, inputRefs, outputRefs):
814 inputs = butlerQC.get(inputRefs)
815 if self.funcs is None:
816 raise ValueError("config.functorFile is None. "
817 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
818 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
819 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
820 butlerQC.put(result, outputRefs)
821
822 def run(self, handle, funcs=None, dataId=None, band=None):
823 """Do postprocessing calculations
824
825 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
826 ``DataFrame`` object and dataId,
827 returns a dataframe with results of postprocessing calculations.
828
829 Parameters
830 ----------
831 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
832 `~lsst.pipe.base.InMemoryDatasetHandle` or
833 `~pandas.DataFrame`, or list of these.
834 DataFrames from which calculations are done.
835 funcs : `~lsst.pipe.tasks.functors.Functor`
836 Functors to apply to the table's columns
837 dataId : dict, optional
838 Used to add a `patchId` column to the output dataframe.
839 band : `str`, optional
840 Filter band that is being processed.
841
842 Returns
843 -------
844 result : `lsst.pipe.base.Struct`
845 Result struct, with a single ``outputCatalog`` attribute holding
846 the transformed catalog.
847 """
848 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
849
850 df = self.transform(band, handle, funcs, dataId).df
851 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
852 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
853 return result
854
855 def getFunctors(self):
856 return self.funcs
857
858 def getAnalysis(self, handles, funcs=None, band=None):
859 if funcs is None:
860 funcs = self.funcs
861 analysis = PostprocessAnalysis(handles, funcs, filt=band)
862 return analysis
863
864 def transform(self, band, handles, funcs, dataId):
865 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
866 df = analysis.df
867 if dataId and self.config.columnsFromDataId:
868 for key in self.config.columnsFromDataId:
869 if key in dataId:
870 if key == "detector":
871 # int16 instead of uint8 because databases don't like unsigned bytes.
872 df[key] = np.int16(dataId[key])
873 else:
874 df[key] = dataId[key]
875 else:
876 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
877
878 if self.config.primaryKey:
879 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
880 df.reset_index(inplace=True, drop=True)
881 df.set_index(self.config.primaryKey, inplace=True)
882
883 return pipeBase.Struct(
884 df=df,
885 analysis=analysis
886 )
887
888
889class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
890 defaultTemplates={"coaddName": "deep"},
891 dimensions=("tract", "patch", "skymap")):
892 inputCatalog = connectionTypes.Input(
893 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
894 "stored as a DataFrame with a multi-level column index per-patch.",
895 dimensions=("tract", "patch", "skymap"),
896 storageClass="DataFrame",
897 name="{coaddName}Coadd_obj",
898 deferLoad=True,
899 )
900 inputCatalogRef = connectionTypes.Input(
901 doc="Catalog marking the primary detection (which band provides a good shape and position)"
902 "for each detection in deepCoadd_mergeDet.",
903 dimensions=("tract", "patch", "skymap"),
904 storageClass="SourceCatalog",
905 name="{coaddName}Coadd_ref",
906 deferLoad=True,
907 )
908 inputCatalogSersicMultiprofit = connectionTypes.Input(
909 doc="Catalog of source measurements on the deepCoadd.",
910 dimensions=("tract", "patch", "skymap"),
911 storageClass="ArrowAstropy",
912 name="{coaddName}Coadd_Sersic_multiprofit",
913 deferLoad=True,
914 )
915 inputCatalogEpoch = connectionTypes.Input(
916 doc="Catalog of mean epochs for each object per band.",
917 dimensions=("tract", "patch", "skymap"),
918 storageClass="ArrowAstropy",
919 name="object_epoch",
920 deferLoad=True,
921 )
922 outputCatalog = connectionTypes.Output(
923 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
924 "data model.",
925 dimensions=("tract", "patch", "skymap"),
926 storageClass="ArrowAstropy",
927 name="objectTable"
928 )
929
930 def __init__(self, *, config=None):
931 super().__init__(config=config)
932 if config.multilevelOutput:
933 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
934
935
936class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
937 pipelineConnections=TransformObjectCatalogConnections):
938 coaddName = pexConfig.Field(
939 dtype=str,
940 default="deep",
941 doc="Name of coadd"
942 )
943 outputBands = pexConfig.ListField(
944 dtype=str,
945 default=None,
946 optional=True,
947 doc=("These bands and only these bands will appear in the output,"
948 " NaN-filled if the input does not include them."
949 " If None, then use all bands found in the input.")
950 )
951 camelCase = pexConfig.Field(
952 dtype=bool,
953 default=False,
954 doc=("Write per-band columns names with camelCase, else underscore "
955 "For example: gPsFlux instead of g_PsFlux.")
956 )
957 multilevelOutput = pexConfig.Field(
958 dtype=bool,
959 default=False,
960 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
961 "and name-munged (False). If True, the output storage class will be "
962 "set to DataFrame, since astropy tables do not support multi-level indexing."),
963 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
964 )
965 goodFlags = pexConfig.ListField(
966 dtype=str,
967 default=[],
968 doc=("List of 'good' flags that should be set False when populating empty tables. "
969 "All other flags are considered to be 'bad' flags and will be set to True.")
970 )
971 floatFillValue = pexConfig.Field(
972 dtype=float,
973 default=np.nan,
974 doc="Fill value for float fields when populating empty tables."
975 )
976 integerFillValue = pexConfig.Field(
977 dtype=int,
978 default=-1,
979 doc="Fill value for integer fields when populating empty tables."
980 )
981
982 def setDefaults(self):
983 super().setDefaults()
984 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
985 self.primaryKey = "objectId"
986 self.columnsFromDataId = ["tract", "patch"]
987 self.goodFlags = ["calib_astrometry_used",
988 "calib_photometry_reserved",
989 "calib_photometry_used",
990 "calib_psf_candidate",
991 "calib_psf_reserved",
992 "calib_psf_used"]
993
994
995class TransformObjectCatalogTask(TransformCatalogBaseTask):
996 """Produce a flattened Object Table to match the format specified in
997 sdm_schemas.
998
999 Do the same set of postprocessing calculations on all bands.
1000
1001 This is identical to `TransformCatalogBaseTask`, except for that it does
1002 the specified functor calculations for all filters present in the
1003 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1004 by the YAML file will be superceded.
1005 """
1006 _DefaultName = "transformObjectCatalog"
1007 ConfigClass = TransformObjectCatalogConfig
1008
1009 datasets_multiband = ("epoch", "ref", "Sersic_multiprofit")
1010
1011 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1012 inputs = butlerQC.get(inputRefs)
1013 if self.funcs is None:
1014 raise ValueError("config.functorFile is None. "
1015 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1016 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
1017 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1018 handle_epoch=inputs["inputCatalogEpoch"],
1019 handle_ref=inputs["inputCatalogRef"],
1020 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
1021 )
1022 butlerQC.put(result, outputRefs)
1023
1024 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1025 # NOTE: band kwarg is ignored here.
1026 # TODO: Document and improve funcs argument usage in DM-48895
1027 # self.getAnalysis only supports list, dict and CompositeFunctor
1028 if isinstance(funcs, CompositeFunctor):
1029 funcDict_in = funcs.funcDict
1030 elif isinstance(funcs, dict):
1031 funcDict_in = funcs
1032 elif isinstance(funcs, list):
1033 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
1034 else:
1035 raise TypeError(f"Unsupported {type(funcs)=}")
1036
1037 handles_multi = {}
1038 funcDicts_multiband = {}
1039 for dataset in self.datasets_multiband:
1040 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
1041 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
1042 handles_multi[dataset] = handle_multi
1043 funcDicts_multiband[dataset] = {}
1044
1045 dfDict = {}
1046 analysisDict = {}
1047 templateDf = pd.DataFrame()
1048
1049 columns = handle.get(component="columns")
1050 inputBands = columns.unique(level=1).values
1051
1052 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1053
1054 # Split up funcs for per-band and multiband tables
1055 funcDict_band = {}
1056
1057 for name, func in funcDict_in.items():
1058 if func.dataset in funcDicts_multiband:
1059 # This is something like a MultibandColumn
1060 if band := getattr(func, "band_to_check", None):
1061 if band not in outputBands:
1062 continue
1063 # This is something like a ReferenceBand that has configurable bands
1064 elif hasattr(func, "bands"):
1065 # TODO: Determine if this can be avoided DM-48895
1066 # This will work fine if the init doesn't manipulate bands
1067 # If it does, then one would need to make a new functor
1068 # Determining the (kw)args is tricky in that case
1069 func.bands = tuple(inputBands)
1070
1071 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1072 funcDict[name] = func
1073
1074 funcs_band = CompositeFunctor(funcDict_band)
1075
1076 # Perform transform for data of filters that exist in the handle dataframe.
1077 for inputBand in inputBands:
1078 if inputBand not in outputBands:
1079 self.log.info("Ignoring %s band data in the input", inputBand)
1080 continue
1081 self.log.info("Transforming the catalog of band %s", inputBand)
1082 result = self.transform(inputBand, handle, funcs_band, dataId)
1083 dfDict[inputBand] = result.df
1084 analysisDict[inputBand] = result.analysis
1085 if templateDf.empty:
1086 templateDf = result.df
1087
1088 # Put filler values in columns of other wanted bands
1089 for filt in outputBands:
1090 if filt not in dfDict:
1091 self.log.info("Adding empty columns for band %s", filt)
1092 dfTemp = templateDf.copy()
1093 for col in dfTemp.columns:
1094 testValue = dfTemp[col].values[0]
1095 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1096 # Boolean flag type, check if it is a "good" flag
1097 if col in self.config.goodFlags:
1098 fillValue = False
1099 else:
1100 fillValue = True
1101 elif isinstance(testValue, numbers.Integral):
1102 # Checking numbers.Integral catches all flavors
1103 # of python, numpy, pandas, etc. integers.
1104 # We must ensure this is not an unsigned integer.
1105 if isinstance(testValue, np.unsignedinteger):
1106 raise ValueError("Parquet tables may not have unsigned integer columns.")
1107 else:
1108 fillValue = self.config.integerFillValue
1109 else:
1110 fillValue = self.config.floatFillValue
1111 dfTemp[col].values[:] = fillValue
1112 dfDict[filt] = dfTemp
1113
1114 # This makes a multilevel column index, with band as first level
1115 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1116 name_index = df.index.name
1117
1118 # TODO: Remove in DM-48895
1119 if not self.config.multilevelOutput:
1120 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1121 if self.config.primaryKey in noDupCols:
1122 noDupCols.remove(self.config.primaryKey)
1123 if dataId and self.config.columnsFromDataId:
1124 noDupCols += self.config.columnsFromDataId
1125 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1126 inputBands=inputBands)
1127
1128 # Apply per-dataset functors to each multiband dataset in turn
1129 for dataset, funcDict in funcDicts_multiband.items():
1130 handle_multiband = handles_multi[dataset]
1131 df_dataset = handle_multiband.get()
1132 if isinstance(df_dataset, astropy.table.Table):
1133 # Allow astropy table inputs to already have the output index
1134 if name_index not in df_dataset.colnames:
1135 if self.config.primaryKey in df_dataset.colnames:
1136 name_index_ap = self.config.primaryKey
1137 else:
1138 raise RuntimeError(
1139 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1140 f" {df_dataset.colnames=} for {dataset=}"
1141 )
1142 else:
1143 name_index_ap = name_index
1144 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False)
1145 elif isinstance(df_dataset, afwTable.SourceCatalog):
1146 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1147 # TODO: should funcDict have noDup funcs removed?
1148 # noDup was intended for per-band tables.
1149 result = self.transform(
1150 None,
1151 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1152 CompositeFunctor(funcDict),
1153 dataId,
1154 )
1155 result.df.index.name = name_index
1156 # Drop columns from dataId if present (patch, tract)
1157 if self.config.columnsFromDataId:
1158 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1159 if columns_drop:
1160 result.df.drop(columns_drop, axis=1, inplace=True)
1161 # Make the same multi-index for the multiband table if needed
1162 # This might end up making copies, one of several reasons to avoid
1163 # using multilevel indexes, or DataFrames at all
1164 to_concat = pd.concat(
1165 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1166 ) if self.config.multilevelOutput else result.df
1167 df = pd.concat([df, to_concat], axis=1)
1168 analysisDict[dataset] = result.analysis
1169 del result
1170
1171 df.index.name = self.config.primaryKey
1172
1173 if not self.config.multilevelOutput:
1174 tbl = pandas_to_astropy(df)
1175 else:
1176 tbl = df
1177
1178 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1179
1180 return pipeBase.Struct(outputCatalog=tbl)
1181
1182
1183class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1184 dimensions=("tract", "skymap")):
1185 inputCatalogs = connectionTypes.Input(
1186 doc="Per-Patch objectTables conforming to the standard data model.",
1187 name="objectTable",
1188 storageClass="ArrowAstropy",
1189 dimensions=("tract", "patch", "skymap"),
1190 multiple=True,
1191 deferLoad=True,
1192 )
1193 outputCatalog = connectionTypes.Output(
1194 doc="Pre-tract horizontal concatenation of the input objectTables",
1195 name="objectTable_tract",
1196 storageClass="ArrowAstropy",
1197 dimensions=("tract", "skymap"),
1198 )
1199
1200
1201class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1202 pipelineConnections=ConsolidateObjectTableConnections):
1203 coaddName = pexConfig.Field(
1204 dtype=str,
1205 default="deep",
1206 doc="Name of coadd"
1207 )
1208
1209
1210class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1211 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1212
1213 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1214 """
1215 _DefaultName = "consolidateObjectTable"
1216 ConfigClass = ConsolidateObjectTableConfig
1217
1218 inputDataset = "objectTable"
1219 outputDataset = "objectTable_tract"
1220
1221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1222 inputs = butlerQC.get(inputRefs)
1223 self.log.info("Concatenating %s per-patch Object Tables",
1224 len(inputs["inputCatalogs"]))
1225 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1226 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1227
1228
1229class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1230 defaultTemplates={"catalogType": ""},
1231 dimensions=("instrument", "visit", "detector")):
1232
1233 inputCatalog = connectionTypes.Input(
1234 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1235 name="{catalogType}source",
1236 storageClass="DataFrame",
1237 dimensions=("instrument", "visit", "detector"),
1238 deferLoad=True
1239 )
1240 outputCatalog = connectionTypes.Output(
1241 doc="Narrower, per-detector Source Table transformed and converted per a "
1242 "specified set of functors",
1243 name="{catalogType}sourceTable",
1244 storageClass="ArrowAstropy",
1245 dimensions=("instrument", "visit", "detector")
1246 )
1247
1248
1249class TransformSourceTableConfig(TransformCatalogBaseConfig,
1250 pipelineConnections=TransformSourceTableConnections):
1251
1252 def setDefaults(self):
1253 super().setDefaults()
1254 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1255 self.primaryKey = "sourceId"
1256 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1257
1258
1259class TransformSourceTableTask(TransformCatalogBaseTask):
1260 """Transform/standardize a source catalog
1261 """
1262 _DefaultName = "transformSourceTable"
1263 ConfigClass = TransformSourceTableConfig
1264
1265
1266class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1267 dimensions=("instrument", "visit",),
1268 defaultTemplates={"calexpType": ""}):
1269 calexp = connectionTypes.Input(
1270 doc="Processed exposures used for metadata",
1271 name="calexp",
1272 storageClass="ExposureF",
1273 dimensions=("instrument", "visit", "detector"),
1274 deferLoad=True,
1275 multiple=True,
1276 )
1277 visitSummary = connectionTypes.Output(
1278 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1279 "detector id for the id and are sorted for fast lookups of a "
1280 "detector."),
1281 name="visitSummary",
1282 storageClass="ExposureCatalog",
1283 dimensions=("instrument", "visit"),
1284 )
1285 visitSummarySchema = connectionTypes.InitOutput(
1286 doc="Schema of the visitSummary catalog",
1287 name="visitSummary_schema",
1288 storageClass="ExposureCatalog",
1289 )
1290
1291
1292class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1293 pipelineConnections=ConsolidateVisitSummaryConnections):
1294 """Config for ConsolidateVisitSummaryTask"""
1295
1296 full = pexConfig.Field(
1297 "Whether to propate all exposure components. "
1298 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1299 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1300 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1301 dtype=bool,
1302 default=False,
1303 )
1304
1305
1306class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1307 """Task to consolidate per-detector visit metadata.
1308
1309 This task aggregates the following metadata from all the detectors in a
1310 single visit into an exposure catalog:
1311 - The visitInfo.
1312 - The wcs.
1313 - The photoCalib.
1314 - The physical_filter and band (if available).
1315 - The detector.
1316 - The PSF model.
1317 - The aperture correction map.
1318 - The transmission curve.
1319 - The psf size, shape, and effective area at the center of the detector.
1320 - The corners of the bounding box in right ascension/declination.
1321
1322 Tests for this task are performed in ci_hsc_gen3.
1323 """
1324 _DefaultName = "consolidateVisitSummary"
1325 ConfigClass = ConsolidateVisitSummaryConfig
1326
1327 def __init__(self, **kwargs):
1328 super().__init__(**kwargs)
1329 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1330 self.schema.addField("visit", type="L", doc="Visit number")
1331 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1332 self.schema.addField("band", type="String", size=32, doc="Name of band")
1333 ExposureSummaryStats.update_schema(self.schema)
1334 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1335
1336 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1337 dataRefs = butlerQC.get(inputRefs.calexp)
1338 visit = dataRefs[0].dataId["visit"]
1339
1340 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1341 len(dataRefs), visit)
1342
1343 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1344
1345 butlerQC.put(expCatalog, outputRefs.visitSummary)
1346
1347 def _combineExposureMetadata(self, visit, dataRefs):
1348 """Make a combined exposure catalog from a list of dataRefs.
1349 These dataRefs must point to exposures with wcs, summaryStats,
1350 and other visit metadata.
1351
1352 Parameters
1353 ----------
1354 visit : `int`
1355 Visit identification number.
1356 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1357 List of dataRefs in visit.
1358
1359 Returns
1360 -------
1361 visitSummary : `lsst.afw.table.ExposureCatalog`
1362 Exposure catalog with per-detector summary information.
1363 """
1364 cat = afwTable.ExposureCatalog(self.schema)
1365 cat.resize(len(dataRefs))
1366
1367 cat["visit"] = visit
1368
1369 for i, dataRef in enumerate(dataRefs):
1370 visitInfo = dataRef.get(component="visitInfo")
1371 filterLabel = dataRef.get(component="filter")
1372 summaryStats = dataRef.get(component="summaryStats")
1373 detector = dataRef.get(component="detector")
1374 wcs = dataRef.get(component="wcs")
1375 photoCalib = dataRef.get(component="photoCalib")
1376 bbox = dataRef.get(component="bbox")
1377 validPolygon = dataRef.get(component="validPolygon")
1378
1379 rec = cat[i]
1380 rec.setBBox(bbox)
1381 rec.setVisitInfo(visitInfo)
1382 rec.setWcs(wcs)
1383 rec.setPhotoCalib(photoCalib)
1384 rec.setValidPolygon(validPolygon)
1385
1386 if self.config.full:
1387 rec.setDetector(dataRef.get(component="detector"))
1388 rec.setPsf(dataRef.get(component="psf"))
1389 rec.setApCorrMap(dataRef.get(component="apCorrMap"))
1390 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve"))
1391
1392 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1393 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1394 rec.setId(detector.getId())
1395 summaryStats.update_record(rec)
1396
1397 if not cat:
1398 raise pipeBase.NoWorkFound(
1399 "No detectors had sufficient information to make a visit summary row."
1400 )
1401
1402 metadata = dafBase.PropertyList()
1403 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1404 # We are looping over existing datarefs, so the following is true
1405 metadata.add("COMMENT", "Only detectors with data have entries.")
1406 cat.setMetadata(metadata)
1407
1408 cat.sort()
1409 return cat
1410
1411
1412class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1413 defaultTemplates={"catalogType": ""},
1414 dimensions=("instrument", "visit")):
1415 inputCatalogs = connectionTypes.Input(
1416 doc="Input per-detector Source Tables",
1417 name="{catalogType}sourceTable",
1418 storageClass="ArrowAstropy",
1419 dimensions=("instrument", "visit", "detector"),
1420 multiple=True,
1421 deferLoad=True,
1422 )
1423 outputCatalog = connectionTypes.Output(
1424 doc="Per-visit concatenation of Source Table",
1425 name="{catalogType}sourceTable_visit",
1426 storageClass="ArrowAstropy",
1427 dimensions=("instrument", "visit")
1428 )
1429
1430
1431class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1432 pipelineConnections=ConsolidateSourceTableConnections):
1433 pass
1434
1435
1436class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1437 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1438 """
1439 _DefaultName = "consolidateSourceTable"
1440 ConfigClass = ConsolidateSourceTableConfig
1441
1442 inputDataset = "sourceTable"
1443 outputDataset = "sourceTable_visit"
1444
1445 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1446 from .makeWarp import reorderRefs
1447
1448 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1449 detectorOrder.sort()
1450 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1451 inputs = butlerQC.get(inputRefs)
1452 self.log.info("Concatenating %s per-detector Source Tables",
1453 len(inputs["inputCatalogs"]))
1454 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1455 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1456
1457
1458class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1459 dimensions=("instrument",),
1460 defaultTemplates={"calexpType": ""}):
1461 visitSummaryRefs = connectionTypes.Input(
1462 doc="Data references for per-visit consolidated exposure metadata",
1463 name="finalVisitSummary",
1464 storageClass="ExposureCatalog",
1465 dimensions=("instrument", "visit"),
1466 multiple=True,
1467 deferLoad=True,
1468 )
1469 outputCatalog = connectionTypes.Output(
1470 doc="CCD and Visit metadata table",
1471 name="ccdVisitTable",
1472 storageClass="ArrowAstropy",
1473 dimensions=("instrument",)
1474 )
1475
1476
1477class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1478 pipelineConnections=MakeCcdVisitTableConnections):
1479 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1480
1481
1482class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1483 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1484 """
1485 _DefaultName = "makeCcdVisitTable"
1486 ConfigClass = MakeCcdVisitTableConfig
1487
1488 def run(self, visitSummaryRefs):
1489 """Make a table of ccd information from the visit summary catalogs.
1490
1491 Parameters
1492 ----------
1493 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1494 List of DeferredDatasetHandles pointing to exposure catalogs with
1495 per-detector summary information.
1496
1497 Returns
1498 -------
1499 result : `~lsst.pipe.base.Struct`
1500 Results struct with attribute:
1501
1502 ``outputCatalog``
1503 Catalog of ccd and visit information.
1504 """
1505 ccdEntries = []
1506 for visitSummaryRef in visitSummaryRefs:
1507 visitSummary = visitSummaryRef.get()
1508 if not visitSummary:
1509 continue
1510 visitInfo = visitSummary[0].getVisitInfo()
1511
1512 # Strip provenance to prevent merge confusion.
1513 strip_provenance_from_fits_header(visitSummary.metadata)
1514
1515 ccdEntry = {}
1516 summaryTable = visitSummary.asAstropy()
1517 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1518 "pixelScale", "zenithDistance",
1519 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1520 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1521 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1522 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1523 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1524 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1525 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1526 "maxDistToNearestPsf",
1527 "effTime", "effTimePsfSigmaScale",
1528 "effTimeSkyBgScale", "effTimeZeroPointScale",
1529 "magLim"]
1530 ccdEntry = summaryTable[selectColumns]
1531 # 'visit' is the human readable visit number.
1532 # 'visitId' is the key to the visitId table. They are the same.
1533 # Technically you should join to get the visit from the visit
1534 # table.
1535 ccdEntry.rename_column("visit", "visitId")
1536 ccdEntry.rename_column("id", "detectorId")
1537
1538 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1539 # compatibility. To be removed after September 2023.
1540 ccdEntry["decl"] = ccdEntry["dec"]
1541
1542 ccdEntry["ccdVisitId"] = [
1543 self.config.idGenerator.apply(
1544 visitSummaryRef.dataId,
1545 detector=detector_id,
1546 is_exposure=False,
1547 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1548 # because it's usually the ID for a whole catalog
1549 # with a {visit, detector}, and that's the main
1550 # use case for IdGenerator. This usage for a
1551 # summary table is rare.
1552 for detector_id in summaryTable["id"]
1553 ]
1554 ccdEntry["detector"] = summaryTable["id"]
1555 ccdEntry["seeing"] = (
1556 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1557 )
1558 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1559 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1560 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1561 expTime = visitInfo.getExposureTime()
1562 ccdEntry["obsStart"] = (
1563 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1564 )
1565 expTime_days = expTime / (60*60*24)
1566 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1567 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1568 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1569 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1570 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1571 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1572 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1573 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1574 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1575 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1576 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1577 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1578 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1579 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1580 # values are actually wanted.
1581 ccdEntries.append(ccdEntry)
1582
1583 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1584 return pipeBase.Struct(outputCatalog=outputCatalog)
1585
1586
1587class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1588 dimensions=("instrument",),
1589 defaultTemplates={"calexpType": ""}):
1590 visitSummaries = connectionTypes.Input(
1591 doc="Per-visit consolidated exposure metadata",
1592 name="finalVisitSummary",
1593 storageClass="ExposureCatalog",
1594 dimensions=("instrument", "visit",),
1595 multiple=True,
1596 deferLoad=True,
1597 )
1598 outputCatalog = connectionTypes.Output(
1599 doc="Visit metadata table",
1600 name="visitTable",
1601 storageClass="ArrowAstropy",
1602 dimensions=("instrument",)
1603 )
1604
1605
1606class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1607 pipelineConnections=MakeVisitTableConnections):
1608 pass
1609
1610
1611class MakeVisitTableTask(pipeBase.PipelineTask):
1612 """Produce a `visitTable` from the visit summary exposure catalogs.
1613 """
1614 _DefaultName = "makeVisitTable"
1615 ConfigClass = MakeVisitTableConfig
1616
1617 def run(self, visitSummaries):
1618 """Make a table of visit information from the visit summary catalogs.
1619
1620 Parameters
1621 ----------
1622 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1623 List of exposure catalogs with per-detector summary information.
1624 Returns
1625 -------
1626 result : `~lsst.pipe.base.Struct`
1627 Results struct with attribute:
1628
1629 ``outputCatalog``
1630 Catalog of visit information.
1631 """
1632 visitEntries = []
1633 for visitSummary in visitSummaries:
1634 visitSummary = visitSummary.get()
1635 if not visitSummary:
1636 continue
1637 visitRow = visitSummary[0]
1638 visitInfo = visitRow.getVisitInfo()
1639
1640 visitEntry = {}
1641 visitEntry["visitId"] = visitRow["visit"]
1642 visitEntry["visit"] = visitRow["visit"]
1643 visitEntry["physical_filter"] = visitRow["physical_filter"]
1644 visitEntry["band"] = visitRow["band"]
1645 raDec = visitInfo.getBoresightRaDec()
1646 visitEntry["ra"] = raDec.getRa().asDegrees()
1647 visitEntry["dec"] = raDec.getDec().asDegrees()
1648
1649 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1650 # compatibility. To be removed after September 2023.
1651 visitEntry["decl"] = visitEntry["dec"]
1652
1653 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1654 azAlt = visitInfo.getBoresightAzAlt()
1655 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1656 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1657 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1658 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1659 expTime = visitInfo.getExposureTime()
1660 visitEntry["expTime"] = expTime
1661 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1662 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1663 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1664 expTime_days = expTime / (60*60*24)
1665 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1666 visitEntries.append(visitEntry)
1667
1668 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1669 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1670 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1671
1672 outputCatalog = astropy.table.Table(rows=visitEntries)
1673 return pipeBase.Struct(outputCatalog=outputCatalog)
1674
1675
1676class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1677 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1678
1679 inputCatalog = connectionTypes.Input(
1680 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1681 "By default, it is the output of ForcedPhotCcdTask on calexps",
1682 name="forced_src",
1683 storageClass="SourceCatalog",
1684 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1685 )
1686 inputCatalogDiff = connectionTypes.Input(
1687 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1688 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1689 name="forced_diff",
1690 storageClass="SourceCatalog",
1691 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1692 )
1693 outputCatalog = connectionTypes.Output(
1694 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1695 name="mergedForcedSource",
1696 storageClass="DataFrame",
1697 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1698 )
1699
1700
1701class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1702 pipelineConnections=WriteForcedSourceTableConnections):
1704 doc="Column on which to join the two input tables on and make the primary key of the output",
1705 dtype=str,
1706 default="objectId",
1707 )
1708
1709
1710class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1711 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1712
1713 Because the predecessor ForcedPhotCcdTask operates per-detector,
1714 per-tract, (i.e., it has tract in its dimensions), detectors
1715 on the tract boundary may have multiple forced source catalogs.
1716
1717 The successor task TransformForcedSourceTable runs per-patch
1718 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1719 available multiple epochs.
1720 """
1721 _DefaultName = "writeForcedSourceTable"
1722 ConfigClass = WriteForcedSourceTableConfig
1723
1724 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1725 inputs = butlerQC.get(inputRefs)
1726 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1727 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1728 inputs["band"] = butlerQC.quantum.dataId["band"]
1729 outputs = self.run(**inputs)
1730 butlerQC.put(outputs, outputRefs)
1731
1732 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1733 dfs = []
1734 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1735 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1736 df = df.reindex(sorted(df.columns), axis=1)
1737 df["visit"] = visit
1738 # int16 instead of uint8 because databases don't like unsigned bytes.
1739 df["detector"] = np.int16(detector)
1740 df["band"] = band if band else pd.NA
1741 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1742 names=("dataset", "column"))
1743
1744 dfs.append(df)
1745
1746 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1747 return pipeBase.Struct(outputCatalog=outputCatalog)
1748
1749
1750class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1751 dimensions=("instrument", "skymap", "patch", "tract")):
1752
1753 inputCatalogs = connectionTypes.Input(
1754 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1755 name="mergedForcedSource",
1756 storageClass="DataFrame",
1757 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1758 multiple=True,
1759 deferLoad=True
1760 )
1761 referenceCatalog = connectionTypes.Input(
1762 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1763 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1764 "are expected.",
1765 name="objectTable",
1766 storageClass="DataFrame",
1767 dimensions=("tract", "patch", "skymap"),
1768 deferLoad=True
1769 )
1770 outputCatalog = connectionTypes.Output(
1771 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1772 "specified set of functors",
1773 name="forcedSourceTable",
1774 storageClass="ArrowAstropy",
1775 dimensions=("tract", "patch", "skymap")
1776 )
1777
1778
1779class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1780 pipelineConnections=TransformForcedSourceTableConnections):
1781 referenceColumns = pexConfig.ListField(
1782 dtype=str,
1783 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1784 optional=True,
1785 doc="Columns to pull from reference catalog",
1786 )
1787 keyRef = lsst.pex.config.Field(
1788 doc="Column on which to join the two input tables on and make the primary key of the output",
1789 dtype=str,
1790 default="objectId",
1791 )
1793 doc="Rename the output DataFrame index to this name",
1794 dtype=str,
1795 default="forcedSourceId",
1796 )
1797
1798 def setDefaults(self):
1799 super().setDefaults()
1800 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1801 self.columnsFromDataId = ["tract", "patch"]
1802
1803
1804class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1805 """Transform/standardize a ForcedSource catalog
1806
1807 Transforms each wide, per-detector forcedSource DataFrame per the
1808 specification file (per-camera defaults found in ForcedSource.yaml).
1809 All epochs that overlap the patch are aggregated into one per-patch
1810 narrow-DataFrame file.
1811
1812 No de-duplication of rows is performed. Duplicate resolutions flags are
1813 pulled in from the referenceCatalog: `detect_isPrimary`,
1814 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1815 for analysis or compare duplicates for QA.
1816
1817 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1818 per-visit rows can be retreived by joining with the CcdVisitTable on
1819 ccdVisitId.
1820 """
1821 _DefaultName = "transformForcedSourceTable"
1822 ConfigClass = TransformForcedSourceTableConfig
1823
1824 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1825 inputs = butlerQC.get(inputRefs)
1826 if self.funcs is None:
1827 raise ValueError("config.functorFile is None. "
1828 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1829 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1830 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1831
1832 butlerQC.put(outputs, outputRefs)
1833
1834 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1835 dfs = []
1836 refColumns = list(self.config.referenceColumns)
1837 refColumns.append(self.config.keyRef)
1838 ref = referenceCatalog.get(parameters={"columns": refColumns})
1839 if ref.index.name != self.config.keyRef:
1840 # If the DataFrame we loaded was originally written as some other
1841 # Parquet type, it probably doesn't have the index set. If it was
1842 # written as a DataFrame, the index should already be set and
1843 # trying to set it again would be an error, since it doens't exist
1844 # as a regular column anymore.
1845 ref.set_index(self.config.keyRef, inplace=True)
1846 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1847 for handle in inputCatalogs:
1848 result = self.transform(None, handle, funcs, dataId)
1849 # Filter for only rows that were detected on (overlap) the patch
1850 dfs.append(result.df.join(ref, how="inner"))
1851
1852 outputCatalog = pd.concat(dfs)
1853
1854 if outputCatalog.empty:
1855 raise NoWorkFound(f"No forced photometry rows for {dataId}.")
1856
1857 # Now that we are done joining on config.keyRef
1858 # Change index to config.key by
1859 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1860 # Add config.keyRef to the column list
1861 outputCatalog.reset_index(inplace=True)
1862 # Set the forcedSourceId to the index. This is specified in the
1863 # ForcedSource.yaml
1864 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1865 # Rename it to the config.key
1866 outputCatalog.index.rename(self.config.key, inplace=True)
1867
1868 self.log.info("Made a table of %d columns and %d rows",
1869 len(outputCatalog.columns), len(outputCatalog))
1870 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1871
1872
1873class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1874 defaultTemplates={"catalogType": ""},
1875 dimensions=("instrument", "tract")):
1876 inputCatalogs = connectionTypes.Input(
1877 doc="Input per-patch DataFrame Tables to be concatenated",
1878 name="{catalogType}ForcedSourceTable",
1879 storageClass="DataFrame",
1880 dimensions=("tract", "patch", "skymap"),
1881 multiple=True,
1882 )
1883
1884 outputCatalog = connectionTypes.Output(
1885 doc="Output per-tract concatenation of DataFrame Tables",
1886 name="{catalogType}ForcedSourceTable_tract",
1887 storageClass="DataFrame",
1888 dimensions=("tract", "skymap"),
1889 )
1890
1891
1892class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1893 pipelineConnections=ConsolidateTractConnections):
1894 pass
1895
1896
1897class ConsolidateTractTask(pipeBase.PipelineTask):
1898 """Concatenate any per-patch, dataframe list into a single
1899 per-tract DataFrame.
1900 """
1901 _DefaultName = "ConsolidateTract"
1902 ConfigClass = ConsolidateTractConfig
1903
1904 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1905 inputs = butlerQC.get(inputRefs)
1906 # Not checking at least one inputCatalog exists because that'd be an
1907 # empty QG.
1908 self.log.info("Concatenating %s per-patch %s Tables",
1909 len(inputs["inputCatalogs"]),
1910 inputRefs.inputCatalogs[0].datasetType.name)
1911 df = pd.concat(inputs["inputCatalogs"])
1912 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)