LSST Applications g04a91732dc+146a938ab0,g07dc498a13+80b84b0d75,g0fba68d861+4c4f3dcb5c,g1409bbee79+80b84b0d75,g1a7e361dbc+80b84b0d75,g1fd858c14a+f6e422e056,g20f46db602+333b6c0f32,g35bb328faa+fcb1d3bbc8,g42c1b31a95+a1301e4c20,g4d2262a081+f1facf12e5,g4d39ba7253+9b833be27e,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g60b5630c4e+9b833be27e,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+790117df0f,g89139ef638+80b84b0d75,g8d6b6b353c+9b833be27e,g9125e01d80+fcb1d3bbc8,g989de1cb63+80b84b0d75,g9f33ca652e+9c6b68d7f3,ga9baa6287d+9b833be27e,gaaedd4e678+80b84b0d75,gabe3b4be73+1e0a283bba,gb1101e3267+9f3571abad,gb58c049af0+f03b321e39,gb90eeb9370+691e4ab549,gc741bbaa4f+2bcd3860df,gcf25f946ba+790117df0f,gd315a588df+5b65d88fe4,gd6cbbdb0b4+c8606af20c,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+ee6a3faa19,ge278dab8ac+932305ba37,ge82c20c137+76d20ab76d,gee8db133a9+2a6ae0040b,w.2025.10
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38from collections import defaultdict
39import dataclasses
40import functools
41import logging
42import numbers
43import os
44
45import numpy as np
46import pandas as pd
47import astropy.table
48from astro_metadata_translator.headers import merge_headers
49
50import lsst.geom
51import lsst.pex.config as pexConfig
52import lsst.pipe.base as pipeBase
53import lsst.daf.base as dafBase
54from lsst.daf.butler.formatters.parquet import pandas_to_astropy
55from lsst.pipe.base import connectionTypes
56import lsst.afw.table as afwTable
57from lsst.afw.image import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils import strip_provenance_from_fits_header
60
61from .functors import CompositeFunctor, Column
62
63log = logging.getLogger(__name__)
64
65
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
68 """
69 newDf = pd.DataFrame()
70 # band is the level 0 index
71 dfBands = df.columns.unique(level=0).values
72 for band in dfBands:
73 subdf = df[band]
74 columnFormat = "{0}{1}" if camelCase else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c in subdf.columns if c not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
79
80 # Band must be present in the input and output or else column is all NaN:
81 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
82 # Get the unexploded columns from any present band's partition
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
85 return newDf
86
87
89 """A helper class for stacking astropy tables without having them all in
90 memory at once.
91
92 Parameters
93 ----------
94 capacity : `int`
95 Full size of the final table.
96
97 Notes
98 -----
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
102 """
103
104 def __init__(self, capacity):
105 self.index = 0
106 self.capacity = capacity
107 self.result = None
108
109 @classmethod
110 def from_handles(cls, handles):
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
113
114 Parameters
115 ----------
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
120
121 Returns
122 -------
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
126 """
127 capacity = sum(handle.get(component="rowcount") for handle in handles)
128 return cls(capacity=capacity)
129
130 def extend(self, table):
131 """Add a single table to the stack.
132
133 Parameters
134 ----------
135 table : `astropy.table.Table`
136 An astropy table instance.
137 """
138 if self.result is None:
139 self.result = astropy.table.Table()
140 for name in table.colnames:
141 column = table[name]
142 column_cls = type(column)
143 self.result[name] = column_cls.info.new_like([column], self.capacity, name=name)
144 self.result[name][:len(table)] = column
145 self.index = len(table)
146 self.result.meta = table.meta.copy()
147 else:
148 next_index = self.index + len(table)
149 if set(self.result.colnames) != set(table.colnames):
150 raise TypeError(
151 "Inconsistent columns in concatentation: "
152 f"{set(self.result.colnames).symmetric_difference(table.colnames)}"
153 )
154 for name in table.colnames:
155 out_col = self.result[name]
156 in_col = table[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.result[name][self.index:next_index] = table[name]
160 self.index = next_index
161 # Butler provenance should be stripped on merge. It will be
162 # added by butler on write. No attempt is made here to combine
163 # provenance from multiple input tables.
164 self.result.meta = merge_headers([self.result.meta, table.meta], mode="drop")
165 strip_provenance_from_fits_header(self.result.meta)
166
167 @classmethod
168 def vstack_handles(cls, handles):
169 """Vertically stack tables represented by deferred dataset handles.
170
171 Parameters
172 ----------
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
177
178 Returns
179 -------
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
183 """
184 handles = tuple(handles) # guard against single-pass iterators
185 # Ensure that zero length catalogs are not included
186 rowcount = tuple(handle.get(component="rowcount") for handle in handles)
187 handles = tuple(handle for handle, count in zip(handles, rowcount) if count > 0)
188
189 vstack = cls.from_handles(handles)
190 for handle in handles:
191 vstack.extend(handle.get())
192 return vstack.result
193
194
195class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
196 defaultTemplates={"coaddName": "deep"},
197 dimensions=("tract", "patch", "skymap")):
198 inputCatalogMeas = connectionTypes.Input(
199 doc="Catalog of source measurements on the deepCoadd.",
200 dimensions=("tract", "patch", "band", "skymap"),
201 storageClass="SourceCatalog",
202 name="{coaddName}Coadd_meas",
203 multiple=True
204 )
205 inputCatalogForcedSrc = connectionTypes.Input(
206 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
207 dimensions=("tract", "patch", "band", "skymap"),
208 storageClass="SourceCatalog",
209 name="{coaddName}Coadd_forced_src",
210 multiple=True
211 )
212 inputCatalogPsfsMultiprofit = connectionTypes.Input(
213 doc="Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
214 dimensions=("tract", "patch", "band", "skymap"),
215 storageClass="ArrowAstropy",
216 name="{coaddName}Coadd_psfs_multiprofit",
217 multiple=True,
218 )
219 outputCatalog = connectionTypes.Output(
220 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
221 "stored as a DataFrame with a multi-level column index per-patch.",
222 dimensions=("tract", "patch", "skymap"),
223 storageClass="DataFrame",
224 name="{coaddName}Coadd_obj"
225 )
226
227
228class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
229 pipelineConnections=WriteObjectTableConnections):
230 coaddName = pexConfig.Field(
231 dtype=str,
232 default="deep",
233 doc="Name of coadd"
234 )
235
236
237class WriteObjectTableTask(pipeBase.PipelineTask):
238 """Write filter-merged object tables as a DataFrame in parquet format.
239 """
240 _DefaultName = "writeObjectTable"
241 ConfigClass = WriteObjectTableConfig
242
243 # Tag of output dataset written by `MergeSourcesTask.write`
244 outputDataset = "obj"
245
246 def runQuantum(self, butlerQC, inputRefs, outputRefs):
247 inputs = butlerQC.get(inputRefs)
248
249 catalogs = defaultdict(dict)
250 for dataset, connection in (
251 ("meas", "inputCatalogMeas"),
252 ("forced_src", "inputCatalogForcedSrc"),
253 ("psfs_multiprofit", "inputCatalogPsfsMultiprofit"),
254 ):
255 for ref, cat in zip(getattr(inputRefs, connection), inputs[connection]):
256 catalogs[ref.dataId["band"]][dataset] = cat
257
258 dataId = butlerQC.quantum.dataId
259 df = self.run(catalogs=catalogs, tract=dataId["tract"], patch=dataId["patch"])
260 outputs = pipeBase.Struct(outputCatalog=df)
261 butlerQC.put(outputs, outputRefs)
262
263 def run(self, catalogs, tract, patch):
264 """Merge multiple catalogs.
265
266 Parameters
267 ----------
268 catalogs : `dict`
269 Mapping from filter names to dict of catalogs.
270 tract : int
271 tractId to use for the tractId column.
272 patch : str
273 patchId to use for the patchId column.
274
275 Returns
276 -------
277 catalog : `pandas.DataFrame`
278 Merged dataframe.
279
280 Raises
281 ------
282 ValueError
283 Raised if any of the catalogs is of an unsupported type.
284 """
285 dfs = []
286 for filt, tableDict in catalogs.items():
287 for dataset, table in tableDict.items():
288 # Convert afwTable to pandas DataFrame if needed
289 if isinstance(table, pd.DataFrame):
290 df = table
291 elif isinstance(table, afwTable.SourceCatalog):
292 df = table.asAstropy().to_pandas()
293 elif isinstance(table, astropy.table.Table):
294 df = table.to_pandas()
295 else:
296 raise ValueError(f"{dataset=} has unsupported {type(table)=}")
297 df.set_index("id", drop=True, inplace=True)
298
299 # Sort columns by name, to ensure matching schema among patches
300 df = df.reindex(sorted(df.columns), axis=1)
301 df = df.assign(tractId=tract, patchId=patch)
302
303 # Make columns a 3-level MultiIndex
304 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
305 names=("dataset", "band", "column"))
306 dfs.append(df)
307
308 # We do this dance and not `pd.concat(dfs)` because the pandas
309 # concatenation uses infinite memory.
310 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
311 return catalog
312
313
314class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
315 defaultTemplates={"catalogType": ""},
316 dimensions=("instrument", "visit", "detector")):
317
318 catalog = connectionTypes.Input(
319 doc="Input full-depth catalog of sources produced by CalibrateTask",
320 name="{catalogType}src",
321 storageClass="SourceCatalog",
322 dimensions=("instrument", "visit", "detector")
323 )
324 outputCatalog = connectionTypes.Output(
325 doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
326 name="{catalogType}source",
327 storageClass="ArrowAstropy",
328 dimensions=("instrument", "visit", "detector")
329 )
330
331
332class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
333 pipelineConnections=WriteSourceTableConnections):
334 pass
335
336
337class WriteSourceTableTask(pipeBase.PipelineTask):
338 """Write source table to DataFrame Parquet format.
339 """
340 _DefaultName = "writeSourceTable"
341 ConfigClass = WriteSourceTableConfig
342
343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
344 inputs = butlerQC.get(inputRefs)
345 inputs["visit"] = butlerQC.quantum.dataId["visit"]
346 inputs["detector"] = butlerQC.quantum.dataId["detector"]
347 result = self.run(**inputs)
348 outputs = pipeBase.Struct(outputCatalog=result.table)
349 butlerQC.put(outputs, outputRefs)
350
351 def run(self, catalog, visit, detector, **kwargs):
352 """Convert `src` catalog to an Astropy table.
353
354 Parameters
355 ----------
356 catalog: `afwTable.SourceCatalog`
357 catalog to be converted
358 visit, detector: `int`
359 Visit and detector ids to be added as columns.
360 **kwargs
361 Additional keyword arguments are ignored as a convenience for
362 subclasses that pass the same arguments to several different
363 methods.
364
365 Returns
366 -------
367 result : `~lsst.pipe.base.Struct`
368 ``table``
369 `astropy.table.Table` version of the input catalog
370 """
371 self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
372 tbl = catalog.asAstropy()
373 tbl["visit"] = visit
374 # int16 instead of uint8 because databases don't like unsigned bytes.
375 tbl["detector"] = np.int16(detector)
376
377 return pipeBase.Struct(table=tbl)
378
379
380class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
381 defaultTemplates={"catalogType": ""},
382 dimensions=("instrument", "visit", "detector", "skymap")):
383 visitSummary = connectionTypes.Input(
384 doc="Input visit-summary catalog with updated calibration objects.",
385 name="finalVisitSummary",
386 storageClass="ExposureCatalog",
387 dimensions=("instrument", "visit",),
388 )
389
390 def __init__(self, config):
391 # We don't want the input catalog here to be an initial existence
392 # constraint in QG generation, because that can unfortunately limit the
393 # set of data IDs of inputs to other tasks, even those that run earlier
394 # (e.g. updateVisitSummary), when the input 'src' catalog is not
395 # produced. It's safer to just use 'visitSummary' existence as an
396 # initial constraint, and then let the graph prune out the detectors
397 # that don't have a 'src' for this task only.
398 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=True)
399
400
401class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
402 pipelineConnections=WriteRecalibratedSourceTableConnections):
403
404 doReevaluatePhotoCalib = pexConfig.Field(
405 dtype=bool,
406 default=True,
407 doc=("Add or replace local photoCalib columns"),
408 )
409 doReevaluateSkyWcs = pexConfig.Field(
410 dtype=bool,
411 default=True,
412 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
413 )
414
415
416class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
417 """Write source table to DataFrame Parquet format.
418 """
419 _DefaultName = "writeRecalibratedSourceTable"
420 ConfigClass = WriteRecalibratedSourceTableConfig
421
422 def runQuantum(self, butlerQC, inputRefs, outputRefs):
423 inputs = butlerQC.get(inputRefs)
424
425 inputs["visit"] = butlerQC.quantum.dataId["visit"]
426 inputs["detector"] = butlerQC.quantum.dataId["detector"]
427
428 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
429 exposure = ExposureF()
430 inputs["exposure"] = self.prepareCalibratedExposure(
431 exposure=exposure,
432 visitSummary=inputs["visitSummary"],
433 detectorId=butlerQC.quantum.dataId["detector"]
434 )
435 inputs["catalog"] = self.addCalibColumns(**inputs)
436
437 result = self.run(**inputs)
438 outputs = pipeBase.Struct(outputCatalog=result.table)
439 butlerQC.put(outputs, outputRefs)
440
441 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
442 """Prepare a calibrated exposure and apply external calibrations
443 if so configured.
444
445 Parameters
446 ----------
447 exposure : `lsst.afw.image.exposure.Exposure`
448 Input exposure to adjust calibrations. May be an empty Exposure.
449 detectorId : `int`
450 Detector ID associated with the exposure.
451 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
452 Exposure catalog with all calibration objects. WCS and PhotoCalib
453 are always applied if ``visitSummary`` is provided and those
454 components are not `None`.
455
456 Returns
457 -------
458 exposure : `lsst.afw.image.exposure.Exposure`
459 Exposure with adjusted calibrations.
460 """
461 if visitSummary is not None:
462 row = visitSummary.find(detectorId)
463 if row is None:
464 raise pipeBase.NoWorkFound(f"Visit summary for detector {detectorId} is missing.")
465 if (photoCalib := row.getPhotoCalib()) is None:
466 self.log.warning("Detector id %s has None for photoCalib in visit summary; "
467 "skipping reevaluation of photoCalib.", detectorId)
468 exposure.setPhotoCalib(None)
469 else:
470 exposure.setPhotoCalib(photoCalib)
471 if (skyWcs := row.getWcs()) is None:
472 self.log.warning("Detector id %s has None for skyWcs in visit summary; "
473 "skipping reevaluation of skyWcs.", detectorId)
474 exposure.setWcs(None)
475 else:
476 exposure.setWcs(skyWcs)
477
478 return exposure
479
480 def addCalibColumns(self, catalog, exposure, **kwargs):
481 """Add replace columns with calibs evaluated at each centroid
482
483 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
484 a source catalog, by rerunning the plugins.
485
486 Parameters
487 ----------
488 catalog : `lsst.afw.table.SourceCatalog`
489 catalog to which calib columns will be added
490 exposure : `lsst.afw.image.exposure.Exposure`
491 Exposure with attached PhotoCalibs and SkyWcs attributes to be
492 reevaluated at local centroids. Pixels are not required.
493 **kwargs
494 Additional keyword arguments are ignored to facilitate passing the
495 same arguments to several methods.
496
497 Returns
498 -------
499 newCat: `lsst.afw.table.SourceCatalog`
500 Source Catalog with requested local calib columns
501 """
502 measureConfig = SingleFrameMeasurementTask.ConfigClass()
503 measureConfig.doReplaceWithNoise = False
504
505 # Clear all slots, because we aren't running the relevant plugins.
506 for slot in measureConfig.slots:
507 setattr(measureConfig.slots, slot, None)
508
509 measureConfig.plugins.names = []
510 if self.config.doReevaluateSkyWcs:
511 measureConfig.plugins.names.add("base_LocalWcs")
512 self.log.info("Re-evaluating base_LocalWcs plugin")
513 if self.config.doReevaluatePhotoCalib:
514 measureConfig.plugins.names.add("base_LocalPhotoCalib")
515 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
516 pluginsNotToCopy = tuple(measureConfig.plugins.names)
517
518 # Create a new schema and catalog
519 # Copy all columns from original except for the ones to reevaluate
520 aliasMap = catalog.schema.getAliasMap()
521 mapper = afwTable.SchemaMapper(catalog.schema)
522 for item in catalog.schema:
523 if not item.field.getName().startswith(pluginsNotToCopy):
524 mapper.addMapping(item.key)
525
526 schema = mapper.getOutputSchema()
527 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
528 schema.setAliasMap(aliasMap)
529 newCat = afwTable.SourceCatalog(schema)
530 newCat.extend(catalog, mapper=mapper)
531
532 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
533 # update here. LocalPhotoCalibs are applied during transform tasks.
534 # Update coord_ra/coord_dec, which are expected to be positions on the
535 # sky and are used as such in sdm tables without transform
536 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
537 afwTable.updateSourceCoords(exposure.wcs, newCat)
538 wcsPlugin = measurement.plugins["base_LocalWcs"]
539 else:
540 wcsPlugin = None
541
542 if self.config.doReevaluatePhotoCalib and exposure.getPhotoCalib() is not None:
543 pcPlugin = measurement.plugins["base_LocalPhotoCalib"]
544 else:
545 pcPlugin = None
546
547 for row in newCat:
548 if wcsPlugin is not None:
549 wcsPlugin.measure(row, exposure)
550 if pcPlugin is not None:
551 pcPlugin.measure(row, exposure)
552
553 return newCat
554
555
556class PostprocessAnalysis(object):
557 """Calculate columns from DataFrames or handles storing DataFrames.
558
559 This object manages and organizes an arbitrary set of computations
560 on a catalog. The catalog is defined by a
561 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
562 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
563 computations are defined by a collection of
564 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
565 ``CompositeFunctor``).
566
567 After the object is initialized, accessing the ``.df`` attribute (which
568 holds the `pandas.DataFrame` containing the results of the calculations)
569 triggers computation of said dataframe.
570
571 One of the conveniences of using this object is the ability to define a
572 desired common filter for all functors. This enables the same functor
573 collection to be passed to several different `PostprocessAnalysis` objects
574 without having to change the original functor collection, since the ``filt``
575 keyword argument of this object triggers an overwrite of the ``filt``
576 property for all functors in the collection.
577
578 This object also allows a list of refFlags to be passed, and defines a set
579 of default refFlags that are always included even if not requested.
580
581 If a list of DataFrames or Handles is passed, rather than a single one,
582 then the calculations will be mapped over all the input catalogs. In
583 principle, it should be straightforward to parallelize this activity, but
584 initial tests have failed (see TODO in code comments).
585
586 Parameters
587 ----------
588 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
589 `~lsst.pipe.base.InMemoryDatasetHandle` or
590 list of these.
591 Source catalog(s) for computation.
592 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
593 Computations to do (functors that act on ``handles``).
594 If a dict, the output
595 DataFrame will have columns keyed accordingly.
596 If a list, the column keys will come from the
597 ``.shortname`` attribute of each functor.
598
599 filt : `str`, optional
600 Filter in which to calculate. If provided,
601 this will overwrite any existing ``.filt`` attribute
602 of the provided functors.
603
604 flags : `list`, optional
605 List of flags (per-band) to include in output table.
606 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
607
608 refFlags : `list`, optional
609 List of refFlags (only reference band) to include in output table.
610
611 forcedFlags : `list`, optional
612 List of flags (per-band) to include in output table.
613 Taken from the ``forced_src`` dataset if applied to a
614 multilevel Object Table. Intended for flags from measurement plugins
615 only run during multi-band forced-photometry.
616 """
617 _defaultRefFlags = []
618 _defaultFuncs = ()
619
620 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
621 self.handles = handles
622 self.functors = functors
623
624 self.filt = filt
625 self.flags = list(flags) if flags is not None else []
626 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
627 self.refFlags = list(self._defaultRefFlags)
628 if refFlags is not None:
629 self.refFlags += list(refFlags)
630
631 self._df = None
632
633 @property
634 def defaultFuncs(self):
635 funcs = dict(self._defaultFuncs)
636 return funcs
637
638 @property
639 def func(self):
640 additionalFuncs = self.defaultFuncs
641 additionalFuncs.update({flag: Column(flag, dataset="forced_src") for flag in self.forcedFlags})
642 additionalFuncs.update({flag: Column(flag, dataset="ref") for flag in self.refFlags})
643 additionalFuncs.update({flag: Column(flag, dataset="meas") for flag in self.flags})
644
645 if isinstance(self.functors, CompositeFunctor):
646 func = self.functors
647 else:
648 func = CompositeFunctor(self.functors)
649
650 func.funcDict.update(additionalFuncs)
651 func.filt = self.filt
652
653 return func
654
655 @property
656 def noDupCols(self):
657 return [name for name, func in self.func.funcDict.items() if func.noDup]
658
659 @property
660 def df(self):
661 if self._df is None:
662 self.compute()
663 return self._df
664
665 def compute(self, dropna=False, pool=None):
666 # map over multiple handles
667 if type(self.handles) in (list, tuple):
668 if pool is None:
669 dflist = [self.func(handle, dropna=dropna) for handle in self.handles]
670 else:
671 # TODO: Figure out why this doesn't work (pyarrow pickling
672 # issues?)
673 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
674 self._df = pd.concat(dflist)
675 else:
676 self._df = self.func(self.handles, dropna=dropna)
677
678 return self._df
679
680
681class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
682 dimensions=()):
683 """Expected Connections for subclasses of TransformCatalogBaseTask.
684
685 Must be subclassed.
686 """
687 inputCatalog = connectionTypes.Input(
688 name="",
689 storageClass="DataFrame",
690 )
691 outputCatalog = connectionTypes.Output(
692 name="",
693 storageClass="ArrowAstropy",
694 )
695
696
697class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
698 pipelineConnections=TransformCatalogBaseConnections):
699 functorFile = pexConfig.Field(
700 dtype=str,
701 doc="Path to YAML file specifying Science Data Model functors to use "
702 "when copying columns and computing calibrated values.",
703 default=None,
704 optional=True
705 )
706 primaryKey = pexConfig.Field(
707 dtype=str,
708 doc="Name of column to be set as the DataFrame index. If None, the index"
709 "will be named `id`",
710 default=None,
711 optional=True
712 )
713 columnsFromDataId = pexConfig.ListField(
714 dtype=str,
715 default=None,
716 optional=True,
717 doc="Columns to extract from the dataId",
718 )
719
720
721class TransformCatalogBaseTask(pipeBase.PipelineTask):
722 """Base class for transforming/standardizing a catalog by applying functors
723 that convert units and apply calibrations.
724
725 The purpose of this task is to perform a set of computations on an input
726 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
727 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
728 a new dataset (which needs to be declared in an ``outputDataset``
729 attribute).
730
731 The calculations to be performed are defined in a YAML file that specifies
732 a set of functors to be computed, provided as a ``--functorFile`` config
733 parameter. An example of such a YAML file is the following:
734
735 funcs:
736 sourceId:
737 functor: Index
738 x:
739 functor: Column
740 args: slot_Centroid_x
741 y:
742 functor: Column
743 args: slot_Centroid_y
744 psfFlux:
745 functor: LocalNanojansky
746 args:
747 - slot_PsfFlux_instFlux
748 - slot_PsfFlux_instFluxErr
749 - base_LocalPhotoCalib
750 - base_LocalPhotoCalibErr
751 psfFluxErr:
752 functor: LocalNanojanskyErr
753 args:
754 - slot_PsfFlux_instFlux
755 - slot_PsfFlux_instFluxErr
756 - base_LocalPhotoCalib
757 - base_LocalPhotoCalibErr
758 flags:
759 - detect_isPrimary
760
761 The names for each entry under "func" will become the names of columns in
762 the output dataset. All the functors referenced are defined in
763 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
764 functor are in the `args` list, and any additional entries for each column
765 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
766 treated as keyword arguments to be passed to the functor initialization.
767
768 The "flags" entry is the default shortcut for `Column` functors.
769 All columns listed under "flags" will be copied to the output table
770 untransformed. They can be of any datatype.
771 In the special case of transforming a multi-level oject table with
772 band and dataset indices (deepCoadd_obj), these will be taked from the
773 ``meas`` dataset and exploded out per band.
774
775 There are two special shortcuts that only apply when transforming
776 multi-level Object (deepCoadd_obj) tables:
777 - The "refFlags" entry is shortcut for `Column` functor
778 taken from the ``ref`` dataset if transforming an ObjectTable.
779 - The "forcedFlags" entry is shortcut for `Column` functors.
780 taken from the ``forced_src`` dataset if transforming an ObjectTable.
781 These are expanded out per band.
782
783
784 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
785 to organize and excecute the calculations.
786 """
787 @property
788 def _DefaultName(self):
789 raise NotImplementedError("Subclass must define the \"_DefaultName\" attribute.")
790
791 @property
792 def outputDataset(self):
793 raise NotImplementedError("Subclass must define the \"outputDataset\" attribute.")
794
795 @property
796 def inputDataset(self):
797 raise NotImplementedError("Subclass must define \"inputDataset\" attribute.")
798
799 @property
800 def ConfigClass(self):
801 raise NotImplementedError("Subclass must define \"ConfigClass\" attribute.")
802
803 def __init__(self, *args, **kwargs):
804 super().__init__(*args, **kwargs)
805 if self.config.functorFile:
806 self.log.info("Loading tranform functor definitions from %s",
807 self.config.functorFile)
808 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
809 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
810 else:
811 self.funcs = None
812
813 def runQuantum(self, butlerQC, inputRefs, outputRefs):
814 inputs = butlerQC.get(inputRefs)
815 if self.funcs is None:
816 raise ValueError("config.functorFile is None. "
817 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
818 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
819 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
820 butlerQC.put(result, outputRefs)
821
822 def run(self, handle, funcs=None, dataId=None, band=None):
823 """Do postprocessing calculations
824
825 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
826 ``DataFrame`` object and dataId,
827 returns a dataframe with results of postprocessing calculations.
828
829 Parameters
830 ----------
831 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
832 `~lsst.pipe.base.InMemoryDatasetHandle` or
833 `~pandas.DataFrame`, or list of these.
834 DataFrames from which calculations are done.
835 funcs : `~lsst.pipe.tasks.functors.Functor`
836 Functors to apply to the table's columns
837 dataId : dict, optional
838 Used to add a `patchId` column to the output dataframe.
839 band : `str`, optional
840 Filter band that is being processed.
841
842 Returns
843 -------
844 result : `lsst.pipe.base.Struct`
845 Result struct, with a single ``outputCatalog`` attribute holding
846 the transformed catalog.
847 """
848 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
849
850 df = self.transform(band, handle, funcs, dataId).df
851 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
852 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
853 return result
854
855 def getFunctors(self):
856 return self.funcs
857
858 def getAnalysis(self, handles, funcs=None, band=None):
859 if funcs is None:
860 funcs = self.funcs
861 analysis = PostprocessAnalysis(handles, funcs, filt=band)
862 return analysis
863
864 def transform(self, band, handles, funcs, dataId):
865 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
866 df = analysis.df
867 if dataId and self.config.columnsFromDataId:
868 for key in self.config.columnsFromDataId:
869 if key in dataId:
870 if key == "detector":
871 # int16 instead of uint8 because databases don't like unsigned bytes.
872 df[key] = np.int16(dataId[key])
873 else:
874 df[key] = dataId[key]
875 else:
876 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
877
878 if self.config.primaryKey:
879 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
880 df.reset_index(inplace=True, drop=True)
881 df.set_index(self.config.primaryKey, inplace=True)
882
883 return pipeBase.Struct(
884 df=df,
885 analysis=analysis
886 )
887
888
889class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
890 defaultTemplates={"coaddName": "deep"},
891 dimensions=("tract", "patch", "skymap")):
892 inputCatalog = connectionTypes.Input(
893 doc="The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
894 "stored as a DataFrame with a multi-level column index per-patch.",
895 dimensions=("tract", "patch", "skymap"),
896 storageClass="DataFrame",
897 name="{coaddName}Coadd_obj",
898 deferLoad=True,
899 )
900 inputCatalogRef = connectionTypes.Input(
901 doc="Catalog marking the primary detection (which band provides a good shape and position)"
902 "for each detection in deepCoadd_mergeDet.",
903 dimensions=("tract", "patch", "skymap"),
904 storageClass="SourceCatalog",
905 name="{coaddName}Coadd_ref",
906 deferLoad=True,
907 )
908 inputCatalogSersicMultiprofit = connectionTypes.Input(
909 doc="Catalog of source measurements on the deepCoadd.",
910 dimensions=("tract", "patch", "skymap"),
911 storageClass="ArrowAstropy",
912 name="{coaddName}Coadd_Sersic_multiprofit",
913 deferLoad=True,
914 )
915 outputCatalog = connectionTypes.Output(
916 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
917 "data model.",
918 dimensions=("tract", "patch", "skymap"),
919 storageClass="ArrowAstropy",
920 name="objectTable"
921 )
922
923 def __init__(self, *, config=None):
924 super().__init__(config=config)
925 if config.multilevelOutput:
926 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame")
927
928
929class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
930 pipelineConnections=TransformObjectCatalogConnections):
931 coaddName = pexConfig.Field(
932 dtype=str,
933 default="deep",
934 doc="Name of coadd"
935 )
936 outputBands = pexConfig.ListField(
937 dtype=str,
938 default=None,
939 optional=True,
940 doc=("These bands and only these bands will appear in the output,"
941 " NaN-filled if the input does not include them."
942 " If None, then use all bands found in the input.")
943 )
944 camelCase = pexConfig.Field(
945 dtype=bool,
946 default=False,
947 doc=("Write per-band columns names with camelCase, else underscore "
948 "For example: gPsFlux instead of g_PsFlux.")
949 )
950 multilevelOutput = pexConfig.Field(
951 dtype=bool,
952 default=False,
953 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
954 "and name-munged (False). If True, the output storage class will be "
955 "set to DataFrame, since astropy tables do not support multi-level indexing."),
956 deprecated="Support for multi-level outputs is deprecated and will be removed after v29.",
957 )
958 goodFlags = pexConfig.ListField(
959 dtype=str,
960 default=[],
961 doc=("List of 'good' flags that should be set False when populating empty tables. "
962 "All other flags are considered to be 'bad' flags and will be set to True.")
963 )
964 floatFillValue = pexConfig.Field(
965 dtype=float,
966 default=np.nan,
967 doc="Fill value for float fields when populating empty tables."
968 )
969 integerFillValue = pexConfig.Field(
970 dtype=int,
971 default=-1,
972 doc="Fill value for integer fields when populating empty tables."
973 )
974
975 def setDefaults(self):
976 super().setDefaults()
977 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Object.yaml")
978 self.primaryKey = "objectId"
979 self.columnsFromDataId = ["tract", "patch"]
980 self.goodFlags = ["calib_astrometry_used",
981 "calib_photometry_reserved",
982 "calib_photometry_used",
983 "calib_psf_candidate",
984 "calib_psf_reserved",
985 "calib_psf_used"]
986
987
988class TransformObjectCatalogTask(TransformCatalogBaseTask):
989 """Produce a flattened Object Table to match the format specified in
990 sdm_schemas.
991
992 Do the same set of postprocessing calculations on all bands.
993
994 This is identical to `TransformCatalogBaseTask`, except for that it does
995 the specified functor calculations for all filters present in the
996 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
997 by the YAML file will be superceded.
998 """
999 _DefaultName = "transformObjectCatalog"
1000 ConfigClass = TransformObjectCatalogConfig
1001
1002 datasets_multiband = ("ref", "Sersic_multiprofit")
1003
1004 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1005 inputs = butlerQC.get(inputRefs)
1006 if self.funcs is None:
1007 raise ValueError("config.functorFile is None. "
1008 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1009 result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs,
1010 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1011 handle_ref=inputs["inputCatalogRef"],
1012 handle_Sersic_multiprofit=inputs["inputCatalogSersicMultiprofit"],
1013 )
1014 butlerQC.put(result, outputRefs)
1015
1016 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1017 # NOTE: band kwarg is ignored here.
1018 # TODO: Document and improve funcs argument usage in DM-48895
1019 # self.getAnalysis only supports list, dict and CompositeFunctor
1020 if isinstance(funcs, CompositeFunctor):
1021 funcDict_in = funcs.funcDict
1022 elif isinstance(funcs, dict):
1023 funcDict_in = funcs
1024 elif isinstance(funcs, list):
1025 funcDict_in = {idx: v for idx, v in enumerate(funcs)}
1026 else:
1027 raise TypeError(f"Unsupported {type(funcs)=}")
1028
1029 handles_multi = {}
1030 funcDicts_multiband = {}
1031 for dataset in self.datasets_multiband:
1032 if (handle_multi := kwargs.get(f"handle_{dataset}")) is None:
1033 raise RuntimeError(f"Missing required handle_{dataset} kwarg")
1034 handles_multi[dataset] = handle_multi
1035 funcDicts_multiband[dataset] = {}
1036
1037 dfDict = {}
1038 analysisDict = {}
1039 templateDf = pd.DataFrame()
1040
1041 columns = handle.get(component="columns")
1042 inputBands = columns.unique(level=1).values
1043
1044 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1045
1046 # Split up funcs for per-band and multiband tables
1047 funcDict_band = {}
1048
1049 for name, func in funcDict_in.items():
1050 if func.dataset in funcDicts_multiband:
1051 # This is something like a MultibandColumn
1052 if band := getattr(func, "band_to_check", None):
1053 if band not in outputBands:
1054 continue
1055 # This is something like a ReferenceBand that has configurable bands
1056 elif hasattr(func, "bands"):
1057 # TODO: Determine if this can be avoided DM-48895
1058 # This will work fine if the init doesn't manipulate bands
1059 # If it does, then one would need to make a new functor
1060 # Determining the (kw)args is tricky in that case
1061 func.bands = tuple(inputBands)
1062
1063 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1064 funcDict[name] = func
1065
1066 funcs_band = CompositeFunctor(funcDict_band)
1067
1068 # Perform transform for data of filters that exist in the handle dataframe.
1069 for inputBand in inputBands:
1070 if inputBand not in outputBands:
1071 self.log.info("Ignoring %s band data in the input", inputBand)
1072 continue
1073 self.log.info("Transforming the catalog of band %s", inputBand)
1074 result = self.transform(inputBand, handle, funcs_band, dataId)
1075 dfDict[inputBand] = result.df
1076 analysisDict[inputBand] = result.analysis
1077 if templateDf.empty:
1078 templateDf = result.df
1079
1080 # Put filler values in columns of other wanted bands
1081 for filt in outputBands:
1082 if filt not in dfDict:
1083 self.log.info("Adding empty columns for band %s", filt)
1084 dfTemp = templateDf.copy()
1085 for col in dfTemp.columns:
1086 testValue = dfTemp[col].values[0]
1087 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1088 # Boolean flag type, check if it is a "good" flag
1089 if col in self.config.goodFlags:
1090 fillValue = False
1091 else:
1092 fillValue = True
1093 elif isinstance(testValue, numbers.Integral):
1094 # Checking numbers.Integral catches all flavors
1095 # of python, numpy, pandas, etc. integers.
1096 # We must ensure this is not an unsigned integer.
1097 if isinstance(testValue, np.unsignedinteger):
1098 raise ValueError("Parquet tables may not have unsigned integer columns.")
1099 else:
1100 fillValue = self.config.integerFillValue
1101 else:
1102 fillValue = self.config.floatFillValue
1103 dfTemp[col].values[:] = fillValue
1104 dfDict[filt] = dfTemp
1105
1106 # This makes a multilevel column index, with band as first level
1107 df = pd.concat(dfDict, axis=1, names=["band", "column"])
1108 name_index = df.index.name
1109
1110 # TODO: Remove in DM-48895
1111 if not self.config.multilevelOutput:
1112 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1113 if self.config.primaryKey in noDupCols:
1114 noDupCols.remove(self.config.primaryKey)
1115 if dataId and self.config.columnsFromDataId:
1116 noDupCols += self.config.columnsFromDataId
1117 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1118 inputBands=inputBands)
1119
1120 # Apply per-dataset functors to each multiband dataset in turn
1121 for dataset, funcDict in funcDicts_multiband.items():
1122 handle_multiband = handles_multi[dataset]
1123 df_dataset = handle_multiband.get()
1124 if isinstance(df_dataset, astropy.table.Table):
1125 df_dataset = df_dataset.to_pandas().set_index(name_index, drop=False)
1126 elif isinstance(df_dataset, afwTable.SourceCatalog):
1127 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=False)
1128 # TODO: should funcDict have noDup funcs removed?
1129 # noDup was intended for per-band tables.
1130 result = self.transform(
1131 None,
1132 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass="DataFrame"),
1133 CompositeFunctor(funcDict),
1134 dataId,
1135 )
1136 result.df.index.name = name_index
1137 # Drop columns from dataId if present (patch, tract)
1138 if self.config.columnsFromDataId:
1139 columns_drop = [column for column in self.config.columnsFromDataId if column in result.df]
1140 if columns_drop:
1141 result.df.drop(columns_drop, axis=1, inplace=True)
1142 # Make the same multi-index for the multiband table if needed
1143 # This might end up making copies, one of several reasons to avoid
1144 # using multilevel indexes, or DataFrames at all
1145 to_concat = pd.concat(
1146 {band: result.df for band in self.config.outputBands}, axis=1, names=["band", "column"]
1147 ) if self.config.multilevelOutput else result.df
1148 df = pd.concat([df, to_concat], axis=1)
1149 analysisDict[dataset] = result.analysis
1150 del result
1151
1152 df.index.name = self.config.primaryKey
1153
1154 if not self.config.multilevelOutput:
1155 tbl = pandas_to_astropy(df)
1156 else:
1157 tbl = df
1158
1159 self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1160
1161 return pipeBase.Struct(outputCatalog=tbl)
1162
1163
1164class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1165 dimensions=("tract", "skymap")):
1166 inputCatalogs = connectionTypes.Input(
1167 doc="Per-Patch objectTables conforming to the standard data model.",
1168 name="objectTable",
1169 storageClass="ArrowAstropy",
1170 dimensions=("tract", "patch", "skymap"),
1171 multiple=True,
1172 deferLoad=True,
1173 )
1174 outputCatalog = connectionTypes.Output(
1175 doc="Pre-tract horizontal concatenation of the input objectTables",
1176 name="objectTable_tract",
1177 storageClass="ArrowAstropy",
1178 dimensions=("tract", "skymap"),
1179 )
1180
1181
1182class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1183 pipelineConnections=ConsolidateObjectTableConnections):
1184 coaddName = pexConfig.Field(
1185 dtype=str,
1186 default="deep",
1187 doc="Name of coadd"
1188 )
1189
1190
1191class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1192 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1193
1194 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1195 """
1196 _DefaultName = "consolidateObjectTable"
1197 ConfigClass = ConsolidateObjectTableConfig
1198
1199 inputDataset = "objectTable"
1200 outputDataset = "objectTable_tract"
1201
1202 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1203 inputs = butlerQC.get(inputRefs)
1204 self.log.info("Concatenating %s per-patch Object Tables",
1205 len(inputs["inputCatalogs"]))
1206 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1207 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1208
1209
1210class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1211 defaultTemplates={"catalogType": ""},
1212 dimensions=("instrument", "visit", "detector")):
1213
1214 inputCatalog = connectionTypes.Input(
1215 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1216 name="{catalogType}source",
1217 storageClass="DataFrame",
1218 dimensions=("instrument", "visit", "detector"),
1219 deferLoad=True
1220 )
1221 outputCatalog = connectionTypes.Output(
1222 doc="Narrower, per-detector Source Table transformed and converted per a "
1223 "specified set of functors",
1224 name="{catalogType}sourceTable",
1225 storageClass="ArrowAstropy",
1226 dimensions=("instrument", "visit", "detector")
1227 )
1228
1229
1230class TransformSourceTableConfig(TransformCatalogBaseConfig,
1231 pipelineConnections=TransformSourceTableConnections):
1232
1233 def setDefaults(self):
1234 super().setDefaults()
1235 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "Source.yaml")
1236 self.primaryKey = "sourceId"
1237 self.columnsFromDataId = ["visit", "detector", "band", "physical_filter"]
1238
1239
1240class TransformSourceTableTask(TransformCatalogBaseTask):
1241 """Transform/standardize a source catalog
1242 """
1243 _DefaultName = "transformSourceTable"
1244 ConfigClass = TransformSourceTableConfig
1245
1246
1247class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1248 dimensions=("instrument", "visit",),
1249 defaultTemplates={"calexpType": ""}):
1250 calexp = connectionTypes.Input(
1251 doc="Processed exposures used for metadata",
1252 name="calexp",
1253 storageClass="ExposureF",
1254 dimensions=("instrument", "visit", "detector"),
1255 deferLoad=True,
1256 multiple=True,
1257 )
1258 visitSummary = connectionTypes.Output(
1259 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1260 "detector id for the id and are sorted for fast lookups of a "
1261 "detector."),
1262 name="visitSummary",
1263 storageClass="ExposureCatalog",
1264 dimensions=("instrument", "visit"),
1265 )
1266 visitSummarySchema = connectionTypes.InitOutput(
1267 doc="Schema of the visitSummary catalog",
1268 name="visitSummary_schema",
1269 storageClass="ExposureCatalog",
1270 )
1271
1272
1273class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1274 pipelineConnections=ConsolidateVisitSummaryConnections):
1275 """Config for ConsolidateVisitSummaryTask"""
1276 pass
1277
1278
1279class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1280 """Task to consolidate per-detector visit metadata.
1281
1282 This task aggregates the following metadata from all the detectors in a
1283 single visit into an exposure catalog:
1284 - The visitInfo.
1285 - The wcs.
1286 - The photoCalib.
1287 - The physical_filter and band (if available).
1288 - The psf size, shape, and effective area at the center of the detector.
1289 - The corners of the bounding box in right ascension/declination.
1290
1291 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1292 are not persisted here because of storage concerns, and because of their
1293 limited utility as summary statistics.
1294
1295 Tests for this task are performed in ci_hsc_gen3.
1296 """
1297 _DefaultName = "consolidateVisitSummary"
1298 ConfigClass = ConsolidateVisitSummaryConfig
1299
1300 def __init__(self, **kwargs):
1301 super().__init__(**kwargs)
1302 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1303 self.schema.addField("visit", type="L", doc="Visit number")
1304 self.schema.addField("physical_filter", type="String", size=32, doc="Physical filter")
1305 self.schema.addField("band", type="String", size=32, doc="Name of band")
1306 ExposureSummaryStats.update_schema(self.schema)
1307 self.visitSummarySchema = afwTable.ExposureCatalog(self.schema)
1308
1309 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1310 dataRefs = butlerQC.get(inputRefs.calexp)
1311 visit = dataRefs[0].dataId["visit"]
1312
1313 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1314 len(dataRefs), visit)
1315
1316 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1317
1318 butlerQC.put(expCatalog, outputRefs.visitSummary)
1319
1320 def _combineExposureMetadata(self, visit, dataRefs):
1321 """Make a combined exposure catalog from a list of dataRefs.
1322 These dataRefs must point to exposures with wcs, summaryStats,
1323 and other visit metadata.
1324
1325 Parameters
1326 ----------
1327 visit : `int`
1328 Visit identification number.
1329 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1330 List of dataRefs in visit.
1331
1332 Returns
1333 -------
1334 visitSummary : `lsst.afw.table.ExposureCatalog`
1335 Exposure catalog with per-detector summary information.
1336 """
1337 cat = afwTable.ExposureCatalog(self.schema)
1338 cat.resize(len(dataRefs))
1339
1340 cat["visit"] = visit
1341
1342 for i, dataRef in enumerate(dataRefs):
1343 visitInfo = dataRef.get(component="visitInfo")
1344 filterLabel = dataRef.get(component="filter")
1345 summaryStats = dataRef.get(component="summaryStats")
1346 detector = dataRef.get(component="detector")
1347 wcs = dataRef.get(component="wcs")
1348 photoCalib = dataRef.get(component="photoCalib")
1349 detector = dataRef.get(component="detector")
1350 bbox = dataRef.get(component="bbox")
1351 validPolygon = dataRef.get(component="validPolygon")
1352
1353 rec = cat[i]
1354 rec.setBBox(bbox)
1355 rec.setVisitInfo(visitInfo)
1356 rec.setWcs(wcs)
1357 rec.setPhotoCalib(photoCalib)
1358 rec.setValidPolygon(validPolygon)
1359
1360 rec["physical_filter"] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1361 rec["band"] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1362 rec.setId(detector.getId())
1363 summaryStats.update_record(rec)
1364
1365 if not cat:
1366 raise pipeBase.NoWorkFound(
1367 "No detectors had sufficient information to make a visit summary row."
1368 )
1369
1370 metadata = dafBase.PropertyList()
1371 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1372 # We are looping over existing datarefs, so the following is true
1373 metadata.add("COMMENT", "Only detectors with data have entries.")
1374 cat.setMetadata(metadata)
1375
1376 cat.sort()
1377 return cat
1378
1379
1380class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1381 defaultTemplates={"catalogType": ""},
1382 dimensions=("instrument", "visit")):
1383 inputCatalogs = connectionTypes.Input(
1384 doc="Input per-detector Source Tables",
1385 name="{catalogType}sourceTable",
1386 storageClass="ArrowAstropy",
1387 dimensions=("instrument", "visit", "detector"),
1388 multiple=True,
1389 deferLoad=True,
1390 )
1391 outputCatalog = connectionTypes.Output(
1392 doc="Per-visit concatenation of Source Table",
1393 name="{catalogType}sourceTable_visit",
1394 storageClass="ArrowAstropy",
1395 dimensions=("instrument", "visit")
1396 )
1397
1398
1399class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1400 pipelineConnections=ConsolidateSourceTableConnections):
1401 pass
1402
1403
1404class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1405 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1406 """
1407 _DefaultName = "consolidateSourceTable"
1408 ConfigClass = ConsolidateSourceTableConfig
1409
1410 inputDataset = "sourceTable"
1411 outputDataset = "sourceTable_visit"
1412
1413 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1414 from .makeWarp import reorderRefs
1415
1416 detectorOrder = [ref.dataId["detector"] for ref in inputRefs.inputCatalogs]
1417 detectorOrder.sort()
1418 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey="detector")
1419 inputs = butlerQC.get(inputRefs)
1420 self.log.info("Concatenating %s per-detector Source Tables",
1421 len(inputs["inputCatalogs"]))
1422 table = TableVStack.vstack_handles(inputs["inputCatalogs"])
1423 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1424
1425
1426class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1427 dimensions=("instrument",),
1428 defaultTemplates={"calexpType": ""}):
1429 visitSummaryRefs = connectionTypes.Input(
1430 doc="Data references for per-visit consolidated exposure metadata",
1431 name="finalVisitSummary",
1432 storageClass="ExposureCatalog",
1433 dimensions=("instrument", "visit"),
1434 multiple=True,
1435 deferLoad=True,
1436 )
1437 outputCatalog = connectionTypes.Output(
1438 doc="CCD and Visit metadata table",
1439 name="ccdVisitTable",
1440 storageClass="ArrowAstropy",
1441 dimensions=("instrument",)
1442 )
1443
1444
1445class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1446 pipelineConnections=MakeCcdVisitTableConnections):
1447 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1448
1449
1450class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1451 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1452 """
1453 _DefaultName = "makeCcdVisitTable"
1454 ConfigClass = MakeCcdVisitTableConfig
1455
1456 def run(self, visitSummaryRefs):
1457 """Make a table of ccd information from the visit summary catalogs.
1458
1459 Parameters
1460 ----------
1461 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1462 List of DeferredDatasetHandles pointing to exposure catalogs with
1463 per-detector summary information.
1464
1465 Returns
1466 -------
1467 result : `~lsst.pipe.base.Struct`
1468 Results struct with attribute:
1469
1470 ``outputCatalog``
1471 Catalog of ccd and visit information.
1472 """
1473 ccdEntries = []
1474 for visitSummaryRef in visitSummaryRefs:
1475 visitSummary = visitSummaryRef.get()
1476 if not visitSummary:
1477 continue
1478 visitInfo = visitSummary[0].getVisitInfo()
1479
1480 # Strip provenance to prevent merge confusion.
1481 strip_provenance_from_fits_header(visitSummary.metadata)
1482
1483 ccdEntry = {}
1484 summaryTable = visitSummary.asAstropy()
1485 selectColumns = ["id", "visit", "physical_filter", "band", "ra", "dec",
1486 "pixelScale", "zenithDistance",
1487 "expTime", "zeroPoint", "psfSigma", "skyBg", "skyNoise",
1488 "astromOffsetMean", "astromOffsetStd", "nPsfStar",
1489 "psfStarDeltaE1Median", "psfStarDeltaE2Median",
1490 "psfStarDeltaE1Scatter", "psfStarDeltaE2Scatter",
1491 "psfStarDeltaSizeMedian", "psfStarDeltaSizeScatter",
1492 "psfStarScaledDeltaSizeScatter", "psfTraceRadiusDelta",
1493 "psfApFluxDelta", "psfApCorrSigmaScaledDelta",
1494 "maxDistToNearestPsf",
1495 "effTime", "effTimePsfSigmaScale",
1496 "effTimeSkyBgScale", "effTimeZeroPointScale",
1497 "magLim"]
1498 ccdEntry = summaryTable[selectColumns]
1499 # 'visit' is the human readable visit number.
1500 # 'visitId' is the key to the visitId table. They are the same.
1501 # Technically you should join to get the visit from the visit
1502 # table.
1503 ccdEntry.rename_column("visit", "visitId")
1504 ccdEntry.rename_column("id", "detectorId")
1505
1506 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1507 # compatibility. To be removed after September 2023.
1508 ccdEntry["decl"] = ccdEntry["dec"]
1509
1510 ccdEntry["ccdVisitId"] = [
1511 self.config.idGenerator.apply(
1512 visitSummaryRef.dataId,
1513 detector=detector_id,
1514 is_exposure=False,
1515 ).catalog_id # The "catalog ID" here is the ccdVisit ID
1516 # because it's usually the ID for a whole catalog
1517 # with a {visit, detector}, and that's the main
1518 # use case for IdGenerator. This usage for a
1519 # summary table is rare.
1520 for detector_id in summaryTable["id"]
1521 ]
1522 ccdEntry["detector"] = summaryTable["id"]
1523 ccdEntry["seeing"] = (
1524 visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2))
1525 )
1526 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1527 ccdEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns")
1528 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1529 expTime = visitInfo.getExposureTime()
1530 ccdEntry["obsStart"] = (
1531 ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1532 )
1533 expTime_days = expTime / (60*60*24)
1534 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1535 ccdEntry["darkTime"] = visitInfo.getDarkTime()
1536 ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"]
1537 ccdEntry["ySize"] = summaryTable["bbox_max_y"] - summaryTable["bbox_min_y"]
1538 ccdEntry["llcra"] = summaryTable["raCorners"][:, 0]
1539 ccdEntry["llcdec"] = summaryTable["decCorners"][:, 0]
1540 ccdEntry["ulcra"] = summaryTable["raCorners"][:, 1]
1541 ccdEntry["ulcdec"] = summaryTable["decCorners"][:, 1]
1542 ccdEntry["urcra"] = summaryTable["raCorners"][:, 2]
1543 ccdEntry["urcdec"] = summaryTable["decCorners"][:, 2]
1544 ccdEntry["lrcra"] = summaryTable["raCorners"][:, 3]
1545 ccdEntry["lrcdec"] = summaryTable["decCorners"][:, 3]
1546 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1547 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1548 # values are actually wanted.
1549 ccdEntries.append(ccdEntry)
1550
1551 outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact")
1552 return pipeBase.Struct(outputCatalog=outputCatalog)
1553
1554
1555class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1556 dimensions=("instrument",),
1557 defaultTemplates={"calexpType": ""}):
1558 visitSummaries = connectionTypes.Input(
1559 doc="Per-visit consolidated exposure metadata",
1560 name="finalVisitSummary",
1561 storageClass="ExposureCatalog",
1562 dimensions=("instrument", "visit",),
1563 multiple=True,
1564 deferLoad=True,
1565 )
1566 outputCatalog = connectionTypes.Output(
1567 doc="Visit metadata table",
1568 name="visitTable",
1569 storageClass="ArrowAstropy",
1570 dimensions=("instrument",)
1571 )
1572
1573
1574class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1575 pipelineConnections=MakeVisitTableConnections):
1576 pass
1577
1578
1579class MakeVisitTableTask(pipeBase.PipelineTask):
1580 """Produce a `visitTable` from the visit summary exposure catalogs.
1581 """
1582 _DefaultName = "makeVisitTable"
1583 ConfigClass = MakeVisitTableConfig
1584
1585 def run(self, visitSummaries):
1586 """Make a table of visit information from the visit summary catalogs.
1587
1588 Parameters
1589 ----------
1590 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1591 List of exposure catalogs with per-detector summary information.
1592 Returns
1593 -------
1594 result : `~lsst.pipe.base.Struct`
1595 Results struct with attribute:
1596
1597 ``outputCatalog``
1598 Catalog of visit information.
1599 """
1600 visitEntries = []
1601 for visitSummary in visitSummaries:
1602 visitSummary = visitSummary.get()
1603 if not visitSummary:
1604 continue
1605 visitRow = visitSummary[0]
1606 visitInfo = visitRow.getVisitInfo()
1607
1608 visitEntry = {}
1609 visitEntry["visitId"] = visitRow["visit"]
1610 visitEntry["visit"] = visitRow["visit"]
1611 visitEntry["physical_filter"] = visitRow["physical_filter"]
1612 visitEntry["band"] = visitRow["band"]
1613 raDec = visitInfo.getBoresightRaDec()
1614 visitEntry["ra"] = raDec.getRa().asDegrees()
1615 visitEntry["dec"] = raDec.getDec().asDegrees()
1616
1617 # RFC-924: Temporarily keep a duplicate "decl" entry for backwards
1618 # compatibility. To be removed after September 2023.
1619 visitEntry["decl"] = visitEntry["dec"]
1620
1621 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1622 azAlt = visitInfo.getBoresightAzAlt()
1623 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1624 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1625 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1626 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1627 expTime = visitInfo.getExposureTime()
1628 visitEntry["expTime"] = expTime
1629 visitEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns")
1630 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1631 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns")
1632 expTime_days = expTime / (60*60*24)
1633 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1634 visitEntries.append(visitEntry)
1635
1636 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1637 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1638 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1639
1640 outputCatalog = astropy.table.Table(rows=visitEntries)
1641 return pipeBase.Struct(outputCatalog=outputCatalog)
1642
1643
1644class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1645 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1646
1647 inputCatalog = connectionTypes.Input(
1648 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1649 "By default, it is the output of ForcedPhotCcdTask on calexps",
1650 name="forced_src",
1651 storageClass="SourceCatalog",
1652 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1653 )
1654 inputCatalogDiff = connectionTypes.Input(
1655 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1656 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1657 name="forced_diff",
1658 storageClass="SourceCatalog",
1659 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1660 )
1661 outputCatalog = connectionTypes.Output(
1662 doc="InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1663 name="mergedForcedSource",
1664 storageClass="DataFrame",
1665 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1666 )
1667
1668
1669class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1670 pipelineConnections=WriteForcedSourceTableConnections):
1672 doc="Column on which to join the two input tables on and make the primary key of the output",
1673 dtype=str,
1674 default="objectId",
1675 )
1676
1677
1678class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1679 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1680
1681 Because the predecessor ForcedPhotCcdTask operates per-detector,
1682 per-tract, (i.e., it has tract in its dimensions), detectors
1683 on the tract boundary may have multiple forced source catalogs.
1684
1685 The successor task TransformForcedSourceTable runs per-patch
1686 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1687 available multiple epochs.
1688 """
1689 _DefaultName = "writeForcedSourceTable"
1690 ConfigClass = WriteForcedSourceTableConfig
1691
1692 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1693 inputs = butlerQC.get(inputRefs)
1694 inputs["visit"] = butlerQC.quantum.dataId["visit"]
1695 inputs["detector"] = butlerQC.quantum.dataId["detector"]
1696 inputs["band"] = butlerQC.quantum.dataId["band"]
1697 outputs = self.run(**inputs)
1698 butlerQC.put(outputs, outputRefs)
1699
1700 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1701 dfs = []
1702 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ("calexp", "diff")):
1703 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1704 df = df.reindex(sorted(df.columns), axis=1)
1705 df["visit"] = visit
1706 # int16 instead of uint8 because databases don't like unsigned bytes.
1707 df["detector"] = np.int16(detector)
1708 df["band"] = band if band else pd.NA
1709 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1710 names=("dataset", "column"))
1711
1712 dfs.append(df)
1713
1714 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1715 return pipeBase.Struct(outputCatalog=outputCatalog)
1716
1717
1718class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1719 dimensions=("instrument", "skymap", "patch", "tract")):
1720
1721 inputCatalogs = connectionTypes.Input(
1722 doc="DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1723 name="mergedForcedSource",
1724 storageClass="DataFrame",
1725 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1726 multiple=True,
1727 deferLoad=True
1728 )
1729 referenceCatalog = connectionTypes.Input(
1730 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1731 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1732 "are expected.",
1733 name="objectTable",
1734 storageClass="DataFrame",
1735 dimensions=("tract", "patch", "skymap"),
1736 deferLoad=True
1737 )
1738 outputCatalog = connectionTypes.Output(
1739 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1740 "specified set of functors",
1741 name="forcedSourceTable",
1742 storageClass="DataFrame",
1743 dimensions=("tract", "patch", "skymap")
1744 )
1745
1746
1747class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1748 pipelineConnections=TransformForcedSourceTableConnections):
1749 referenceColumns = pexConfig.ListField(
1750 dtype=str,
1751 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1752 optional=True,
1753 doc="Columns to pull from reference catalog",
1754 )
1755 keyRef = lsst.pex.config.Field(
1756 doc="Column on which to join the two input tables on and make the primary key of the output",
1757 dtype=str,
1758 default="objectId",
1759 )
1761 doc="Rename the output DataFrame index to this name",
1762 dtype=str,
1763 default="forcedSourceId",
1764 )
1765
1766 def setDefaults(self):
1767 super().setDefaults()
1768 self.functorFile = os.path.join("$PIPE_TASKS_DIR", "schemas", "ForcedSource.yaml")
1769 self.columnsFromDataId = ["tract", "patch"]
1770
1771
1772class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1773 """Transform/standardize a ForcedSource catalog
1774
1775 Transforms each wide, per-detector forcedSource DataFrame per the
1776 specification file (per-camera defaults found in ForcedSource.yaml).
1777 All epochs that overlap the patch are aggregated into one per-patch
1778 narrow-DataFrame file.
1779
1780 No de-duplication of rows is performed. Duplicate resolutions flags are
1781 pulled in from the referenceCatalog: `detect_isPrimary`,
1782 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1783 for analysis or compare duplicates for QA.
1784
1785 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1786 per-visit rows can be retreived by joining with the CcdVisitTable on
1787 ccdVisitId.
1788 """
1789 _DefaultName = "transformForcedSourceTable"
1790 ConfigClass = TransformForcedSourceTableConfig
1791
1792 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1793 inputs = butlerQC.get(inputRefs)
1794 if self.funcs is None:
1795 raise ValueError("config.functorFile is None. "
1796 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1797 outputs = self.run(inputs["inputCatalogs"], inputs["referenceCatalog"], funcs=self.funcs,
1798 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1799
1800 butlerQC.put(outputs, outputRefs)
1801
1802 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1803 dfs = []
1804 refColumns = list(self.config.referenceColumns)
1805 refColumns.append(self.config.keyRef)
1806 ref = referenceCatalog.get(parameters={"columns": refColumns})
1807 if ref.index.name != self.config.keyRef:
1808 # If the DataFrame we loaded was originally written as some other
1809 # Parquet type, it probably doesn't have the index set. If it was
1810 # written as a DataFrame, the index should already be set and
1811 # trying to set it again would be an error, since it doens't exist
1812 # as a regular column anymore.
1813 ref.set_index(self.config.keyRef, inplace=True)
1814 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1815 for handle in inputCatalogs:
1816 result = self.transform(None, handle, funcs, dataId)
1817 # Filter for only rows that were detected on (overlap) the patch
1818 dfs.append(result.df.join(ref, how="inner"))
1819
1820 outputCatalog = pd.concat(dfs)
1821
1822 # Now that we are done joining on config.keyRef
1823 # Change index to config.key by
1824 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1825 # Add config.keyRef to the column list
1826 outputCatalog.reset_index(inplace=True)
1827 # Set the forcedSourceId to the index. This is specified in the
1828 # ForcedSource.yaml
1829 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1830 # Rename it to the config.key
1831 outputCatalog.index.rename(self.config.key, inplace=True)
1832
1833 self.log.info("Made a table of %d columns and %d rows",
1834 len(outputCatalog.columns), len(outputCatalog))
1835 return pipeBase.Struct(outputCatalog=outputCatalog)
1836
1837
1838class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1839 defaultTemplates={"catalogType": ""},
1840 dimensions=("instrument", "tract")):
1841 inputCatalogs = connectionTypes.Input(
1842 doc="Input per-patch DataFrame Tables to be concatenated",
1843 name="{catalogType}ForcedSourceTable",
1844 storageClass="DataFrame",
1845 dimensions=("tract", "patch", "skymap"),
1846 multiple=True,
1847 )
1848
1849 outputCatalog = connectionTypes.Output(
1850 doc="Output per-tract concatenation of DataFrame Tables",
1851 name="{catalogType}ForcedSourceTable_tract",
1852 storageClass="DataFrame",
1853 dimensions=("tract", "skymap"),
1854 )
1855
1856
1857class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1858 pipelineConnections=ConsolidateTractConnections):
1859 pass
1860
1861
1862class ConsolidateTractTask(pipeBase.PipelineTask):
1863 """Concatenate any per-patch, dataframe list into a single
1864 per-tract DataFrame.
1865 """
1866 _DefaultName = "ConsolidateTract"
1867 ConfigClass = ConsolidateTractConfig
1868
1869 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1870 inputs = butlerQC.get(inputRefs)
1871 # Not checking at least one inputCatalog exists because that'd be an
1872 # empty QG.
1873 self.log.info("Concatenating %s per-patch %s Tables",
1874 len(inputs["inputCatalogs"]),
1875 inputRefs.inputCatalogs[0].datasetType.name)
1876 df = pd.concat(inputs["inputCatalogs"])
1877 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
Definition wcsUtils.cc:125
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)