LSST Applications g042eb84c57+730a74494b,g04e9c324dd+8c5ae1fdc5,g134cb467dc+1f1e3e7524,g199a45376c+0ba108daf9,g1fd858c14a+fa7d31856b,g210f2d0738+f66ac109ec,g262e1987ae+83a3acc0e5,g29ae962dfc+d856a2cb1f,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+a1e0c9f713,g47891489e3+0d594cb711,g4d44eb3520+c57ec8f3ed,g4d7b6aa1c5+f66ac109ec,g53246c7159+8c5ae1fdc5,g56a1a4eaf3+fd7ad03fde,g64539dfbff+f66ac109ec,g67b6fd64d1+0d594cb711,g67fd3c3899+f66ac109ec,g6985122a63+0d594cb711,g74acd417e5+3098891321,g786e29fd12+668abc6043,g81db2e9a8d+98e2ab9f28,g87389fa792+8856018cbb,g89139ef638+0d594cb711,g8d7436a09f+80fda9ce03,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+a8a29bda4b,g99822b682c+e3ec3c61f9,g9d5c6a246b+0d5dac0c3d,ga41d0fce20+9243b26dd2,gbf99507273+8c5ae1fdc5,gd7ef33dd92+0d594cb711,gdab6d2f7ff+3098891321,ge410e46f29+0d594cb711,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.38
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40from deprecated.sphinx import deprecated
41import functools
42import logging
43import numbers
44import os
45
46import numpy as np
47import pandas as pd
48import astropy.table
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header, TableVStack
60
61from .coaddBase import reorderRefs
62from .functors import CompositeFunctor, Column
63
64log = logging.getLogger(__name__)
65
66
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
69 """
70 newDf = pd.DataFrame()
71 # band is the level 0 index
72 dfBands = df.columns.unique(level=0).values
73 for band in dfBands:
74 subdf = df[band]
75 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c in subdf.columns if c not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
80
81 # Band must be present in the input and output or else column is all NaN:
82 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
83 # Get the unexploded columns from any present band's partition
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
86 return newDf
87
88
89class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
90 defaultTemplates={"coaddName": "deep"},
91 dimensions=("tract", "patch", "skymap")):
92 inputCatalogMeas = connectionTypes.Input(
93 doc="Catalog of source measurements on the deepCoadd.",
94 dimensions=("tract", "patch", "band", "skymap"),
95 storageClass="SourceCatalog",
96 name="{coaddName}Coadd_meas",
97 multiple=True
98 )
99 inputCatalogForcedSrc = connectionTypes.Input(
100 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
101 dimensions=("tract", "patch", "band", "skymap"),
102 storageClass="SourceCatalog",
103 name="{coaddName}Coadd_forced_src",
104 multiple=True
105 )
106 inputCatalogPsfsMultiprofit = connectionTypes.Input(
107 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
108 dimensions=("tract", "patch", "band", "skymap"),
109 storageClass="ArrowAstropy",
110 name="{coaddName}Coadd_psfs_multiprofit",
111 multiple=True,
112 )
113 outputCatalog = connectionTypes.Output(
114 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
115 "stored as a DataFrame with a multi-level column index per-patch.",
116 dimensions=("tract", "patch", "skymap"),
117 storageClass="DataFrame",
118 name="{coaddName}Coadd_obj"
119 )
120
121
122class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
123 pipelineConnections=WriteObjectTableConnections):
124 coaddName = pexConfig.Field(
125 dtype=str,
126 default="deep",
127 doc="Name of coadd"
128 )
129
130
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged object tables as a DataFrame in parquet format.
133 """
134 _DefaultName = "writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
136
137 # Tag of output dataset written by `MergeSourcesTask.write`
138 outputDataset = "obj"
139
140 def runQuantum(self, butlerQC, inputRefs, outputRefs):
141 inputs = butlerQC.get(inputRefs)
142
143 catalogs = defaultdict(dict)
144 for dataset, connection in (
145 ("meas", "inputCatalogMeas"),
146 ("forced_src", "inputCatalogForcedSrc"),
147 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
148 ):
149 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
150 catalogs[ref.dataId["band"]][dataset] = cat
151
152 dataId = butlerQC.quantum.dataId
153 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
154 outputs = pipeBase.Struct(outputCatalog=df)
155 butlerQC.put(outputs, outputRefs)
156
157 def run(self, catalogs, tract, patch):
158 """Merge multiple catalogs.
159
160 Parameters
161 ----------
162 catalogs : `dict`
163 Mapping from filter names to dict of catalogs.
164 tract : int
165 tractId to use for the tractId column.
166 patch : str
167 patchId to use for the patchId column.
168
169 Returns
170 -------
171 catalog : `pandas.DataFrame`
172 Merged dataframe.
173
174 Raises
175 ------
176 ValueError
177 Raised if any of the catalogs is of an unsupported type.
178 """
179 dfs = []
180 for filt, tableDict in catalogs.items():
181 for dataset, table in tableDict.items():
182 # Convert afwTable to pandas DataFrame if needed
183 if isinstance(table, pd.DataFrame):
184 df = table
185 elif isinstance(table, afwTable.SourceCatalog):
186 df = table.asAstropy().to_pandas()
187 elif isinstance(table, astropy.table.Table):
188 df = table.to_pandas()
189 else:
190 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
191 df.set_index("id", drop=True, inplace=True)
192
193 # Sort columns by name, to ensure matching schema among patches
194 df = df.reindex(sorted(df.columns), axis=1)
195 df = df.assign(tractId=tract, patchId=patch)
196
197 # Make columns a 3-level MultiIndex
198 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
199 names=("dataset", "band", "column"))
200 dfs.append(df)
201
202 # We do this dance and not `pd.concat(dfs)` because the pandas
203 # concatenation uses infinite memory.
204 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
205 return catalog
206
207
208class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
209 defaultTemplates={"catalogType": ""},
210 dimensions=("instrument", "visit", "detector")):
211
212 catalog = connectionTypes.Input(
213 doc="Input full-depth catalog of sources produced by CalibrateTask",
214 name="{catalogType}src",
215 storageClass="SourceCatalog",
216 dimensions=("instrument", "visit", "detector")
217 )
218 outputCatalog = connectionTypes.Output(
219 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
220 name="{catalogType}source",
221 storageClass="ArrowAstropy",
222 dimensions=("instrument", "visit", "detector")
223 )
224
225
226class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
227 pipelineConnections=WriteSourceTableConnections):
228 pass
229
230
231class WriteSourceTableTask(pipeBase.PipelineTask):
232 """Write source table to DataFrame Parquet format.
233 """
234 _DefaultName = "writeSourceTable"
235 ConfigClass = WriteSourceTableConfig
236
237 def runQuantum(self, butlerQC, inputRefs, outputRefs):
238 inputs = butlerQC.get(inputRefs)
239 inputs["visit"] = butlerQC.quantum.dataId["visit"]
240 inputs["detector"] = butlerQC.quantum.dataId["detector"]
241 result = self.run(**inputs)
242 outputs = pipeBase.Struct(outputCatalog=result.table)
243 butlerQC.put(outputs, outputRefs)
244
245 def run(self, catalog, visit, detector, **kwargs):
246 """Convert `src` catalog to an Astropy table.
247
248 Parameters
249 ----------
250 catalog: `afwTable.SourceCatalog`
251 catalog to be converted
252 visit, detector: `int`
253 Visit and detector ids to be added as columns.
254 **kwargs
255 Additional keyword arguments are ignored as a convenience for
256 subclasses that pass the same arguments to several different
257 methods.
258
259 Returns
260 -------
261 result : `~lsst.pipe.base.Struct`
262 ``table``
263 `astropy.table.Table` version of the input catalog
264 """
265 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
266 tbl = catalog.asAstropy()
267 tbl["visit"] = visit
268 # int16 instead of uint8 because databases don't like unsigned bytes.
269 tbl["detector"] = np.int16(detector)
270
271 return pipeBase.Struct(table=tbl)
272
273
274class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
275 defaultTemplates={"catalogType": ""},
276 dimensions=("instrument", "visit", "detector", "skymap")):
277 visitSummary = connectionTypes.Input(
278 doc="Input visit-summary catalog with updated calibration objects.",
279 name="finalVisitSummary",
280 storageClass="ExposureCatalog",
281 dimensions=("instrument", "visit",),
282 )
283
284 def __init__(self, config):
285 # We don't want the input catalog here to be an initial existence
286 # constraint in QG generation, because that can unfortunately limit the
287 # set of data IDs of inputs to other tasks, even those that run earlier
288 # (e.g. updateVisitSummary), when the input 'src' catalog is not
289 # produced. It's safer to just use 'visitSummary' existence as an
290 # initial constraint, and then let the graph prune out the detectors
291 # that don't have a 'src' for this task only.
292 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
293
294
295class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
296 pipelineConnections=WriteRecalibratedSourceTableConnections):
297
298 doReevaluatePhotoCalib = pexConfig.Field(
299 dtype=bool,
300 default=True,
301 doc=("Add or replace local photoCalib columns"),
302 )
303 doReevaluateSkyWcs = pexConfig.Field(
304 dtype=bool,
305 default=True,
306 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
307 )
308
309
310class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
311 """Write source table to DataFrame Parquet format.
312 """
313 _DefaultName = "writeRecalibratedSourceTable"
314 ConfigClass = WriteRecalibratedSourceTableConfig
315
316 def runQuantum(self, butlerQC, inputRefs, outputRefs):
317 inputs = butlerQC.get(inputRefs)
318
319 inputs["visit"] = butlerQC.quantum.dataId["visit"]
320 inputs["detector"] = butlerQC.quantum.dataId["detector"]
321
322 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
323 exposure = ExposureF()
324 inputs["exposure"] = self.prepareCalibratedExposure(
325 exposure=exposure,
326 visitSummary=inputs["visitSummary"],
327 detectorId=butlerQC.quantum.dataId["detector"]
328 )
329 inputs["catalog"] = self.addCalibColumns(**inputs)
330
331 result = self.run(**inputs)
332 outputs = pipeBase.Struct(outputCatalog=result.table)
333 butlerQC.put(outputs, outputRefs)
334
335 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
336 """Prepare a calibrated exposure and apply external calibrations
337 if so configured.
338
339 Parameters
340 ----------
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Input exposure to adjust calibrations. May be an empty Exposure.
343 detectorId : `int`
344 Detector ID associated with the exposure.
345 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
346 Exposure catalog with all calibration objects. WCS and PhotoCalib
347 are always applied if ``visitSummary`` is provided and those
348 components are not `None`.
349
350 Returns
351 -------
352 exposure : `lsst.afw.image.exposure.Exposure`
353 Exposure with adjusted calibrations.
354 """
355 if visitSummary is not None:
356 row = visitSummary.find(detectorId)
357 if row is None:
358 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
359 if (photoCalib := row.getPhotoCalib()) is None:
360 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
361 "skipping reevaluation of photoCalib.", detectorId)
362 exposure.setPhotoCalib(None)
363 else:
364 exposure.setPhotoCalib(photoCalib)
365 if (skyWcs := row.getWcs()) is None:
366 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
367 "skipping reevaluation of skyWcs.", detectorId)
368 exposure.setWcs(None)
369 else:
370 exposure.setWcs(skyWcs)
371
372 return exposure
373
374 def addCalibColumns(self, catalog, exposure, **kwargs):
375 """Add replace columns with calibs evaluated at each centroid
376
377 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
378 a source catalog, by rerunning the plugins.
379
380 Parameters
381 ----------
382 catalog : `lsst.afw.table.SourceCatalog`
383 catalog to which calib columns will be added
384 exposure : `lsst.afw.image.exposure.Exposure`
385 Exposure with attached PhotoCalibs and SkyWcs attributes to be
386 reevaluated at local centroids. Pixels are not required.
387 **kwargs
388 Additional keyword arguments are ignored to facilitate passing the
389 same arguments to several methods.
390
391 Returns
392 -------
393 newCat: `lsst.afw.table.SourceCatalog`
394 Source Catalog with requested local calib columns
395 """
396 measureConfig = SingleFrameMeasurementTask.ConfigClass()
397 measureConfig.doReplaceWithNoise = False
398
399 # Clear all slots, because we aren't running the relevant plugins.
400 for slot in measureConfig.slots:
401 setattr(measureConfig.slots, slot, None)
402
403 measureConfig.plugins.names = []
404 if self.config.doReevaluateSkyWcs:
405 measureConfig.plugins.names.add("base_LocalWcs")
406 self.log.info("Re-evaluating base_LocalWcs plugin")
407 if self.config.doReevaluatePhotoCalib:
408 measureConfig.plugins.names.add("base_LocalPhotoCalib")
409 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
410 pluginsNotToCopy = tuple(measureConfig.plugins.names)
411
412 # Create a new schema and catalog
413 # Copy all columns from original except for the ones to reevaluate
414 aliasMap = catalog.schema.getAliasMap()
415 mapper = afwTable.SchemaMapper(catalog.schema)
416 for item in catalog.schema:
417 if not item.field.getName().startswith(pluginsNotToCopy):
418 mapper.addMapping(item.key)
419
420 schema = mapper.getOutputSchema()
421 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
422 schema.setAliasMap(aliasMap)
423 newCat = afwTable.SourceCatalog(schema)
424 newCat.extend(catalog, mapper=mapper)
425
426 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
427 # update here. LocalPhotoCalibs are applied during transform tasks.
428 # Update coord_ra/coord_dec, which are expected to be positions on the
429 # sky and are used as such in sdm tables without transform
430 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
431 afwTable.updateSourceCoords(exposure.wcs, newCat)
432 wcsPlugin = measurement.plugins["base_LocalWcs"]
433 else:
434 wcsPlugin = None
435
436 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
437 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
438 else:
439 pcPlugin = None
440
441 for row in newCat:
442 if wcsPlugin is not None:
443 wcsPlugin.measure(row, exposure)
444 if pcPlugin is not None:
445 pcPlugin.measure(row, exposure)
446
447 return newCat
448
449
450class PostprocessAnalysis(object):
451 """Calculate columns from DataFrames or handles storing DataFrames.
452
453 This object manages and organizes an arbitrary set of computations
454 on a catalog. The catalog is defined by a
455 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
456 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
457 computations are defined by a collection of
458 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
459 ``CompositeFunctor``).
460
461 After the object is initialized, accessing the ``.df`` attribute (which
462 holds the `pandas.DataFrame` containing the results of the calculations)
463 triggers computation of said dataframe.
464
465 One of the conveniences of using this object is the ability to define a
466 desired common filter for all functors. This enables the same functor
467 collection to be passed to several different `PostprocessAnalysis` objects
468 without having to change the original functor collection, since the ``filt``
469 keyword argument of this object triggers an overwrite of the ``filt``
470 property for all functors in the collection.
471
472 This object also allows a list of refFlags to be passed, and defines a set
473 of default refFlags that are always included even if not requested.
474
475 If a list of DataFrames or Handles is passed, rather than a single one,
476 then the calculations will be mapped over all the input catalogs. In
477 principle, it should be straightforward to parallelize this activity, but
478 initial tests have failed (see TODO in code comments).
479
480 Parameters
481 ----------
482 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
483 `~lsst.pipe.base.InMemoryDatasetHandle` or
484 list of these.
485 Source catalog(s) for computation.
486 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
487 Computations to do (functors that act on ``handles``).
488 If a dict, the output
489 DataFrame will have columns keyed accordingly.
490 If a list, the column keys will come from the
491 ``.shortname`` attribute of each functor.
492
493 filt : `str`, optional
494 Filter in which to calculate. If provided,
495 this will overwrite any existing ``.filt`` attribute
496 of the provided functors.
497
498 flags : `list`, optional
499 List of flags (per-band) to include in output table.
500 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
501
502 refFlags : `list`, optional
503 List of refFlags (only reference band) to include in output table.
504
505 forcedFlags : `list`, optional
506 List of flags (per-band) to include in output table.
507 Taken from the ``forced_src`` dataset if applied to a
508 multilevel Object Table. Intended for flags from measurement plugins
509 only run during multi-band forced-photometry.
510 """
511 _defaultRefFlags = []
512 _defaultFuncs = ()
513
514 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
515 self.handles = handles
516 self.functors = functors
517
518 self.filt = filt
519 self.flags = list(flags) if flags is not None else []
520 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
521 self.refFlags = list(self._defaultRefFlags)
522 if refFlags is not None:
523 self.refFlags += list(refFlags)
524
525 self._df = None
526
527 @property
528 def defaultFuncs(self):
529 funcs = dict(self._defaultFuncs)
530 return funcs
531
532 @property
533 def func(self):
534 additionalFuncs = self.defaultFuncs
535 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
536 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
537 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
538
539 if isinstance(self.functors, CompositeFunctor):
540 func = self.functors
541 else:
542 func = CompositeFunctor(self.functors)
543
544 func.funcDict.update(additionalFuncs)
545 func.filt = self.filt
546
547 return func
548
549 @property
550 def noDupCols(self):
551 return [name for name, func in self.func.funcDict.items() if func.noDup]
552
553 @property
554 def df(self):
555 if self._df is None:
556 self.compute()
557 return self._df
558
559 def compute(self, dropna=False, pool=None):
560 # map over multiple handles
561 if type(self.handles) in (list, tuple):
562 if pool is None:
563 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
564 else:
565 # TODO: Figure out why this doesn't work (pyarrow pickling
566 # issues?)
567 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
568 self._df = pd.concat(dflist)
569 else:
570 self._df = self.func(self.handles, dropna=dropna)
571
572 return self._df
573
574
575class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
576 dimensions=()):
577 """Expected Connections for subclasses of TransformCatalogBaseTask.
578
579 Must be subclassed.
580 """
581 inputCatalog = connectionTypes.Input(
582 name="",
583 storageClass="DataFrame",
584 )
585 outputCatalog = connectionTypes.Output(
586 name="",
587 storageClass="ArrowAstropy",
588 )
589
590
591class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
592 pipelineConnections=TransformCatalogBaseConnections):
593 functorFile = pexConfig.Field(
594 dtype=str,
595 doc="Path to YAML file specifying Science Data Model functors to use "
596 "when copying columns and computing calibrated values.",
597 default=None,
598 optional=True
599 )
600 primaryKey = pexConfig.Field(
601 dtype=str,
602 doc="Name of column to be set as the DataFrame index. If None, the index"
603 "will be named `id`",
604 default=None,
605 optional=True
606 )
607 columnsFromDataId = pexConfig.ListField(
608 dtype=str,
609 default=None,
610 optional=True,
611 doc="Columns to extract from the dataId",
612 )
613
614
615class TransformCatalogBaseTask(pipeBase.PipelineTask):
616 """Base class for transforming/standardizing a catalog by applying functors
617 that convert units and apply calibrations.
618
619 The purpose of this task is to perform a set of computations on an input
620 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
621 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
622 a new dataset (which needs to be declared in an ``outputDataset``
623 attribute).
624
625 The calculations to be performed are defined in a YAML file that specifies
626 a set of functors to be computed, provided as a ``--functorFile`` config
627 parameter. An example of such a YAML file is the following:
628
629 funcs:
630 sourceId:
631 functor: Index
632 x:
633 functor: Column
634 args: slot_Centroid_x
635 y:
636 functor: Column
637 args: slot_Centroid_y
638 psfFlux:
639 functor: LocalNanojansky
640 args:
641 - slot_PsfFlux_instFlux
642 - slot_PsfFlux_instFluxErr
643 - base_LocalPhotoCalib
644 - base_LocalPhotoCalibErr
645 psfFluxErr:
646 functor: LocalNanojanskyErr
647 args:
648 - slot_PsfFlux_instFlux
649 - slot_PsfFlux_instFluxErr
650 - base_LocalPhotoCalib
651 - base_LocalPhotoCalibErr
652 flags:
653 - detect_isPrimary
654
655 The names for each entry under "func" will become the names of columns in
656 the output dataset. All the functors referenced are defined in
657 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
658 functor are in the `args` list, and any additional entries for each column
659 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
660 treated as keyword arguments to be passed to the functor initialization.
661
662 The "flags" entry is the default shortcut for `Column` functors.
663 All columns listed under "flags" will be copied to the output table
664 untransformed. They can be of any datatype.
665 In the special case of transforming a multi-level oject table with
666 band and dataset indices (deepCoadd_obj), these will be taked from the
667 ``meas`` dataset and exploded out per band.
668
669 There are two special shortcuts that only apply when transforming
670 multi-level Object (deepCoadd_obj) tables:
671 - The "refFlags" entry is shortcut for `Column` functor
672 taken from the ``ref`` dataset if transforming an ObjectTable.
673 - The "forcedFlags" entry is shortcut for `Column` functors.
674 taken from the ``forced_src`` dataset if transforming an ObjectTable.
675 These are expanded out per band.
676
677
678 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
679 to organize and excecute the calculations.
680 """
681 @property
682 def _DefaultName(self):
683 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
684
685 @property
686 def outputDataset(self):
687 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
688
689 @property
690 def inputDataset(self):
691 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
692
693 @property
694 def ConfigClass(self):
695 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
696
697 def __init__(self, *args, **kwargs):
698 super().__init__(*args, **kwargs)
699 if self.config.functorFile:
700 self.log.info("Loading tranform functor definitions from %s",
701 self.config.functorFile)
702 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
703 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
704 else:
705 self.funcs = None
706
707 def runQuantum(self, butlerQC, inputRefs, outputRefs):
708 inputs = butlerQC.get(inputRefs)
709 if self.funcs is None:
710 raise ValueError("config.functorFile is None. "
711 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
712 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
713 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
714 butlerQC.put(result, outputRefs)
715
716 def run(self, handle, funcs=None, dataId=None, band=None):
717 """Do postprocessing calculations
718
719 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
720 ``DataFrame`` object and dataId,
721 returns a dataframe with results of postprocessing calculations.
722
723 Parameters
724 ----------
725 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
726 `~lsst.pipe.base.InMemoryDatasetHandle` or
727 `~pandas.DataFrame`, or list of these.
728 DataFrames from which calculations are done.
729 funcs : `~lsst.pipe.tasks.functors.Functor`
730 Functors to apply to the table's columns
731 dataId : dict, optional
732 Used to add a `patchId` column to the output dataframe.
733 band : `str`, optional
734 Filter band that is being processed.
735
736 Returns
737 -------
738 result : `lsst.pipe.base.Struct`
739 Result struct, with a single ``outputCatalog`` attribute holding
740 the transformed catalog.
741 """
742 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
743
744 df = self.transform(band, handle, funcs, dataId).df
745 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
746
747 if len(df) == 0:
748 raise UpstreamFailureNoWorkFound(
749 "Input catalog is empty, so there is nothing to transform/standardize",
750 )
751
752 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
753 return result
754
755 def getFunctors(self):
756 return self.funcs
757
758 def getAnalysis(self, handles, funcs=None, band=None):
759 if funcs is None:
760 funcs = self.funcs
761 analysis = PostprocessAnalysis(handles, funcs, filt=band)
762 return analysis
763
764 def transform(self, band, handles, funcs, dataId):
765 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
766 df = analysis.df
767 if dataId and self.config.columnsFromDataId:
768 for key in self.config.columnsFromDataId:
769 if key in dataId:
770 if key == "detector":
771 # int16 instead of uint8 because databases don't like unsigned bytes.
772 df[key] = np.int16(dataId[key])
773 else:
774 df[key] = dataId[key]
775 else:
776 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
777
778 if self.config.primaryKey:
779 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
780 df.reset_index(inplace=True, drop=True)
781 df.set_index(self.config.primaryKey, inplace=True)
782
783 return pipeBase.Struct(
784 df=df,
785 analysis=analysis
786 )
787
788
789class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
790 defaultTemplates={"coaddName": "deep"},
791 dimensions=("tract", "patch", "skymap")):
792 inputCatalog = connectionTypes.Input(
793 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
794 "stored as a DataFrame with a multi-level column index per-patch.",
795 dimensions=("tract", "patch", "skymap"),
796 storageClass="DataFrame",
797 name="{coaddName}Coadd_obj",
798 deferLoad=True,
799 )
800 inputCatalogRef = connectionTypes.Input(
801 doc="Catalog marking the primary detection (which band provides a good shape and position)"
802 "for each detection in deepCoadd_mergeDet.",
803 dimensions=("tract", "patch", "skymap"),
804 storageClass="SourceCatalog",
805 name="{coaddName}Coadd_ref",
806 deferLoad=True,
807 )
808 inputCatalogSersicMultiprofit = connectionTypes.Input(
809 doc="Catalog of source measurements on the deepCoadd.",
810 dimensions=("tract", "patch", "skymap"),
811 storageClass="ArrowAstropy",
812 name="{coaddName}Coadd_Sersic_multiprofit",
813 deferLoad=True,
814 )
815 inputCatalogEpoch = connectionTypes.Input(
816 doc="Catalog of mean epochs for each object per band.",
817 dimensions=("tract", "patch", "skymap"),
818 storageClass="ArrowAstropy",
819 name="object_epoch",
820 deferLoad=True,
821 )
822 outputCatalog = connectionTypes.Output(
823 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
824 "data model.",
825 dimensions=("tract", "patch", "skymap"),
826 storageClass="ArrowAstropy",
827 name="objectTable"
828 )
829
830 def __init__(self, *, config=None):
831 super().__init__(config=config)
832 if config.multilevelOutput:
833 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
834
835
836class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
837 pipelineConnections=TransformObjectCatalogConnections):
838 coaddName = pexConfig.Field(
839 dtype=str,
840 default="deep",
841 doc="Name of coadd"
842 )
843 outputBands = pexConfig.ListField(
844 dtype=str,
845 default=None,
846 optional=True,
847 doc=("These bands and only these bands will appear in the output,"
848 " NaN-filled if the input does not include them."
849 " If None, then use all bands found in the input.")
850 )
851 camelCase = pexConfig.Field(
852 dtype=bool,
853 default=False,
854 doc=("Write per-band columns names with camelCase, else underscore "
855 "For example: gPsFlux instead of g_PsFlux.")
856 )
857 multilevelOutput = pexConfig.Field(
858 dtype=bool,
859 default=False,
860 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
861 "and name-munged (False). If True, the output storage class will be "
862 "set to DataFrame, since astropy tables do not support multi-level indexing."),
863 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
864 )
865 goodFlags = pexConfig.ListField(
866 dtype=str,
867 default=[],
868 doc=("List of 'good' flags that should be set False when populating empty tables. "
869 "All other flags are considered to be 'bad' flags and will be set to True.")
870 )
871 floatFillValue = pexConfig.Field(
872 dtype=float,
873 default=np.nan,
874 doc="Fill value for float fields when populating empty tables."
875 )
876 integerFillValue = pexConfig.Field(
877 dtype=int,
878 default=-1,
879 doc="Fill value for integer fields when populating empty tables."
880 )
881
882 def setDefaults(self):
883 super().setDefaults()
884 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
885 self.primaryKey = "objectId"
886 self.columnsFromDataId = ["tract", "patch"]
887 self.goodFlags = ["calib_astrometry_used",
888 "calib_photometry_reserved",
889 "calib_photometry_used",
890 "calib_psf_candidate",
891 "calib_psf_reserved",
892 "calib_psf_used"]
893
894
895class TransformObjectCatalogTask(TransformCatalogBaseTask):
896 """Produce a flattened Object Table to match the format specified in
897 sdm_schemas.
898
899 Do the same set of postprocessing calculations on all bands.
900
901 This is identical to `TransformCatalogBaseTask`, except for that it does
902 the specified functor calculations for all filters present in the
903 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
904 by the YAML file will be superceded.
905 """
906 _DefaultName = "transformObjectCatalog"
907 ConfigClass = TransformObjectCatalogConfig
908
909 datasets_multiband = ("epoch", "ref", "Sersic_multiprofit")
910
911 def runQuantum(self, butlerQC, inputRefs, outputRefs):
912 inputs = butlerQC.get(inputRefs)
913 if self.funcs is None:
914 raise ValueError("config.functorFile is None. "
915 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
916 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
917 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
918 handle_epoch=inputs["inputCatalogEpoch"],
919 handle_ref=inputs["inputCatalogRef"],
920 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
921 )
922 butlerQC.put(result, outputRefs)
923
924 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
925 # NOTE: band kwarg is ignored here.
926 # TODO: Document and improve funcs argument usage in DM-48895
927 # self.getAnalysis only supports list, dict and CompositeFunctor
928 if isinstance(funcs, CompositeFunctor):
929 funcDict_in = funcs.funcDict
930 elif isinstance(funcs, dict):
931 funcDict_in = funcs
932 elif isinstance(funcs, list):
933 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
934 else:
935 raise TypeError(f"Unsupported {type(funcs)=}")
936
937 handles_multi = {}
938 funcDicts_multiband = {}
939 for dataset in self.datasets_multiband:
940 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
941 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
942 handles_multi[dataset] = handle_multi
943 funcDicts_multiband[dataset] = {}
944
945 dfDict = {}
946 analysisDict = {}
947 templateDf = pd.DataFrame()
948
949 columns = handle.get(component="columns")
950 inputBands = columns.unique(level=1).values
951
952 outputBands = self.config.outputBands if self.config.outputBands else inputBands
953
954 # Split up funcs for per-band and multiband tables
955 funcDict_band = {}
956
957 for name, func in funcDict_in.items():
958 if func.dataset in funcDicts_multiband:
959 # This is something like a MultibandColumn
960 if band := getattr(func, "band_to_check", None):
961 if band not in outputBands:
962 continue
963 # This is something like a ReferenceBand that has configurable bands
964 elif hasattr(func, "bands"):
965 # TODO: Determine if this can be avoided DM-48895
966 # This will work fine if the init doesn't manipulate bands
967 # If it does, then one would need to make a new functor
968 # Determining the (kw)args is tricky in that case
969 func.bands = tuple(inputBands)
970
971 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
972 funcDict[name] = func
973
974 funcs_band = CompositeFunctor(funcDict_band)
975
976 # Perform transform for data of filters that exist in the handle dataframe.
977 for inputBand in inputBands:
978 if inputBand not in outputBands:
979 self.log.info("Ignoring %s band data in the input", inputBand)
980 continue
981 self.log.info("Transforming the catalog of band %s", inputBand)
982 result = self.transform(inputBand, handle, funcs_band, dataId)
983 dfDict[inputBand] = result.df
984 analysisDict[inputBand] = result.analysis
985 if templateDf.empty:
986 templateDf = result.df
987
988 # Put filler values in columns of other wanted bands
989 for filt in outputBands:
990 if filt not in dfDict:
991 self.log.info("Adding empty columns for band %s", filt)
992 dfTemp = templateDf.copy()
993 for col in dfTemp.columns:
994 testValue = dfTemp[col].values[0]
995 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
996 # Boolean flag type, check if it is a "good" flag
997 if col in self.config.goodFlags:
998 fillValue = False
999 else:
1000 fillValue = True
1001 elif isinstance(testValue, numbers.Integral):
1002 # Checking numbers.Integral catches all flavors
1003 # of python, numpy, pandas, etc. integers.
1004 # We must ensure this is not an unsigned integer.
1005 if isinstance(testValue, np.unsignedinteger):
1006 raise ValueError("Parquet tables may not have unsigned integer columns.")
1007 else:
1008 fillValue = self.config.integerFillValue
1009 else:
1010 fillValue = self.config.floatFillValue
1011 dfTemp[col].values[:] = fillValue
1012 dfDict[filt] = dfTemp
1013
1014 # This makes a multilevel column index, with band as first level
1015 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1016 name_index = df.index.name
1017
1018 # TODO: Remove in DM-48895
1019 if not self.config.multilevelOutput:
1020 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1021 if self.config.primaryKey in noDupCols:
1022 noDupCols.remove(self.config.primaryKey)
1023 if dataId and self.config.columnsFromDataId:
1024 noDupCols += self.config.columnsFromDataId
1025 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1026 inputBands=inputBands)
1027
1028 # Apply per-dataset functors to each multiband dataset in turn
1029 for dataset, funcDict in funcDicts_multiband.items():
1030 handle_multiband = handles_multi[dataset]
1031 df_dataset = handle_multiband.get()
1032 if isinstance(df_dataset, astropy.table.Table):
1033 # Allow astropy table inputs to already have the output index
1034 if name_index not in df_dataset.colnames:
1035 if self.config.primaryKey in df_dataset.colnames:
1036 name_index_ap = self.config.primaryKey
1037 else:
1038 raise RuntimeError(
1039 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1040 f" {df_dataset.colnames=} for {dataset=}"
1041 )
1042 else:
1043 name_index_ap = name_index
1044 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False)
1045 elif isinstance(df_dataset, afwTable.SourceCatalog):
1046 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1047 # TODO: should funcDict have noDup funcs removed?
1048 # noDup was intended for per-band tables.
1049 result = self.transform(
1050 None,
1051 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1052 CompositeFunctor(funcDict),
1053 dataId,
1054 )
1055 result.df.index.name = name_index
1056 # Drop columns from dataId if present (patch, tract)
1057 if self.config.columnsFromDataId:
1058 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1059 if columns_drop:
1060 result.df.drop(columns_drop, axis=1, inplace=True)
1061 # Make the same multi-index for the multiband table if needed
1062 # This might end up making copies, one of several reasons to avoid
1063 # using multilevel indexes, or DataFrames at all
1064 to_concat = pd.concat(
1065 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1066 ) if self.config.multilevelOutput else result.df
1067 df = pd.concat([df, to_concat], axis=1)
1068 analysisDict[dataset] = result.analysis
1069 del result
1070
1071 df.index.name = self.config.primaryKey
1072
1073 if not self.config.multilevelOutput:
1074 tbl = pandas_to_astropy(df)
1075 else:
1076 tbl = df
1077
1078 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1079
1080 return pipeBase.Struct(outputCatalog=tbl)
1081
1082
1083class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1084 dimensions=("tract", "skymap")):
1085 inputCatalogs = connectionTypes.Input(
1086 doc="Per-Patch objectTables conforming to the standard data model.",
1087 name="objectTable",
1088 storageClass="ArrowAstropy",
1089 dimensions=("tract", "patch", "skymap"),
1090 multiple=True,
1091 deferLoad=True,
1092 )
1093 outputCatalog = connectionTypes.Output(
1094 doc="Pre-tract horizontal concatenation of the input objectTables",
1095 name="objectTable_tract",
1096 storageClass="ArrowAstropy",
1097 dimensions=("tract", "skymap"),
1098 )
1099
1100
1101class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1102 pipelineConnections=ConsolidateObjectTableConnections):
1103 coaddName = pexConfig.Field(
1104 dtype=str,
1105 default="deep",
1106 doc="Name of coadd"
1107 )
1108
1109
1110class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1111 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1112
1113 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1114 """
1115 _DefaultName = "consolidateObjectTable"
1116 ConfigClass = ConsolidateObjectTableConfig
1117
1118 inputDataset = "objectTable"
1119 outputDataset = "objectTable_tract"
1120
1121 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1122 inputs = butlerQC.get(inputRefs)
1123 self.log.info("Concatenating %s per-patch Object Tables",
1124 len(inputs["inputCatalogs"]))
1125 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1126 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1127
1128
1129class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1130 defaultTemplates={"catalogType": ""},
1131 dimensions=("instrument", "visit", "detector")):
1132
1133 inputCatalog = connectionTypes.Input(
1134 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1135 name="{catalogType}source",
1136 storageClass="DataFrame",
1137 dimensions=("instrument", "visit", "detector"),
1138 deferLoad=True
1139 )
1140 outputCatalog = connectionTypes.Output(
1141 doc="Narrower, per-detector Source Table transformed and converted per a "
1142 "specified set of functors",
1143 name="{catalogType}sourceTable",
1144 storageClass="ArrowAstropy",
1145 dimensions=("instrument", "visit", "detector")
1146 )
1147
1148
1149class TransformSourceTableConfig(TransformCatalogBaseConfig,
1150 pipelineConnections=TransformSourceTableConnections):
1151
1152 def setDefaults(self):
1153 super().setDefaults()
1154 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1155 self.primaryKey = "sourceId"
1156 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1157
1158
1159class TransformSourceTableTask(TransformCatalogBaseTask):
1160 """Transform/standardize a source catalog
1161 """
1162 _DefaultName = "transformSourceTable"
1163 ConfigClass = TransformSourceTableConfig
1164
1165
1166class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1167 dimensions=("instrument", "visit",),
1168 defaultTemplates={"calexpType": ""}):
1169 calexp = connectionTypes.Input(
1170 doc="Processed exposures used for metadata",
1171 name="calexp",
1172 storageClass="ExposureF",
1173 dimensions=("instrument", "visit", "detector"),
1174 deferLoad=True,
1175 multiple=True,
1176 )
1177 visitSummary = connectionTypes.Output(
1178 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1179 "detector id for the id and are sorted for fast lookups of a "
1180 "detector."),
1181 name="visitSummary",
1182 storageClass="ExposureCatalog",
1183 dimensions=("instrument", "visit"),
1184 )
1185 visitSummarySchema = connectionTypes.InitOutput(
1186 doc="Schema of the visitSummary catalog",
1187 name="visitSummary_schema",
1188 storageClass="ExposureCatalog",
1189 )
1190
1191
1192class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1193 pipelineConnections=ConsolidateVisitSummaryConnections):
1194 """Config for ConsolidateVisitSummaryTask"""
1195
1196 full = pexConfig.Field(
1197 "Whether to propate all exposure components. "
1198 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1199 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1200 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1201 dtype=bool,
1202 default=False,
1203 )
1204
1205
1206class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1207 """Task to consolidate per-detector visit metadata.
1208
1209 This task aggregates the following metadata from all the detectors in a
1210 single visit into an exposure catalog:
1211 - The visitInfo.
1212 - The wcs.
1213 - The photoCalib.
1214 - The physical_filter and band (if available).
1215 - The PSF model.
1216 - The aperture correction map.
1217 - The transmission curve.
1218 - The psf size, shape, and effective area at the center of the detector.
1219 - The corners of the bounding box in right ascension/declination.
1220
1221 Tests for this task are performed in ci_hsc_gen3.
1222 """
1223 _DefaultName = "consolidateVisitSummary"
1224 ConfigClass = ConsolidateVisitSummaryConfig
1225
1226 def __init__(self, **kwargs):
1227 super().__init__(**kwargs)
1228 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1229 self.schema.addField("visit", type="L", doc="Visit number")
1230 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1231 self.schema.addField("band", type="String", size=32, doc="Name of band")
1232 ExposureSummaryStats.update_schema(self.schema)
1233 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1234
1235 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1236 dataRefs = butlerQC.get(inputRefs.calexp)
1237 visit = dataRefs[0].dataId["visit"]
1238
1239 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1240 len(dataRefs), visit)
1241
1242 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1243
1244 butlerQC.put(expCatalog, outputRefs.visitSummary)
1245
1246 def _combineExposureMetadata(self, visit, dataRefs):
1247 """Make a combined exposure catalog from a list of dataRefs.
1248 These dataRefs must point to exposures with wcs, summaryStats,
1249 and other visit metadata.
1250
1251 Parameters
1252 ----------
1253 visit : `int`
1254 Visit identification number.
1255 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1256 List of dataRefs in visit.
1257
1258 Returns
1259 -------
1260 visitSummary : `lsst.afw.table.ExposureCatalog`
1261 Exposure catalog with per-detector summary information.
1262 """
1263 cat = afwTable.ExposureCatalog(self.schema)
1264 cat.resize(len(dataRefs))
1265
1266 cat["visit"] = visit
1267
1268 for i, dataRef in enumerate(dataRefs):
1269 visitInfo = dataRef.get(component="visitInfo")
1270 filterLabel = dataRef.get(component="filter")
1271 summaryStats = dataRef.get(component="summaryStats")
1272 detector = dataRef.get(component="detector")
1273 wcs = dataRef.get(component="wcs")
1274 photoCalib = dataRef.get(component="photoCalib")
1275 bbox = dataRef.get(component="bbox")
1276 validPolygon = dataRef.get(component="validPolygon")
1277
1278 rec = cat[i]
1279 rec.setBBox(bbox)
1280 rec.setVisitInfo(visitInfo)
1281 rec.setWcs(wcs)
1282 rec.setPhotoCalib(photoCalib)
1283 rec.setValidPolygon(validPolygon)
1284
1285 if self.config.full:
1286 rec.setPsf(dataRef.get(component="psf"))
1287 rec.setApCorrMap(dataRef.get(component="apCorrMap"))
1288 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve"))
1289
1290 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1291 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1292 rec.setId(detector.getId())
1293 summaryStats.update_record(rec)
1294
1295 if not cat:
1296 raise pipeBase.NoWorkFound(
1297 "No detectors had sufficient information to make a visit summary row."
1298 )
1299
1300 metadata = dafBase.PropertyList()
1301 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1302 # We are looping over existing datarefs, so the following is true
1303 metadata.add("COMMENT", "Only detectors with data have entries.")
1304 cat.setMetadata(metadata)
1305
1306 cat.sort()
1307 return cat
1308
1309
1310class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1311 defaultTemplates={"catalogType": ""},
1312 dimensions=("instrument", "visit")):
1313 inputCatalogs = connectionTypes.Input(
1314 doc="Input per-detector Source Tables",
1315 name="{catalogType}sourceTable",
1316 storageClass="ArrowAstropy",
1317 dimensions=("instrument", "visit", "detector"),
1318 multiple=True,
1319 deferLoad=True,
1320 )
1321 outputCatalog = connectionTypes.Output(
1322 doc="Per-visit concatenation of Source Table",
1323 name="{catalogType}sourceTable_visit",
1324 storageClass="ArrowAstropy",
1325 dimensions=("instrument", "visit")
1326 )
1327
1328
1329class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1330 pipelineConnections=ConsolidateSourceTableConnections):
1331 pass
1332
1333
1334class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1335 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1336 """
1337 _DefaultName = "consolidateSourceTable"
1338 ConfigClass = ConsolidateSourceTableConfig
1339
1340 inputDataset = "sourceTable"
1341 outputDataset = "sourceTable_visit"
1342
1343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1344 # Docstring inherited.
1345 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1346 detectorOrder.sort()
1347 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1348 inputs = butlerQC.get(inputRefs)
1349 self.log.info("Concatenating %s per-detector Source Tables",
1350 len(inputs["inputCatalogs"]))
1351 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1352 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1353
1354
1355class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1356 dimensions=("instrument",),
1357 defaultTemplates={"calexpType": ""}):
1358 visitSummaryRefs = connectionTypes.Input(
1359 doc="Data references for per-visit consolidated exposure metadata",
1360 name="finalVisitSummary",
1361 storageClass="ExposureCatalog",
1362 dimensions=("instrument", "visit"),
1363 multiple=True,
1364 deferLoad=True,
1365 )
1366 outputCatalog = connectionTypes.Output(
1367 doc="CCD and Visit metadata table",
1368 name="ccdVisitTable",
1369 storageClass="ArrowAstropy",
1370 dimensions=("instrument",)
1371 )
1372
1373
1374class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1375 pipelineConnections=MakeCcdVisitTableConnections):
1376 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1377
1378
1379class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1380 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1381 """
1382 _DefaultName = "makeCcdVisitTable"
1383 ConfigClass = MakeCcdVisitTableConfig
1384
1385 def run(self, visitSummaryRefs):
1386 """Make a table of ccd information from the visit summary catalogs.
1387
1388 Parameters
1389 ----------
1390 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1391 List of DeferredDatasetHandles pointing to exposure catalogs with
1392 per-detector summary information.
1393
1394 Returns
1395 -------
1396 result : `~lsst.pipe.base.Struct`
1397 Results struct with attribute:
1398
1399 ``outputCatalog``
1400 Catalog of ccd and visit information.
1401 """
1402 ccdEntries = []
1403 for visitSummaryRef in visitSummaryRefs:
1404 visitSummary = visitSummaryRef.get()
1405 if not visitSummary:
1406 continue
1407 visitInfo = visitSummary[0].getVisitInfo()
1408
1409 # Strip provenance to prevent merge confusion.
1410 strip_provenance_from_fits_header(visitSummary.metadata)
1411
1412 ccdEntry = {}
1413 summaryTable = visitSummary.asAstropy()
1414 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1415 "pixelScale", "zenithDistance",
1416 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1417 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1418 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1419 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1420 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1421 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1422 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1423 "maxDistToNearestPsf",
1424 "effTime", "effTimePsfSigmaScale",
1425 "effTimeSkyBgScale", "effTimeZeroPointScale",
1426 "magLim"]
1427 ccdEntry = summaryTable[selectColumns]
1428 # 'visit' is the human readable visit number.
1429 # 'visitId' is the key to the visitId table. They are the same.
1430 # Technically you should join to get the visit from the visit
1431 # table.
1432 ccdEntry.rename_column("visit", "visitId")
1433 ccdEntry.rename_column("id", "detectorId")
1434
1435 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1436 # compatibility. To be removed after September 2023.
1437 ccdEntry["decl"] = ccdEntry["dec"]
1438
1439 ccdEntry["ccdVisitId"] = [
1440 self.config.idGenerator.apply(
1441 visitSummaryRef.dataId,
1442 detector=detector_id,
1443 is_exposure=False,
1444 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1445 # because it's usually the ID for a whole catalog
1446 # with a {visit, detector}, and that's the main
1447 # use case for IdGenerator. This usage for a
1448 # summary table is rare.
1449 for detector_id in summaryTable["id"]
1450 ]
1451 ccdEntry["detector"] = summaryTable["id"]
1452 ccdEntry["seeing"] = (
1453 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1454 )
1455 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1456 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1457 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1458 expTime = visitInfo.getExposureTime()
1459 ccdEntry["obsStart"] = (
1460 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1461 )
1462 expTime_days = expTime / (60*60*24)
1463 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1464 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1465 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1466 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1467 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1468 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1469 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1470 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1471 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1472 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1473 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1474 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1475 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1476 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1477 # values are actually wanted.
1478 ccdEntries.append(ccdEntry)
1479
1480 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1481 return pipeBase.Struct(outputCatalog=outputCatalog)
1482
1483
1484class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1485 dimensions=("instrument",),
1486 defaultTemplates={"calexpType": ""}):
1487 visitSummaries = connectionTypes.Input(
1488 doc="Per-visit consolidated exposure metadata",
1489 name="finalVisitSummary",
1490 storageClass="ExposureCatalog",
1491 dimensions=("instrument", "visit",),
1492 multiple=True,
1493 deferLoad=True,
1494 )
1495 outputCatalog = connectionTypes.Output(
1496 doc="Visit metadata table",
1497 name="visitTable",
1498 storageClass="ArrowAstropy",
1499 dimensions=("instrument",)
1500 )
1501
1502
1503class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1504 pipelineConnections=MakeVisitTableConnections):
1505 pass
1506
1507
1508class MakeVisitTableTask(pipeBase.PipelineTask):
1509 """Produce a `visitTable` from the visit summary exposure catalogs.
1510 """
1511 _DefaultName = "makeVisitTable"
1512 ConfigClass = MakeVisitTableConfig
1513
1514 def run(self, visitSummaries):
1515 """Make a table of visit information from the visit summary catalogs.
1516
1517 Parameters
1518 ----------
1519 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1520 List of exposure catalogs with per-detector summary information.
1521 Returns
1522 -------
1523 result : `~lsst.pipe.base.Struct`
1524 Results struct with attribute:
1525
1526 ``outputCatalog``
1527 Catalog of visit information.
1528 """
1529 visitEntries = []
1530 for visitSummary in visitSummaries:
1531 visitSummary = visitSummary.get()
1532 if not visitSummary:
1533 continue
1534 visitRow = visitSummary[0]
1535 visitInfo = visitRow.getVisitInfo()
1536
1537 visitEntry = {}
1538 visitEntry["visitId"] = visitRow["visit"]
1539 visitEntry["visit"] = visitRow["visit"]
1540 visitEntry["physical_filter"] = visitRow["physical_filter"]
1541 visitEntry["band"] = visitRow["band"]
1542 raDec = visitInfo.getBoresightRaDec()
1543 visitEntry["ra"] = raDec.getRa().asDegrees()
1544 visitEntry["dec"] = raDec.getDec().asDegrees()
1545
1546 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1547 # compatibility. To be removed after September 2023.
1548 visitEntry["decl"] = visitEntry["dec"]
1549
1550 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1551 azAlt = visitInfo.getBoresightAzAlt()
1552 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1553 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1554 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1555 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1556 expTime = visitInfo.getExposureTime()
1557 visitEntry["expTime"] = expTime
1558 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1559 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1560 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1561 expTime_days = expTime / (60*60*24)
1562 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1563 visitEntries.append(visitEntry)
1564
1565 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1566 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1567 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1568
1569 outputCatalog = astropy.table.Table(rows=visitEntries)
1570 return pipeBase.Struct(outputCatalog=outputCatalog)
1571
1572
1573@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1574 "This task will be removed after v30.",
1575 version="v29.0", category=FutureWarning)
1576class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1577 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1578
1579 inputCatalog = connectionTypes.Input(
1580 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1581 "By default, it is the output of ForcedPhotCcdTask on calexps",
1582 name="forced_src",
1583 storageClass="SourceCatalog",
1584 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1585 )
1586 inputCatalogDiff = connectionTypes.Input(
1587 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1588 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1589 name="forced_diff",
1590 storageClass="SourceCatalog",
1591 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1592 )
1593 outputCatalog = connectionTypes.Output(
1594 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1595 name="mergedForcedSource",
1596 storageClass="DataFrame",
1597 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1598 )
1599
1600
1601@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1602 "This task will be removed after v30.",
1603 version="v29.0", category=FutureWarning)
1604class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1605 pipelineConnections=WriteForcedSourceTableConnections):
1607 doc="Column on which to join the two input tables on and make the primary key of the output",
1608 dtype=str,
1609 default="objectId",
1610 )
1611
1612
1613@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1614 "This task will be removed after v30.",
1615 version="v29.0", category=FutureWarning)
1616class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1617 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1618
1619 Because the predecessor ForcedPhotCcdTask operates per-detector,
1620 per-tract, (i.e., it has tract in its dimensions), detectors
1621 on the tract boundary may have multiple forced source catalogs.
1622
1623 The successor task TransformForcedSourceTable runs per-patch
1624 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1625 available multiple epochs.
1626 """
1627 _DefaultName = "writeForcedSourceTable"
1628 ConfigClass = WriteForcedSourceTableConfig
1629
1630 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1631 inputs = butlerQC.get(inputRefs)
1632 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1633 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1634 inputs["band"] = butlerQC.quantum.dataId["band"]
1635 outputs = self.run(**inputs)
1636 butlerQC.put(outputs, outputRefs)
1637
1638 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1639 dfs = []
1640 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1641 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1642 df = df.reindex(sorted(df.columns), axis=1)
1643 df["visit"] = visit
1644 # int16 instead of uint8 because databases don't like unsigned bytes.
1645 df["detector"] = np.int16(detector)
1646 df["band"] = band if band else pd.NA
1647 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1648 names=("dataset", "column"))
1649
1650 dfs.append(df)
1651
1652 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1653 return pipeBase.Struct(outputCatalog=outputCatalog)
1654
1655
1656class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1657 dimensions=("instrument", "skymap", "patch", "tract")):
1658
1659 inputCatalogs = connectionTypes.Input(
1660 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1661 name="mergedForcedSource",
1662 storageClass="DataFrame",
1663 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1664 multiple=True,
1665 deferLoad=True
1666 )
1667 referenceCatalog = connectionTypes.Input(
1668 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1669 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1670 "are expected.",
1671 name="objectTable",
1672 storageClass="DataFrame",
1673 dimensions=("tract", "patch", "skymap"),
1674 deferLoad=True
1675 )
1676 outputCatalog = connectionTypes.Output(
1677 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1678 "specified set of functors",
1679 name="forcedSourceTable",
1680 storageClass="ArrowAstropy",
1681 dimensions=("tract", "patch", "skymap")
1682 )
1683
1684
1685class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1686 pipelineConnections=TransformForcedSourceTableConnections):
1687 referenceColumns = pexConfig.ListField(
1688 dtype=str,
1689 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1690 optional=True,
1691 doc="Columns to pull from reference catalog",
1692 )
1693 keyRef = lsst.pex.config.Field(
1694 doc="Column on which to join the two input tables on and make the primary key of the output",
1695 dtype=str,
1696 default="objectId",
1697 )
1699 doc="Rename the output DataFrame index to this name",
1700 dtype=str,
1701 default="forcedSourceId",
1702 )
1703
1704 def setDefaults(self):
1705 super().setDefaults()
1706 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1707 self.columnsFromDataId = ["tract", "patch"]
1708
1709
1710class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1711 """Transform/standardize a ForcedSource catalog
1712
1713 Transforms each wide, per-detector forcedSource DataFrame per the
1714 specification file (per-camera defaults found in ForcedSource.yaml).
1715 All epochs that overlap the patch are aggregated into one per-patch
1716 narrow-DataFrame file.
1717
1718 No de-duplication of rows is performed. Duplicate resolutions flags are
1719 pulled in from the referenceCatalog: `detect_isPrimary`,
1720 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1721 for analysis or compare duplicates for QA.
1722
1723 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1724 per-visit rows can be retreived by joining with the CcdVisitTable on
1725 ccdVisitId.
1726 """
1727 _DefaultName = "transformForcedSourceTable"
1728 ConfigClass = TransformForcedSourceTableConfig
1729
1730 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1731 inputs = butlerQC.get(inputRefs)
1732 if self.funcs is None:
1733 raise ValueError("config.functorFile is None. "
1734 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1735 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1736 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1737
1738 butlerQC.put(outputs, outputRefs)
1739
1740 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1741 dfs = []
1742 refColumns = list(self.config.referenceColumns)
1743 refColumns.append(self.config.keyRef)
1744 ref = referenceCatalog.get(parameters={"columns": refColumns})
1745 if ref.index.name != self.config.keyRef:
1746 # If the DataFrame we loaded was originally written as some other
1747 # Parquet type, it probably doesn't have the index set. If it was
1748 # written as a DataFrame, the index should already be set and
1749 # trying to set it again would be an error, since it doens't exist
1750 # as a regular column anymore.
1751 ref.set_index(self.config.keyRef, inplace=True)
1752 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1753 for handle in inputCatalogs:
1754 result = self.transform(None, handle, funcs, dataId)
1755 # Filter for only rows that were detected on (overlap) the patch
1756 dfs.append(result.df.join(ref, how="inner"))
1757
1758 outputCatalog = pd.concat(dfs)
1759
1760 if outputCatalog.empty:
1761 raise NoWorkFound(f"No forced photometry rows for {dataId}.")
1762
1763 # Now that we are done joining on config.keyRef
1764 # Change index to config.key by
1765 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1766 # Add config.keyRef to the column list
1767 outputCatalog.reset_index(inplace=True)
1768 # Set the forcedSourceId to the index. This is specified in the
1769 # ForcedSource.yaml
1770 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1771 # Rename it to the config.key
1772 outputCatalog.index.rename(self.config.key, inplace=True)
1773
1774 self.log.info("Made a table of %d columns and %d rows",
1775 len(outputCatalog.columns), len(outputCatalog))
1776 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1777
1778
1779class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1780 defaultTemplates={"catalogType": ""},
1781 dimensions=("instrument", "tract")):
1782 inputCatalogs = connectionTypes.Input(
1783 doc="Input per-patch DataFrame Tables to be concatenated",
1784 name="{catalogType}ForcedSourceTable",
1785 storageClass="DataFrame",
1786 dimensions=("tract", "patch", "skymap"),
1787 multiple=True,
1788 )
1789
1790 outputCatalog = connectionTypes.Output(
1791 doc="Output per-tract concatenation of DataFrame Tables",
1792 name="{catalogType}ForcedSourceTable_tract",
1793 storageClass="DataFrame",
1794 dimensions=("tract", "skymap"),
1795 )
1796
1797
1798class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1799 pipelineConnections=ConsolidateTractConnections):
1800 pass
1801
1802
1803class ConsolidateTractTask(pipeBase.PipelineTask):
1804 """Concatenate any per-patch, dataframe list into a single
1805 per-tract DataFrame.
1806 """
1807 _DefaultName = "ConsolidateTract"
1808 ConfigClass = ConsolidateTractConfig
1809
1810 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1811 inputs = butlerQC.get(inputRefs)
1812 # Not checking at least one inputCatalog exists because that'd be an
1813 # empty QG.
1814 self.log.info("Concatenating %s per-patch %s Tables",
1815 len(inputs["inputCatalogs"]),
1816 inputRefs.inputCatalogs[0].datasetType.name)
1817 df = pd.concat(inputs["inputCatalogs"])
1818 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)