LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+1b3060144d,g18429d2f64+f642bf4753,g199a45376c+0ba108daf9,g1fd858c14a+2dcf163641,g262e1987ae+7b8c96d2ca,g29ae962dfc+3bd6ecb08a,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53e1a9e7c5,g4595892280+fef73a337f,g47891489e3+2efcf17695,g4d44eb3520+642b70b07e,g53246c7159+8c5ae1fdc5,g67b6fd64d1+2efcf17695,g67fd3c3899+b70e05ef52,g74acd417e5+317eb4c7d4,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+2efcf17695,g8d7436a09f+3be3c13596,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+a4e7b16d9b,g97be763408+ad77d7208f,g9dd6db0277+b70e05ef52,ga681d05dcb+a3f46e7fff,gabf8522325+735880ea63,gac2eed3f23+2efcf17695,gb89ab40317+2efcf17695,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+b70e05ef52,gdab6d2f7ff+317eb4c7d4,gdc713202bf+b70e05ef52,gdfd2d52018+b10e285e0f,ge365c994fd+310e8507c4,ge410e46f29+2efcf17695,geaed405ab2+562b3308c0,gffca2db377+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40from deprecated.sphinx import deprecated
41import functools
42import logging
43import numbers
44import os
45
46import numpy as np
47import pandas as pd
48import astropy.table
49from astro_metadata_translator.headers import merge_headers
50
51import lsst.geom
52import lsst.pex.config as pexConfig
53import lsst.pipe.base as pipeBase
54import lsst.daf.base as dafBase
55from lsst.daf.butler.formatters.parquet import pandas_to_astropy
56from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
57import lsst.afw.table as afwTable
58from lsst.afw.image import ExposureSummaryStats, ExposureF
59from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
60from lsst.obs.base.utils import strip_provenance_from_fits_header
61
62from .coaddBase import reorderRefs
63from .functors import CompositeFunctor, Column
64
65log = logging.getLogger(__name__)
66
67
68def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
69 """Flattens a dataframe with multilevel column index.
70 """
71 newDf = pd.DataFrame()
72 # band is the level 0 index
73 dfBands = df.columns.unique(level=0).values
74 for band in dfBands:
75 subdf = df[band]
76 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
77 newColumns = {c: columnFormat.format(band, c)
78 for c in subdf.columns if c not in noDupCols}
79 cols = list(newColumns.keys())
80 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
81
82 # Band must be present in the input and output or else column is all NaN:
83 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
84 # Get the unexploded columns from any present band's partition
85 noDupDf = df[presentBands[0]][noDupCols]
86 newDf = pd.concat([noDupDf, newDf], axis=1)
87 return newDf
88
89
91 """A helper class for stacking astropy tables without having them all in
92 memory at once.
93
94 Parameters
95 ----------
96 capacity : `int`
97 Full size of the final table.
98
99 Notes
100 -----
101 Unlike `astropy.table.vstack`, this class requires all tables to have the
102 exact same columns (it's slightly more strict than even the
103 ``join_type="exact"`` argument to `astropy.table.vstack`).
104 """
105
106 def __init__(self, capacity):
107 self.index = 0
108 self.capacity = capacity
109 self.result = None
110
111 @classmethod
112 def from_handles(cls, handles):
113 """Construct from an iterable of
114 `lsst.daf.butler.DeferredDatasetHandle`.
115
116 Parameters
117 ----------
118 handles : `~collections.abc.Iterable` [ \
119 `lsst.daf.butler.DeferredDatasetHandle` ]
120 Iterable of handles. Must have a storage class that supports the
121 "rowcount" component, which is all that will be fetched.
122
123 Returns
124 -------
125 vstack : `TableVStack`
126 An instance of this class, initialized with capacity equal to the
127 sum of the rowcounts of all the given table handles.
128 """
129 capacity = sum(handle.get(component="rowcount") for handle in handles)
130 return cls(capacity=capacity)
131
132 def extend(self, table):
133 """Add a single table to the stack.
134
135 Parameters
136 ----------
137 table : `astropy.table.Table`
138 An astropy table instance.
139 """
140 if self.result is None:
141 self.result = astropy.table.Table()
142 for name in table.colnames:
143 column = table[name]
144 column_cls = type(column)
145 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
146 self.result[name][:len(table)] = column
147 self.index = len(table)
148 self.result.meta = table.meta.copy()
149 else:
150 next_index = self.index + len(table)
151 if set(self.result.colnames) != set(table.colnames):
152 raise TypeError(
153 "Inconsistent columns in concatentation: "
154 f"{set(self.result.colnames).symmetric_difference(table.colnames)}"
155 )
156 for name in table.colnames:
157 out_col = self.result[name]
158 in_col = table[name]
159 if out_col.dtype != in_col.dtype:
160 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
161 self.result[name][self.index:next_index] = table[name]
162 self.index = next_index
163 # Butler provenance should be stripped on merge. It will be
164 # added by butler on write. No attempt is made here to combine
165 # provenance from multiple input tables.
166 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
167 strip_provenance_from_fits_header(self.result.meta)
168
169 @classmethod
170 def vstack_handles(cls, handles):
171 """Vertically stack tables represented by deferred dataset handles.
172
173 Parameters
174 ----------
175 handles : `~collections.abc.Iterable` [ \
176 `lsst.daf.butler.DeferredDatasetHandle` ]
177 Iterable of handles. Must have the "ArrowAstropy" storage class
178 and identical columns.
179
180 Returns
181 -------
182 table : `astropy.table.Table`
183 Concatenated table with the same columns as each input table and
184 the rows of all of them.
185 """
186 handles = tuple(handles) # guard against single-pass iterators
187 # Ensure that zero length catalogs are not included
188 rowcount = tuple(handle.get(component="rowcount") for handle in handles)
189 handles = tuple(handle for handle, count in zip(handles, rowcount) if count > 0)
190
191 vstack = cls.from_handles(handles)
192 for handle in handles:
193 vstack.extend(handle.get())
194 return vstack.result
195
196
197class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
198 defaultTemplates={"coaddName": "deep"},
199 dimensions=("tract", "patch", "skymap")):
200 inputCatalogMeas = connectionTypes.Input(
201 doc="Catalog of source measurements on the deepCoadd.",
202 dimensions=("tract", "patch", "band", "skymap"),
203 storageClass="SourceCatalog",
204 name="{coaddName}Coadd_meas",
205 multiple=True
206 )
207 inputCatalogForcedSrc = connectionTypes.Input(
208 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
209 dimensions=("tract", "patch", "band", "skymap"),
210 storageClass="SourceCatalog",
211 name="{coaddName}Coadd_forced_src",
212 multiple=True
213 )
214 inputCatalogPsfsMultiprofit = connectionTypes.Input(
215 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
216 dimensions=("tract", "patch", "band", "skymap"),
217 storageClass="ArrowAstropy",
218 name="{coaddName}Coadd_psfs_multiprofit",
219 multiple=True,
220 )
221 outputCatalog = connectionTypes.Output(
222 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
223 "stored as a DataFrame with a multi-level column index per-patch.",
224 dimensions=("tract", "patch", "skymap"),
225 storageClass="DataFrame",
226 name="{coaddName}Coadd_obj"
227 )
228
229
230class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
231 pipelineConnections=WriteObjectTableConnections):
232 coaddName = pexConfig.Field(
233 dtype=str,
234 default="deep",
235 doc="Name of coadd"
236 )
237
238
239class WriteObjectTableTask(pipeBase.PipelineTask):
240 """Write filter-merged object tables as a DataFrame in parquet format.
241 """
242 _DefaultName = "writeObjectTable"
243 ConfigClass = WriteObjectTableConfig
244
245 # Tag of output dataset written by `MergeSourcesTask.write`
246 outputDataset = "obj"
247
248 def runQuantum(self, butlerQC, inputRefs, outputRefs):
249 inputs = butlerQC.get(inputRefs)
250
251 catalogs = defaultdict(dict)
252 for dataset, connection in (
253 ("meas", "inputCatalogMeas"),
254 ("forced_src", "inputCatalogForcedSrc"),
255 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
256 ):
257 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
258 catalogs[ref.dataId["band"]][dataset] = cat
259
260 dataId = butlerQC.quantum.dataId
261 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
262 outputs = pipeBase.Struct(outputCatalog=df)
263 butlerQC.put(outputs, outputRefs)
264
265 def run(self, catalogs, tract, patch):
266 """Merge multiple catalogs.
267
268 Parameters
269 ----------
270 catalogs : `dict`
271 Mapping from filter names to dict of catalogs.
272 tract : int
273 tractId to use for the tractId column.
274 patch : str
275 patchId to use for the patchId column.
276
277 Returns
278 -------
279 catalog : `pandas.DataFrame`
280 Merged dataframe.
281
282 Raises
283 ------
284 ValueError
285 Raised if any of the catalogs is of an unsupported type.
286 """
287 dfs = []
288 for filt, tableDict in catalogs.items():
289 for dataset, table in tableDict.items():
290 # Convert afwTable to pandas DataFrame if needed
291 if isinstance(table, pd.DataFrame):
292 df = table
293 elif isinstance(table, afwTable.SourceCatalog):
294 df = table.asAstropy().to_pandas()
295 elif isinstance(table, astropy.table.Table):
296 df = table.to_pandas()
297 else:
298 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
299 df.set_index("id", drop=True, inplace=True)
300
301 # Sort columns by name, to ensure matching schema among patches
302 df = df.reindex(sorted(df.columns), axis=1)
303 df = df.assign(tractId=tract, patchId=patch)
304
305 # Make columns a 3-level MultiIndex
306 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
307 names=("dataset", "band", "column"))
308 dfs.append(df)
309
310 # We do this dance and not `pd.concat(dfs)` because the pandas
311 # concatenation uses infinite memory.
312 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
313 return catalog
314
315
316class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
317 defaultTemplates={"catalogType": ""},
318 dimensions=("instrument", "visit", "detector")):
319
320 catalog = connectionTypes.Input(
321 doc="Input full-depth catalog of sources produced by CalibrateTask",
322 name="{catalogType}src",
323 storageClass="SourceCatalog",
324 dimensions=("instrument", "visit", "detector")
325 )
326 outputCatalog = connectionTypes.Output(
327 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
328 name="{catalogType}source",
329 storageClass="ArrowAstropy",
330 dimensions=("instrument", "visit", "detector")
331 )
332
333
334class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
335 pipelineConnections=WriteSourceTableConnections):
336 pass
337
338
339class WriteSourceTableTask(pipeBase.PipelineTask):
340 """Write source table to DataFrame Parquet format.
341 """
342 _DefaultName = "writeSourceTable"
343 ConfigClass = WriteSourceTableConfig
344
345 def runQuantum(self, butlerQC, inputRefs, outputRefs):
346 inputs = butlerQC.get(inputRefs)
347 inputs["visit"] = butlerQC.quantum.dataId["visit"]
348 inputs["detector"] = butlerQC.quantum.dataId["detector"]
349 result = self.run(**inputs)
350 outputs = pipeBase.Struct(outputCatalog=result.table)
351 butlerQC.put(outputs, outputRefs)
352
353 def run(self, catalog, visit, detector, **kwargs):
354 """Convert `src` catalog to an Astropy table.
355
356 Parameters
357 ----------
358 catalog: `afwTable.SourceCatalog`
359 catalog to be converted
360 visit, detector: `int`
361 Visit and detector ids to be added as columns.
362 **kwargs
363 Additional keyword arguments are ignored as a convenience for
364 subclasses that pass the same arguments to several different
365 methods.
366
367 Returns
368 -------
369 result : `~lsst.pipe.base.Struct`
370 ``table``
371 `astropy.table.Table` version of the input catalog
372 """
373 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
374 tbl = catalog.asAstropy()
375 tbl["visit"] = visit
376 # int16 instead of uint8 because databases don't like unsigned bytes.
377 tbl["detector"] = np.int16(detector)
378
379 return pipeBase.Struct(table=tbl)
380
381
382class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
383 defaultTemplates={"catalogType": ""},
384 dimensions=("instrument", "visit", "detector", "skymap")):
385 visitSummary = connectionTypes.Input(
386 doc="Input visit-summary catalog with updated calibration objects.",
387 name="finalVisitSummary",
388 storageClass="ExposureCatalog",
389 dimensions=("instrument", "visit",),
390 )
391
392 def __init__(self, config):
393 # We don't want the input catalog here to be an initial existence
394 # constraint in QG generation, because that can unfortunately limit the
395 # set of data IDs of inputs to other tasks, even those that run earlier
396 # (e.g. updateVisitSummary), when the input 'src' catalog is not
397 # produced. It's safer to just use 'visitSummary' existence as an
398 # initial constraint, and then let the graph prune out the detectors
399 # that don't have a 'src' for this task only.
400 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
401
402
403class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
404 pipelineConnections=WriteRecalibratedSourceTableConnections):
405
406 doReevaluatePhotoCalib = pexConfig.Field(
407 dtype=bool,
408 default=True,
409 doc=("Add or replace local photoCalib columns"),
410 )
411 doReevaluateSkyWcs = pexConfig.Field(
412 dtype=bool,
413 default=True,
414 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
415 )
416
417
418class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
419 """Write source table to DataFrame Parquet format.
420 """
421 _DefaultName = "writeRecalibratedSourceTable"
422 ConfigClass = WriteRecalibratedSourceTableConfig
423
424 def runQuantum(self, butlerQC, inputRefs, outputRefs):
425 inputs = butlerQC.get(inputRefs)
426
427 inputs["visit"] = butlerQC.quantum.dataId["visit"]
428 inputs["detector"] = butlerQC.quantum.dataId["detector"]
429
430 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
431 exposure = ExposureF()
432 inputs["exposure"] = self.prepareCalibratedExposure(
433 exposure=exposure,
434 visitSummary=inputs["visitSummary"],
435 detectorId=butlerQC.quantum.dataId["detector"]
436 )
437 inputs["catalog"] = self.addCalibColumns(**inputs)
438
439 result = self.run(**inputs)
440 outputs = pipeBase.Struct(outputCatalog=result.table)
441 butlerQC.put(outputs, outputRefs)
442
443 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
444 """Prepare a calibrated exposure and apply external calibrations
445 if so configured.
446
447 Parameters
448 ----------
449 exposure : `lsst.afw.image.exposure.Exposure`
450 Input exposure to adjust calibrations. May be an empty Exposure.
451 detectorId : `int`
452 Detector ID associated with the exposure.
453 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
454 Exposure catalog with all calibration objects. WCS and PhotoCalib
455 are always applied if ``visitSummary`` is provided and those
456 components are not `None`.
457
458 Returns
459 -------
460 exposure : `lsst.afw.image.exposure.Exposure`
461 Exposure with adjusted calibrations.
462 """
463 if visitSummary is not None:
464 row = visitSummary.find(detectorId)
465 if row is None:
466 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
467 if (photoCalib := row.getPhotoCalib()) is None:
468 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
469 "skipping reevaluation of photoCalib.", detectorId)
470 exposure.setPhotoCalib(None)
471 else:
472 exposure.setPhotoCalib(photoCalib)
473 if (skyWcs := row.getWcs()) is None:
474 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
475 "skipping reevaluation of skyWcs.", detectorId)
476 exposure.setWcs(None)
477 else:
478 exposure.setWcs(skyWcs)
479
480 return exposure
481
482 def addCalibColumns(self, catalog, exposure, **kwargs):
483 """Add replace columns with calibs evaluated at each centroid
484
485 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
486 a source catalog, by rerunning the plugins.
487
488 Parameters
489 ----------
490 catalog : `lsst.afw.table.SourceCatalog`
491 catalog to which calib columns will be added
492 exposure : `lsst.afw.image.exposure.Exposure`
493 Exposure with attached PhotoCalibs and SkyWcs attributes to be
494 reevaluated at local centroids. Pixels are not required.
495 **kwargs
496 Additional keyword arguments are ignored to facilitate passing the
497 same arguments to several methods.
498
499 Returns
500 -------
501 newCat: `lsst.afw.table.SourceCatalog`
502 Source Catalog with requested local calib columns
503 """
504 measureConfig = SingleFrameMeasurementTask.ConfigClass()
505 measureConfig.doReplaceWithNoise = False
506
507 # Clear all slots, because we aren't running the relevant plugins.
508 for slot in measureConfig.slots:
509 setattr(measureConfig.slots, slot, None)
510
511 measureConfig.plugins.names = []
512 if self.config.doReevaluateSkyWcs:
513 measureConfig.plugins.names.add("base_LocalWcs")
514 self.log.info("Re-evaluating base_LocalWcs plugin")
515 if self.config.doReevaluatePhotoCalib:
516 measureConfig.plugins.names.add("base_LocalPhotoCalib")
517 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
518 pluginsNotToCopy = tuple(measureConfig.plugins.names)
519
520 # Create a new schema and catalog
521 # Copy all columns from original except for the ones to reevaluate
522 aliasMap = catalog.schema.getAliasMap()
523 mapper = afwTable.SchemaMapper(catalog.schema)
524 for item in catalog.schema:
525 if not item.field.getName().startswith(pluginsNotToCopy):
526 mapper.addMapping(item.key)
527
528 schema = mapper.getOutputSchema()
529 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
530 schema.setAliasMap(aliasMap)
531 newCat = afwTable.SourceCatalog(schema)
532 newCat.extend(catalog, mapper=mapper)
533
534 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
535 # update here. LocalPhotoCalibs are applied during transform tasks.
536 # Update coord_ra/coord_dec, which are expected to be positions on the
537 # sky and are used as such in sdm tables without transform
538 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
539 afwTable.updateSourceCoords(exposure.wcs, newCat)
540 wcsPlugin = measurement.plugins["base_LocalWcs"]
541 else:
542 wcsPlugin = None
543
544 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
545 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
546 else:
547 pcPlugin = None
548
549 for row in newCat:
550 if wcsPlugin is not None:
551 wcsPlugin.measure(row, exposure)
552 if pcPlugin is not None:
553 pcPlugin.measure(row, exposure)
554
555 return newCat
556
557
558class PostprocessAnalysis(object):
559 """Calculate columns from DataFrames or handles storing DataFrames.
560
561 This object manages and organizes an arbitrary set of computations
562 on a catalog. The catalog is defined by a
563 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
564 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
565 computations are defined by a collection of
566 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
567 ``CompositeFunctor``).
568
569 After the object is initialized, accessing the ``.df`` attribute (which
570 holds the `pandas.DataFrame` containing the results of the calculations)
571 triggers computation of said dataframe.
572
573 One of the conveniences of using this object is the ability to define a
574 desired common filter for all functors. This enables the same functor
575 collection to be passed to several different `PostprocessAnalysis` objects
576 without having to change the original functor collection, since the ``filt``
577 keyword argument of this object triggers an overwrite of the ``filt``
578 property for all functors in the collection.
579
580 This object also allows a list of refFlags to be passed, and defines a set
581 of default refFlags that are always included even if not requested.
582
583 If a list of DataFrames or Handles is passed, rather than a single one,
584 then the calculations will be mapped over all the input catalogs. In
585 principle, it should be straightforward to parallelize this activity, but
586 initial tests have failed (see TODO in code comments).
587
588 Parameters
589 ----------
590 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
591 `~lsst.pipe.base.InMemoryDatasetHandle` or
592 list of these.
593 Source catalog(s) for computation.
594 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
595 Computations to do (functors that act on ``handles``).
596 If a dict, the output
597 DataFrame will have columns keyed accordingly.
598 If a list, the column keys will come from the
599 ``.shortname`` attribute of each functor.
600
601 filt : `str`, optional
602 Filter in which to calculate. If provided,
603 this will overwrite any existing ``.filt`` attribute
604 of the provided functors.
605
606 flags : `list`, optional
607 List of flags (per-band) to include in output table.
608 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
609
610 refFlags : `list`, optional
611 List of refFlags (only reference band) to include in output table.
612
613 forcedFlags : `list`, optional
614 List of flags (per-band) to include in output table.
615 Taken from the ``forced_src`` dataset if applied to a
616 multilevel Object Table. Intended for flags from measurement plugins
617 only run during multi-band forced-photometry.
618 """
619 _defaultRefFlags = []
620 _defaultFuncs = ()
621
622 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
623 self.handles = handles
624 self.functors = functors
625
626 self.filt = filt
627 self.flags = list(flags) if flags is not None else []
628 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
629 self.refFlags = list(self._defaultRefFlags)
630 if refFlags is not None:
631 self.refFlags += list(refFlags)
632
633 self._df = None
634
635 @property
636 def defaultFuncs(self):
637 funcs = dict(self._defaultFuncs)
638 return funcs
639
640 @property
641 def func(self):
642 additionalFuncs = self.defaultFuncs
643 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
644 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
645 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
646
647 if isinstance(self.functors, CompositeFunctor):
648 func = self.functors
649 else:
650 func = CompositeFunctor(self.functors)
651
652 func.funcDict.update(additionalFuncs)
653 func.filt = self.filt
654
655 return func
656
657 @property
658 def noDupCols(self):
659 return [name for name, func in self.func.funcDict.items() if func.noDup]
660
661 @property
662 def df(self):
663 if self._df is None:
664 self.compute()
665 return self._df
666
667 def compute(self, dropna=False, pool=None):
668 # map over multiple handles
669 if type(self.handles) in (list, tuple):
670 if pool is None:
671 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
672 else:
673 # TODO: Figure out why this doesn't work (pyarrow pickling
674 # issues?)
675 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
676 self._df = pd.concat(dflist)
677 else:
678 self._df = self.func(self.handles, dropna=dropna)
679
680 return self._df
681
682
683class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
684 dimensions=()):
685 """Expected Connections for subclasses of TransformCatalogBaseTask.
686
687 Must be subclassed.
688 """
689 inputCatalog = connectionTypes.Input(
690 name="",
691 storageClass="DataFrame",
692 )
693 outputCatalog = connectionTypes.Output(
694 name="",
695 storageClass="ArrowAstropy",
696 )
697
698
699class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
700 pipelineConnections=TransformCatalogBaseConnections):
701 functorFile = pexConfig.Field(
702 dtype=str,
703 doc="Path to YAML file specifying Science Data Model functors to use "
704 "when copying columns and computing calibrated values.",
705 default=None,
706 optional=True
707 )
708 primaryKey = pexConfig.Field(
709 dtype=str,
710 doc="Name of column to be set as the DataFrame index. If None, the index"
711 "will be named `id`",
712 default=None,
713 optional=True
714 )
715 columnsFromDataId = pexConfig.ListField(
716 dtype=str,
717 default=None,
718 optional=True,
719 doc="Columns to extract from the dataId",
720 )
721
722
723class TransformCatalogBaseTask(pipeBase.PipelineTask):
724 """Base class for transforming/standardizing a catalog by applying functors
725 that convert units and apply calibrations.
726
727 The purpose of this task is to perform a set of computations on an input
728 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
729 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
730 a new dataset (which needs to be declared in an ``outputDataset``
731 attribute).
732
733 The calculations to be performed are defined in a YAML file that specifies
734 a set of functors to be computed, provided as a ``--functorFile`` config
735 parameter. An example of such a YAML file is the following:
736
737 funcs:
738 sourceId:
739 functor: Index
740 x:
741 functor: Column
742 args: slot_Centroid_x
743 y:
744 functor: Column
745 args: slot_Centroid_y
746 psfFlux:
747 functor: LocalNanojansky
748 args:
749 - slot_PsfFlux_instFlux
750 - slot_PsfFlux_instFluxErr
751 - base_LocalPhotoCalib
752 - base_LocalPhotoCalibErr
753 psfFluxErr:
754 functor: LocalNanojanskyErr
755 args:
756 - slot_PsfFlux_instFlux
757 - slot_PsfFlux_instFluxErr
758 - base_LocalPhotoCalib
759 - base_LocalPhotoCalibErr
760 flags:
761 - detect_isPrimary
762
763 The names for each entry under "func" will become the names of columns in
764 the output dataset. All the functors referenced are defined in
765 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
766 functor are in the `args` list, and any additional entries for each column
767 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
768 treated as keyword arguments to be passed to the functor initialization.
769
770 The "flags" entry is the default shortcut for `Column` functors.
771 All columns listed under "flags" will be copied to the output table
772 untransformed. They can be of any datatype.
773 In the special case of transforming a multi-level oject table with
774 band and dataset indices (deepCoadd_obj), these will be taked from the
775 ``meas`` dataset and exploded out per band.
776
777 There are two special shortcuts that only apply when transforming
778 multi-level Object (deepCoadd_obj) tables:
779 - The "refFlags" entry is shortcut for `Column` functor
780 taken from the ``ref`` dataset if transforming an ObjectTable.
781 - The "forcedFlags" entry is shortcut for `Column` functors.
782 taken from the ``forced_src`` dataset if transforming an ObjectTable.
783 These are expanded out per band.
784
785
786 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
787 to organize and excecute the calculations.
788 """
789 @property
790 def _DefaultName(self):
791 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
792
793 @property
794 def outputDataset(self):
795 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
796
797 @property
798 def inputDataset(self):
799 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
800
801 @property
802 def ConfigClass(self):
803 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
804
805 def __init__(self, *args, **kwargs):
806 super().__init__(*args, **kwargs)
807 if self.config.functorFile:
808 self.log.info("Loading tranform functor definitions from %s",
809 self.config.functorFile)
810 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
811 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
812 else:
813 self.funcs = None
814
815 def runQuantum(self, butlerQC, inputRefs, outputRefs):
816 inputs = butlerQC.get(inputRefs)
817 if self.funcs is None:
818 raise ValueError("config.functorFile is None. "
819 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
820 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
821 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
822 butlerQC.put(result, outputRefs)
823
824 def run(self, handle, funcs=None, dataId=None, band=None):
825 """Do postprocessing calculations
826
827 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
828 ``DataFrame`` object and dataId,
829 returns a dataframe with results of postprocessing calculations.
830
831 Parameters
832 ----------
833 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
834 `~lsst.pipe.base.InMemoryDatasetHandle` or
835 `~pandas.DataFrame`, or list of these.
836 DataFrames from which calculations are done.
837 funcs : `~lsst.pipe.tasks.functors.Functor`
838 Functors to apply to the table's columns
839 dataId : dict, optional
840 Used to add a `patchId` column to the output dataframe.
841 band : `str`, optional
842 Filter band that is being processed.
843
844 Returns
845 -------
846 result : `lsst.pipe.base.Struct`
847 Result struct, with a single ``outputCatalog`` attribute holding
848 the transformed catalog.
849 """
850 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
851
852 df = self.transform(band, handle, funcs, dataId).df
853 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
854
855 if len(df) == 0:
856 raise UpstreamFailureNoWorkFound(
857 "Input catalog is empty, so there is nothing to transform/standardize",
858 )
859
860 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
861 return result
862
863 def getFunctors(self):
864 return self.funcs
865
866 def getAnalysis(self, handles, funcs=None, band=None):
867 if funcs is None:
868 funcs = self.funcs
869 analysis = PostprocessAnalysis(handles, funcs, filt=band)
870 return analysis
871
872 def transform(self, band, handles, funcs, dataId):
873 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
874 df = analysis.df
875 if dataId and self.config.columnsFromDataId:
876 for key in self.config.columnsFromDataId:
877 if key in dataId:
878 if key == "detector":
879 # int16 instead of uint8 because databases don't like unsigned bytes.
880 df[key] = np.int16(dataId[key])
881 else:
882 df[key] = dataId[key]
883 else:
884 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
885
886 if self.config.primaryKey:
887 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
888 df.reset_index(inplace=True, drop=True)
889 df.set_index(self.config.primaryKey, inplace=True)
890
891 return pipeBase.Struct(
892 df=df,
893 analysis=analysis
894 )
895
896
897class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
898 defaultTemplates={"coaddName": "deep"},
899 dimensions=("tract", "patch", "skymap")):
900 inputCatalog = connectionTypes.Input(
901 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
902 "stored as a DataFrame with a multi-level column index per-patch.",
903 dimensions=("tract", "patch", "skymap"),
904 storageClass="DataFrame",
905 name="{coaddName}Coadd_obj",
906 deferLoad=True,
907 )
908 inputCatalogRef = connectionTypes.Input(
909 doc="Catalog marking the primary detection (which band provides a good shape and position)"
910 "for each detection in deepCoadd_mergeDet.",
911 dimensions=("tract", "patch", "skymap"),
912 storageClass="SourceCatalog",
913 name="{coaddName}Coadd_ref",
914 deferLoad=True,
915 )
916 inputCatalogSersicMultiprofit = connectionTypes.Input(
917 doc="Catalog of source measurements on the deepCoadd.",
918 dimensions=("tract", "patch", "skymap"),
919 storageClass="ArrowAstropy",
920 name="{coaddName}Coadd_Sersic_multiprofit",
921 deferLoad=True,
922 )
923 inputCatalogEpoch = connectionTypes.Input(
924 doc="Catalog of mean epochs for each object per band.",
925 dimensions=("tract", "patch", "skymap"),
926 storageClass="ArrowAstropy",
927 name="object_epoch",
928 deferLoad=True,
929 )
930 outputCatalog = connectionTypes.Output(
931 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
932 "data model.",
933 dimensions=("tract", "patch", "skymap"),
934 storageClass="ArrowAstropy",
935 name="objectTable"
936 )
937
938 def __init__(self, *, config=None):
939 super().__init__(config=config)
940 if config.multilevelOutput:
941 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
942
943
944class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
945 pipelineConnections=TransformObjectCatalogConnections):
946 coaddName = pexConfig.Field(
947 dtype=str,
948 default="deep",
949 doc="Name of coadd"
950 )
951 outputBands = pexConfig.ListField(
952 dtype=str,
953 default=None,
954 optional=True,
955 doc=("These bands and only these bands will appear in the output,"
956 " NaN-filled if the input does not include them."
957 " If None, then use all bands found in the input.")
958 )
959 camelCase = pexConfig.Field(
960 dtype=bool,
961 default=False,
962 doc=("Write per-band columns names with camelCase, else underscore "
963 "For example: gPsFlux instead of g_PsFlux.")
964 )
965 multilevelOutput = pexConfig.Field(
966 dtype=bool,
967 default=False,
968 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
969 "and name-munged (False). If True, the output storage class will be "
970 "set to DataFrame, since astropy tables do not support multi-level indexing."),
971 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
972 )
973 goodFlags = pexConfig.ListField(
974 dtype=str,
975 default=[],
976 doc=("List of 'good' flags that should be set False when populating empty tables. "
977 "All other flags are considered to be 'bad' flags and will be set to True.")
978 )
979 floatFillValue = pexConfig.Field(
980 dtype=float,
981 default=np.nan,
982 doc="Fill value for float fields when populating empty tables."
983 )
984 integerFillValue = pexConfig.Field(
985 dtype=int,
986 default=-1,
987 doc="Fill value for integer fields when populating empty tables."
988 )
989
990 def setDefaults(self):
991 super().setDefaults()
992 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
993 self.primaryKey = "objectId"
994 self.columnsFromDataId = ["tract", "patch"]
995 self.goodFlags = ["calib_astrometry_used",
996 "calib_photometry_reserved",
997 "calib_photometry_used",
998 "calib_psf_candidate",
999 "calib_psf_reserved",
1000 "calib_psf_used"]
1001
1002
1003class TransformObjectCatalogTask(TransformCatalogBaseTask):
1004 """Produce a flattened Object Table to match the format specified in
1005 sdm_schemas.
1006
1007 Do the same set of postprocessing calculations on all bands.
1008
1009 This is identical to `TransformCatalogBaseTask`, except for that it does
1010 the specified functor calculations for all filters present in the
1011 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1012 by the YAML file will be superceded.
1013 """
1014 _DefaultName = "transformObjectCatalog"
1015 ConfigClass = TransformObjectCatalogConfig
1016
1017 datasets_multiband = ("epoch", "ref", "Sersic_multiprofit")
1018
1019 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1020 inputs = butlerQC.get(inputRefs)
1021 if self.funcs is None:
1022 raise ValueError("config.functorFile is None. "
1023 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1024 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
1025 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1026 handle_epoch=inputs["inputCatalogEpoch"],
1027 handle_ref=inputs["inputCatalogRef"],
1028 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
1029 )
1030 butlerQC.put(result, outputRefs)
1031
1032 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1033 # NOTE: band kwarg is ignored here.
1034 # TODO: Document and improve funcs argument usage in DM-48895
1035 # self.getAnalysis only supports list, dict and CompositeFunctor
1036 if isinstance(funcs, CompositeFunctor):
1037 funcDict_in = funcs.funcDict
1038 elif isinstance(funcs, dict):
1039 funcDict_in = funcs
1040 elif isinstance(funcs, list):
1041 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
1042 else:
1043 raise TypeError(f"Unsupported {type(funcs)=}")
1044
1045 handles_multi = {}
1046 funcDicts_multiband = {}
1047 for dataset in self.datasets_multiband:
1048 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
1049 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
1050 handles_multi[dataset] = handle_multi
1051 funcDicts_multiband[dataset] = {}
1052
1053 dfDict = {}
1054 analysisDict = {}
1055 templateDf = pd.DataFrame()
1056
1057 columns = handle.get(component="columns")
1058 inputBands = columns.unique(level=1).values
1059
1060 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1061
1062 # Split up funcs for per-band and multiband tables
1063 funcDict_band = {}
1064
1065 for name, func in funcDict_in.items():
1066 if func.dataset in funcDicts_multiband:
1067 # This is something like a MultibandColumn
1068 if band := getattr(func, "band_to_check", None):
1069 if band not in outputBands:
1070 continue
1071 # This is something like a ReferenceBand that has configurable bands
1072 elif hasattr(func, "bands"):
1073 # TODO: Determine if this can be avoided DM-48895
1074 # This will work fine if the init doesn't manipulate bands
1075 # If it does, then one would need to make a new functor
1076 # Determining the (kw)args is tricky in that case
1077 func.bands = tuple(inputBands)
1078
1079 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1080 funcDict[name] = func
1081
1082 funcs_band = CompositeFunctor(funcDict_band)
1083
1084 # Perform transform for data of filters that exist in the handle dataframe.
1085 for inputBand in inputBands:
1086 if inputBand not in outputBands:
1087 self.log.info("Ignoring %s band data in the input", inputBand)
1088 continue
1089 self.log.info("Transforming the catalog of band %s", inputBand)
1090 result = self.transform(inputBand, handle, funcs_band, dataId)
1091 dfDict[inputBand] = result.df
1092 analysisDict[inputBand] = result.analysis
1093 if templateDf.empty:
1094 templateDf = result.df
1095
1096 # Put filler values in columns of other wanted bands
1097 for filt in outputBands:
1098 if filt not in dfDict:
1099 self.log.info("Adding empty columns for band %s", filt)
1100 dfTemp = templateDf.copy()
1101 for col in dfTemp.columns:
1102 testValue = dfTemp[col].values[0]
1103 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1104 # Boolean flag type, check if it is a "good" flag
1105 if col in self.config.goodFlags:
1106 fillValue = False
1107 else:
1108 fillValue = True
1109 elif isinstance(testValue, numbers.Integral):
1110 # Checking numbers.Integral catches all flavors
1111 # of python, numpy, pandas, etc. integers.
1112 # We must ensure this is not an unsigned integer.
1113 if isinstance(testValue, np.unsignedinteger):
1114 raise ValueError("Parquet tables may not have unsigned integer columns.")
1115 else:
1116 fillValue = self.config.integerFillValue
1117 else:
1118 fillValue = self.config.floatFillValue
1119 dfTemp[col].values[:] = fillValue
1120 dfDict[filt] = dfTemp
1121
1122 # This makes a multilevel column index, with band as first level
1123 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1124 name_index = df.index.name
1125
1126 # TODO: Remove in DM-48895
1127 if not self.config.multilevelOutput:
1128 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1129 if self.config.primaryKey in noDupCols:
1130 noDupCols.remove(self.config.primaryKey)
1131 if dataId and self.config.columnsFromDataId:
1132 noDupCols += self.config.columnsFromDataId
1133 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1134 inputBands=inputBands)
1135
1136 # Apply per-dataset functors to each multiband dataset in turn
1137 for dataset, funcDict in funcDicts_multiband.items():
1138 handle_multiband = handles_multi[dataset]
1139 df_dataset = handle_multiband.get()
1140 if isinstance(df_dataset, astropy.table.Table):
1141 # Allow astropy table inputs to already have the output index
1142 if name_index not in df_dataset.colnames:
1143 if self.config.primaryKey in df_dataset.colnames:
1144 name_index_ap = self.config.primaryKey
1145 else:
1146 raise RuntimeError(
1147 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1148 f" {df_dataset.colnames=} for {dataset=}"
1149 )
1150 else:
1151 name_index_ap = name_index
1152 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False)
1153 elif isinstance(df_dataset, afwTable.SourceCatalog):
1154 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1155 # TODO: should funcDict have noDup funcs removed?
1156 # noDup was intended for per-band tables.
1157 result = self.transform(
1158 None,
1159 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1160 CompositeFunctor(funcDict),
1161 dataId,
1162 )
1163 result.df.index.name = name_index
1164 # Drop columns from dataId if present (patch, tract)
1165 if self.config.columnsFromDataId:
1166 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1167 if columns_drop:
1168 result.df.drop(columns_drop, axis=1, inplace=True)
1169 # Make the same multi-index for the multiband table if needed
1170 # This might end up making copies, one of several reasons to avoid
1171 # using multilevel indexes, or DataFrames at all
1172 to_concat = pd.concat(
1173 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1174 ) if self.config.multilevelOutput else result.df
1175 df = pd.concat([df, to_concat], axis=1)
1176 analysisDict[dataset] = result.analysis
1177 del result
1178
1179 df.index.name = self.config.primaryKey
1180
1181 if not self.config.multilevelOutput:
1182 tbl = pandas_to_astropy(df)
1183 else:
1184 tbl = df
1185
1186 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1187
1188 return pipeBase.Struct(outputCatalog=tbl)
1189
1190
1191class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1192 dimensions=("tract", "skymap")):
1193 inputCatalogs = connectionTypes.Input(
1194 doc="Per-Patch objectTables conforming to the standard data model.",
1195 name="objectTable",
1196 storageClass="ArrowAstropy",
1197 dimensions=("tract", "patch", "skymap"),
1198 multiple=True,
1199 deferLoad=True,
1200 )
1201 outputCatalog = connectionTypes.Output(
1202 doc="Pre-tract horizontal concatenation of the input objectTables",
1203 name="objectTable_tract",
1204 storageClass="ArrowAstropy",
1205 dimensions=("tract", "skymap"),
1206 )
1207
1208
1209class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1210 pipelineConnections=ConsolidateObjectTableConnections):
1211 coaddName = pexConfig.Field(
1212 dtype=str,
1213 default="deep",
1214 doc="Name of coadd"
1215 )
1216
1217
1218class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1219 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1220
1221 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1222 """
1223 _DefaultName = "consolidateObjectTable"
1224 ConfigClass = ConsolidateObjectTableConfig
1225
1226 inputDataset = "objectTable"
1227 outputDataset = "objectTable_tract"
1228
1229 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1230 inputs = butlerQC.get(inputRefs)
1231 self.log.info("Concatenating %s per-patch Object Tables",
1232 len(inputs["inputCatalogs"]))
1233 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1234 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1235
1236
1237class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1238 defaultTemplates={"catalogType": ""},
1239 dimensions=("instrument", "visit", "detector")):
1240
1241 inputCatalog = connectionTypes.Input(
1242 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1243 name="{catalogType}source",
1244 storageClass="DataFrame",
1245 dimensions=("instrument", "visit", "detector"),
1246 deferLoad=True
1247 )
1248 outputCatalog = connectionTypes.Output(
1249 doc="Narrower, per-detector Source Table transformed and converted per a "
1250 "specified set of functors",
1251 name="{catalogType}sourceTable",
1252 storageClass="ArrowAstropy",
1253 dimensions=("instrument", "visit", "detector")
1254 )
1255
1256
1257class TransformSourceTableConfig(TransformCatalogBaseConfig,
1258 pipelineConnections=TransformSourceTableConnections):
1259
1260 def setDefaults(self):
1261 super().setDefaults()
1262 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1263 self.primaryKey = "sourceId"
1264 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1265
1266
1267class TransformSourceTableTask(TransformCatalogBaseTask):
1268 """Transform/standardize a source catalog
1269 """
1270 _DefaultName = "transformSourceTable"
1271 ConfigClass = TransformSourceTableConfig
1272
1273
1274class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1275 dimensions=("instrument", "visit",),
1276 defaultTemplates={"calexpType": ""}):
1277 calexp = connectionTypes.Input(
1278 doc="Processed exposures used for metadata",
1279 name="calexp",
1280 storageClass="ExposureF",
1281 dimensions=("instrument", "visit", "detector"),
1282 deferLoad=True,
1283 multiple=True,
1284 )
1285 visitSummary = connectionTypes.Output(
1286 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1287 "detector id for the id and are sorted for fast lookups of a "
1288 "detector."),
1289 name="visitSummary",
1290 storageClass="ExposureCatalog",
1291 dimensions=("instrument", "visit"),
1292 )
1293 visitSummarySchema = connectionTypes.InitOutput(
1294 doc="Schema of the visitSummary catalog",
1295 name="visitSummary_schema",
1296 storageClass="ExposureCatalog",
1297 )
1298
1299
1300class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1301 pipelineConnections=ConsolidateVisitSummaryConnections):
1302 """Config for ConsolidateVisitSummaryTask"""
1303
1304 full = pexConfig.Field(
1305 "Whether to propate all exposure components. "
1306 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1307 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1308 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1309 dtype=bool,
1310 default=False,
1311 )
1312
1313
1314class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1315 """Task to consolidate per-detector visit metadata.
1316
1317 This task aggregates the following metadata from all the detectors in a
1318 single visit into an exposure catalog:
1319 - The visitInfo.
1320 - The wcs.
1321 - The photoCalib.
1322 - The physical_filter and band (if available).
1323 - The PSF model.
1324 - The aperture correction map.
1325 - The transmission curve.
1326 - The psf size, shape, and effective area at the center of the detector.
1327 - The corners of the bounding box in right ascension/declination.
1328
1329 Tests for this task are performed in ci_hsc_gen3.
1330 """
1331 _DefaultName = "consolidateVisitSummary"
1332 ConfigClass = ConsolidateVisitSummaryConfig
1333
1334 def __init__(self, **kwargs):
1335 super().__init__(**kwargs)
1336 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1337 self.schema.addField("visit", type="L", doc="Visit number")
1338 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1339 self.schema.addField("band", type="String", size=32, doc="Name of band")
1340 ExposureSummaryStats.update_schema(self.schema)
1341 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1342
1343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1344 dataRefs = butlerQC.get(inputRefs.calexp)
1345 visit = dataRefs[0].dataId["visit"]
1346
1347 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1348 len(dataRefs), visit)
1349
1350 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1351
1352 butlerQC.put(expCatalog, outputRefs.visitSummary)
1353
1354 def _combineExposureMetadata(self, visit, dataRefs):
1355 """Make a combined exposure catalog from a list of dataRefs.
1356 These dataRefs must point to exposures with wcs, summaryStats,
1357 and other visit metadata.
1358
1359 Parameters
1360 ----------
1361 visit : `int`
1362 Visit identification number.
1363 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1364 List of dataRefs in visit.
1365
1366 Returns
1367 -------
1368 visitSummary : `lsst.afw.table.ExposureCatalog`
1369 Exposure catalog with per-detector summary information.
1370 """
1371 cat = afwTable.ExposureCatalog(self.schema)
1372 cat.resize(len(dataRefs))
1373
1374 cat["visit"] = visit
1375
1376 for i, dataRef in enumerate(dataRefs):
1377 visitInfo = dataRef.get(component="visitInfo")
1378 filterLabel = dataRef.get(component="filter")
1379 summaryStats = dataRef.get(component="summaryStats")
1380 detector = dataRef.get(component="detector")
1381 wcs = dataRef.get(component="wcs")
1382 photoCalib = dataRef.get(component="photoCalib")
1383 bbox = dataRef.get(component="bbox")
1384 validPolygon = dataRef.get(component="validPolygon")
1385
1386 rec = cat[i]
1387 rec.setBBox(bbox)
1388 rec.setVisitInfo(visitInfo)
1389 rec.setWcs(wcs)
1390 rec.setPhotoCalib(photoCalib)
1391 rec.setValidPolygon(validPolygon)
1392
1393 if self.config.full:
1394 rec.setPsf(dataRef.get(component="psf"))
1395 rec.setApCorrMap(dataRef.get(component="apCorrMap"))
1396 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve"))
1397
1398 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1399 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1400 rec.setId(detector.getId())
1401 summaryStats.update_record(rec)
1402
1403 if not cat:
1404 raise pipeBase.NoWorkFound(
1405 "No detectors had sufficient information to make a visit summary row."
1406 )
1407
1408 metadata = dafBase.PropertyList()
1409 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1410 # We are looping over existing datarefs, so the following is true
1411 metadata.add("COMMENT", "Only detectors with data have entries.")
1412 cat.setMetadata(metadata)
1413
1414 cat.sort()
1415 return cat
1416
1417
1418class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1419 defaultTemplates={"catalogType": ""},
1420 dimensions=("instrument", "visit")):
1421 inputCatalogs = connectionTypes.Input(
1422 doc="Input per-detector Source Tables",
1423 name="{catalogType}sourceTable",
1424 storageClass="ArrowAstropy",
1425 dimensions=("instrument", "visit", "detector"),
1426 multiple=True,
1427 deferLoad=True,
1428 )
1429 outputCatalog = connectionTypes.Output(
1430 doc="Per-visit concatenation of Source Table",
1431 name="{catalogType}sourceTable_visit",
1432 storageClass="ArrowAstropy",
1433 dimensions=("instrument", "visit")
1434 )
1435
1436
1437class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1438 pipelineConnections=ConsolidateSourceTableConnections):
1439 pass
1440
1441
1442class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1443 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1444 """
1445 _DefaultName = "consolidateSourceTable"
1446 ConfigClass = ConsolidateSourceTableConfig
1447
1448 inputDataset = "sourceTable"
1449 outputDataset = "sourceTable_visit"
1450
1451 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1452 # Docstring inherited.
1453 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1454 detectorOrder.sort()
1455 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1456 inputs = butlerQC.get(inputRefs)
1457 self.log.info("Concatenating %s per-detector Source Tables",
1458 len(inputs["inputCatalogs"]))
1459 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1460 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1461
1462
1463class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1464 dimensions=("instrument",),
1465 defaultTemplates={"calexpType": ""}):
1466 visitSummaryRefs = connectionTypes.Input(
1467 doc="Data references for per-visit consolidated exposure metadata",
1468 name="finalVisitSummary",
1469 storageClass="ExposureCatalog",
1470 dimensions=("instrument", "visit"),
1471 multiple=True,
1472 deferLoad=True,
1473 )
1474 outputCatalog = connectionTypes.Output(
1475 doc="CCD and Visit metadata table",
1476 name="ccdVisitTable",
1477 storageClass="ArrowAstropy",
1478 dimensions=("instrument",)
1479 )
1480
1481
1482class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1483 pipelineConnections=MakeCcdVisitTableConnections):
1484 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1485
1486
1487class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1488 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1489 """
1490 _DefaultName = "makeCcdVisitTable"
1491 ConfigClass = MakeCcdVisitTableConfig
1492
1493 def run(self, visitSummaryRefs):
1494 """Make a table of ccd information from the visit summary catalogs.
1495
1496 Parameters
1497 ----------
1498 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1499 List of DeferredDatasetHandles pointing to exposure catalogs with
1500 per-detector summary information.
1501
1502 Returns
1503 -------
1504 result : `~lsst.pipe.base.Struct`
1505 Results struct with attribute:
1506
1507 ``outputCatalog``
1508 Catalog of ccd and visit information.
1509 """
1510 ccdEntries = []
1511 for visitSummaryRef in visitSummaryRefs:
1512 visitSummary = visitSummaryRef.get()
1513 if not visitSummary:
1514 continue
1515 visitInfo = visitSummary[0].getVisitInfo()
1516
1517 # Strip provenance to prevent merge confusion.
1518 strip_provenance_from_fits_header(visitSummary.metadata)
1519
1520 ccdEntry = {}
1521 summaryTable = visitSummary.asAstropy()
1522 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1523 "pixelScale", "zenithDistance",
1524 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1525 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1526 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1527 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1528 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1529 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1530 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1531 "maxDistToNearestPsf",
1532 "effTime", "effTimePsfSigmaScale",
1533 "effTimeSkyBgScale", "effTimeZeroPointScale",
1534 "magLim"]
1535 ccdEntry = summaryTable[selectColumns]
1536 # 'visit' is the human readable visit number.
1537 # 'visitId' is the key to the visitId table. They are the same.
1538 # Technically you should join to get the visit from the visit
1539 # table.
1540 ccdEntry.rename_column("visit", "visitId")
1541 ccdEntry.rename_column("id", "detectorId")
1542
1543 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1544 # compatibility. To be removed after September 2023.
1545 ccdEntry["decl"] = ccdEntry["dec"]
1546
1547 ccdEntry["ccdVisitId"] = [
1548 self.config.idGenerator.apply(
1549 visitSummaryRef.dataId,
1550 detector=detector_id,
1551 is_exposure=False,
1552 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1553 # because it's usually the ID for a whole catalog
1554 # with a {visit, detector}, and that's the main
1555 # use case for IdGenerator. This usage for a
1556 # summary table is rare.
1557 for detector_id in summaryTable["id"]
1558 ]
1559 ccdEntry["detector"] = summaryTable["id"]
1560 ccdEntry["seeing"] = (
1561 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1562 )
1563 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1564 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1565 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1566 expTime = visitInfo.getExposureTime()
1567 ccdEntry["obsStart"] = (
1568 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1569 )
1570 expTime_days = expTime / (60*60*24)
1571 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1572 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1573 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1574 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1575 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1576 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1577 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1578 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1579 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1580 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1581 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1582 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1583 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1584 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1585 # values are actually wanted.
1586 ccdEntries.append(ccdEntry)
1587
1588 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1589 return pipeBase.Struct(outputCatalog=outputCatalog)
1590
1591
1592class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1593 dimensions=("instrument",),
1594 defaultTemplates={"calexpType": ""}):
1595 visitSummaries = connectionTypes.Input(
1596 doc="Per-visit consolidated exposure metadata",
1597 name="finalVisitSummary",
1598 storageClass="ExposureCatalog",
1599 dimensions=("instrument", "visit",),
1600 multiple=True,
1601 deferLoad=True,
1602 )
1603 outputCatalog = connectionTypes.Output(
1604 doc="Visit metadata table",
1605 name="visitTable",
1606 storageClass="ArrowAstropy",
1607 dimensions=("instrument",)
1608 )
1609
1610
1611class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1612 pipelineConnections=MakeVisitTableConnections):
1613 pass
1614
1615
1616class MakeVisitTableTask(pipeBase.PipelineTask):
1617 """Produce a `visitTable` from the visit summary exposure catalogs.
1618 """
1619 _DefaultName = "makeVisitTable"
1620 ConfigClass = MakeVisitTableConfig
1621
1622 def run(self, visitSummaries):
1623 """Make a table of visit information from the visit summary catalogs.
1624
1625 Parameters
1626 ----------
1627 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1628 List of exposure catalogs with per-detector summary information.
1629 Returns
1630 -------
1631 result : `~lsst.pipe.base.Struct`
1632 Results struct with attribute:
1633
1634 ``outputCatalog``
1635 Catalog of visit information.
1636 """
1637 visitEntries = []
1638 for visitSummary in visitSummaries:
1639 visitSummary = visitSummary.get()
1640 if not visitSummary:
1641 continue
1642 visitRow = visitSummary[0]
1643 visitInfo = visitRow.getVisitInfo()
1644
1645 visitEntry = {}
1646 visitEntry["visitId"] = visitRow["visit"]
1647 visitEntry["visit"] = visitRow["visit"]
1648 visitEntry["physical_filter"] = visitRow["physical_filter"]
1649 visitEntry["band"] = visitRow["band"]
1650 raDec = visitInfo.getBoresightRaDec()
1651 visitEntry["ra"] = raDec.getRa().asDegrees()
1652 visitEntry["dec"] = raDec.getDec().asDegrees()
1653
1654 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1655 # compatibility. To be removed after September 2023.
1656 visitEntry["decl"] = visitEntry["dec"]
1657
1658 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1659 azAlt = visitInfo.getBoresightAzAlt()
1660 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1661 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1662 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1663 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1664 expTime = visitInfo.getExposureTime()
1665 visitEntry["expTime"] = expTime
1666 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1667 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1668 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1669 expTime_days = expTime / (60*60*24)
1670 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1671 visitEntries.append(visitEntry)
1672
1673 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1674 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1675 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1676
1677 outputCatalog = astropy.table.Table(rows=visitEntries)
1678 return pipeBase.Struct(outputCatalog=outputCatalog)
1679
1680
1681@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1682 "This task will be removed after v30.",
1683 version="v29.0", category=FutureWarning)
1684class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1685 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1686
1687 inputCatalog = connectionTypes.Input(
1688 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1689 "By default, it is the output of ForcedPhotCcdTask on calexps",
1690 name="forced_src",
1691 storageClass="SourceCatalog",
1692 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1693 )
1694 inputCatalogDiff = connectionTypes.Input(
1695 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1696 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1697 name="forced_diff",
1698 storageClass="SourceCatalog",
1699 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1700 )
1701 outputCatalog = connectionTypes.Output(
1702 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1703 name="mergedForcedSource",
1704 storageClass="DataFrame",
1705 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1706 )
1707
1708
1709@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1710 "This task will be removed after v30.",
1711 version="v29.0", category=FutureWarning)
1712class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1713 pipelineConnections=WriteForcedSourceTableConnections):
1715 doc="Column on which to join the two input tables on and make the primary key of the output",
1716 dtype=str,
1717 default="objectId",
1718 )
1719
1720
1721@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1722 "This task will be removed after v30.",
1723 version="v29.0", category=FutureWarning)
1724class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1725 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1726
1727 Because the predecessor ForcedPhotCcdTask operates per-detector,
1728 per-tract, (i.e., it has tract in its dimensions), detectors
1729 on the tract boundary may have multiple forced source catalogs.
1730
1731 The successor task TransformForcedSourceTable runs per-patch
1732 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1733 available multiple epochs.
1734 """
1735 _DefaultName = "writeForcedSourceTable"
1736 ConfigClass = WriteForcedSourceTableConfig
1737
1738 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1739 inputs = butlerQC.get(inputRefs)
1740 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1741 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1742 inputs["band"] = butlerQC.quantum.dataId["band"]
1743 outputs = self.run(**inputs)
1744 butlerQC.put(outputs, outputRefs)
1745
1746 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1747 dfs = []
1748 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1749 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1750 df = df.reindex(sorted(df.columns), axis=1)
1751 df["visit"] = visit
1752 # int16 instead of uint8 because databases don't like unsigned bytes.
1753 df["detector"] = np.int16(detector)
1754 df["band"] = band if band else pd.NA
1755 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1756 names=("dataset", "column"))
1757
1758 dfs.append(df)
1759
1760 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1761 return pipeBase.Struct(outputCatalog=outputCatalog)
1762
1763
1764class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1765 dimensions=("instrument", "skymap", "patch", "tract")):
1766
1767 inputCatalogs = connectionTypes.Input(
1768 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1769 name="mergedForcedSource",
1770 storageClass="DataFrame",
1771 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1772 multiple=True,
1773 deferLoad=True
1774 )
1775 referenceCatalog = connectionTypes.Input(
1776 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1777 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1778 "are expected.",
1779 name="objectTable",
1780 storageClass="DataFrame",
1781 dimensions=("tract", "patch", "skymap"),
1782 deferLoad=True
1783 )
1784 outputCatalog = connectionTypes.Output(
1785 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1786 "specified set of functors",
1787 name="forcedSourceTable",
1788 storageClass="ArrowAstropy",
1789 dimensions=("tract", "patch", "skymap")
1790 )
1791
1792
1793class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1794 pipelineConnections=TransformForcedSourceTableConnections):
1795 referenceColumns = pexConfig.ListField(
1796 dtype=str,
1797 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1798 optional=True,
1799 doc="Columns to pull from reference catalog",
1800 )
1801 keyRef = lsst.pex.config.Field(
1802 doc="Column on which to join the two input tables on and make the primary key of the output",
1803 dtype=str,
1804 default="objectId",
1805 )
1807 doc="Rename the output DataFrame index to this name",
1808 dtype=str,
1809 default="forcedSourceId",
1810 )
1811
1812 def setDefaults(self):
1813 super().setDefaults()
1814 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1815 self.columnsFromDataId = ["tract", "patch"]
1816
1817
1818class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1819 """Transform/standardize a ForcedSource catalog
1820
1821 Transforms each wide, per-detector forcedSource DataFrame per the
1822 specification file (per-camera defaults found in ForcedSource.yaml).
1823 All epochs that overlap the patch are aggregated into one per-patch
1824 narrow-DataFrame file.
1825
1826 No de-duplication of rows is performed. Duplicate resolutions flags are
1827 pulled in from the referenceCatalog: `detect_isPrimary`,
1828 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1829 for analysis or compare duplicates for QA.
1830
1831 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1832 per-visit rows can be retreived by joining with the CcdVisitTable on
1833 ccdVisitId.
1834 """
1835 _DefaultName = "transformForcedSourceTable"
1836 ConfigClass = TransformForcedSourceTableConfig
1837
1838 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1839 inputs = butlerQC.get(inputRefs)
1840 if self.funcs is None:
1841 raise ValueError("config.functorFile is None. "
1842 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1843 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1844 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1845
1846 butlerQC.put(outputs, outputRefs)
1847
1848 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1849 dfs = []
1850 refColumns = list(self.config.referenceColumns)
1851 refColumns.append(self.config.keyRef)
1852 ref = referenceCatalog.get(parameters={"columns": refColumns})
1853 if ref.index.name != self.config.keyRef:
1854 # If the DataFrame we loaded was originally written as some other
1855 # Parquet type, it probably doesn't have the index set. If it was
1856 # written as a DataFrame, the index should already be set and
1857 # trying to set it again would be an error, since it doens't exist
1858 # as a regular column anymore.
1859 ref.set_index(self.config.keyRef, inplace=True)
1860 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1861 for handle in inputCatalogs:
1862 result = self.transform(None, handle, funcs, dataId)
1863 # Filter for only rows that were detected on (overlap) the patch
1864 dfs.append(result.df.join(ref, how="inner"))
1865
1866 outputCatalog = pd.concat(dfs)
1867
1868 if outputCatalog.empty:
1869 raise NoWorkFound(f"No forced photometry rows for {dataId}.")
1870
1871 # Now that we are done joining on config.keyRef
1872 # Change index to config.key by
1873 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1874 # Add config.keyRef to the column list
1875 outputCatalog.reset_index(inplace=True)
1876 # Set the forcedSourceId to the index. This is specified in the
1877 # ForcedSource.yaml
1878 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1879 # Rename it to the config.key
1880 outputCatalog.index.rename(self.config.key, inplace=True)
1881
1882 self.log.info("Made a table of %d columns and %d rows",
1883 len(outputCatalog.columns), len(outputCatalog))
1884 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1885
1886
1887class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1888 defaultTemplates={"catalogType": ""},
1889 dimensions=("instrument", "tract")):
1890 inputCatalogs = connectionTypes.Input(
1891 doc="Input per-patch DataFrame Tables to be concatenated",
1892 name="{catalogType}ForcedSourceTable",
1893 storageClass="DataFrame",
1894 dimensions=("tract", "patch", "skymap"),
1895 multiple=True,
1896 )
1897
1898 outputCatalog = connectionTypes.Output(
1899 doc="Output per-tract concatenation of DataFrame Tables",
1900 name="{catalogType}ForcedSourceTable_tract",
1901 storageClass="DataFrame",
1902 dimensions=("tract", "skymap"),
1903 )
1904
1905
1906class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1907 pipelineConnections=ConsolidateTractConnections):
1908 pass
1909
1910
1911class ConsolidateTractTask(pipeBase.PipelineTask):
1912 """Concatenate any per-patch, dataframe list into a single
1913 per-tract DataFrame.
1914 """
1915 _DefaultName = "ConsolidateTract"
1916 ConfigClass = ConsolidateTractConfig
1917
1918 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1919 inputs = butlerQC.get(inputRefs)
1920 # Not checking at least one inputCatalog exists because that'd be an
1921 # empty QG.
1922 self.log.info("Concatenating %s per-patch %s Tables",
1923 len(inputs["inputCatalogs"]),
1924 inputRefs.inputCatalogs[0].datasetType.name)
1925 df = pd.concat(inputs["inputCatalogs"])
1926 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)