LSST Applications g013ef56533+d2224463a4,g199a45376c+0ba108daf9,g19c4beb06c+9f335b2115,g1fd858c14a+2459ca3e43,g210f2d0738+2d3d333a78,g262e1987ae+abbb004f04,g2825c19fe3+eedc38578d,g29ae962dfc+0cb55f06ef,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+19c3a54948,g47891489e3+501a489530,g4cdb532a89+a047e97985,g511e8cfd20+ce1f47b6d6,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5fd55ab2c7+951cc3f256,g64539dfbff+2d3d333a78,g67b6fd64d1+501a489530,g67fd3c3899+2d3d333a78,g74acd417e5+0ea5dee12c,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+501a489530,g8d7436a09f+5ea4c44d25,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g9486f8a5af+165c016931,g97be763408+d5e351dcc8,gbf99507273+8c5ae1fdc5,gc2a301910b+2d3d333a78,gca7fc764a6+501a489530,gce8aa8abaa+8c5ae1fdc5,gd7ef33dd92+501a489530,gdab6d2f7ff+0ea5dee12c,ge410e46f29+501a489530,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.41
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40from deprecated.sphinx import deprecated
41import functools
42import logging
43import numbers
44import os
45
46import numpy as np
47import pandas as pd
48import astropy.table
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header, TableVStack
60
61from .coaddBase import reorderRefs
62from .functors import CompositeFunctor, Column
63
64log = logging.getLogger(__name__)
65
66
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
69 """
70 newDf = pd.DataFrame()
71 # band is the level 0 index
72 dfBands = df.columns.unique(level=0).values
73 for band in dfBands:
74 subdf = df[band]
75 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c in subdf.columns if c not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
80
81 # Band must be present in the input and output or else column is all NaN:
82 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
83 # Get the unexploded columns from any present band's partition
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
86 return newDf
87
88
89class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
90 defaultTemplates={"coaddName": "deep"},
91 dimensions=("tract", "patch", "skymap")):
92 inputCatalogMeas = connectionTypes.Input(
93 doc="Catalog of source measurements on the deepCoadd.",
94 dimensions=("tract", "patch", "band", "skymap"),
95 storageClass="SourceCatalog",
96 name="{coaddName}Coadd_meas",
97 multiple=True
98 )
99 inputCatalogForcedSrc = connectionTypes.Input(
100 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
101 dimensions=("tract", "patch", "band", "skymap"),
102 storageClass="SourceCatalog",
103 name="{coaddName}Coadd_forced_src",
104 multiple=True
105 )
106 inputCatalogPsfsMultiprofit = connectionTypes.Input(
107 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
108 dimensions=("tract", "patch", "band", "skymap"),
109 storageClass="ArrowAstropy",
110 name="{coaddName}Coadd_psfs_multiprofit",
111 multiple=True,
112 )
113 outputCatalog = connectionTypes.Output(
114 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
115 "stored as a DataFrame with a multi-level column index per-patch.",
116 dimensions=("tract", "patch", "skymap"),
117 storageClass="DataFrame",
118 name="{coaddName}Coadd_obj"
119 )
120
121
122class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
123 pipelineConnections=WriteObjectTableConnections):
124 coaddName = pexConfig.Field(
125 dtype=str,
126 default="deep",
127 doc="Name of coadd"
128 )
129
130
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged object tables as a DataFrame in parquet format.
133 """
134 _DefaultName = "writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
136
137 # Tag of output dataset written by `MergeSourcesTask.write`
138 outputDataset = "obj"
139
140 def runQuantum(self, butlerQC, inputRefs, outputRefs):
141 inputs = butlerQC.get(inputRefs)
142
143 catalogs = defaultdict(dict)
144 for dataset, connection in (
145 ("meas", "inputCatalogMeas"),
146 ("forced_src", "inputCatalogForcedSrc"),
147 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
148 ):
149 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
150 catalogs[ref.dataId["band"]][dataset] = cat
151
152 dataId = butlerQC.quantum.dataId
153 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
154 outputs = pipeBase.Struct(outputCatalog=df)
155 butlerQC.put(outputs, outputRefs)
156
157 def run(self, catalogs, tract, patch):
158 """Merge multiple catalogs.
159
160 Parameters
161 ----------
162 catalogs : `dict`
163 Mapping from filter names to dict of catalogs.
164 tract : int
165 tractId to use for the tractId column.
166 patch : str
167 patchId to use for the patchId column.
168
169 Returns
170 -------
171 catalog : `pandas.DataFrame`
172 Merged dataframe.
173
174 Raises
175 ------
176 ValueError
177 Raised if any of the catalogs is of an unsupported type.
178 """
179 dfs = []
180 for filt, tableDict in catalogs.items():
181 for dataset, table in tableDict.items():
182 # Convert afwTable to pandas DataFrame if needed
183 if isinstance(table, pd.DataFrame):
184 df = table
185 elif isinstance(table, afwTable.SourceCatalog):
186 df = table.asAstropy().to_pandas()
187 elif isinstance(table, astropy.table.Table):
188 df = table.to_pandas()
189 else:
190 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
191 df.set_index("id", drop=True, inplace=True)
192
193 # Sort columns by name, to ensure matching schema among patches
194 df = df.reindex(sorted(df.columns), axis=1)
195 df = df.assign(tractId=tract, patchId=patch)
196
197 # Make columns a 3-level MultiIndex
198 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
199 names=("dataset", "band", "column"))
200 dfs.append(df)
201
202 # We do this dance and not `pd.concat(dfs)` because the pandas
203 # concatenation uses infinite memory.
204 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
205 return catalog
206
207
208class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
209 defaultTemplates={"catalogType": ""},
210 dimensions=("instrument", "visit", "detector")):
211
212 catalog = connectionTypes.Input(
213 doc="Input full-depth catalog of sources produced by CalibrateTask",
214 name="{catalogType}src",
215 storageClass="SourceCatalog",
216 dimensions=("instrument", "visit", "detector")
217 )
218 outputCatalog = connectionTypes.Output(
219 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
220 name="{catalogType}source",
221 storageClass="ArrowAstropy",
222 dimensions=("instrument", "visit", "detector")
223 )
224
225
226class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
227 pipelineConnections=WriteSourceTableConnections):
228 pass
229
230
231class WriteSourceTableTask(pipeBase.PipelineTask):
232 """Write source table to DataFrame Parquet format.
233 """
234 _DefaultName = "writeSourceTable"
235 ConfigClass = WriteSourceTableConfig
236
237 def runQuantum(self, butlerQC, inputRefs, outputRefs):
238 inputs = butlerQC.get(inputRefs)
239 inputs["visit"] = butlerQC.quantum.dataId["visit"]
240 inputs["detector"] = butlerQC.quantum.dataId["detector"]
241 result = self.run(**inputs)
242 outputs = pipeBase.Struct(outputCatalog=result.table)
243 butlerQC.put(outputs, outputRefs)
244
245 def run(self, catalog, visit, detector, **kwargs):
246 """Convert `src` catalog to an Astropy table.
247
248 Parameters
249 ----------
250 catalog: `afwTable.SourceCatalog`
251 catalog to be converted
252 visit, detector: `int`
253 Visit and detector ids to be added as columns.
254 **kwargs
255 Additional keyword arguments are ignored as a convenience for
256 subclasses that pass the same arguments to several different
257 methods.
258
259 Returns
260 -------
261 result : `~lsst.pipe.base.Struct`
262 ``table``
263 `astropy.table.Table` version of the input catalog
264 """
265 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
266 tbl = catalog.asAstropy()
267 tbl["visit"] = visit
268 # int16 instead of uint8 because databases don't like unsigned bytes.
269 tbl["detector"] = np.int16(detector)
270
271 return pipeBase.Struct(table=tbl)
272
273
274class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
275 defaultTemplates={"catalogType": ""},
276 dimensions=("instrument", "visit", "detector", "skymap")):
277 visitSummary = connectionTypes.Input(
278 doc="Input visit-summary catalog with updated calibration objects.",
279 name="finalVisitSummary",
280 storageClass="ExposureCatalog",
281 dimensions=("instrument", "visit",),
282 )
283
284 def __init__(self, config):
285 # We don't want the input catalog here to be an initial existence
286 # constraint in QG generation, because that can unfortunately limit the
287 # set of data IDs of inputs to other tasks, even those that run earlier
288 # (e.g. updateVisitSummary), when the input 'src' catalog is not
289 # produced. It's safer to just use 'visitSummary' existence as an
290 # initial constraint, and then let the graph prune out the detectors
291 # that don't have a 'src' for this task only.
292 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
293
294
295class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
296 pipelineConnections=WriteRecalibratedSourceTableConnections):
297
298 doReevaluatePhotoCalib = pexConfig.Field(
299 dtype=bool,
300 default=True,
301 doc=("Add or replace local photoCalib columns"),
302 )
303 doReevaluateSkyWcs = pexConfig.Field(
304 dtype=bool,
305 default=True,
306 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
307 )
308
309
310class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
311 """Write source table to DataFrame Parquet format.
312 """
313 _DefaultName = "writeRecalibratedSourceTable"
314 ConfigClass = WriteRecalibratedSourceTableConfig
315
316 def runQuantum(self, butlerQC, inputRefs, outputRefs):
317 inputs = butlerQC.get(inputRefs)
318
319 inputs["visit"] = butlerQC.quantum.dataId["visit"]
320 inputs["detector"] = butlerQC.quantum.dataId["detector"]
321
322 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
323 exposure = ExposureF()
324 inputs["exposure"] = self.prepareCalibratedExposure(
325 exposure=exposure,
326 visitSummary=inputs["visitSummary"],
327 detectorId=butlerQC.quantum.dataId["detector"]
328 )
329 inputs["catalog"] = self.addCalibColumns(**inputs)
330
331 result = self.run(**inputs)
332 outputs = pipeBase.Struct(outputCatalog=result.table)
333 butlerQC.put(outputs, outputRefs)
334
335 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
336 """Prepare a calibrated exposure and apply external calibrations
337 if so configured.
338
339 Parameters
340 ----------
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Input exposure to adjust calibrations. May be an empty Exposure.
343 detectorId : `int`
344 Detector ID associated with the exposure.
345 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
346 Exposure catalog with all calibration objects. WCS and PhotoCalib
347 are always applied if ``visitSummary`` is provided and those
348 components are not `None`.
349
350 Returns
351 -------
352 exposure : `lsst.afw.image.exposure.Exposure`
353 Exposure with adjusted calibrations.
354 """
355 if visitSummary is not None:
356 row = visitSummary.find(detectorId)
357 if row is None:
358 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
359 if (photoCalib := row.getPhotoCalib()) is None:
360 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
361 "skipping reevaluation of photoCalib.", detectorId)
362 exposure.setPhotoCalib(None)
363 else:
364 exposure.setPhotoCalib(photoCalib)
365 if (skyWcs := row.getWcs()) is None:
366 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
367 "skipping reevaluation of skyWcs.", detectorId)
368 exposure.setWcs(None)
369 else:
370 exposure.setWcs(skyWcs)
371
372 return exposure
373
374 def addCalibColumns(self, catalog, exposure, **kwargs):
375 """Add replace columns with calibs evaluated at each centroid
376
377 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
378 a source catalog, by rerunning the plugins.
379
380 Parameters
381 ----------
382 catalog : `lsst.afw.table.SourceCatalog`
383 catalog to which calib columns will be added
384 exposure : `lsst.afw.image.exposure.Exposure`
385 Exposure with attached PhotoCalibs and SkyWcs attributes to be
386 reevaluated at local centroids. Pixels are not required.
387 **kwargs
388 Additional keyword arguments are ignored to facilitate passing the
389 same arguments to several methods.
390
391 Returns
392 -------
393 newCat: `lsst.afw.table.SourceCatalog`
394 Source Catalog with requested local calib columns
395 """
396 measureConfig = SingleFrameMeasurementTask.ConfigClass()
397 measureConfig.doReplaceWithNoise = False
398
399 # Clear all slots, because we aren't running the relevant plugins.
400 for slot in measureConfig.slots:
401 setattr(measureConfig.slots, slot, None)
402
403 measureConfig.plugins.names = []
404 if self.config.doReevaluateSkyWcs:
405 measureConfig.plugins.names.add("base_LocalWcs")
406 self.log.info("Re-evaluating base_LocalWcs plugin")
407 if self.config.doReevaluatePhotoCalib:
408 measureConfig.plugins.names.add("base_LocalPhotoCalib")
409 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
410 pluginsNotToCopy = tuple(measureConfig.plugins.names)
411
412 # Create a new schema and catalog
413 # Copy all columns from original except for the ones to reevaluate
414 aliasMap = catalog.schema.getAliasMap()
415 mapper = afwTable.SchemaMapper(catalog.schema)
416 for item in catalog.schema:
417 if not item.field.getName().startswith(pluginsNotToCopy):
418 mapper.addMapping(item.key)
419
420 schema = mapper.getOutputSchema()
421 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
422 schema.setAliasMap(aliasMap)
423 newCat = afwTable.SourceCatalog(schema)
424 newCat.extend(catalog, mapper=mapper)
425
426 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
427 # update here. LocalPhotoCalibs are applied during transform tasks.
428 # Update coord_ra/coord_dec, which are expected to be positions on the
429 # sky and are used as such in sdm tables without transform
430 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
431 afwTable.updateSourceCoords(exposure.wcs, newCat)
432 wcsPlugin = measurement.plugins["base_LocalWcs"]
433 else:
434 wcsPlugin = None
435
436 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
437 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
438 else:
439 pcPlugin = None
440
441 for row in newCat:
442 if wcsPlugin is not None:
443 wcsPlugin.measure(row, exposure)
444 if pcPlugin is not None:
445 pcPlugin.measure(row, exposure)
446
447 return newCat
448
449
450class PostprocessAnalysis(object):
451 """Calculate columns from DataFrames or handles storing DataFrames.
452
453 This object manages and organizes an arbitrary set of computations
454 on a catalog. The catalog is defined by a
455 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
456 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
457 computations are defined by a collection of
458 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
459 ``CompositeFunctor``).
460
461 After the object is initialized, accessing the ``.df`` attribute (which
462 holds the `pandas.DataFrame` containing the results of the calculations)
463 triggers computation of said dataframe.
464
465 One of the conveniences of using this object is the ability to define a
466 desired common filter for all functors. This enables the same functor
467 collection to be passed to several different `PostprocessAnalysis` objects
468 without having to change the original functor collection, since the ``filt``
469 keyword argument of this object triggers an overwrite of the ``filt``
470 property for all functors in the collection.
471
472 This object also allows a list of refFlags to be passed, and defines a set
473 of default refFlags that are always included even if not requested.
474
475 If a list of DataFrames or Handles is passed, rather than a single one,
476 then the calculations will be mapped over all the input catalogs. In
477 principle, it should be straightforward to parallelize this activity, but
478 initial tests have failed (see TODO in code comments).
479
480 Parameters
481 ----------
482 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
483 `~lsst.pipe.base.InMemoryDatasetHandle` or
484 list of these.
485 Source catalog(s) for computation.
486 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
487 Computations to do (functors that act on ``handles``).
488 If a dict, the output
489 DataFrame will have columns keyed accordingly.
490 If a list, the column keys will come from the
491 ``.shortname`` attribute of each functor.
492
493 filt : `str`, optional
494 Filter in which to calculate. If provided,
495 this will overwrite any existing ``.filt`` attribute
496 of the provided functors.
497
498 flags : `list`, optional
499 List of flags (per-band) to include in output table.
500 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
501
502 refFlags : `list`, optional
503 List of refFlags (only reference band) to include in output table.
504
505 forcedFlags : `list`, optional
506 List of flags (per-band) to include in output table.
507 Taken from the ``forced_src`` dataset if applied to a
508 multilevel Object Table. Intended for flags from measurement plugins
509 only run during multi-band forced-photometry.
510 """
511 _defaultRefFlags = []
512 _defaultFuncs = ()
513
514 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
515 self.handles = handles
516 self.functors = functors
517
518 self.filt = filt
519 self.flags = list(flags) if flags is not None else []
520 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
521 self.refFlags = list(self._defaultRefFlags)
522 if refFlags is not None:
523 self.refFlags += list(refFlags)
524
525 self._df = None
526
527 @property
528 def defaultFuncs(self):
529 funcs = dict(self._defaultFuncs)
530 return funcs
531
532 @property
533 def func(self):
534 additionalFuncs = self.defaultFuncs
535 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
536 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
537 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
538
539 if isinstance(self.functors, CompositeFunctor):
540 func = self.functors
541 else:
542 func = CompositeFunctor(self.functors)
543
544 func.funcDict.update(additionalFuncs)
545 func.filt = self.filt
546
547 return func
548
549 @property
550 def noDupCols(self):
551 return [name for name, func in self.func.funcDict.items() if func.noDup]
552
553 @property
554 def df(self):
555 if self._df is None:
556 self.compute()
557 return self._df
558
559 def compute(self, dropna=False, pool=None):
560 # map over multiple handles
561 if type(self.handles) in (list, tuple):
562 if pool is None:
563 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
564 else:
565 # TODO: Figure out why this doesn't work (pyarrow pickling
566 # issues?)
567 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
568 self._df = pd.concat(dflist)
569 else:
570 self._df = self.func(self.handles, dropna=dropna)
571
572 return self._df
573
574
575class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
576 dimensions=()):
577 """Expected Connections for subclasses of TransformCatalogBaseTask.
578
579 Must be subclassed.
580 """
581 inputCatalog = connectionTypes.Input(
582 name="",
583 storageClass="DataFrame",
584 )
585 outputCatalog = connectionTypes.Output(
586 name="",
587 storageClass="ArrowAstropy",
588 )
589
590
591class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
592 pipelineConnections=TransformCatalogBaseConnections):
593 functorFile = pexConfig.Field(
594 dtype=str,
595 doc="Path to YAML file specifying Science Data Model functors to use "
596 "when copying columns and computing calibrated values.",
597 default=None,
598 optional=True
599 )
600 primaryKey = pexConfig.Field(
601 dtype=str,
602 doc="Name of column to be set as the DataFrame index. If None, the index"
603 "will be named `id`",
604 default=None,
605 optional=True
606 )
607 columnsFromDataId = pexConfig.ListField(
608 dtype=str,
609 default=None,
610 optional=True,
611 doc="Columns to extract from the dataId",
612 )
613
614
615class TransformCatalogBaseTask(pipeBase.PipelineTask):
616 """Base class for transforming/standardizing a catalog by applying functors
617 that convert units and apply calibrations.
618
619 The purpose of this task is to perform a set of computations on an input
620 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
621 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
622 a new dataset (which needs to be declared in an ``outputDataset``
623 attribute).
624
625 The calculations to be performed are defined in a YAML file that specifies
626 a set of functors to be computed, provided as a ``--functorFile`` config
627 parameter. An example of such a YAML file is the following:
628
629 funcs:
630 sourceId:
631 functor: Index
632 x:
633 functor: Column
634 args: slot_Centroid_x
635 y:
636 functor: Column
637 args: slot_Centroid_y
638 psfFlux:
639 functor: LocalNanojansky
640 args:
641 - slot_PsfFlux_instFlux
642 - slot_PsfFlux_instFluxErr
643 - base_LocalPhotoCalib
644 - base_LocalPhotoCalibErr
645 psfFluxErr:
646 functor: LocalNanojanskyErr
647 args:
648 - slot_PsfFlux_instFlux
649 - slot_PsfFlux_instFluxErr
650 - base_LocalPhotoCalib
651 - base_LocalPhotoCalibErr
652 flags:
653 - detect_isPrimary
654
655 The names for each entry under "func" will become the names of columns in
656 the output dataset. All the functors referenced are defined in
657 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
658 functor are in the `args` list, and any additional entries for each column
659 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
660 treated as keyword arguments to be passed to the functor initialization.
661
662 The "flags" entry is the default shortcut for `Column` functors.
663 All columns listed under "flags" will be copied to the output table
664 untransformed. They can be of any datatype.
665 In the special case of transforming a multi-level oject table with
666 band and dataset indices (deepCoadd_obj), these will be taked from the
667 ``meas`` dataset and exploded out per band.
668
669 There are two special shortcuts that only apply when transforming
670 multi-level Object (deepCoadd_obj) tables:
671 - The "refFlags" entry is shortcut for `Column` functor
672 taken from the ``ref`` dataset if transforming an ObjectTable.
673 - The "forcedFlags" entry is shortcut for `Column` functors.
674 taken from the ``forced_src`` dataset if transforming an ObjectTable.
675 These are expanded out per band.
676
677
678 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
679 to organize and excecute the calculations.
680 """
681 @property
682 def _DefaultName(self):
683 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
684
685 @property
686 def outputDataset(self):
687 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
688
689 @property
690 def inputDataset(self):
691 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
692
693 @property
694 def ConfigClass(self):
695 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
696
697 def __init__(self, *args, **kwargs):
698 super().__init__(*args, **kwargs)
699 if self.config.functorFile:
700 self.log.info("Loading tranform functor definitions from %s",
701 self.config.functorFile)
702 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
703 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
704 else:
705 self.funcs = None
706
707 def runQuantum(self, butlerQC, inputRefs, outputRefs):
708 inputs = butlerQC.get(inputRefs)
709 if self.funcs is None:
710 raise ValueError("config.functorFile is None. "
711 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
712 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
713 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
714 butlerQC.put(result, outputRefs)
715
716 def run(self, handle, funcs=None, dataId=None, band=None):
717 """Do postprocessing calculations
718
719 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
720 ``DataFrame`` object and dataId,
721 returns a dataframe with results of postprocessing calculations.
722
723 Parameters
724 ----------
725 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
726 `~lsst.pipe.base.InMemoryDatasetHandle` or
727 `~pandas.DataFrame`, or list of these.
728 DataFrames from which calculations are done.
729 funcs : `~lsst.pipe.tasks.functors.Functor`
730 Functors to apply to the table's columns
731 dataId : dict, optional
732 Used to add a `patchId` column to the output dataframe.
733 band : `str`, optional
734 Filter band that is being processed.
735
736 Returns
737 -------
738 result : `lsst.pipe.base.Struct`
739 Result struct, with a single ``outputCatalog`` attribute holding
740 the transformed catalog.
741 """
742 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
743
744 df = self.transform(band, handle, funcs, dataId).df
745 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
746
747 if len(df) == 0:
748 raise UpstreamFailureNoWorkFound(
749 "Input catalog is empty, so there is nothing to transform/standardize",
750 )
751
752 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
753 return result
754
755 def getFunctors(self):
756 return self.funcs
757
758 def getAnalysis(self, handles, funcs=None, band=None):
759 if funcs is None:
760 funcs = self.funcs
761 analysis = PostprocessAnalysis(handles, funcs, filt=band)
762 return analysis
763
764 def transform(self, band, handles, funcs, dataId):
765 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
766 df = analysis.df
767 if dataId and self.config.columnsFromDataId:
768 for key in self.config.columnsFromDataId:
769 if key in dataId:
770 if key == "detector":
771 # int16 instead of uint8 because databases don't like unsigned bytes.
772 df[key] = np.int16(dataId[key])
773 else:
774 df[key] = dataId[key]
775 else:
776 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
777
778 if self.config.primaryKey:
779 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
780 df.reset_index(inplace=True, drop=True)
781 df.set_index(self.config.primaryKey, inplace=True)
782
783 return pipeBase.Struct(
784 df=df,
785 analysis=analysis
786 )
787
788
789class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
790 defaultTemplates={"coaddName": "deep"},
791 dimensions=("tract", "patch", "skymap")):
792 inputCatalog = connectionTypes.Input(
793 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
794 "stored as a DataFrame with a multi-level column index per-patch.",
795 dimensions=("tract", "patch", "skymap"),
796 storageClass="DataFrame",
797 name="{coaddName}Coadd_obj",
798 deferLoad=True,
799 )
800 inputCatalogRef = connectionTypes.Input(
801 doc="Catalog marking the primary detection (which band provides a good shape and position)"
802 "for each detection in deepCoadd_mergeDet.",
803 dimensions=("tract", "patch", "skymap"),
804 storageClass="SourceCatalog",
805 name="{coaddName}Coadd_ref",
806 deferLoad=True,
807 )
808 inputCatalogExpMultiprofit = connectionTypes.Input(
809 doc="Catalog of multiband Exponential fits.",
810 dimensions=("tract", "patch", "skymap"),
811 storageClass="ArrowAstropy",
812 name="{coaddName}Coadd_Exp_multiprofit",
813 deferLoad=True,
814 )
815 inputCatalogSersicMultiprofit = connectionTypes.Input(
816 doc="Catalog of multiband Sersic fits.",
817 dimensions=("tract", "patch", "skymap"),
818 storageClass="ArrowAstropy",
819 name="{coaddName}Coadd_Sersic_multiprofit",
820 deferLoad=True,
821 )
822 inputCatalogEpoch = connectionTypes.Input(
823 doc="Catalog of mean epochs for each object per band.",
824 dimensions=("tract", "patch", "skymap"),
825 storageClass="ArrowAstropy",
826 name="object_epoch",
827 deferLoad=True,
828 )
829 outputCatalog = connectionTypes.Output(
830 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
831 "data model.",
832 dimensions=("tract", "patch", "skymap"),
833 storageClass="ArrowAstropy",
834 name="objectTable"
835 )
836
837 def __init__(self, *, config=None):
838 super().__init__(config=config)
839 if config.multilevelOutput:
840 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
841
842
843class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
844 pipelineConnections=TransformObjectCatalogConnections):
845 coaddName = pexConfig.Field(
846 dtype=str,
847 default="deep",
848 doc="Name of coadd"
849 )
850 outputBands = pexConfig.ListField(
851 dtype=str,
852 default=None,
853 optional=True,
854 doc=("These bands and only these bands will appear in the output,"
855 " NaN-filled if the input does not include them."
856 " If None, then use all bands found in the input.")
857 )
858 camelCase = pexConfig.Field(
859 dtype=bool,
860 default=False,
861 doc=("Write per-band columns names with camelCase, else underscore "
862 "For example: gPsFlux instead of g_PsFlux.")
863 )
864 multilevelOutput = pexConfig.Field(
865 dtype=bool,
866 default=False,
867 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
868 "and name-munged (False). If True, the output storage class will be "
869 "set to DataFrame, since astropy tables do not support multi-level indexing."),
870 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
871 )
872 goodFlags = pexConfig.ListField(
873 dtype=str,
874 default=[],
875 doc=("List of 'good' flags that should be set False when populating empty tables. "
876 "All other flags are considered to be 'bad' flags and will be set to True.")
877 )
878 floatFillValue = pexConfig.Field(
879 dtype=float,
880 default=np.nan,
881 doc="Fill value for float fields when populating empty tables."
882 )
883 integerFillValue = pexConfig.Field(
884 dtype=int,
885 default=-1,
886 doc="Fill value for integer fields when populating empty tables."
887 )
888
889 def setDefaults(self):
890 super().setDefaults()
891 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
892 self.primaryKey = "objectId"
893 self.columnsFromDataId = ["tract", "patch"]
894 self.goodFlags = ["calib_astrometry_used",
895 "calib_photometry_reserved",
896 "calib_photometry_used",
897 "calib_psf_candidate",
898 "calib_psf_reserved",
899 "calib_psf_used"]
900
901
902class TransformObjectCatalogTask(TransformCatalogBaseTask):
903 """Produce a flattened Object Table to match the format specified in
904 sdm_schemas.
905
906 Do the same set of postprocessing calculations on all bands.
907
908 This is identical to `TransformCatalogBaseTask`, except for that it does
909 the specified functor calculations for all filters present in the
910 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
911 by the YAML file will be superceded.
912 """
913 _DefaultName = "transformObjectCatalog"
914 ConfigClass = TransformObjectCatalogConfig
915
916 datasets_multiband = ("epoch", "ref", "Exp_multiprofit", "Sersic_multiprofit")
917
918 def runQuantum(self, butlerQC, inputRefs, outputRefs):
919 inputs = butlerQC.get(inputRefs)
920 if self.funcs is None:
921 raise ValueError("config.functorFile is None. "
922 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
923 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
924 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
925 handle_epoch=inputs["inputCatalogEpoch"],
926 handle_ref=inputs["inputCatalogRef"],
927 handle_Exp_multiprofit=inputs["inputCatalogExpMultiprofit"],
928 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
929 )
930 butlerQC.put(result, outputRefs)
931
932 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
933 # NOTE: band kwarg is ignored here.
934 # TODO: Document and improve funcs argument usage in DM-48895
935 # self.getAnalysis only supports list, dict and CompositeFunctor
936 if isinstance(funcs, CompositeFunctor):
937 funcDict_in = funcs.funcDict
938 elif isinstance(funcs, dict):
939 funcDict_in = funcs
940 elif isinstance(funcs, list):
941 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
942 else:
943 raise TypeError(f"Unsupported {type(funcs)=}")
944
945 handles_multi = {}
946 funcDicts_multiband = {}
947 for dataset in self.datasets_multiband:
948 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
949 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
950 handles_multi[dataset] = handle_multi
951 funcDicts_multiband[dataset] = {}
952
953 dfDict = {}
954 analysisDict = {}
955 templateDf = pd.DataFrame()
956
957 columns = handle.get(component="columns")
958 inputBands = columns.unique(level=1).values
959
960 outputBands = self.config.outputBands if self.config.outputBands else inputBands
961
962 # Split up funcs for per-band and multiband tables
963 funcDict_band = {}
964
965 for name, func in funcDict_in.items():
966 if func.dataset in funcDicts_multiband:
967 # This is something like a MultibandColumn
968 if band := getattr(func, "band_to_check", None):
969 if band not in outputBands:
970 continue
971 # This is something like a ReferenceBand that has configurable bands
972 elif hasattr(func, "bands"):
973 # TODO: Determine if this can be avoided DM-48895
974 # This will work fine if the init doesn't manipulate bands
975 # If it does, then one would need to make a new functor
976 # Determining the (kw)args is tricky in that case
977 func.bands = tuple(inputBands)
978
979 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
980 funcDict[name] = func
981
982 funcs_band = CompositeFunctor(funcDict_band)
983
984 # Perform transform for data of filters that exist in the handle dataframe.
985 for inputBand in inputBands:
986 if inputBand not in outputBands:
987 self.log.info("Ignoring %s band data in the input", inputBand)
988 continue
989 self.log.info("Transforming the catalog of band %s", inputBand)
990 result = self.transform(inputBand, handle, funcs_band, dataId)
991 dfDict[inputBand] = result.df
992 analysisDict[inputBand] = result.analysis
993 if templateDf.empty:
994 templateDf = result.df
995
996 # Put filler values in columns of other wanted bands
997 for filt in outputBands:
998 if filt not in dfDict:
999 self.log.info("Adding empty columns for band %s", filt)
1000 dfTemp = templateDf.copy()
1001 for col in dfTemp.columns:
1002 testValue = dfTemp[col].values[0]
1003 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1004 # Boolean flag type, check if it is a "good" flag
1005 if col in self.config.goodFlags:
1006 fillValue = False
1007 else:
1008 fillValue = True
1009 elif isinstance(testValue, numbers.Integral):
1010 # Checking numbers.Integral catches all flavors
1011 # of python, numpy, pandas, etc. integers.
1012 # We must ensure this is not an unsigned integer.
1013 if isinstance(testValue, np.unsignedinteger):
1014 raise ValueError("Parquet tables may not have unsigned integer columns.")
1015 else:
1016 fillValue = self.config.integerFillValue
1017 else:
1018 fillValue = self.config.floatFillValue
1019 dfTemp[col].values[:] = fillValue
1020 dfDict[filt] = dfTemp
1021
1022 # This makes a multilevel column index, with band as first level
1023 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1024 name_index = df.index.name
1025
1026 # TODO: Remove in DM-48895
1027 if not self.config.multilevelOutput:
1028 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1029 if self.config.primaryKey in noDupCols:
1030 noDupCols.remove(self.config.primaryKey)
1031 if dataId and self.config.columnsFromDataId:
1032 noDupCols += self.config.columnsFromDataId
1033 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1034 inputBands=inputBands)
1035
1036 # Apply per-dataset functors to each multiband dataset in turn
1037 for dataset, funcDict in funcDicts_multiband.items():
1038 handle_multiband = handles_multi[dataset]
1039 df_dataset = handle_multiband.get()
1040 if isinstance(df_dataset, astropy.table.Table):
1041 # Allow astropy table inputs to already have the output index
1042 if name_index not in df_dataset.colnames:
1043 if self.config.primaryKey in df_dataset.colnames:
1044 name_index_ap = self.config.primaryKey
1045 else:
1046 raise RuntimeError(
1047 f"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1048 f" {df_dataset.colnames=} for {dataset=}"
1049 )
1050 else:
1051 name_index_ap = name_index
1052 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=False)
1053 elif isinstance(df_dataset, afwTable.SourceCatalog):
1054 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1055 # TODO: should funcDict have noDup funcs removed?
1056 # noDup was intended for per-band tables.
1057 result = self.transform(
1058 None,
1059 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1060 CompositeFunctor(funcDict),
1061 dataId,
1062 )
1063 result.df.index.name = name_index
1064 # Drop columns from dataId if present (patch, tract)
1065 if self.config.columnsFromDataId:
1066 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1067 if columns_drop:
1068 result.df.drop(columns_drop, axis=1, inplace=True)
1069 # Make the same multi-index for the multiband table if needed
1070 # This might end up making copies, one of several reasons to avoid
1071 # using multilevel indexes, or DataFrames at all
1072 to_concat = pd.concat(
1073 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1074 ) if self.config.multilevelOutput else result.df
1075 df = pd.concat([df, to_concat], axis=1)
1076 analysisDict[dataset] = result.analysis
1077 del result
1078
1079 df.index.name = self.config.primaryKey
1080
1081 if not self.config.multilevelOutput:
1082 tbl = pandas_to_astropy(df)
1083 else:
1084 tbl = df
1085
1086 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1087
1088 return pipeBase.Struct(outputCatalog=tbl)
1089
1090
1091class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1092 dimensions=("tract", "skymap")):
1093 inputCatalogs = connectionTypes.Input(
1094 doc="Per-Patch objectTables conforming to the standard data model.",
1095 name="objectTable",
1096 storageClass="ArrowAstropy",
1097 dimensions=("tract", "patch", "skymap"),
1098 multiple=True,
1099 deferLoad=True,
1100 )
1101 outputCatalog = connectionTypes.Output(
1102 doc="Pre-tract horizontal concatenation of the input objectTables",
1103 name="objectTable_tract",
1104 storageClass="ArrowAstropy",
1105 dimensions=("tract", "skymap"),
1106 )
1107
1108
1109class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1110 pipelineConnections=ConsolidateObjectTableConnections):
1111 coaddName = pexConfig.Field(
1112 dtype=str,
1113 default="deep",
1114 doc="Name of coadd"
1115 )
1116
1117
1118class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1119 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1120
1121 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1122 """
1123 _DefaultName = "consolidateObjectTable"
1124 ConfigClass = ConsolidateObjectTableConfig
1125
1126 inputDataset = "objectTable"
1127 outputDataset = "objectTable_tract"
1128
1129 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1130 inputs = butlerQC.get(inputRefs)
1131 self.log.info("Concatenating %s per-patch Object Tables",
1132 len(inputs["inputCatalogs"]))
1133 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1134 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1135
1136
1137class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1138 defaultTemplates={"catalogType": ""},
1139 dimensions=("instrument", "visit", "detector")):
1140
1141 inputCatalog = connectionTypes.Input(
1142 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1143 name="{catalogType}source",
1144 storageClass="DataFrame",
1145 dimensions=("instrument", "visit", "detector"),
1146 deferLoad=True
1147 )
1148 outputCatalog = connectionTypes.Output(
1149 doc="Narrower, per-detector Source Table transformed and converted per a "
1150 "specified set of functors",
1151 name="{catalogType}sourceTable",
1152 storageClass="ArrowAstropy",
1153 dimensions=("instrument", "visit", "detector")
1154 )
1155
1156
1157class TransformSourceTableConfig(TransformCatalogBaseConfig,
1158 pipelineConnections=TransformSourceTableConnections):
1159
1160 def setDefaults(self):
1161 super().setDefaults()
1162 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1163 self.primaryKey = "sourceId"
1164 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1165
1166
1167class TransformSourceTableTask(TransformCatalogBaseTask):
1168 """Transform/standardize a source catalog
1169 """
1170 _DefaultName = "transformSourceTable"
1171 ConfigClass = TransformSourceTableConfig
1172
1173
1174class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1175 dimensions=("instrument", "visit",),
1176 defaultTemplates={"calexpType": ""}):
1177 calexp = connectionTypes.Input(
1178 doc="Processed exposures used for metadata",
1179 name="calexp",
1180 storageClass="ExposureF",
1181 dimensions=("instrument", "visit", "detector"),
1182 deferLoad=True,
1183 multiple=True,
1184 )
1185 visitSummary = connectionTypes.Output(
1186 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1187 "detector id for the id and are sorted for fast lookups of a "
1188 "detector."),
1189 name="visitSummary",
1190 storageClass="ExposureCatalog",
1191 dimensions=("instrument", "visit"),
1192 )
1193 visitSummarySchema = connectionTypes.InitOutput(
1194 doc="Schema of the visitSummary catalog",
1195 name="visitSummary_schema",
1196 storageClass="ExposureCatalog",
1197 )
1198
1199
1200class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1201 pipelineConnections=ConsolidateVisitSummaryConnections):
1202 """Config for ConsolidateVisitSummaryTask"""
1203
1204 full = pexConfig.Field(
1205 "Whether to propate all exposure components. "
1206 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1207 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1208 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1209 dtype=bool,
1210 default=False,
1211 )
1212
1213
1214class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1215 """Task to consolidate per-detector visit metadata.
1216
1217 This task aggregates the following metadata from all the detectors in a
1218 single visit into an exposure catalog:
1219 - The visitInfo.
1220 - The wcs.
1221 - The photoCalib.
1222 - The physical_filter and band (if available).
1223 - The PSF model.
1224 - The aperture correction map.
1225 - The transmission curve.
1226 - The psf size, shape, and effective area at the center of the detector.
1227 - The corners of the bounding box in right ascension/declination.
1228
1229 Tests for this task are performed in ci_hsc_gen3.
1230 """
1231 _DefaultName = "consolidateVisitSummary"
1232 ConfigClass = ConsolidateVisitSummaryConfig
1233
1234 def __init__(self, **kwargs):
1235 super().__init__(**kwargs)
1236 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1237 self.schema.addField("visit", type="L", doc="Visit number")
1238 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1239 self.schema.addField("band", type="String", size=32, doc="Name of band")
1240 ExposureSummaryStats.update_schema(self.schema)
1241 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1242
1243 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1244 dataRefs = butlerQC.get(inputRefs.calexp)
1245 visit = dataRefs[0].dataId["visit"]
1246
1247 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1248 len(dataRefs), visit)
1249
1250 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1251
1252 butlerQC.put(expCatalog, outputRefs.visitSummary)
1253
1254 def _combineExposureMetadata(self, visit, dataRefs):
1255 """Make a combined exposure catalog from a list of dataRefs.
1256 These dataRefs must point to exposures with wcs, summaryStats,
1257 and other visit metadata.
1258
1259 Parameters
1260 ----------
1261 visit : `int`
1262 Visit identification number.
1263 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1264 List of dataRefs in visit.
1265
1266 Returns
1267 -------
1268 visitSummary : `lsst.afw.table.ExposureCatalog`
1269 Exposure catalog with per-detector summary information.
1270 """
1271 cat = afwTable.ExposureCatalog(self.schema)
1272 cat.resize(len(dataRefs))
1273
1274 cat["visit"] = visit
1275
1276 for i, dataRef in enumerate(dataRefs):
1277 visitInfo = dataRef.get(component="visitInfo")
1278 filterLabel = dataRef.get(component="filter")
1279 summaryStats = dataRef.get(component="summaryStats")
1280 detector = dataRef.get(component="detector")
1281 wcs = dataRef.get(component="wcs")
1282 photoCalib = dataRef.get(component="photoCalib")
1283 bbox = dataRef.get(component="bbox")
1284 validPolygon = dataRef.get(component="validPolygon")
1285
1286 rec = cat[i]
1287 rec.setBBox(bbox)
1288 rec.setVisitInfo(visitInfo)
1289 rec.setWcs(wcs)
1290 rec.setPhotoCalib(photoCalib)
1291 rec.setValidPolygon(validPolygon)
1292
1293 if self.config.full:
1294 rec.setPsf(dataRef.get(component="psf"))
1295 rec.setApCorrMap(dataRef.get(component="apCorrMap"))
1296 rec.setTransmissionCurve(dataRef.get(component="transmissionCurve"))
1297
1298 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1299 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1300 rec.setId(detector.getId())
1301 summaryStats.update_record(rec)
1302
1303 if not cat:
1304 raise pipeBase.NoWorkFound(
1305 "No detectors had sufficient information to make a visit summary row."
1306 )
1307
1308 metadata = dafBase.PropertyList()
1309 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1310 # We are looping over existing datarefs, so the following is true
1311 metadata.add("COMMENT", "Only detectors with data have entries.")
1312 cat.setMetadata(metadata)
1313
1314 cat.sort()
1315 return cat
1316
1317
1318class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1319 defaultTemplates={"catalogType": ""},
1320 dimensions=("instrument", "visit")):
1321 inputCatalogs = connectionTypes.Input(
1322 doc="Input per-detector Source Tables",
1323 name="{catalogType}sourceTable",
1324 storageClass="ArrowAstropy",
1325 dimensions=("instrument", "visit", "detector"),
1326 multiple=True,
1327 deferLoad=True,
1328 )
1329 outputCatalog = connectionTypes.Output(
1330 doc="Per-visit concatenation of Source Table",
1331 name="{catalogType}sourceTable_visit",
1332 storageClass="ArrowAstropy",
1333 dimensions=("instrument", "visit")
1334 )
1335
1336
1337class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1338 pipelineConnections=ConsolidateSourceTableConnections):
1339 pass
1340
1341
1342class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1343 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1344 """
1345 _DefaultName = "consolidateSourceTable"
1346 ConfigClass = ConsolidateSourceTableConfig
1347
1348 inputDataset = "sourceTable"
1349 outputDataset = "sourceTable_visit"
1350
1351 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1352 # Docstring inherited.
1353 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1354 detectorOrder.sort()
1355 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1356 inputs = butlerQC.get(inputRefs)
1357 self.log.info("Concatenating %s per-detector Source Tables",
1358 len(inputs["inputCatalogs"]))
1359 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1360 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1361
1362
1363class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1364 dimensions=("instrument",),
1365 defaultTemplates={"calexpType": ""}):
1366 visitSummaryRefs = connectionTypes.Input(
1367 doc="Data references for per-visit consolidated exposure metadata",
1368 name="finalVisitSummary",
1369 storageClass="ExposureCatalog",
1370 dimensions=("instrument", "visit"),
1371 multiple=True,
1372 deferLoad=True,
1373 )
1374 outputCatalog = connectionTypes.Output(
1375 doc="CCD and Visit metadata table",
1376 name="ccdVisitTable",
1377 storageClass="ArrowAstropy",
1378 dimensions=("instrument",)
1379 )
1380
1381
1382class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1383 pipelineConnections=MakeCcdVisitTableConnections):
1384 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1385
1386
1387class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1388 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1389 """
1390 _DefaultName = "makeCcdVisitTable"
1391 ConfigClass = MakeCcdVisitTableConfig
1392
1393 def run(self, visitSummaryRefs):
1394 """Make a table of ccd information from the visit summary catalogs.
1395
1396 Parameters
1397 ----------
1398 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1399 List of DeferredDatasetHandles pointing to exposure catalogs with
1400 per-detector summary information.
1401
1402 Returns
1403 -------
1404 result : `~lsst.pipe.base.Struct`
1405 Results struct with attribute:
1406
1407 ``outputCatalog``
1408 Catalog of ccd and visit information.
1409 """
1410 ccdEntries = []
1411 for visitSummaryRef in visitSummaryRefs:
1412 visitSummary = visitSummaryRef.get()
1413 if not visitSummary:
1414 continue
1415 visitInfo = visitSummary[0].getVisitInfo()
1416
1417 # Strip provenance to prevent merge confusion.
1418 strip_provenance_from_fits_header(visitSummary.metadata)
1419
1420 ccdEntry = {}
1421 summaryTable = visitSummary.asAstropy()
1422 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1423 "pixelScale", "zenithDistance",
1424 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1425 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1426 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1427 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1428 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1429 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1430 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1431 "maxDistToNearestPsf",
1432 "effTime", "effTimePsfSigmaScale",
1433 "effTimeSkyBgScale", "effTimeZeroPointScale",
1434 "magLim"]
1435 ccdEntry = summaryTable[selectColumns]
1436 # 'visit' is the human readable visit number.
1437 # 'visitId' is the key to the visitId table. They are the same.
1438 # Technically you should join to get the visit from the visit
1439 # table.
1440 ccdEntry.rename_column("visit", "visitId")
1441 ccdEntry.rename_column("id", "detectorId")
1442
1443 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1444 # compatibility. To be removed after September 2023.
1445 ccdEntry["decl"] = ccdEntry["dec"]
1446
1447 ccdEntry["ccdVisitId"] = [
1448 self.config.idGenerator.apply(
1449 visitSummaryRef.dataId,
1450 detector=detector_id,
1451 is_exposure=False,
1452 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1453 # because it's usually the ID for a whole catalog
1454 # with a {visit, detector}, and that's the main
1455 # use case for IdGenerator. This usage for a
1456 # summary table is rare.
1457 for detector_id in summaryTable["id"]
1458 ]
1459 ccdEntry["detector"] = summaryTable["id"]
1460 ccdEntry["seeing"] = (
1461 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1462 )
1463 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1464 ccdEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1465 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1466 expTime = visitInfo.getExposureTime()
1467 ccdEntry["obsStart"] = (
1468 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1469 )
1470 expTime_days = expTime / (60*60*24)
1471 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1472 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1473 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1474 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1475 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1476 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1477 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1478 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1479 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1480 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1481 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1482 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1483 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1484 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1485 # values are actually wanted.
1486 ccdEntries.append(ccdEntry)
1487
1488 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1489 return pipeBase.Struct(outputCatalog=outputCatalog)
1490
1491
1492class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1493 dimensions=("instrument",),
1494 defaultTemplates={"calexpType": ""}):
1495 visitSummaries = connectionTypes.Input(
1496 doc="Per-visit consolidated exposure metadata",
1497 name="finalVisitSummary",
1498 storageClass="ExposureCatalog",
1499 dimensions=("instrument", "visit",),
1500 multiple=True,
1501 deferLoad=True,
1502 )
1503 outputCatalog = connectionTypes.Output(
1504 doc="Visit metadata table",
1505 name="visitTable",
1506 storageClass="ArrowAstropy",
1507 dimensions=("instrument",)
1508 )
1509
1510
1511class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1512 pipelineConnections=MakeVisitTableConnections):
1513 pass
1514
1515
1516class MakeVisitTableTask(pipeBase.PipelineTask):
1517 """Produce a `visitTable` from the visit summary exposure catalogs.
1518 """
1519 _DefaultName = "makeVisitTable"
1520 ConfigClass = MakeVisitTableConfig
1521
1522 def run(self, visitSummaries):
1523 """Make a table of visit information from the visit summary catalogs.
1524
1525 Parameters
1526 ----------
1527 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1528 List of exposure catalogs with per-detector summary information.
1529 Returns
1530 -------
1531 result : `~lsst.pipe.base.Struct`
1532 Results struct with attribute:
1533
1534 ``outputCatalog``
1535 Catalog of visit information.
1536 """
1537 visitEntries = []
1538 for visitSummary in visitSummaries:
1539 visitSummary = visitSummary.get()
1540 if not visitSummary:
1541 continue
1542 visitRow = visitSummary[0]
1543 visitInfo = visitRow.getVisitInfo()
1544
1545 visitEntry = {}
1546 visitEntry["visitId"] = visitRow["visit"]
1547 visitEntry["visit"] = visitRow["visit"]
1548 visitEntry["physical_filter"] = visitRow["physical_filter"]
1549 visitEntry["band"] = visitRow["band"]
1550 raDec = visitInfo.getBoresightRaDec()
1551 visitEntry["ra"] = raDec.getRa().asDegrees()
1552 visitEntry["dec"] = raDec.getDec().asDegrees()
1553
1554 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1555 # compatibility. To be removed after September 2023.
1556 visitEntry["decl"] = visitEntry["dec"]
1557
1558 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1559 azAlt = visitInfo.getBoresightAzAlt()
1560 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1561 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1562 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1563 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1564 expTime = visitInfo.getExposureTime()
1565 visitEntry["expTime"] = expTime
1566 visitEntry["expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI), "ns")
1567 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1568 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1569 expTime_days = expTime / (60*60*24)
1570 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1571 visitEntries.append(visitEntry)
1572
1573 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1574 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1575 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1576
1577 outputCatalog = astropy.table.Table(rows=visitEntries)
1578 return pipeBase.Struct(outputCatalog=outputCatalog)
1579
1580
1581@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1582 "This task will be removed after v30.",
1583 version="v29.0", category=FutureWarning)
1584class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1585 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1586
1587 inputCatalog = connectionTypes.Input(
1588 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1589 "By default, it is the output of ForcedPhotCcdTask on calexps",
1590 name="forced_src",
1591 storageClass="SourceCatalog",
1592 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1593 )
1594 inputCatalogDiff = connectionTypes.Input(
1595 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1596 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1597 name="forced_diff",
1598 storageClass="SourceCatalog",
1599 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1600 )
1601 outputCatalog = connectionTypes.Output(
1602 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1603 name="mergedForcedSource",
1604 storageClass="DataFrame",
1605 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1606 )
1607
1608
1609@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1610 "This task will be removed after v30.",
1611 version="v29.0", category=FutureWarning)
1612class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1613 pipelineConnections=WriteForcedSourceTableConnections):
1615 doc="Column on which to join the two input tables on and make the primary key of the output",
1616 dtype=str,
1617 default="objectId",
1618 )
1619
1620
1621@deprecated(reason="This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1622 "This task will be removed after v30.",
1623 version="v29.0", category=FutureWarning)
1624class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1625 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1626
1627 Because the predecessor ForcedPhotCcdTask operates per-detector,
1628 per-tract, (i.e., it has tract in its dimensions), detectors
1629 on the tract boundary may have multiple forced source catalogs.
1630
1631 The successor task TransformForcedSourceTable runs per-patch
1632 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1633 available multiple epochs.
1634 """
1635 _DefaultName = "writeForcedSourceTable"
1636 ConfigClass = WriteForcedSourceTableConfig
1637
1638 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1639 inputs = butlerQC.get(inputRefs)
1640 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1641 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1642 inputs["band"] = butlerQC.quantum.dataId["band"]
1643 outputs = self.run(**inputs)
1644 butlerQC.put(outputs, outputRefs)
1645
1646 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1647 dfs = []
1648 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1649 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1650 df = df.reindex(sorted(df.columns), axis=1)
1651 df["visit"] = visit
1652 # int16 instead of uint8 because databases don't like unsigned bytes.
1653 df["detector"] = np.int16(detector)
1654 df["band"] = band if band else pd.NA
1655 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1656 names=("dataset", "column"))
1657
1658 dfs.append(df)
1659
1660 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1661 return pipeBase.Struct(outputCatalog=outputCatalog)
1662
1663
1664class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1665 dimensions=("instrument", "skymap", "patch", "tract")):
1666
1667 inputCatalogs = connectionTypes.Input(
1668 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1669 name="mergedForcedSource",
1670 storageClass="DataFrame",
1671 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1672 multiple=True,
1673 deferLoad=True
1674 )
1675 referenceCatalog = connectionTypes.Input(
1676 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1677 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1678 "are expected.",
1679 name="objectTable",
1680 storageClass="DataFrame",
1681 dimensions=("tract", "patch", "skymap"),
1682 deferLoad=True
1683 )
1684 outputCatalog = connectionTypes.Output(
1685 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1686 "specified set of functors",
1687 name="forcedSourceTable",
1688 storageClass="ArrowAstropy",
1689 dimensions=("tract", "patch", "skymap")
1690 )
1691
1692
1693class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1694 pipelineConnections=TransformForcedSourceTableConnections):
1695 referenceColumns = pexConfig.ListField(
1696 dtype=str,
1697 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1698 optional=True,
1699 doc="Columns to pull from reference catalog",
1700 )
1701 keyRef = lsst.pex.config.Field(
1702 doc="Column on which to join the two input tables on and make the primary key of the output",
1703 dtype=str,
1704 default="objectId",
1705 )
1707 doc="Rename the output DataFrame index to this name",
1708 dtype=str,
1709 default="forcedSourceId",
1710 )
1711
1712 def setDefaults(self):
1713 super().setDefaults()
1714 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1715 self.columnsFromDataId = ["tract", "patch"]
1716
1717
1718class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1719 """Transform/standardize a ForcedSource catalog
1720
1721 Transforms each wide, per-detector forcedSource DataFrame per the
1722 specification file (per-camera defaults found in ForcedSource.yaml).
1723 All epochs that overlap the patch are aggregated into one per-patch
1724 narrow-DataFrame file.
1725
1726 No de-duplication of rows is performed. Duplicate resolutions flags are
1727 pulled in from the referenceCatalog: `detect_isPrimary`,
1728 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1729 for analysis or compare duplicates for QA.
1730
1731 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1732 per-visit rows can be retreived by joining with the CcdVisitTable on
1733 ccdVisitId.
1734 """
1735 _DefaultName = "transformForcedSourceTable"
1736 ConfigClass = TransformForcedSourceTableConfig
1737
1738 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1739 inputs = butlerQC.get(inputRefs)
1740 if self.funcs is None:
1741 raise ValueError("config.functorFile is None. "
1742 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1743 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1744 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1745
1746 butlerQC.put(outputs, outputRefs)
1747
1748 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1749 dfs = []
1750 refColumns = list(self.config.referenceColumns)
1751 refColumns.append(self.config.keyRef)
1752 ref = referenceCatalog.get(parameters={"columns": refColumns})
1753 if ref.index.name != self.config.keyRef:
1754 # If the DataFrame we loaded was originally written as some other
1755 # Parquet type, it probably doesn't have the index set. If it was
1756 # written as a DataFrame, the index should already be set and
1757 # trying to set it again would be an error, since it doens't exist
1758 # as a regular column anymore.
1759 ref.set_index(self.config.keyRef, inplace=True)
1760 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1761 for handle in inputCatalogs:
1762 result = self.transform(None, handle, funcs, dataId)
1763 # Filter for only rows that were detected on (overlap) the patch
1764 dfs.append(result.df.join(ref, how="inner"))
1765
1766 outputCatalog = pd.concat(dfs)
1767
1768 if outputCatalog.empty:
1769 raise NoWorkFound(f"No forced photometry rows for {dataId}.")
1770
1771 # Now that we are done joining on config.keyRef
1772 # Change index to config.key by
1773 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1774 # Add config.keyRef to the column list
1775 outputCatalog.reset_index(inplace=True)
1776 # Set the forcedSourceId to the index. This is specified in the
1777 # ForcedSource.yaml
1778 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1779 # Rename it to the config.key
1780 outputCatalog.index.rename(self.config.key, inplace=True)
1781
1782 self.log.info("Made a table of %d columns and %d rows",
1783 len(outputCatalog.columns), len(outputCatalog))
1784 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1785
1786
1787class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1788 defaultTemplates={"catalogType": ""},
1789 dimensions=("instrument", "tract")):
1790 inputCatalogs = connectionTypes.Input(
1791 doc="Input per-patch DataFrame Tables to be concatenated",
1792 name="{catalogType}ForcedSourceTable",
1793 storageClass="DataFrame",
1794 dimensions=("tract", "patch", "skymap"),
1795 multiple=True,
1796 )
1797
1798 outputCatalog = connectionTypes.Output(
1799 doc="Output per-tract concatenation of DataFrame Tables",
1800 name="{catalogType}ForcedSourceTable_tract",
1801 storageClass="DataFrame",
1802 dimensions=("tract", "skymap"),
1803 )
1804
1805
1806class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1807 pipelineConnections=ConsolidateTractConnections):
1808 pass
1809
1810
1811class ConsolidateTractTask(pipeBase.PipelineTask):
1812 """Concatenate any per-patch, dataframe list into a single
1813 per-tract DataFrame.
1814 """
1815 _DefaultName = "ConsolidateTract"
1816 ConfigClass = ConsolidateTractConfig
1817
1818 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1819 inputs = butlerQC.get(inputRefs)
1820 # Not checking at least one inputCatalog exists because that'd be an
1821 # empty QG.
1822 self.log.info("Concatenating %s per-patch %s Tables",
1823 len(inputs["inputCatalogs"]),
1824 inputRefs.inputCatalogs[0].datasetType.name)
1825 df = pd.concat(inputs["inputCatalogs"])
1826 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)