22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
48from astro_metadata_translator.headers
import merge_headers
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
57from lsst.afw.image
import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header
61from .coaddBase
import reorderRefs
62from .functors
import CompositeFunctor, Column
64log = logging.getLogger(__name__)
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
70 newDf = pd.DataFrame()
72 dfBands = df.columns.unique(level=0).values
75 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c
in subdf.columns
if c
not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
82 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
90 """A helper class for stacking astropy tables without having them all in
96 Full size of the final table.
100 Unlike `astropy.table.vstack`, this class requires all tables to have the
101 exact same columns (it's slightly more strict than even the
102 ``join_type="exact"`` argument to `astropy.table.vstack`).
112 """Construct from an iterable of
113 `lsst.daf.butler.DeferredDatasetHandle`.
117 handles : `~collections.abc.Iterable` [ \
118 `lsst.daf.butler.DeferredDatasetHandle` ]
119 Iterable of handles. Must have a storage class that supports the
120 "rowcount" component, which is all that will be fetched.
124 vstack : `TableVStack`
125 An instance of this class, initialized with capacity equal to the
126 sum of the rowcounts of all the given table handles.
128 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
129 return cls(capacity=capacity)
132 """Add a single table to the stack.
136 table : `astropy.table.Table`
137 An astropy table instance.
140 self.
result = astropy.table.Table()
141 for name
in table.colnames:
143 column_cls = type(column)
144 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
145 self.
result[name][:len(table)] = column
146 self.
index = len(table)
147 self.
result.meta = table.meta.copy()
149 next_index = self.
index + len(table)
150 if set(self.
result.colnames) != set(table.colnames):
152 "Inconsistent columns in concatentation: "
153 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
155 for name
in table.colnames:
156 out_col = self.
result[name]
158 if out_col.dtype != in_col.dtype:
159 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
160 self.
result[name][self.
index:next_index] = table[name]
161 self.
index = next_index
165 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
166 strip_provenance_from_fits_header(self.
result.meta)
170 """Vertically stack tables represented by deferred dataset handles.
174 handles : `~collections.abc.Iterable` [ \
175 `lsst.daf.butler.DeferredDatasetHandle` ]
176 Iterable of handles. Must have the "ArrowAstropy" storage class
177 and identical columns.
181 table : `astropy.table.Table`
182 Concatenated table with the same columns as each input table and
183 the rows of all of them.
185 handles = tuple(handles)
187 rowcount = tuple(handle.get(component=
"rowcount")
for handle
in handles)
188 handles = tuple(handle
for handle, count
in zip(handles, rowcount)
if count > 0)
191 for handle
in handles:
192 vstack.extend(handle.get())
197 defaultTemplates={
"coaddName":
"deep"},
198 dimensions=(
"tract",
"patch",
"skymap")):
199 inputCatalogMeas = connectionTypes.Input(
200 doc=
"Catalog of source measurements on the deepCoadd.",
201 dimensions=(
"tract",
"patch",
"band",
"skymap"),
202 storageClass=
"SourceCatalog",
203 name=
"{coaddName}Coadd_meas",
206 inputCatalogForcedSrc = connectionTypes.Input(
207 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
208 dimensions=(
"tract",
"patch",
"band",
"skymap"),
209 storageClass=
"SourceCatalog",
210 name=
"{coaddName}Coadd_forced_src",
213 inputCatalogPsfsMultiprofit = connectionTypes.Input(
214 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
215 dimensions=(
"tract",
"patch",
"band",
"skymap"),
216 storageClass=
"ArrowAstropy",
217 name=
"{coaddName}Coadd_psfs_multiprofit",
220 outputCatalog = connectionTypes.Output(
221 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
222 "stored as a DataFrame with a multi-level column index per-patch.",
223 dimensions=(
"tract",
"patch",
"skymap"),
224 storageClass=
"DataFrame",
225 name=
"{coaddName}Coadd_obj"
229class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
230 pipelineConnections=WriteObjectTableConnections):
231 coaddName = pexConfig.Field(
238class WriteObjectTableTask(pipeBase.PipelineTask):
239 """Write filter-merged object tables as a DataFrame in parquet format.
241 _DefaultName =
"writeObjectTable"
242 ConfigClass = WriteObjectTableConfig
245 outputDataset =
"obj"
247 def runQuantum(self, butlerQC, inputRefs, outputRefs):
248 inputs = butlerQC.get(inputRefs)
250 catalogs = defaultdict(dict)
251 for dataset, connection
in (
252 (
"meas",
"inputCatalogMeas"),
253 (
"forced_src",
"inputCatalogForcedSrc"),
254 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
256 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
257 catalogs[ref.dataId[
"band"]][dataset] = cat
259 dataId = butlerQC.quantum.dataId
260 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
261 outputs = pipeBase.Struct(outputCatalog=df)
262 butlerQC.put(outputs, outputRefs)
264 def run(self, catalogs, tract, patch):
265 """Merge multiple catalogs.
270 Mapping from filter names to dict of catalogs.
272 tractId to use for the tractId column.
274 patchId to use for the patchId column.
278 catalog : `pandas.DataFrame`
284 Raised if any of the catalogs is of an unsupported type.
287 for filt, tableDict
in catalogs.items():
288 for dataset, table
in tableDict.items():
290 if isinstance(table, pd.DataFrame):
293 df = table.asAstropy().to_pandas()
294 elif isinstance(table, astropy.table.Table):
295 df = table.to_pandas()
297 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
298 df.set_index(
"id", drop=
True, inplace=
True)
301 df = df.reindex(sorted(df.columns), axis=1)
302 df = df.assign(tractId=tract, patchId=patch)
305 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
306 names=(
"dataset",
"band",
"column"))
311 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
315class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
316 defaultTemplates={
"catalogType":
""},
317 dimensions=(
"instrument",
"visit",
"detector")):
319 catalog = connectionTypes.Input(
320 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
321 name=
"{catalogType}src",
322 storageClass=
"SourceCatalog",
323 dimensions=(
"instrument",
"visit",
"detector")
325 outputCatalog = connectionTypes.Output(
326 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
327 name=
"{catalogType}source",
328 storageClass=
"ArrowAstropy",
329 dimensions=(
"instrument",
"visit",
"detector")
333class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
334 pipelineConnections=WriteSourceTableConnections):
338class WriteSourceTableTask(pipeBase.PipelineTask):
339 """Write source table to DataFrame Parquet format.
341 _DefaultName =
"writeSourceTable"
342 ConfigClass = WriteSourceTableConfig
344 def runQuantum(self, butlerQC, inputRefs, outputRefs):
345 inputs = butlerQC.get(inputRefs)
346 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
347 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
348 result = self.run(**inputs)
349 outputs = pipeBase.Struct(outputCatalog=result.table)
350 butlerQC.put(outputs, outputRefs)
352 def run(self, catalog, visit, detector, **kwargs):
353 """Convert `src` catalog to an Astropy table.
357 catalog: `afwTable.SourceCatalog`
358 catalog to be converted
359 visit, detector: `int`
360 Visit and detector ids to be added as columns.
362 Additional keyword arguments are ignored as a convenience for
363 subclasses that pass the same arguments to several different
368 result : `~lsst.pipe.base.Struct`
370 `astropy.table.Table` version of the input catalog
372 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
373 tbl = catalog.asAstropy()
376 tbl[
"detector"] = np.int16(detector)
378 return pipeBase.Struct(table=tbl)
381class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
382 defaultTemplates={
"catalogType":
""},
383 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
384 visitSummary = connectionTypes.Input(
385 doc=
"Input visit-summary catalog with updated calibration objects.",
386 name=
"finalVisitSummary",
387 storageClass=
"ExposureCatalog",
388 dimensions=(
"instrument",
"visit",),
391 def __init__(self, config):
399 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
402class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
403 pipelineConnections=WriteRecalibratedSourceTableConnections):
405 doReevaluatePhotoCalib = pexConfig.Field(
408 doc=(
"Add or replace local photoCalib columns"),
410 doReevaluateSkyWcs = pexConfig.Field(
413 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
417class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
418 """Write source table to DataFrame Parquet format.
420 _DefaultName =
"writeRecalibratedSourceTable"
421 ConfigClass = WriteRecalibratedSourceTableConfig
423 def runQuantum(self, butlerQC, inputRefs, outputRefs):
424 inputs = butlerQC.get(inputRefs)
426 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
427 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
429 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
430 exposure = ExposureF()
431 inputs[
"exposure"] = self.prepareCalibratedExposure(
433 visitSummary=inputs[
"visitSummary"],
434 detectorId=butlerQC.quantum.dataId[
"detector"]
436 inputs[
"catalog"] = self.addCalibColumns(**inputs)
438 result = self.run(**inputs)
439 outputs = pipeBase.Struct(outputCatalog=result.table)
440 butlerQC.put(outputs, outputRefs)
442 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
443 """Prepare a calibrated exposure and apply external calibrations
448 exposure : `lsst.afw.image.exposure.Exposure`
449 Input exposure to adjust calibrations. May be an empty Exposure.
451 Detector ID associated with the exposure.
452 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
453 Exposure catalog with all calibration objects. WCS and PhotoCalib
454 are always applied if ``visitSummary`` is provided and those
455 components are not `None`.
459 exposure : `lsst.afw.image.exposure.Exposure`
460 Exposure with adjusted calibrations.
462 if visitSummary
is not None:
463 row = visitSummary.find(detectorId)
465 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
466 if (photoCalib := row.getPhotoCalib())
is None:
467 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
468 "skipping reevaluation of photoCalib.", detectorId)
469 exposure.setPhotoCalib(
None)
471 exposure.setPhotoCalib(photoCalib)
472 if (skyWcs := row.getWcs())
is None:
473 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
474 "skipping reevaluation of skyWcs.", detectorId)
475 exposure.setWcs(
None)
477 exposure.setWcs(skyWcs)
481 def addCalibColumns(self, catalog, exposure, **kwargs):
482 """Add replace columns with calibs evaluated at each centroid
484 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
485 a source catalog, by rerunning the plugins.
489 catalog : `lsst.afw.table.SourceCatalog`
490 catalog to which calib columns will be added
491 exposure : `lsst.afw.image.exposure.Exposure`
492 Exposure with attached PhotoCalibs and SkyWcs attributes to be
493 reevaluated at local centroids. Pixels are not required.
495 Additional keyword arguments are ignored to facilitate passing the
496 same arguments to several methods.
500 newCat: `lsst.afw.table.SourceCatalog`
501 Source Catalog with requested local calib columns
503 measureConfig = SingleFrameMeasurementTask.ConfigClass()
504 measureConfig.doReplaceWithNoise =
False
507 for slot
in measureConfig.slots:
508 setattr(measureConfig.slots, slot,
None)
510 measureConfig.plugins.names = []
511 if self.config.doReevaluateSkyWcs:
512 measureConfig.plugins.names.add(
"base_LocalWcs")
513 self.log.info(
"Re-evaluating base_LocalWcs plugin")
514 if self.config.doReevaluatePhotoCalib:
515 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
516 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
517 pluginsNotToCopy = tuple(measureConfig.plugins.names)
521 aliasMap = catalog.schema.getAliasMap()
523 for item
in catalog.schema:
524 if not item.field.getName().startswith(pluginsNotToCopy):
525 mapper.addMapping(item.key)
527 schema = mapper.getOutputSchema()
529 schema.setAliasMap(aliasMap)
531 newCat.extend(catalog, mapper=mapper)
537 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
539 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
543 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
544 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
549 if wcsPlugin
is not None:
550 wcsPlugin.measure(row, exposure)
551 if pcPlugin
is not None:
552 pcPlugin.measure(row, exposure)
557class PostprocessAnalysis(object):
558 """Calculate columns from DataFrames or handles storing DataFrames.
560 This object manages and organizes an arbitrary set of computations
561 on a catalog. The catalog is defined by a
562 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
563 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
564 computations are defined by a collection of
565 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
566 ``CompositeFunctor``).
568 After the object is initialized, accessing the ``.df`` attribute (which
569 holds the `pandas.DataFrame` containing the results of the calculations)
570 triggers computation of said dataframe.
572 One of the conveniences of using this object is the ability to define a
573 desired common filter for all functors. This enables the same functor
574 collection to be passed to several different `PostprocessAnalysis` objects
575 without having to change the original functor collection, since the ``filt``
576 keyword argument of this object triggers an overwrite of the ``filt``
577 property for all functors in the collection.
579 This object also allows a list of refFlags to be passed, and defines a set
580 of default refFlags that are always included even if not requested.
582 If a list of DataFrames or Handles is passed, rather than a single one,
583 then the calculations will be mapped over all the input catalogs. In
584 principle, it should be straightforward to parallelize this activity, but
585 initial tests have failed (see TODO in code comments).
589 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
590 `~lsst.pipe.base.InMemoryDatasetHandle` or
592 Source catalog(s) for computation.
593 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
594 Computations to do (functors that act on ``handles``).
595 If a dict, the output
596 DataFrame will have columns keyed accordingly.
597 If a list, the column keys will come from the
598 ``.shortname`` attribute of each functor.
600 filt : `str`, optional
601 Filter in which to calculate. If provided,
602 this will overwrite any existing ``.filt`` attribute
603 of the provided functors.
605 flags : `list`, optional
606 List of flags (per-band) to include in output table.
607 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
609 refFlags : `list`, optional
610 List of refFlags (only reference band) to include in output table.
612 forcedFlags : `list`, optional
613 List of flags (per-band) to include in output table.
614 Taken from the ``forced_src`` dataset if applied to a
615 multilevel Object Table. Intended for flags from measurement plugins
616 only run during multi-band forced-photometry.
618 _defaultRefFlags = []
621 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
622 self.handles = handles
623 self.functors = functors
626 self.flags = list(flags)
if flags
is not None else []
627 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
628 self.refFlags = list(self._defaultRefFlags)
629 if refFlags
is not None:
630 self.refFlags += list(refFlags)
635 def defaultFuncs(self):
636 funcs = dict(self._defaultFuncs)
641 additionalFuncs = self.defaultFuncs
642 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
643 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
644 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
646 if isinstance(self.functors, CompositeFunctor):
651 func.funcDict.update(additionalFuncs)
652 func.filt = self.filt
658 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
666 def compute(self, dropna=False, pool=None):
668 if type(self.handles)
in (list, tuple):
670 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
674 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
675 self._df = pd.concat(dflist)
677 self._df = self.func(self.handles, dropna=dropna)
682class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
684 """Expected Connections for subclasses of TransformCatalogBaseTask.
688 inputCatalog = connectionTypes.Input(
690 storageClass=
"DataFrame",
692 outputCatalog = connectionTypes.Output(
694 storageClass=
"ArrowAstropy",
698class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
699 pipelineConnections=TransformCatalogBaseConnections):
700 functorFile = pexConfig.Field(
702 doc=
"Path to YAML file specifying Science Data Model functors to use "
703 "when copying columns and computing calibrated values.",
707 primaryKey = pexConfig.Field(
709 doc=
"Name of column to be set as the DataFrame index. If None, the index"
710 "will be named `id`",
714 columnsFromDataId = pexConfig.ListField(
718 doc=
"Columns to extract from the dataId",
722class TransformCatalogBaseTask(pipeBase.PipelineTask):
723 """Base class for transforming/standardizing a catalog by applying functors
724 that convert units and apply calibrations.
726 The purpose of this task is to perform a set of computations on an input
727 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
728 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
729 a new dataset (which needs to be declared in an ``outputDataset``
732 The calculations to be performed are defined in a YAML file that specifies
733 a set of functors to be computed, provided as a ``--functorFile`` config
734 parameter. An example of such a YAML file is the following:
741 args: slot_Centroid_x
744 args: slot_Centroid_y
746 functor: LocalNanojansky
748 - slot_PsfFlux_instFlux
749 - slot_PsfFlux_instFluxErr
750 - base_LocalPhotoCalib
751 - base_LocalPhotoCalibErr
753 functor: LocalNanojanskyErr
755 - slot_PsfFlux_instFlux
756 - slot_PsfFlux_instFluxErr
757 - base_LocalPhotoCalib
758 - base_LocalPhotoCalibErr
762 The names for each entry under "func" will become the names of columns in
763 the output dataset. All the functors referenced are defined in
764 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
765 functor are in the `args` list, and any additional entries for each column
766 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
767 treated as keyword arguments to be passed to the functor initialization.
769 The "flags" entry is the default shortcut for `Column` functors.
770 All columns listed under "flags" will be copied to the output table
771 untransformed. They can be of any datatype.
772 In the special case of transforming a multi-level oject table with
773 band and dataset indices (deepCoadd_obj), these will be taked from the
774 ``meas`` dataset and exploded out per band.
776 There are two special shortcuts that only apply when transforming
777 multi-level Object (deepCoadd_obj) tables:
778 - The "refFlags" entry is shortcut for `Column` functor
779 taken from the ``ref`` dataset if transforming an ObjectTable.
780 - The "forcedFlags" entry is shortcut for `Column` functors.
781 taken from the ``forced_src`` dataset if transforming an ObjectTable.
782 These are expanded out per band.
785 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
786 to organize and excecute the calculations.
789 def _DefaultName(self):
790 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
793 def outputDataset(self):
794 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
797 def inputDataset(self):
798 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
801 def ConfigClass(self):
802 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
804 def __init__(self, *args, **kwargs):
805 super().__init__(*args, **kwargs)
806 if self.config.functorFile:
807 self.log.info(
"Loading tranform functor definitions from %s",
808 self.config.functorFile)
809 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
810 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
814 def runQuantum(self, butlerQC, inputRefs, outputRefs):
815 inputs = butlerQC.get(inputRefs)
816 if self.
funcs is None:
817 raise ValueError(
"config.functorFile is None. "
818 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
819 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
820 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
821 butlerQC.put(result, outputRefs)
823 def run(self, handle, funcs=None, dataId=None, band=None):
824 """Do postprocessing calculations
826 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
827 ``DataFrame`` object and dataId,
828 returns a dataframe with results of postprocessing calculations.
832 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
833 `~lsst.pipe.base.InMemoryDatasetHandle` or
834 `~pandas.DataFrame`, or list of these.
835 DataFrames from which calculations are done.
836 funcs : `~lsst.pipe.tasks.functors.Functor`
837 Functors to apply to the table's columns
838 dataId : dict, optional
839 Used to add a `patchId` column to the output dataframe.
840 band : `str`, optional
841 Filter band that is being processed.
845 result : `lsst.pipe.base.Struct`
846 Result struct, with a single ``outputCatalog`` attribute holding
847 the transformed catalog.
849 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
851 df = self.transform(band, handle, funcs, dataId).df
852 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
855 raise UpstreamFailureNoWorkFound(
856 "Input catalog is empty, so there is nothing to transform/standardize",
859 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
862 def getFunctors(self):
865 def getAnalysis(self, handles, funcs=None, band=None):
868 analysis = PostprocessAnalysis(handles, funcs, filt=band)
871 def transform(self, band, handles, funcs, dataId):
872 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
874 if dataId
and self.config.columnsFromDataId:
875 for key
in self.config.columnsFromDataId:
877 if key ==
"detector":
879 df[key] = np.int16(dataId[key])
881 df[key] = dataId[key]
883 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
885 if self.config.primaryKey:
886 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
887 df.reset_index(inplace=
True, drop=
True)
888 df.set_index(self.config.primaryKey, inplace=
True)
890 return pipeBase.Struct(
897 defaultTemplates={
"coaddName":
"deep"},
898 dimensions=(
"tract",
"patch",
"skymap")):
899 inputCatalog = connectionTypes.Input(
900 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
901 "stored as a DataFrame with a multi-level column index per-patch.",
902 dimensions=(
"tract",
"patch",
"skymap"),
903 storageClass=
"DataFrame",
904 name=
"{coaddName}Coadd_obj",
907 inputCatalogRef = connectionTypes.Input(
908 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
909 "for each detection in deepCoadd_mergeDet.",
910 dimensions=(
"tract",
"patch",
"skymap"),
911 storageClass=
"SourceCatalog",
912 name=
"{coaddName}Coadd_ref",
915 inputCatalogSersicMultiprofit = connectionTypes.Input(
916 doc=
"Catalog of source measurements on the deepCoadd.",
917 dimensions=(
"tract",
"patch",
"skymap"),
918 storageClass=
"ArrowAstropy",
919 name=
"{coaddName}Coadd_Sersic_multiprofit",
922 inputCatalogEpoch = connectionTypes.Input(
923 doc=
"Catalog of mean epochs for each object per band.",
924 dimensions=(
"tract",
"patch",
"skymap"),
925 storageClass=
"ArrowAstropy",
929 outputCatalog = connectionTypes.Output(
930 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
932 dimensions=(
"tract",
"patch",
"skymap"),
933 storageClass=
"ArrowAstropy",
937 def __init__(self, *, config=None):
938 super().__init__(config=config)
939 if config.multilevelOutput:
940 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
943class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
944 pipelineConnections=TransformObjectCatalogConnections):
945 coaddName = pexConfig.Field(
950 outputBands = pexConfig.ListField(
954 doc=(
"These bands and only these bands will appear in the output,"
955 " NaN-filled if the input does not include them."
956 " If None, then use all bands found in the input.")
958 camelCase = pexConfig.Field(
961 doc=(
"Write per-band columns names with camelCase, else underscore "
962 "For example: gPsFlux instead of g_PsFlux.")
964 multilevelOutput = pexConfig.Field(
967 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
968 "and name-munged (False). If True, the output storage class will be "
969 "set to DataFrame, since astropy tables do not support multi-level indexing."),
970 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
972 goodFlags = pexConfig.ListField(
975 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
976 "All other flags are considered to be 'bad' flags and will be set to True.")
978 floatFillValue = pexConfig.Field(
981 doc=
"Fill value for float fields when populating empty tables."
983 integerFillValue = pexConfig.Field(
986 doc=
"Fill value for integer fields when populating empty tables."
989 def setDefaults(self):
990 super().setDefaults()
991 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
992 self.primaryKey =
"objectId"
993 self.columnsFromDataId = [
"tract",
"patch"]
994 self.goodFlags = [
"calib_astrometry_used",
995 "calib_photometry_reserved",
996 "calib_photometry_used",
997 "calib_psf_candidate",
998 "calib_psf_reserved",
1002class TransformObjectCatalogTask(TransformCatalogBaseTask):
1003 """Produce a flattened Object Table to match the format specified in
1006 Do the same set of postprocessing calculations on all bands.
1008 This is identical to `TransformCatalogBaseTask`, except for that it does
1009 the specified functor calculations for all filters present in the
1010 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1011 by the YAML file will be superceded.
1013 _DefaultName =
"transformObjectCatalog"
1014 ConfigClass = TransformObjectCatalogConfig
1016 datasets_multiband = (
"epoch",
"ref",
"Sersic_multiprofit")
1018 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1019 inputs = butlerQC.get(inputRefs)
1020 if self.funcs
is None:
1021 raise ValueError(
"config.functorFile is None. "
1022 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1023 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1024 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1025 handle_epoch=inputs[
"inputCatalogEpoch"],
1026 handle_ref=inputs[
"inputCatalogRef"],
1027 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1029 butlerQC.put(result, outputRefs)
1031 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1035 if isinstance(funcs, CompositeFunctor):
1036 funcDict_in = funcs.funcDict
1037 elif isinstance(funcs, dict):
1039 elif isinstance(funcs, list):
1040 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1042 raise TypeError(f
"Unsupported {type(funcs)=}")
1045 funcDicts_multiband = {}
1046 for dataset
in self.datasets_multiband:
1047 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1048 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1049 handles_multi[dataset] = handle_multi
1050 funcDicts_multiband[dataset] = {}
1054 templateDf = pd.DataFrame()
1056 columns = handle.get(component=
"columns")
1057 inputBands = columns.unique(level=1).values
1059 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1064 for name, func
in funcDict_in.items():
1065 if func.dataset
in funcDicts_multiband:
1067 if band := getattr(func,
"band_to_check",
None):
1068 if band
not in outputBands:
1071 elif hasattr(func,
"bands"):
1076 func.bands = tuple(inputBands)
1078 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1079 funcDict[name] = func
1084 for inputBand
in inputBands:
1085 if inputBand
not in outputBands:
1086 self.log.info(
"Ignoring %s band data in the input", inputBand)
1088 self.log.info(
"Transforming the catalog of band %s", inputBand)
1089 result = self.transform(inputBand, handle, funcs_band, dataId)
1090 dfDict[inputBand] = result.df
1091 analysisDict[inputBand] = result.analysis
1092 if templateDf.empty:
1093 templateDf = result.df
1096 for filt
in outputBands:
1097 if filt
not in dfDict:
1098 self.log.info(
"Adding empty columns for band %s", filt)
1099 dfTemp = templateDf.copy()
1100 for col
in dfTemp.columns:
1101 testValue = dfTemp[col].values[0]
1102 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1104 if col
in self.config.goodFlags:
1108 elif isinstance(testValue, numbers.Integral):
1112 if isinstance(testValue, np.unsignedinteger):
1113 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1115 fillValue = self.config.integerFillValue
1117 fillValue = self.config.floatFillValue
1118 dfTemp[col].values[:] = fillValue
1119 dfDict[filt] = dfTemp
1122 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1123 name_index = df.index.name
1126 if not self.config.multilevelOutput:
1127 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1128 if self.config.primaryKey
in noDupCols:
1129 noDupCols.remove(self.config.primaryKey)
1130 if dataId
and self.config.columnsFromDataId:
1131 noDupCols += self.config.columnsFromDataId
1132 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1133 inputBands=inputBands)
1136 for dataset, funcDict
in funcDicts_multiband.items():
1137 handle_multiband = handles_multi[dataset]
1138 df_dataset = handle_multiband.get()
1139 if isinstance(df_dataset, astropy.table.Table):
1141 if name_index
not in df_dataset.colnames:
1142 if self.config.primaryKey
in df_dataset.colnames:
1143 name_index_ap = self.config.primaryKey
1146 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1147 f
" {df_dataset.colnames=} for {dataset=}"
1150 name_index_ap = name_index
1151 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1153 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1156 result = self.transform(
1158 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1162 result.df.index.name = name_index
1164 if self.config.columnsFromDataId:
1165 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1167 result.df.drop(columns_drop, axis=1, inplace=
True)
1171 to_concat = pd.concat(
1172 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1173 )
if self.config.multilevelOutput
else result.df
1174 df = pd.concat([df, to_concat], axis=1)
1175 analysisDict[dataset] = result.analysis
1178 df.index.name = self.config.primaryKey
1180 if not self.config.multilevelOutput:
1181 tbl = pandas_to_astropy(df)
1185 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1187 return pipeBase.Struct(outputCatalog=tbl)
1190class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1191 dimensions=(
"tract",
"skymap")):
1192 inputCatalogs = connectionTypes.Input(
1193 doc=
"Per-Patch objectTables conforming to the standard data model.",
1195 storageClass=
"ArrowAstropy",
1196 dimensions=(
"tract",
"patch",
"skymap"),
1200 outputCatalog = connectionTypes.Output(
1201 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1202 name=
"objectTable_tract",
1203 storageClass=
"ArrowAstropy",
1204 dimensions=(
"tract",
"skymap"),
1208class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1209 pipelineConnections=ConsolidateObjectTableConnections):
1210 coaddName = pexConfig.Field(
1217class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1218 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1220 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1222 _DefaultName =
"consolidateObjectTable"
1223 ConfigClass = ConsolidateObjectTableConfig
1225 inputDataset =
"objectTable"
1226 outputDataset =
"objectTable_tract"
1228 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1229 inputs = butlerQC.get(inputRefs)
1230 self.log.info(
"Concatenating %s per-patch Object Tables",
1231 len(inputs[
"inputCatalogs"]))
1232 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1233 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1236class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1237 defaultTemplates={
"catalogType":
""},
1238 dimensions=(
"instrument",
"visit",
"detector")):
1240 inputCatalog = connectionTypes.Input(
1241 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1242 name=
"{catalogType}source",
1243 storageClass=
"DataFrame",
1244 dimensions=(
"instrument",
"visit",
"detector"),
1247 outputCatalog = connectionTypes.Output(
1248 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1249 "specified set of functors",
1250 name=
"{catalogType}sourceTable",
1251 storageClass=
"ArrowAstropy",
1252 dimensions=(
"instrument",
"visit",
"detector")
1256class TransformSourceTableConfig(TransformCatalogBaseConfig,
1257 pipelineConnections=TransformSourceTableConnections):
1259 def setDefaults(self):
1260 super().setDefaults()
1261 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1262 self.primaryKey =
"sourceId"
1263 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1266class TransformSourceTableTask(TransformCatalogBaseTask):
1267 """Transform/standardize a source catalog
1269 _DefaultName =
"transformSourceTable"
1270 ConfigClass = TransformSourceTableConfig
1273class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1274 dimensions=(
"instrument",
"visit",),
1275 defaultTemplates={
"calexpType":
""}):
1276 calexp = connectionTypes.Input(
1277 doc=
"Processed exposures used for metadata",
1279 storageClass=
"ExposureF",
1280 dimensions=(
"instrument",
"visit",
"detector"),
1284 visitSummary = connectionTypes.Output(
1285 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1286 "detector id for the id and are sorted for fast lookups of a "
1288 name=
"visitSummary",
1289 storageClass=
"ExposureCatalog",
1290 dimensions=(
"instrument",
"visit"),
1292 visitSummarySchema = connectionTypes.InitOutput(
1293 doc=
"Schema of the visitSummary catalog",
1294 name=
"visitSummary_schema",
1295 storageClass=
"ExposureCatalog",
1299class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1300 pipelineConnections=ConsolidateVisitSummaryConnections):
1301 """Config for ConsolidateVisitSummaryTask"""
1303 full = pexConfig.Field(
1304 "Whether to propate all exposure components. "
1305 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1306 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1307 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1313class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1314 """Task to consolidate per-detector visit metadata.
1316 This task aggregates the following metadata from all the detectors in a
1317 single visit into an exposure catalog:
1321 - The physical_filter and band (if available).
1323 - The aperture correction map.
1324 - The transmission curve.
1325 - The psf size, shape, and effective area at the center of the detector.
1326 - The corners of the bounding box in right ascension/declination.
1328 Tests for this task are performed in ci_hsc_gen3.
1330 _DefaultName =
"consolidateVisitSummary"
1331 ConfigClass = ConsolidateVisitSummaryConfig
1333 def __init__(self, **kwargs):
1334 super().__init__(**kwargs)
1335 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1336 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1337 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1338 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1339 ExposureSummaryStats.update_schema(self.schema)
1342 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1343 dataRefs = butlerQC.get(inputRefs.calexp)
1344 visit = dataRefs[0].dataId[
"visit"]
1346 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1347 len(dataRefs), visit)
1349 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1351 butlerQC.put(expCatalog, outputRefs.visitSummary)
1353 def _combineExposureMetadata(self, visit, dataRefs):
1354 """Make a combined exposure catalog from a list of dataRefs.
1355 These dataRefs must point to exposures with wcs, summaryStats,
1356 and other visit metadata.
1361 Visit identification number.
1362 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1363 List of dataRefs in visit.
1367 visitSummary : `lsst.afw.table.ExposureCatalog`
1368 Exposure catalog with per-detector summary information.
1371 cat.resize(len(dataRefs))
1373 cat[
"visit"] = visit
1375 for i, dataRef
in enumerate(dataRefs):
1376 visitInfo = dataRef.get(component=
"visitInfo")
1377 filterLabel = dataRef.get(component=
"filter")
1378 summaryStats = dataRef.get(component=
"summaryStats")
1379 detector = dataRef.get(component=
"detector")
1380 wcs = dataRef.get(component=
"wcs")
1381 photoCalib = dataRef.get(component=
"photoCalib")
1382 bbox = dataRef.get(component=
"bbox")
1383 validPolygon = dataRef.get(component=
"validPolygon")
1387 rec.setVisitInfo(visitInfo)
1389 rec.setPhotoCalib(photoCalib)
1390 rec.setValidPolygon(validPolygon)
1392 if self.config.full:
1393 rec.setPsf(dataRef.get(component=
"psf"))
1394 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1395 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1397 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1398 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1399 rec.setId(detector.getId())
1400 summaryStats.update_record(rec)
1403 raise pipeBase.NoWorkFound(
1404 "No detectors had sufficient information to make a visit summary row."
1408 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1410 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1411 cat.setMetadata(metadata)
1417class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1418 defaultTemplates={
"catalogType":
""},
1419 dimensions=(
"instrument",
"visit")):
1420 inputCatalogs = connectionTypes.Input(
1421 doc=
"Input per-detector Source Tables",
1422 name=
"{catalogType}sourceTable",
1423 storageClass=
"ArrowAstropy",
1424 dimensions=(
"instrument",
"visit",
"detector"),
1428 outputCatalog = connectionTypes.Output(
1429 doc=
"Per-visit concatenation of Source Table",
1430 name=
"{catalogType}sourceTable_visit",
1431 storageClass=
"ArrowAstropy",
1432 dimensions=(
"instrument",
"visit")
1436class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1437 pipelineConnections=ConsolidateSourceTableConnections):
1441class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1442 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1444 _DefaultName =
"consolidateSourceTable"
1445 ConfigClass = ConsolidateSourceTableConfig
1447 inputDataset =
"sourceTable"
1448 outputDataset =
"sourceTable_visit"
1450 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1452 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1453 detectorOrder.sort()
1454 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1455 inputs = butlerQC.get(inputRefs)
1456 self.log.info(
"Concatenating %s per-detector Source Tables",
1457 len(inputs[
"inputCatalogs"]))
1458 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1459 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1462class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1463 dimensions=(
"instrument",),
1464 defaultTemplates={
"calexpType":
""}):
1465 visitSummaryRefs = connectionTypes.Input(
1466 doc=
"Data references for per-visit consolidated exposure metadata",
1467 name=
"finalVisitSummary",
1468 storageClass=
"ExposureCatalog",
1469 dimensions=(
"instrument",
"visit"),
1473 outputCatalog = connectionTypes.Output(
1474 doc=
"CCD and Visit metadata table",
1475 name=
"ccdVisitTable",
1476 storageClass=
"ArrowAstropy",
1477 dimensions=(
"instrument",)
1481class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1482 pipelineConnections=MakeCcdVisitTableConnections):
1483 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1486class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1487 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1489 _DefaultName =
"makeCcdVisitTable"
1490 ConfigClass = MakeCcdVisitTableConfig
1492 def run(self, visitSummaryRefs):
1493 """Make a table of ccd information from the visit summary catalogs.
1497 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1498 List of DeferredDatasetHandles pointing to exposure catalogs with
1499 per-detector summary information.
1503 result : `~lsst.pipe.base.Struct`
1504 Results struct with attribute:
1507 Catalog of ccd and visit information.
1510 for visitSummaryRef
in visitSummaryRefs:
1511 visitSummary = visitSummaryRef.get()
1512 if not visitSummary:
1514 visitInfo = visitSummary[0].getVisitInfo()
1517 strip_provenance_from_fits_header(visitSummary.metadata)
1520 summaryTable = visitSummary.asAstropy()
1521 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1522 "pixelScale",
"zenithDistance",
1523 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1524 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1525 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1526 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1527 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1528 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1529 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1530 "maxDistToNearestPsf",
1531 "effTime",
"effTimePsfSigmaScale",
1532 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1534 ccdEntry = summaryTable[selectColumns]
1539 ccdEntry.rename_column(
"visit",
"visitId")
1540 ccdEntry.rename_column(
"id",
"detectorId")
1544 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1546 ccdEntry[
"ccdVisitId"] = [
1547 self.config.idGenerator.apply(
1548 visitSummaryRef.dataId,
1549 detector=detector_id,
1556 for detector_id
in summaryTable[
"id"]
1558 ccdEntry[
"detector"] = summaryTable[
"id"]
1559 ccdEntry[
"seeing"] = (
1560 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1562 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1563 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1564 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1565 expTime = visitInfo.getExposureTime()
1566 ccdEntry[
"obsStart"] = (
1567 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1569 expTime_days = expTime / (60*60*24)
1570 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1571 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1572 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1573 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1574 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1575 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1576 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1577 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1578 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1579 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1580 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1581 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1585 ccdEntries.append(ccdEntry)
1587 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1588 return pipeBase.Struct(outputCatalog=outputCatalog)
1591class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1592 dimensions=(
"instrument",),
1593 defaultTemplates={
"calexpType":
""}):
1594 visitSummaries = connectionTypes.Input(
1595 doc=
"Per-visit consolidated exposure metadata",
1596 name=
"finalVisitSummary",
1597 storageClass=
"ExposureCatalog",
1598 dimensions=(
"instrument",
"visit",),
1602 outputCatalog = connectionTypes.Output(
1603 doc=
"Visit metadata table",
1605 storageClass=
"ArrowAstropy",
1606 dimensions=(
"instrument",)
1610class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1611 pipelineConnections=MakeVisitTableConnections):
1615class MakeVisitTableTask(pipeBase.PipelineTask):
1616 """Produce a `visitTable` from the visit summary exposure catalogs.
1618 _DefaultName =
"makeVisitTable"
1619 ConfigClass = MakeVisitTableConfig
1621 def run(self, visitSummaries):
1622 """Make a table of visit information from the visit summary catalogs.
1626 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1627 List of exposure catalogs with per-detector summary information.
1630 result : `~lsst.pipe.base.Struct`
1631 Results struct with attribute:
1634 Catalog of visit information.
1637 for visitSummary
in visitSummaries:
1638 visitSummary = visitSummary.get()
1639 if not visitSummary:
1641 visitRow = visitSummary[0]
1642 visitInfo = visitRow.getVisitInfo()
1645 visitEntry[
"visitId"] = visitRow[
"visit"]
1646 visitEntry[
"visit"] = visitRow[
"visit"]
1647 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1648 visitEntry[
"band"] = visitRow[
"band"]
1649 raDec = visitInfo.getBoresightRaDec()
1650 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1651 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1655 visitEntry[
"decl"] = visitEntry[
"dec"]
1657 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1658 azAlt = visitInfo.getBoresightAzAlt()
1659 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1660 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1661 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1662 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1663 expTime = visitInfo.getExposureTime()
1664 visitEntry[
"expTime"] = expTime
1665 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1666 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1667 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1668 expTime_days = expTime / (60*60*24)
1669 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1670 visitEntries.append(visitEntry)
1676 outputCatalog = astropy.table.Table(rows=visitEntries)
1677 return pipeBase.Struct(outputCatalog=outputCatalog)
1680class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1681 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1683 inputCatalog = connectionTypes.Input(
1684 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1685 "By default, it is the output of ForcedPhotCcdTask on calexps",
1687 storageClass=
"SourceCatalog",
1688 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1690 inputCatalogDiff = connectionTypes.Input(
1691 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1692 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1694 storageClass=
"SourceCatalog",
1695 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1697 outputCatalog = connectionTypes.Output(
1698 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1699 name=
"mergedForcedSource",
1700 storageClass=
"DataFrame",
1701 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1705class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1706 pipelineConnections=WriteForcedSourceTableConnections):
1708 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1714class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1715 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1717 Because the predecessor ForcedPhotCcdTask operates per-detector,
1718 per-tract, (i.e., it has tract in its dimensions), detectors
1719 on the tract boundary may have multiple forced source catalogs.
1721 The successor task TransformForcedSourceTable runs per-patch
1722 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1723 available multiple epochs.
1725 _DefaultName =
"writeForcedSourceTable"
1726 ConfigClass = WriteForcedSourceTableConfig
1728 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1729 inputs = butlerQC.get(inputRefs)
1730 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1731 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1732 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1733 outputs = self.run(**inputs)
1734 butlerQC.put(outputs, outputRefs)
1736 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1738 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1739 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1740 df = df.reindex(sorted(df.columns), axis=1)
1743 df[
"detector"] = np.int16(detector)
1744 df[
"band"] = band
if band
else pd.NA
1745 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1746 names=(
"dataset",
"column"))
1750 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1751 return pipeBase.Struct(outputCatalog=outputCatalog)
1754class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1755 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1757 inputCatalogs = connectionTypes.Input(
1758 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1759 name=
"mergedForcedSource",
1760 storageClass=
"DataFrame",
1761 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1765 referenceCatalog = connectionTypes.Input(
1766 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1767 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1770 storageClass=
"DataFrame",
1771 dimensions=(
"tract",
"patch",
"skymap"),
1774 outputCatalog = connectionTypes.Output(
1775 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1776 "specified set of functors",
1777 name=
"forcedSourceTable",
1778 storageClass=
"ArrowAstropy",
1779 dimensions=(
"tract",
"patch",
"skymap")
1783class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1784 pipelineConnections=TransformForcedSourceTableConnections):
1785 referenceColumns = pexConfig.ListField(
1787 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1789 doc=
"Columns to pull from reference catalog",
1792 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1797 doc=
"Rename the output DataFrame index to this name",
1799 default=
"forcedSourceId",
1802 def setDefaults(self):
1803 super().setDefaults()
1804 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1805 self.columnsFromDataId = [
"tract",
"patch"]
1808class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1809 """Transform/standardize a ForcedSource catalog
1811 Transforms each wide, per-detector forcedSource DataFrame per the
1812 specification file (per-camera defaults found in ForcedSource.yaml).
1813 All epochs that overlap the patch are aggregated into one per-patch
1814 narrow-DataFrame file.
1816 No de-duplication of rows is performed. Duplicate resolutions flags are
1817 pulled in from the referenceCatalog: `detect_isPrimary`,
1818 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1819 for analysis or compare duplicates for QA.
1821 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1822 per-visit rows can be retreived by joining with the CcdVisitTable on
1825 _DefaultName =
"transformForcedSourceTable"
1826 ConfigClass = TransformForcedSourceTableConfig
1828 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1829 inputs = butlerQC.get(inputRefs)
1830 if self.funcs
is None:
1831 raise ValueError(
"config.functorFile is None. "
1832 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1833 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1834 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1836 butlerQC.put(outputs, outputRefs)
1838 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1840 refColumns = list(self.config.referenceColumns)
1841 refColumns.append(self.config.keyRef)
1842 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1843 if ref.index.name != self.config.keyRef:
1849 ref.set_index(self.config.keyRef, inplace=
True)
1850 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1851 for handle
in inputCatalogs:
1852 result = self.transform(
None, handle, funcs, dataId)
1854 dfs.append(result.df.join(ref, how=
"inner"))
1856 outputCatalog = pd.concat(dfs)
1858 if outputCatalog.empty:
1859 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1863 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1865 outputCatalog.reset_index(inplace=
True)
1868 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1870 outputCatalog.index.rename(self.config.key, inplace=
True)
1872 self.log.info(
"Made a table of %d columns and %d rows",
1873 len(outputCatalog.columns), len(outputCatalog))
1874 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1877class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1878 defaultTemplates={
"catalogType":
""},
1879 dimensions=(
"instrument",
"tract")):
1880 inputCatalogs = connectionTypes.Input(
1881 doc=
"Input per-patch DataFrame Tables to be concatenated",
1882 name=
"{catalogType}ForcedSourceTable",
1883 storageClass=
"DataFrame",
1884 dimensions=(
"tract",
"patch",
"skymap"),
1888 outputCatalog = connectionTypes.Output(
1889 doc=
"Output per-tract concatenation of DataFrame Tables",
1890 name=
"{catalogType}ForcedSourceTable_tract",
1891 storageClass=
"DataFrame",
1892 dimensions=(
"tract",
"skymap"),
1896class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1897 pipelineConnections=ConsolidateTractConnections):
1901class ConsolidateTractTask(pipeBase.PipelineTask):
1902 """Concatenate any per-patch, dataframe list into a single
1903 per-tract DataFrame.
1905 _DefaultName =
"ConsolidateTract"
1906 ConfigClass = ConsolidateTractConfig
1908 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1909 inputs = butlerQC.get(inputRefs)
1912 self.log.info(
"Concatenating %s per-patch %s Tables",
1913 len(inputs[
"inputCatalogs"]),
1914 inputRefs.inputCatalogs[0].datasetType.name)
1915 df = pd.concat(inputs[
"inputCatalogs"])
1916 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)