LSST Applications 29.1.0,g0fba68d861+6b120c4394,g123d84c11c+8c5ae1fdc5,g1ec0fe41b4+191117f6ec,g1fd858c14a+c8450ae71a,g3533f9d6cb+a04f9ee0ab,g35bb328faa+8c5ae1fdc5,g3f0dcc2b1b+7df08700bd,g4178042926+b4254969db,g44ba364a48+04455b336b,g53246c7159+8c5ae1fdc5,g60b5630c4e+a04f9ee0ab,g663da51e9b+b05e6e1875,g67b6fd64d1+250bf6acd3,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g87e3079a85+d3fa38de54,g8852436030+cd899e2626,g89139ef638+250bf6acd3,g93a033419f+31ead11197,g989de1cb63+250bf6acd3,g9f33ca652e+f6053ecf14,ga1e959baac+5fbc491aed,ga2f891cd6c+a04f9ee0ab,gabe3b4be73+8856018cbb,gabf8522325+1f7e6d67b9,gac2eed3f23+250bf6acd3,gb1101e3267+0c331e9486,gb89ab40317+250bf6acd3,gcf25f946ba+cd899e2626,gd107969129+8964d67276,gd6cbbdb0b4+6bbecc8878,gde0f65d7ad+d65f9e019a,ge278dab8ac+eb3bbeb12f,ge410e46f29+250bf6acd3,gf5e32f922b+8c5ae1fdc5,gff02db199a+747430a128,gffe7e49bb4+a04f9ee0ab
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40import functools
41import logging
42import numbers
43import os
44
45import numpy as np
46import pandas as pd
47import astropy.table
48from astro_metadata_translator.headers import merge_headers
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header
60
61from .coaddBase import reorderRefs
62from .functors import CompositeFunctor, Column
63
64log = logging.getLogger(__name__)
65
66
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
69 """
70 newDf = pd.DataFrame()
71 # band is the level 0 index
72 dfBands = df.columns.unique(level=0).values
73 for band in dfBands:
74 subdf = df[band]
75 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c in subdf.columns if c not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
80
81 # Band must be present in the input and output or else column is all NaN:
82 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
83 # Get the unexploded columns from any present band's partition
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
86 return newDf
87
88
90 """A helper class for stacking astropy tables without having them all in
91 memory at once.
92
93 Parameters
94 ----------
95 capacity : `int`
96 Full size of the final table.
97
98 Notes
99 -----
100 Unlike `astropy.table.vstack`, this class requires all tables to have the
101 exact same columns (it's slightly more strict than even the
102 ``join_type="exact"`` argument to `astropy.table.vstack`).
103 """
104
105 def __init__(self, capacity):
106 self.index = 0
107 self.capacity = capacity
108 self.result = None
109
110 @classmethod
111 def from_handles(cls, handles):
112 """Construct from an iterable of
113 `lsst.daf.butler.DeferredDatasetHandle`.
114
115 Parameters
116 ----------
117 handles : `~collections.abc.Iterable` [ \
118 `lsst.daf.butler.DeferredDatasetHandle` ]
119 Iterable of handles. Must have a storage class that supports the
120 "rowcount" component, which is all that will be fetched.
121
122 Returns
123 -------
124 vstack : `TableVStack`
125 An instance of this class, initialized with capacity equal to the
126 sum of the rowcounts of all the given table handles.
127 """
128 capacity = sum(handle.get(component="rowcount") for handle in handles)
129 return cls(capacity=capacity)
130
131 def extend(self, table):
132 """Add a single table to the stack.
133
134 Parameters
135 ----------
136 table : `astropy.table.Table`
137 An astropy table instance.
138 """
139 if self.result is None:
140 self.result = astropy.table.Table()
141 for name in table.colnames:
142 column = table[name]
143 column_cls = type(column)
144 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
145 self.result[name][:len(table)] = column
146 self.index = len(table)
147 self.result.meta = table.meta.copy()
148 else:
149 next_index = self.index + len(table)
150 if set(self.result.colnames) != set(table.colnames):
151 raise TypeError(
152 "Inconsistent columns in concatentation: "
153 f"{set(self.result.colnames).symmetric_difference(table.colnames)}"
154 )
155 for name in table.colnames:
156 out_col = self.result[name]
157 in_col = table[name]
158 if out_col.dtype != in_col.dtype:
159 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
160 self.result[name][self.index:next_index] = table[name]
161 self.index = next_index
162 # Butler provenance should be stripped on merge. It will be
163 # added by butler on write. No attempt is made here to combine
164 # provenance from multiple input tables.
165 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
166 strip_provenance_from_fits_header(self.result.meta)
167
168 @classmethod
169 def vstack_handles(cls, handles):
170 """Vertically stack tables represented by deferred dataset handles.
171
172 Parameters
173 ----------
174 handles : `~collections.abc.Iterable` [ \
175 `lsst.daf.butler.DeferredDatasetHandle` ]
176 Iterable of handles. Must have the "ArrowAstropy" storage class
177 and identical columns.
178
179 Returns
180 -------
181 table : `astropy.table.Table`
182 Concatenated table with the same columns as each input table and
183 the rows of all of them.
184 """
185 handles = tuple(handles) # guard against single-pass iterators
186 # Ensure that zero length catalogs are not included
187 rowcount = tuple(handle.get(component="rowcount") for handle in handles)
188 handles = tuple(handle for handle, count in zip(handles, rowcount) if count > 0)
189
190 vstack = cls.from_handles(handles)
191 for handle in handles:
192 vstack.extend(handle.get())
193 return vstack.result
194
195
196class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
197 defaultTemplates={"coaddName": "deep"},
198 dimensions=("tract", "patch", "skymap")):
199 inputCatalogMeas = connectionTypes.Input(
200 doc="Catalog of source measurements on the deepCoadd.",
201 dimensions=("tract", "patch", "band", "skymap"),
202 storageClass="SourceCatalog",
203 name="{coaddName}Coadd_meas",
204 multiple=True
205 )
206 inputCatalogForcedSrc = connectionTypes.Input(
207 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
208 dimensions=("tract", "patch", "band", "skymap"),
209 storageClass="SourceCatalog",
210 name="{coaddName}Coadd_forced_src",
211 multiple=True
212 )
213 inputCatalogPsfsMultiprofit = connectionTypes.Input(
214 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
215 dimensions=("tract", "patch", "band", "skymap"),
216 storageClass="ArrowAstropy",
217 name="{coaddName}Coadd_psfs_multiprofit",
218 multiple=True,
219 )
220 outputCatalog = connectionTypes.Output(
221 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
222 "stored as a DataFrame with a multi-level column index per-patch.",
223 dimensions=("tract", "patch", "skymap"),
224 storageClass="DataFrame",
225 name="{coaddName}Coadd_obj"
226 )
227
228
229class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
230 pipelineConnections=WriteObjectTableConnections):
231 coaddName = pexConfig.Field(
232 dtype=str,
233 default="deep",
234 doc="Name of coadd"
235 )
236
237
238class WriteObjectTableTask(pipeBase.PipelineTask):
239 """Write filter-merged object tables as a DataFrame in parquet format.
240 """
241 _DefaultName = "writeObjectTable"
242 ConfigClass = WriteObjectTableConfig
243
244 # Tag of output dataset written by `MergeSourcesTask.write`
245 outputDataset = "obj"
246
247 def runQuantum(self, butlerQC, inputRefs, outputRefs):
248 inputs = butlerQC.get(inputRefs)
249
250 catalogs = defaultdict(dict)
251 for dataset, connection in (
252 ("meas", "inputCatalogMeas"),
253 ("forced_src", "inputCatalogForcedSrc"),
254 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
255 ):
256 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
257 catalogs[ref.dataId["band"]][dataset] = cat
258
259 dataId = butlerQC.quantum.dataId
260 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
261 outputs = pipeBase.Struct(outputCatalog=df)
262 butlerQC.put(outputs, outputRefs)
263
264 def run(self, catalogs, tract, patch):
265 """Merge multiple catalogs.
266
267 Parameters
268 ----------
269 catalogs : `dict`
270 Mapping from filter names to dict of catalogs.
271 tract : int
272 tractId to use for the tractId column.
273 patch : str
274 patchId to use for the patchId column.
275
276 Returns
277 -------
278 catalog : `pandas.DataFrame`
279 Merged dataframe.
280
281 Raises
282 ------
283 ValueError
284 Raised if any of the catalogs is of an unsupported type.
285 """
286 dfs = []
287 for filt, tableDict in catalogs.items():
288 for dataset, table in tableDict.items():
289 # Convert afwTable to pandas DataFrame if needed
290 if isinstance(table, pd.DataFrame):
291 df = table
292 elif isinstance(table, afwTable.SourceCatalog):
293 df = table.asAstropy().to_pandas()
294 elif isinstance(table, astropy.table.Table):
295 df = table.to_pandas()
296 else:
297 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
298 df.set_index("id", drop=True, inplace=True)
299
300 # Sort columns by name, to ensure matching schema among patches
301 df = df.reindex(sorted(df.columns), axis=1)
302 df = df.assign(tractId=tract, patchId=patch)
303
304 # Make columns a 3-level MultiIndex
305 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
306 names=("dataset", "band", "column"))
307 dfs.append(df)
308
309 # We do this dance and not `pd.concat(dfs)` because the pandas
310 # concatenation uses infinite memory.
311 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
312 return catalog
313
314
315class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
316 defaultTemplates={"catalogType": ""},
317 dimensions=("instrument", "visit", "detector")):
318
319 catalog = connectionTypes.Input(
320 doc="Input full-depth catalog of sources produced by CalibrateTask",
321 name="{catalogType}src",
322 storageClass="SourceCatalog",
323 dimensions=("instrument", "visit", "detector")
324 )
325 outputCatalog = connectionTypes.Output(
326 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
327 name="{catalogType}source",
328 storageClass="ArrowAstropy",
329 dimensions=("instrument", "visit", "detector")
330 )
331
332
333class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
334 pipelineConnections=WriteSourceTableConnections):
335 pass
336
337
338class WriteSourceTableTask(pipeBase.PipelineTask):
339 """Write source table to DataFrame Parquet format.
340 """
341 _DefaultName = "writeSourceTable"
342 ConfigClass = WriteSourceTableConfig
343
344 def runQuantum(self, butlerQC, inputRefs, outputRefs):
345 inputs = butlerQC.get(inputRefs)
346 inputs["visit"] = butlerQC.quantum.dataId["visit"]
347 inputs["detector"] = butlerQC.quantum.dataId["detector"]
348 result = self.run(**inputs)
349 outputs = pipeBase.Struct(outputCatalog=result.table)
350 butlerQC.put(outputs, outputRefs)
351
352 def run(self, catalog, visit, detector, **kwargs):
353 """Convert `src` catalog to an Astropy table.
354
355 Parameters
356 ----------
357 catalog: `afwTable.SourceCatalog`
358 catalog to be converted
359 visit, detector: `int`
360 Visit and detector ids to be added as columns.
361 **kwargs
362 Additional keyword arguments are ignored as a convenience for
363 subclasses that pass the same arguments to several different
364 methods.
365
366 Returns
367 -------
368 result : `~lsst.pipe.base.Struct`
369 ``table``
370 `astropy.table.Table` version of the input catalog
371 """
372 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
373 tbl = catalog.asAstropy()
374 tbl["visit"] = visit
375 # int16 instead of uint8 because databases don't like unsigned bytes.
376 tbl["detector"] = np.int16(detector)
377
378 return pipeBase.Struct(table=tbl)
379
380
381class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
382 defaultTemplates={"catalogType": ""},
383 dimensions=("instrument", "visit", "detector", "skymap")):
384 visitSummary = connectionTypes.Input(
385 doc="Input visit-summary catalog with updated calibration objects.",
386 name="finalVisitSummary",
387 storageClass="ExposureCatalog",
388 dimensions=("instrument", "visit",),
389 )
390
391 def __init__(self, config):
392 # We don't want the input catalog here to be an initial existence
393 # constraint in QG generation, because that can unfortunately limit the
394 # set of data IDs of inputs to other tasks, even those that run earlier
395 # (e.g. updateVisitSummary), when the input 'src' catalog is not
396 # produced. It's safer to just use 'visitSummary' existence as an
397 # initial constraint, and then let the graph prune out the detectors
398 # that don't have a 'src' for this task only.
399 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
400
401
402class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
403 pipelineConnections=WriteRecalibratedSourceTableConnections):
404
405 doReevaluatePhotoCalib = pexConfig.Field(
406 dtype=bool,
407 default=True,
408 doc=("Add or replace local photoCalib columns"),
409 )
410 doReevaluateSkyWcs = pexConfig.Field(
411 dtype=bool,
412 default=True,
413 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
414 )
415
416
417class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
418 """Write source table to DataFrame Parquet format.
419 """
420 _DefaultName = "writeRecalibratedSourceTable"
421 ConfigClass = WriteRecalibratedSourceTableConfig
422
423 def runQuantum(self, butlerQC, inputRefs, outputRefs):
424 inputs = butlerQC.get(inputRefs)
425
426 inputs["visit"] = butlerQC.quantum.dataId["visit"]
427 inputs["detector"] = butlerQC.quantum.dataId["detector"]
428
429 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
430 exposure = ExposureF()
431 inputs["exposure"] = self.prepareCalibratedExposure(
432 exposure=exposure,
433 visitSummary=inputs["visitSummary"],
434 detectorId=butlerQC.quantum.dataId["detector"]
435 )
436 inputs["catalog"] = self.addCalibColumns(**inputs)
437
438 result = self.run(**inputs)
439 outputs = pipeBase.Struct(outputCatalog=result.table)
440 butlerQC.put(outputs, outputRefs)
441
442 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
443 """Prepare a calibrated exposure and apply external calibrations
444 if so configured.
445
446 Parameters
447 ----------
448 exposure : `lsst.afw.image.exposure.Exposure`
449 Input exposure to adjust calibrations. May be an empty Exposure.
450 detectorId : `int`
451 Detector ID associated with the exposure.
452 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
453 Exposure catalog with all calibration objects. WCS and PhotoCalib
454 are always applied if ``visitSummary`` is provided and those
455 components are not `None`.
456
457 Returns
458 -------
459 exposure : `lsst.afw.image.exposure.Exposure`
460 Exposure with adjusted calibrations.
461 """
462 if visitSummary is not None:
463 row = visitSummary.find(detectorId)
464 if row is None:
465 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
466 if (photoCalib := row.getPhotoCalib()) is None:
467 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
468 "skipping reevaluation of photoCalib.", detectorId)
469 exposure.setPhotoCalib(None)
470 else:
471 exposure.setPhotoCalib(photoCalib)
472 if (skyWcs := row.getWcs()) is None:
473 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
474 "skipping reevaluation of skyWcs.", detectorId)
475 exposure.setWcs(None)
476 else:
477 exposure.setWcs(skyWcs)
478
479 return exposure
480
481 def addCalibColumns(self, catalog, exposure, **kwargs):
482 """Add replace columns with calibs evaluated at each centroid
483
484 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
485 a source catalog, by rerunning the plugins.
486
487 Parameters
488 ----------
489 catalog : `lsst.afw.table.SourceCatalog`
490 catalog to which calib columns will be added
491 exposure : `lsst.afw.image.exposure.Exposure`
492 Exposure with attached PhotoCalibs and SkyWcs attributes to be
493 reevaluated at local centroids. Pixels are not required.
494 **kwargs
495 Additional keyword arguments are ignored to facilitate passing the
496 same arguments to several methods.
497
498 Returns
499 -------
500 newCat: `lsst.afw.table.SourceCatalog`
501 Source Catalog with requested local calib columns
502 """
503 measureConfig = SingleFrameMeasurementTask.ConfigClass()
504 measureConfig.doReplaceWithNoise = False
505
506 # Clear all slots, because we aren't running the relevant plugins.
507 for slot in measureConfig.slots:
508 setattr(measureConfig.slots, slot, None)
509
510 measureConfig.plugins.names = []
511 if self.config.doReevaluateSkyWcs:
512 measureConfig.plugins.names.add("base_LocalWcs")
513 self.log.info("Re-evaluating base_LocalWcs plugin")
514 if self.config.doReevaluatePhotoCalib:
515 measureConfig.plugins.names.add("base_LocalPhotoCalib")
516 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
517 pluginsNotToCopy = tuple(measureConfig.plugins.names)
518
519 # Create a new schema and catalog
520 # Copy all columns from original except for the ones to reevaluate
521 aliasMap = catalog.schema.getAliasMap()
522 mapper = afwTable.SchemaMapper(catalog.schema)
523 for item in catalog.schema:
524 if not item.field.getName().startswith(pluginsNotToCopy):
525 mapper.addMapping(item.key)
526
527 schema = mapper.getOutputSchema()
528 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
529 schema.setAliasMap(aliasMap)
530 newCat = afwTable.SourceCatalog(schema)
531 newCat.extend(catalog, mapper=mapper)
532
533 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
534 # update here. LocalPhotoCalibs are applied during transform tasks.
535 # Update coord_ra/coord_dec, which are expected to be positions on the
536 # sky and are used as such in sdm tables without transform
537 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
538 afwTable.updateSourceCoords(exposure.wcs, newCat)
539 wcsPlugin = measurement.plugins["base_LocalWcs"]
540 else:
541 wcsPlugin = None
542
543 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
544 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
545 else:
546 pcPlugin = None
547
548 for row in newCat:
549 if wcsPlugin is not None:
550 wcsPlugin.measure(row, exposure)
551 if pcPlugin is not None:
552 pcPlugin.measure(row, exposure)
553
554 return newCat
555
556
557class PostprocessAnalysis(object):
558 """Calculate columns from DataFrames or handles storing DataFrames.
559
560 This object manages and organizes an arbitrary set of computations
561 on a catalog. The catalog is defined by a
562 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
563 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
564 computations are defined by a collection of
565 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
566 ``CompositeFunctor``).
567
568 After the object is initialized, accessing the ``.df`` attribute (which
569 holds the `pandas.DataFrame` containing the results of the calculations)
570 triggers computation of said dataframe.
571
572 One of the conveniences of using this object is the ability to define a
573 desired common filter for all functors. This enables the same functor
574 collection to be passed to several different `PostprocessAnalysis` objects
575 without having to change the original functor collection, since the ``filt``
576 keyword argument of this object triggers an overwrite of the ``filt``
577 property for all functors in the collection.
578
579 This object also allows a list of refFlags to be passed, and defines a set
580 of default refFlags that are always included even if not requested.
581
582 If a list of DataFrames or Handles is passed, rather than a single one,
583 then the calculations will be mapped over all the input catalogs. In
584 principle, it should be straightforward to parallelize this activity, but
585 initial tests have failed (see TODO in code comments).
586
587 Parameters
588 ----------
589 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
590 `~lsst.pipe.base.InMemoryDatasetHandle` or
591 list of these.
592 Source catalog(s) for computation.
593 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
594 Computations to do (functors that act on ``handles``).
595 If a dict, the output
596 DataFrame will have columns keyed accordingly.
597 If a list, the column keys will come from the
598 ``.shortname`` attribute of each functor.
599
600 filt : `str`, optional
601 Filter in which to calculate. If provided,
602 this will overwrite any existing ``.filt`` attribute
603 of the provided functors.
604
605 flags : `list`, optional
606 List of flags (per-band) to include in output table.
607 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
608
609 refFlags : `list`, optional
610 List of refFlags (only reference band) to include in output table.
611
612 forcedFlags : `list`, optional
613 List of flags (per-band) to include in output table.
614 Taken from the ``forced_src`` dataset if applied to a
615 multilevel Object Table. Intended for flags from measurement plugins
616 only run during multi-band forced-photometry.
617 """
618 _defaultRefFlags = []
619 _defaultFuncs = ()
620
621 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
622 self.handles = handles
623 self.functors = functors
624
625 self.filt = filt
626 self.flags = list(flags) if flags is not None else []
627 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
628 self.refFlags = list(self._defaultRefFlags)
629 if refFlags is not None:
630 self.refFlags += list(refFlags)
631
632 self._df = None
633
634 @property
635 def defaultFuncs(self):
636 funcs = dict(self._defaultFuncs)
637 return funcs
638
639 @property
640 def func(self):
641 additionalFuncs = self.defaultFuncs
642 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
643 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
644 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
645
646 if isinstance(self.functors, CompositeFunctor):
647 func = self.functors
648 else:
649 func = CompositeFunctor(self.functors)
650
651 func.funcDict.update(additionalFuncs)
652 func.filt = self.filt
653
654 return func
655
656 @property
657 def noDupCols(self):
658 return [name for name, func in self.func.funcDict.items() if func.noDup]
659
660 @property
661 def df(self):
662 if self._df is None:
663 self.compute()
664 return self._df
665
666 def compute(self, dropna=False, pool=None):
667 # map over multiple handles
668 if type(self.handles) in (list, tuple):
669 if pool is None:
670 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
671 else:
672 # TODO: Figure out why this doesn't work (pyarrow pickling
673 # issues?)
674 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
675 self._df = pd.concat(dflist)
676 else:
677 self._df = self.func(self.handles, dropna=dropna)
678
679 return self._df
680
681
682class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
683 dimensions=()):
684 """Expected Connections for subclasses of TransformCatalogBaseTask.
685
686 Must be subclassed.
687 """
688 inputCatalog = connectionTypes.Input(
689 name="",
690 storageClass="DataFrame",
691 )
692 outputCatalog = connectionTypes.Output(
693 name="",
694 storageClass="ArrowAstropy",
695 )
696
697
698class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
699 pipelineConnections=TransformCatalogBaseConnections):
700 functorFile = pexConfig.Field(
701 dtype=str,
702 doc="Path to YAML file specifying Science Data Model functors to use "
703 "when copying columns and computing calibrated values.",
704 default=None,
705 optional=True
706 )
707 primaryKey = pexConfig.Field(
708 dtype=str,
709 doc="Name of column to be set as the DataFrame index. If None, the index"
710 "will be named `id`",
711 default=None,
712 optional=True
713 )
714 columnsFromDataId = pexConfig.ListField(
715 dtype=str,
716 default=None,
717 optional=True,
718 doc="Columns to extract from the dataId",
719 )
720
721
722class TransformCatalogBaseTask(pipeBase.PipelineTask):
723 """Base class for transforming/standardizing a catalog by applying functors
724 that convert units and apply calibrations.
725
726 The purpose of this task is to perform a set of computations on an input
727 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
728 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
729 a new dataset (which needs to be declared in an ``outputDataset``
730 attribute).
731
732 The calculations to be performed are defined in a YAML file that specifies
733 a set of functors to be computed, provided as a ``--functorFile`` config
734 parameter. An example of such a YAML file is the following:
735
736 funcs:
737 sourceId:
738 functor: Index
739 x:
740 functor: Column
741 args: slot_Centroid_x
742 y:
743 functor: Column
744 args: slot_Centroid_y
745 psfFlux:
746 functor: LocalNanojansky
747 args:
748 - slot_PsfFlux_instFlux
749 - slot_PsfFlux_instFluxErr
750 - base_LocalPhotoCalib
751 - base_LocalPhotoCalibErr
752 psfFluxErr:
753 functor: LocalNanojanskyErr
754 args:
755 - slot_PsfFlux_instFlux
756 - slot_PsfFlux_instFluxErr
757 - base_LocalPhotoCalib
758 - base_LocalPhotoCalibErr
759 flags:
760 - detect_isPrimary
761
762 The names for each entry under "func" will become the names of columns in
763 the output dataset. All the functors referenced are defined in
764 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
765 functor are in the `args` list, and any additional entries for each column
766 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
767 treated as keyword arguments to be passed to the functor initialization.
768
769 The "flags" entry is the default shortcut for `Column` functors.
770 All columns listed under "flags" will be copied to the output table
771 untransformed. They can be of any datatype.
772 In the special case of transforming a multi-level oject table with
773 band and dataset indices (deepCoadd_obj), these will be taked from the
774 ``meas`` dataset and exploded out per band.
775
776 There are two special shortcuts that only apply when transforming
777 multi-level Object (deepCoadd_obj) tables:
778 - The "refFlags" entry is shortcut for `Column` functor
779 taken from the ``ref`` dataset if transforming an ObjectTable.
780 - The "forcedFlags" entry is shortcut for `Column` functors.
781 taken from the ``forced_src`` dataset if transforming an ObjectTable.
782 These are expanded out per band.
783
784
785 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
786 to organize and excecute the calculations.
787 """
788 @property
789 def _DefaultName(self):
790 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
791
792 @property
793 def outputDataset(self):
794 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
795
796 @property
797 def inputDataset(self):
798 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
799
800 @property
801 def ConfigClass(self):
802 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
803
804 def __init__(self, *args, **kwargs):
805 super().__init__(*args, **kwargs)
806 if self.config.functorFile:
807 self.log.info("Loading tranform functor definitions from %s",
808 self.config.functorFile)
809 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
810 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
811 else:
812 self.funcs = None
813
814 def runQuantum(self, butlerQC, inputRefs, outputRefs):
815 inputs = butlerQC.get(inputRefs)
816 if self.funcs is None:
817 raise ValueError("config.functorFile is None. "
818 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
819 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
820 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
821 butlerQC.put(result, outputRefs)
822
823 def run(self, handle, funcs=None, dataId=None, band=None):
824 """Do postprocessing calculations
825
826 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
827 ``DataFrame`` object and dataId,
828 returns a dataframe with results of postprocessing calculations.
829
830 Parameters
831 ----------
832 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
833 `~lsst.pipe.base.InMemoryDatasetHandle` or
834 `~pandas.DataFrame`, or list of these.
835 DataFrames from which calculations are done.
836 funcs : `~lsst.pipe.tasks.functors.Functor`
837 Functors to apply to the table's columns
838 dataId : dict, optional
839 Used to add a `patchId` column to the output dataframe.
840 band : `str`, optional
841 Filter band that is being processed.
842
843 Returns
844 -------
845 result : `lsst.pipe.base.Struct`
846 Result struct, with a single ``outputCatalog`` attribute holding
847 the transformed catalog.
848 """
849 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
850
851 df = self.transform(band, handle, funcs, dataId).df
852 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
853
854 if len(df) == 0:
855 raise UpstreamFailureNoWorkFound(
856 "Input catalog is empty, so there is nothing to transform/standardize",
857 )
858
859 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
860 return result
861
862 def getFunctors(self):
863 return self.funcs
864
865 def getAnalysis(self, handles, funcs=None, band=None):
866 if funcs is None:
867 funcs = self.funcs
868 analysis = PostprocessAnalysis(handles, funcs, filt=band)
869 return analysis
870
871 def transform(self, band, handles, funcs, dataId):
872 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
873 df = analysis.df
874 if dataId and self.config.columnsFromDataId:
875 for key in self.config.columnsFromDataId:
876 if key in dataId:
877 if key == "detector":
878 # int16 instead of uint8 because databases don't like unsigned bytes.
879 df[key] = np.int16(dataId[key])
880 else:
881 df[key] = dataId[key]
882 else:
883 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
884
885 if self.config.primaryKey:
886 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
887 df.reset_index(inplace=True, drop=True)
888 df.set_index(self.config.primaryKey, inplace=True)
889
890 return pipeBase.Struct(
891 df=df,
892 analysis=analysis
893 )
894
895
896class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
897 defaultTemplates={"coaddName": "deep"},
898 dimensions=("tract", "patch", "skymap")):
899 inputCatalog = connectionTypes.Input(
900 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
901 "stored as a DataFrame with a multi-level column index per-patch.",
902 dimensions=("tract", "patch", "skymap"),
903 storageClass="DataFrame",
904 name="{coaddName}Coadd_obj",
905 deferLoad=True,
906 )
907 inputCatalogRef = connectionTypes.Input(
908 doc="Catalog marking the primary detection (which band provides a good shape and position)"
909 "for each detection in deepCoadd_mergeDet.",
910 dimensions=("tract", "patch", "skymap"),
911 storageClass="SourceCatalog",
912 name="{coaddName}Coadd_ref",
913 deferLoad=True,
914 )
915 inputCatalogSersicMultiprofit = connectionTypes.Input(
916 doc="Catalog of source measurements on the deepCoadd.",
917 dimensions=("tract", "patch", "skymap"),
918 storageClass="ArrowAstropy",
919 name="{coaddName}Coadd_Sersic_multiprofit",
920 deferLoad=True,
921 )
922 inputCatalogEpoch = connectionTypes.Input(
923 doc="Catalog of mean epochs for each object per band.",
924 dimensions=("tract", "patch", "skymap"),
925 storageClass="ArrowAstropy",
926 name="object_epoch",
927 deferLoad=True,
928 )
929 outputCatalog = connectionTypes.Output(
930 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
931 "data model.",
932 dimensions=("tract", "patch", "skymap"),
933 storageClass="ArrowAstropy",
934 name="objectTable"
935 )
936
937 def __init__(self, *, config=None):
938 super().__init__(config=config)
939 if config.multilevelOutput:
940 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
941
942
943class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
944 pipelineConnections=TransformObjectCatalogConnections):
945 coaddName = pexConfig.Field(
946 dtype=str,
947 default="deep",
948 doc="Name of coadd"
949 )
950 outputBands = pexConfig.ListField(
951 dtype=str,
952 default=None,
953 optional=True,
954 doc=("These bands and only these bands will appear in the output,"
955 " NaN-filled if the input does not include them."
956 " If None, then use all bands found in the input.")
957 )
958 camelCase = pexConfig.Field(
959 dtype=bool,
960 default=False,
961 doc=("Write per-band columns names with camelCase, else underscore "
962 "For example: gPsFlux instead of g_PsFlux.")
963 )
964 multilevelOutput = pexConfig.Field(
965 dtype=bool,
966 default=False,
967 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
968 "and name-munged (False). If True, the output storage class will be "
969 "set to DataFrame, since astropy tables do not support multi-level indexing."),
970 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
971 )
972 goodFlags = pexConfig.ListField(
973 dtype=str,
974 default=[],
975 doc=("List of 'good' flags that should be set False when populating empty tables. "
976 "All other flags are considered to be 'bad' flags and will be set to True.")
977 )
978 floatFillValue = pexConfig.Field(
979 dtype=float,
980 default=np.nan,
981 doc="Fill value for float fields when populating empty tables."
982 )
983 integerFillValue = pexConfig.Field(
984 dtype=int,
985 default=-1,
986 doc="Fill value for integer fields when populating empty tables."
987 )
988
989 def setDefaults(self):
990 super().setDefaults()
991 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
992 self.primaryKey = "objectId"
993 self.columnsFromDataId = ["tract", "patch"]
994 self.goodFlags = ["calib_astrometry_used",
995 "calib_photometry_reserved",
996 "calib_photometry_used",
997 "calib_psf_candidate",
998 "calib_psf_reserved",
999 "calib_psf_used"]
1000
1001
1002class TransformObjectCatalogTask(TransformCatalogBaseTask):
1003 """Produce a flattened Object Table to match the format specified in
1004 sdm_schemas.
1005
1006 Do the same set of postprocessing calculations on all bands.
1007
1008 This is identical to `TransformCatalogBaseTask`, except for that it does
1009 the specified functor calculations for all filters present in the
1010 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1011 by the YAML file will be superceded.
1012 """
1013 _DefaultName = "transformObjectCatalog"
1014 ConfigClass = TransformObjectCatalogConfig
1015
1016 datasets_multiband = ("epoch", "ref", "Sersic_multiprofit")
1017
1018 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1019 inputs = butlerQC.get(inputRefs)
1020 if self.funcs is None:
1021 raise ValueError("config.functorFile is None. "
1022 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1023 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
1024 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1025 handle_epoch=inputs["inputCatalogEpoch"],
1026 handle_ref=inputs["inputCatalogRef"],
1027 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
1028 )
1029 butlerQC.put(result, outputRefs)
1030
1031 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1032 # NOTE: band kwarg is ignored here.
1033 # TODO: Document and improve funcs argument usage in DM-48895
1034 # self.getAnalysis only supports list, dict and CompositeFunctor
1035 if isinstance(funcs, CompositeFunctor):
1036 funcDict_in = funcs.funcDict
1037 elif isinstance(funcs, dict):
1038 funcDict_in = funcs
1039 elif isinstance(funcs, list):
1040 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
1041 else:
1042 raise TypeError(f"Unsupported {type(funcs)=}")
1043
1044 handles_multi = {}
1045 funcDicts_multiband = {}
1046 for dataset in self.datasets_multiband:
1047 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
1048 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
1049 handles_multi[dataset] = handle_multi
1050 funcDicts_multiband[dataset] = {}
1051
1052 dfDict = {}
1053 analysisDict = {}
1054 templateDf = pd.DataFrame()
1055
1056 columns = handle.get(component="columns")
1057 inputBands = columns.unique(level=1).values
1058
1059 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1060
1061 # Split up funcs for per-band and multiband tables
1062 funcDict_band = {}
1063
1064 for name, func in funcDict_in.items():
1065 if func.dataset in funcDicts_multiband:
1066 # This is something like a MultibandColumn
1067 if band := getattr(func, "band_to_check", None):
1068 if band not in outputBands:
1069 continue
1070 # This is something like a ReferenceBand that has configurable bands
1071 elif hasattr(func, "bands"):
1072 # TODO: Determine if this can be avoided DM-48895
1073 # This will work fine if the init doesn't manipulate bands
1074 # If it does, then one would need to make a new functor
1075 # Determining the (kw)args is tricky in that case
1076 func.bands = tuple(inputBands)
1077
1078 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1079 funcDict[name] = func
1080
1081 funcs_band = CompositeFunctor(funcDict_band)
1082
1083 # Perform transform for data of filters that exist in the handle dataframe.
1084 for inputBand in inputBands:
1085 if inputBand not in outputBands:
1086 self.log.info("Ignoring %s band data in the input", inputBand)
1087 continue
1088 self.log.info("Transforming the catalog of band %s", inputBand)
1089 result = self.transform(inputBand, handle, funcs_band, dataId)
1090 dfDict[inputBand] = result.df
1091 analysisDict[inputBand] = result.analysis
1092 if templateDf.empty:
1093 templateDf = result.df
1094
1095 # Put filler values in columns of other wanted bands
1096 for filt in outputBands:
1097 if filt not in dfDict:
1098 self.log.info("Adding empty columns for band %s", filt)
1099 dfTemp = templateDf.copy()
1100 for col in dfTemp.columns:
1101 testValue = dfTemp[col].values[0]
1102 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1103 # Boolean flag type, check if it is a "good" flag
1104 if col in self.config.goodFlags:
1105 fillValue = False
1106 else:
1107 fillValue = True
1108 elif isinstance(testValue, numbers.Integral):
1109 # Checking numbers.Integral catches all flavors
1110 # of python, numpy, pandas, etc. integers.
1111 # We must ensure this is not an unsigned integer.
1112 if isinstance(testValue, np.unsignedinteger):
1113 raise ValueError("Parquet tables may not have unsigned integer columns.")
1114 else:
1115 fillValue = self.config.integerFillValue
1116 else:
1117 fillValue = self.config.floatFillValue
1118 dfTemp[col].values[:] = fillValue
1119 dfDict[filt] = dfTemp
1120
1121 # This makes a multilevel column index, with band as first level
1122 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1123 name_index = df.index.name
1124
1125 # TODO: Remove in DM-48895
1126 if not self.config.multilevelOutput:
1127 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1128 if self.config.primaryKey in noDupCols:
1129 noDupCols.remove(self.config.primaryKey)
1130 if dataId and self.config.columnsFromDataId:
1131 noDupCols += self.config.columnsFromDataId
1132 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1133 inputBands=inputBands)
1134
1135 # Apply per-dataset functors to each multiband dataset in turn
1136 for dataset, funcDict in funcDicts_multiband.items():
1137 handle_multiband = handles_multi[dataset]
1138 df_dataset = handle_multiband.get()
1139 if isinstance(df_dataset, astropy.table.Table):
1140 # Allow astropy table inputs to already have the output index
1141 if name_index not in df_dataset.colnames:
1142 if self.config.primaryKey in df_dataset.colnames:
1143 name_index_ap = self.config.primaryKey
1144 else:
1145 raise RuntimeError(
1146 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1147 f" {df_dataset.colnames=} for {dataset=}"
1148 )
1149 else:
1150 name_index_ap = name_index
1151 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False)
1152 elif isinstance(df_dataset, afwTable.SourceCatalog):
1153 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1154 # TODO: should funcDict have noDup funcs removed?
1155 # noDup was intended for per-band tables.
1156 result = self.transform(
1157 None,
1158 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1159 CompositeFunctor(funcDict),
1160 dataId,
1161 )
1162 result.df.index.name = name_index
1163 # Drop columns from dataId if present (patch, tract)
1164 if self.config.columnsFromDataId:
1165 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1166 if columns_drop:
1167 result.df.drop(columns_drop, axis=1, inplace=True)
1168 # Make the same multi-index for the multiband table if needed
1169 # This might end up making copies, one of several reasons to avoid
1170 # using multilevel indexes, or DataFrames at all
1171 to_concat = pd.concat(
1172 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1173 ) if self.config.multilevelOutput else result.df
1174 df = pd.concat([df, to_concat], axis=1)
1175 analysisDict[dataset] = result.analysis
1176 del result
1177
1178 df.index.name = self.config.primaryKey
1179
1180 if not self.config.multilevelOutput:
1181 tbl = pandas_to_astropy(df)
1182 else:
1183 tbl = df
1184
1185 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1186
1187 return pipeBase.Struct(outputCatalog=tbl)
1188
1189
1190class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1191 dimensions=("tract", "skymap")):
1192 inputCatalogs = connectionTypes.Input(
1193 doc="Per-Patch objectTables conforming to the standard data model.",
1194 name="objectTable",
1195 storageClass="ArrowAstropy",
1196 dimensions=("tract", "patch", "skymap"),
1197 multiple=True,
1198 deferLoad=True,
1199 )
1200 outputCatalog = connectionTypes.Output(
1201 doc="Pre-tract horizontal concatenation of the input objectTables",
1202 name="objectTable_tract",
1203 storageClass="ArrowAstropy",
1204 dimensions=("tract", "skymap"),
1205 )
1206
1207
1208class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1209 pipelineConnections=ConsolidateObjectTableConnections):
1210 coaddName = pexConfig.Field(
1211 dtype=str,
1212 default="deep",
1213 doc="Name of coadd"
1214 )
1215
1216
1217class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1218 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1219
1220 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1221 """
1222 _DefaultName = "consolidateObjectTable"
1223 ConfigClass = ConsolidateObjectTableConfig
1224
1225 inputDataset = "objectTable"
1226 outputDataset = "objectTable_tract"
1227
1228 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1229 inputs = butlerQC.get(inputRefs)
1230 self.log.info("Concatenating %s per-patch Object Tables",
1231 len(inputs["inputCatalogs"]))
1232 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1233 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1234
1235
1236class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1237 defaultTemplates={"catalogType": ""},
1238 dimensions=("instrument", "visit", "detector")):
1239
1240 inputCatalog = connectionTypes.Input(
1241 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1242 name="{catalogType}source",
1243 storageClass="DataFrame",
1244 dimensions=("instrument", "visit", "detector"),
1245 deferLoad=True
1246 )
1247 outputCatalog = connectionTypes.Output(
1248 doc="Narrower, per-detector Source Table transformed and converted per a "
1249 "specified set of functors",
1250 name="{catalogType}sourceTable",
1251 storageClass="ArrowAstropy",
1252 dimensions=("instrument", "visit", "detector")
1253 )
1254
1255
1256class TransformSourceTableConfig(TransformCatalogBaseConfig,
1257 pipelineConnections=TransformSourceTableConnections):
1258
1259 def setDefaults(self):
1260 super().setDefaults()
1261 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1262 self.primaryKey = "sourceId"
1263 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1264
1265
1266class TransformSourceTableTask(TransformCatalogBaseTask):
1267 """Transform/standardize a source catalog
1268 """
1269 _DefaultName = "transformSourceTable"
1270 ConfigClass = TransformSourceTableConfig
1271
1272
1273class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1274 dimensions=("instrument", "visit",),
1275 defaultTemplates={"calexpType": ""}):
1276 calexp = connectionTypes.Input(
1277 doc="Processed exposures used for metadata",
1278 name="calexp",
1279 storageClass="ExposureF",
1280 dimensions=("instrument", "visit", "detector"),
1281 deferLoad=True,
1282 multiple=True,
1283 )
1284 visitSummary = connectionTypes.Output(
1285 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1286 "detector id for the id and are sorted for fast lookups of a "
1287 "detector."),
1288 name="visitSummary",
1289 storageClass="ExposureCatalog",
1290 dimensions=("instrument", "visit"),
1291 )
1292 visitSummarySchema = connectionTypes.InitOutput(
1293 doc="Schema of the visitSummary catalog",
1294 name="visitSummary_schema",
1295 storageClass="ExposureCatalog",
1296 )
1297
1298
1299class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1300 pipelineConnections=ConsolidateVisitSummaryConnections):
1301 """Config for ConsolidateVisitSummaryTask"""
1302
1303 full = pexConfig.Field(
1304 "Whether to propate all exposure components. "
1305 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1306 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1307 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1308 dtype=bool,
1309 default=False,
1310 )
1311
1312
1313class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1314 """Task to consolidate per-detector visit metadata.
1315
1316 This task aggregates the following metadata from all the detectors in a
1317 single visit into an exposure catalog:
1318 - The visitInfo.
1319 - The wcs.
1320 - The photoCalib.
1321 - The physical_filter and band (if available).
1322 - The PSF model.
1323 - The aperture correction map.
1324 - The transmission curve.
1325 - The psf size, shape, and effective area at the center of the detector.
1326 - The corners of the bounding box in right ascension/declination.
1327
1328 Tests for this task are performed in ci_hsc_gen3.
1329 """
1330 _DefaultName = "consolidateVisitSummary"
1331 ConfigClass = ConsolidateVisitSummaryConfig
1332
1333 def __init__(self, **kwargs):
1334 super().__init__(**kwargs)
1335 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1336 self.schema.addField("visit", type="L", doc="Visit number")
1337 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1338 self.schema.addField("band", type="String", size=32, doc="Name of band")
1339 ExposureSummaryStats.update_schema(self.schema)
1340 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1341
1342 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1343 dataRefs = butlerQC.get(inputRefs.calexp)
1344 visit = dataRefs[0].dataId["visit"]
1345
1346 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1347 len(dataRefs), visit)
1348
1349 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1350
1351 butlerQC.put(expCatalog, outputRefs.visitSummary)
1352
1353 def _combineExposureMetadata(self, visit, dataRefs):
1354 """Make a combined exposure catalog from a list of dataRefs.
1355 These dataRefs must point to exposures with wcs, summaryStats,
1356 and other visit metadata.
1357
1358 Parameters
1359 ----------
1360 visit : `int`
1361 Visit identification number.
1362 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1363 List of dataRefs in visit.
1364
1365 Returns
1366 -------
1367 visitSummary : `lsst.afw.table.ExposureCatalog`
1368 Exposure catalog with per-detector summary information.
1369 """
1370 cat = afwTable.ExposureCatalog(self.schema)
1371 cat.resize(len(dataRefs))
1372
1373 cat["visit"] = visit
1374
1375 for i, dataRef in enumerate(dataRefs):
1376 visitInfo = dataRef.get(component="visitInfo")
1377 filterLabel = dataRef.get(component="filter")
1378 summaryStats = dataRef.get(component="summaryStats")
1379 detector = dataRef.get(component="detector")
1380 wcs = dataRef.get(component="wcs")
1381 photoCalib = dataRef.get(component="photoCalib")
1382 bbox = dataRef.get(component="bbox")
1383 validPolygon = dataRef.get(component="validPolygon")
1384
1385 rec = cat[i]
1386 rec.setBBox(bbox)
1387 rec.setVisitInfo(visitInfo)
1388 rec.setWcs(wcs)
1389 rec.setPhotoCalib(photoCalib)
1390 rec.setValidPolygon(validPolygon)
1391
1392 if self.config.full:
1393 rec.setPsf(dataRef.get(component="psf"))
1394 rec.setApCorrMap(dataRef.get(component="apCorrMap"))
1395 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve"))
1396
1397 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1398 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1399 rec.setId(detector.getId())
1400 summaryStats.update_record(rec)
1401
1402 if not cat:
1403 raise pipeBase.NoWorkFound(
1404 "No detectors had sufficient information to make a visit summary row."
1405 )
1406
1407 metadata = dafBase.PropertyList()
1408 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1409 # We are looping over existing datarefs, so the following is true
1410 metadata.add("COMMENT", "Only detectors with data have entries.")
1411 cat.setMetadata(metadata)
1412
1413 cat.sort()
1414 return cat
1415
1416
1417class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1418 defaultTemplates={"catalogType": ""},
1419 dimensions=("instrument", "visit")):
1420 inputCatalogs = connectionTypes.Input(
1421 doc="Input per-detector Source Tables",
1422 name="{catalogType}sourceTable",
1423 storageClass="ArrowAstropy",
1424 dimensions=("instrument", "visit", "detector"),
1425 multiple=True,
1426 deferLoad=True,
1427 )
1428 outputCatalog = connectionTypes.Output(
1429 doc="Per-visit concatenation of Source Table",
1430 name="{catalogType}sourceTable_visit",
1431 storageClass="ArrowAstropy",
1432 dimensions=("instrument", "visit")
1433 )
1434
1435
1436class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1437 pipelineConnections=ConsolidateSourceTableConnections):
1438 pass
1439
1440
1441class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1442 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1443 """
1444 _DefaultName = "consolidateSourceTable"
1445 ConfigClass = ConsolidateSourceTableConfig
1446
1447 inputDataset = "sourceTable"
1448 outputDataset = "sourceTable_visit"
1449
1450 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1451 # Docstring inherited.
1452 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1453 detectorOrder.sort()
1454 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1455 inputs = butlerQC.get(inputRefs)
1456 self.log.info("Concatenating %s per-detector Source Tables",
1457 len(inputs["inputCatalogs"]))
1458 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1459 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1460
1461
1462class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1463 dimensions=("instrument",),
1464 defaultTemplates={"calexpType": ""}):
1465 visitSummaryRefs = connectionTypes.Input(
1466 doc="Data references for per-visit consolidated exposure metadata",
1467 name="finalVisitSummary",
1468 storageClass="ExposureCatalog",
1469 dimensions=("instrument", "visit"),
1470 multiple=True,
1471 deferLoad=True,
1472 )
1473 outputCatalog = connectionTypes.Output(
1474 doc="CCD and Visit metadata table",
1475 name="ccdVisitTable",
1476 storageClass="ArrowAstropy",
1477 dimensions=("instrument",)
1478 )
1479
1480
1481class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1482 pipelineConnections=MakeCcdVisitTableConnections):
1483 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1484
1485
1486class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1487 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1488 """
1489 _DefaultName = "makeCcdVisitTable"
1490 ConfigClass = MakeCcdVisitTableConfig
1491
1492 def run(self, visitSummaryRefs):
1493 """Make a table of ccd information from the visit summary catalogs.
1494
1495 Parameters
1496 ----------
1497 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1498 List of DeferredDatasetHandles pointing to exposure catalogs with
1499 per-detector summary information.
1500
1501 Returns
1502 -------
1503 result : `~lsst.pipe.base.Struct`
1504 Results struct with attribute:
1505
1506 ``outputCatalog``
1507 Catalog of ccd and visit information.
1508 """
1509 ccdEntries = []
1510 for visitSummaryRef in visitSummaryRefs:
1511 visitSummary = visitSummaryRef.get()
1512 if not visitSummary:
1513 continue
1514 visitInfo = visitSummary[0].getVisitInfo()
1515
1516 # Strip provenance to prevent merge confusion.
1517 strip_provenance_from_fits_header(visitSummary.metadata)
1518
1519 ccdEntry = {}
1520 summaryTable = visitSummary.asAstropy()
1521 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1522 "pixelScale", "zenithDistance",
1523 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1524 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1525 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1526 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1527 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1528 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1529 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1530 "maxDistToNearestPsf",
1531 "effTime", "effTimePsfSigmaScale",
1532 "effTimeSkyBgScale", "effTimeZeroPointScale",
1533 "magLim"]
1534 ccdEntry = summaryTable[selectColumns]
1535 # 'visit' is the human readable visit number.
1536 # 'visitId' is the key to the visitId table. They are the same.
1537 # Technically you should join to get the visit from the visit
1538 # table.
1539 ccdEntry.rename_column("visit", "visitId")
1540 ccdEntry.rename_column("id", "detectorId")
1541
1542 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1543 # compatibility. To be removed after September 2023.
1544 ccdEntry["decl"] = ccdEntry["dec"]
1545
1546 ccdEntry["ccdVisitId"] = [
1547 self.config.idGenerator.apply(
1548 visitSummaryRef.dataId,
1549 detector=detector_id,
1550 is_exposure=False,
1551 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1552 # because it's usually the ID for a whole catalog
1553 # with a {visit, detector}, and that's the main
1554 # use case for IdGenerator. This usage for a
1555 # summary table is rare.
1556 for detector_id in summaryTable["id"]
1557 ]
1558 ccdEntry["detector"] = summaryTable["id"]
1559 ccdEntry["seeing"] = (
1560 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1561 )
1562 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1563 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1564 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1565 expTime = visitInfo.getExposureTime()
1566 ccdEntry["obsStart"] = (
1567 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1568 )
1569 expTime_days = expTime / (60*60*24)
1570 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1571 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1572 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1573 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1574 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1575 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1576 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1577 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1578 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1579 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1580 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1581 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1582 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1583 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1584 # values are actually wanted.
1585 ccdEntries.append(ccdEntry)
1586
1587 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1588 return pipeBase.Struct(outputCatalog=outputCatalog)
1589
1590
1591class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1592 dimensions=("instrument",),
1593 defaultTemplates={"calexpType": ""}):
1594 visitSummaries = connectionTypes.Input(
1595 doc="Per-visit consolidated exposure metadata",
1596 name="finalVisitSummary",
1597 storageClass="ExposureCatalog",
1598 dimensions=("instrument", "visit",),
1599 multiple=True,
1600 deferLoad=True,
1601 )
1602 outputCatalog = connectionTypes.Output(
1603 doc="Visit metadata table",
1604 name="visitTable",
1605 storageClass="ArrowAstropy",
1606 dimensions=("instrument",)
1607 )
1608
1609
1610class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1611 pipelineConnections=MakeVisitTableConnections):
1612 pass
1613
1614
1615class MakeVisitTableTask(pipeBase.PipelineTask):
1616 """Produce a `visitTable` from the visit summary exposure catalogs.
1617 """
1618 _DefaultName = "makeVisitTable"
1619 ConfigClass = MakeVisitTableConfig
1620
1621 def run(self, visitSummaries):
1622 """Make a table of visit information from the visit summary catalogs.
1623
1624 Parameters
1625 ----------
1626 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1627 List of exposure catalogs with per-detector summary information.
1628 Returns
1629 -------
1630 result : `~lsst.pipe.base.Struct`
1631 Results struct with attribute:
1632
1633 ``outputCatalog``
1634 Catalog of visit information.
1635 """
1636 visitEntries = []
1637 for visitSummary in visitSummaries:
1638 visitSummary = visitSummary.get()
1639 if not visitSummary:
1640 continue
1641 visitRow = visitSummary[0]
1642 visitInfo = visitRow.getVisitInfo()
1643
1644 visitEntry = {}
1645 visitEntry["visitId"] = visitRow["visit"]
1646 visitEntry["visit"] = visitRow["visit"]
1647 visitEntry["physical_filter"] = visitRow["physical_filter"]
1648 visitEntry["band"] = visitRow["band"]
1649 raDec = visitInfo.getBoresightRaDec()
1650 visitEntry["ra"] = raDec.getRa().asDegrees()
1651 visitEntry["dec"] = raDec.getDec().asDegrees()
1652
1653 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1654 # compatibility. To be removed after September 2023.
1655 visitEntry["decl"] = visitEntry["dec"]
1656
1657 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1658 azAlt = visitInfo.getBoresightAzAlt()
1659 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1660 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1661 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1662 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1663 expTime = visitInfo.getExposureTime()
1664 visitEntry["expTime"] = expTime
1665 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1666 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1667 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1668 expTime_days = expTime / (60*60*24)
1669 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1670 visitEntries.append(visitEntry)
1671
1672 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1673 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1674 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1675
1676 outputCatalog = astropy.table.Table(rows=visitEntries)
1677 return pipeBase.Struct(outputCatalog=outputCatalog)
1678
1679
1680class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1681 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1682
1683 inputCatalog = connectionTypes.Input(
1684 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1685 "By default, it is the output of ForcedPhotCcdTask on calexps",
1686 name="forced_src",
1687 storageClass="SourceCatalog",
1688 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1689 )
1690 inputCatalogDiff = connectionTypes.Input(
1691 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1692 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1693 name="forced_diff",
1694 storageClass="SourceCatalog",
1695 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1696 )
1697 outputCatalog = connectionTypes.Output(
1698 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1699 name="mergedForcedSource",
1700 storageClass="DataFrame",
1701 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1702 )
1703
1704
1705class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1706 pipelineConnections=WriteForcedSourceTableConnections):
1708 doc="Column on which to join the two input tables on and make the primary key of the output",
1709 dtype=str,
1710 default="objectId",
1711 )
1712
1713
1714class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1715 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1716
1717 Because the predecessor ForcedPhotCcdTask operates per-detector,
1718 per-tract, (i.e., it has tract in its dimensions), detectors
1719 on the tract boundary may have multiple forced source catalogs.
1720
1721 The successor task TransformForcedSourceTable runs per-patch
1722 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1723 available multiple epochs.
1724 """
1725 _DefaultName = "writeForcedSourceTable"
1726 ConfigClass = WriteForcedSourceTableConfig
1727
1728 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1729 inputs = butlerQC.get(inputRefs)
1730 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1731 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1732 inputs["band"] = butlerQC.quantum.dataId["band"]
1733 outputs = self.run(**inputs)
1734 butlerQC.put(outputs, outputRefs)
1735
1736 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1737 dfs = []
1738 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1739 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1740 df = df.reindex(sorted(df.columns), axis=1)
1741 df["visit"] = visit
1742 # int16 instead of uint8 because databases don't like unsigned bytes.
1743 df["detector"] = np.int16(detector)
1744 df["band"] = band if band else pd.NA
1745 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1746 names=("dataset", "column"))
1747
1748 dfs.append(df)
1749
1750 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1751 return pipeBase.Struct(outputCatalog=outputCatalog)
1752
1753
1754class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1755 dimensions=("instrument", "skymap", "patch", "tract")):
1756
1757 inputCatalogs = connectionTypes.Input(
1758 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1759 name="mergedForcedSource",
1760 storageClass="DataFrame",
1761 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1762 multiple=True,
1763 deferLoad=True
1764 )
1765 referenceCatalog = connectionTypes.Input(
1766 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1767 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1768 "are expected.",
1769 name="objectTable",
1770 storageClass="DataFrame",
1771 dimensions=("tract", "patch", "skymap"),
1772 deferLoad=True
1773 )
1774 outputCatalog = connectionTypes.Output(
1775 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1776 "specified set of functors",
1777 name="forcedSourceTable",
1778 storageClass="ArrowAstropy",
1779 dimensions=("tract", "patch", "skymap")
1780 )
1781
1782
1783class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1784 pipelineConnections=TransformForcedSourceTableConnections):
1785 referenceColumns = pexConfig.ListField(
1786 dtype=str,
1787 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1788 optional=True,
1789 doc="Columns to pull from reference catalog",
1790 )
1791 keyRef = lsst.pex.config.Field(
1792 doc="Column on which to join the two input tables on and make the primary key of the output",
1793 dtype=str,
1794 default="objectId",
1795 )
1797 doc="Rename the output DataFrame index to this name",
1798 dtype=str,
1799 default="forcedSourceId",
1800 )
1801
1802 def setDefaults(self):
1803 super().setDefaults()
1804 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1805 self.columnsFromDataId = ["tract", "patch"]
1806
1807
1808class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1809 """Transform/standardize a ForcedSource catalog
1810
1811 Transforms each wide, per-detector forcedSource DataFrame per the
1812 specification file (per-camera defaults found in ForcedSource.yaml).
1813 All epochs that overlap the patch are aggregated into one per-patch
1814 narrow-DataFrame file.
1815
1816 No de-duplication of rows is performed. Duplicate resolutions flags are
1817 pulled in from the referenceCatalog: `detect_isPrimary`,
1818 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1819 for analysis or compare duplicates for QA.
1820
1821 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1822 per-visit rows can be retreived by joining with the CcdVisitTable on
1823 ccdVisitId.
1824 """
1825 _DefaultName = "transformForcedSourceTable"
1826 ConfigClass = TransformForcedSourceTableConfig
1827
1828 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1829 inputs = butlerQC.get(inputRefs)
1830 if self.funcs is None:
1831 raise ValueError("config.functorFile is None. "
1832 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1833 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1834 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1835
1836 butlerQC.put(outputs, outputRefs)
1837
1838 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1839 dfs = []
1840 refColumns = list(self.config.referenceColumns)
1841 refColumns.append(self.config.keyRef)
1842 ref = referenceCatalog.get(parameters={"columns": refColumns})
1843 if ref.index.name != self.config.keyRef:
1844 # If the DataFrame we loaded was originally written as some other
1845 # Parquet type, it probably doesn't have the index set. If it was
1846 # written as a DataFrame, the index should already be set and
1847 # trying to set it again would be an error, since it doens't exist
1848 # as a regular column anymore.
1849 ref.set_index(self.config.keyRef, inplace=True)
1850 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1851 for handle in inputCatalogs:
1852 result = self.transform(None, handle, funcs, dataId)
1853 # Filter for only rows that were detected on (overlap) the patch
1854 dfs.append(result.df.join(ref, how="inner"))
1855
1856 outputCatalog = pd.concat(dfs)
1857
1858 if outputCatalog.empty:
1859 raise NoWorkFound(f"No forced photometry rows for {dataId}.")
1860
1861 # Now that we are done joining on config.keyRef
1862 # Change index to config.key by
1863 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1864 # Add config.keyRef to the column list
1865 outputCatalog.reset_index(inplace=True)
1866 # Set the forcedSourceId to the index. This is specified in the
1867 # ForcedSource.yaml
1868 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1869 # Rename it to the config.key
1870 outputCatalog.index.rename(self.config.key, inplace=True)
1871
1872 self.log.info("Made a table of %d columns and %d rows",
1873 len(outputCatalog.columns), len(outputCatalog))
1874 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1875
1876
1877class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1878 defaultTemplates={"catalogType": ""},
1879 dimensions=("instrument", "tract")):
1880 inputCatalogs = connectionTypes.Input(
1881 doc="Input per-patch DataFrame Tables to be concatenated",
1882 name="{catalogType}ForcedSourceTable",
1883 storageClass="DataFrame",
1884 dimensions=("tract", "patch", "skymap"),
1885 multiple=True,
1886 )
1887
1888 outputCatalog = connectionTypes.Output(
1889 doc="Output per-tract concatenation of DataFrame Tables",
1890 name="{catalogType}ForcedSourceTable_tract",
1891 storageClass="DataFrame",
1892 dimensions=("tract", "skymap"),
1893 )
1894
1895
1896class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1897 pipelineConnections=ConsolidateTractConnections):
1898 pass
1899
1900
1901class ConsolidateTractTask(pipeBase.PipelineTask):
1902 """Concatenate any per-patch, dataframe list into a single
1903 per-tract DataFrame.
1904 """
1905 _DefaultName = "ConsolidateTract"
1906 ConfigClass = ConsolidateTractConfig
1907
1908 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1909 inputs = butlerQC.get(inputRefs)
1910 # Not checking at least one inputCatalog exists because that'd be an
1911 # empty QG.
1912 self.log.info("Concatenating %s per-patch %s Tables",
1913 len(inputs["inputCatalogs"]),
1914 inputRefs.inputCatalogs[0].datasetType.name)
1915 df = pd.concat(inputs["inputCatalogs"])
1916 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)