22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
40from deprecated.sphinx
import deprecated
49from astro_metadata_translator.headers
import merge_headers
55from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
56from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
58from lsst.afw.image
import ExposureSummaryStats, ExposureF
59from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
60from lsst.obs.base.utils
import strip_provenance_from_fits_header
62from .coaddBase
import reorderRefs
63from .functors
import CompositeFunctor, Column
65log = logging.getLogger(__name__)
68def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
69 """Flattens a dataframe with multilevel column index.
71 newDf = pd.DataFrame()
73 dfBands = df.columns.unique(level=0).values
76 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
77 newColumns = {c: columnFormat.format(band, c)
78 for c
in subdf.columns
if c
not in noDupCols}
79 cols = list(newColumns.keys())
80 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
83 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
85 noDupDf = df[presentBands[0]][noDupCols]
86 newDf = pd.concat([noDupDf, newDf], axis=1)
91 """A helper class for stacking astropy tables without having them all in
97 Full size of the final table.
101 Unlike `astropy.table.vstack`, this class requires all tables to have the
102 exact same columns (it's slightly more strict than even the
103 ``join_type="exact"`` argument to `astropy.table.vstack`).
113 """Construct from an iterable of
114 `lsst.daf.butler.DeferredDatasetHandle`.
118 handles : `~collections.abc.Iterable` [ \
119 `lsst.daf.butler.DeferredDatasetHandle` ]
120 Iterable of handles. Must have a storage class that supports the
121 "rowcount" component, which is all that will be fetched.
125 vstack : `TableVStack`
126 An instance of this class, initialized with capacity equal to the
127 sum of the rowcounts of all the given table handles.
129 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
130 return cls(capacity=capacity)
133 """Add a single table to the stack.
137 table : `astropy.table.Table`
138 An astropy table instance.
141 self.
result = astropy.table.Table()
142 for name
in table.colnames:
144 column_cls = type(column)
145 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
146 self.
result[name][:len(table)] = column
147 self.
index = len(table)
148 self.
result.meta = table.meta.copy()
150 next_index = self.
index + len(table)
151 if set(self.
result.colnames) != set(table.colnames):
153 "Inconsistent columns in concatentation: "
154 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
156 for name
in table.colnames:
157 out_col = self.
result[name]
159 if out_col.dtype != in_col.dtype:
160 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
161 self.
result[name][self.
index:next_index] = table[name]
162 self.
index = next_index
166 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
167 strip_provenance_from_fits_header(self.
result.meta)
171 """Vertically stack tables represented by deferred dataset handles.
175 handles : `~collections.abc.Iterable` [ \
176 `lsst.daf.butler.DeferredDatasetHandle` ]
177 Iterable of handles. Must have the "ArrowAstropy" storage class
178 and identical columns.
182 table : `astropy.table.Table`
183 Concatenated table with the same columns as each input table and
184 the rows of all of them.
186 handles = tuple(handles)
188 rowcount = tuple(handle.get(component=
"rowcount")
for handle
in handles)
189 handles = tuple(handle
for handle, count
in zip(handles, rowcount)
if count > 0)
192 for handle
in handles:
193 vstack.extend(handle.get())
198 defaultTemplates={
"coaddName":
"deep"},
199 dimensions=(
"tract",
"patch",
"skymap")):
200 inputCatalogMeas = connectionTypes.Input(
201 doc=
"Catalog of source measurements on the deepCoadd.",
202 dimensions=(
"tract",
"patch",
"band",
"skymap"),
203 storageClass=
"SourceCatalog",
204 name=
"{coaddName}Coadd_meas",
207 inputCatalogForcedSrc = connectionTypes.Input(
208 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
209 dimensions=(
"tract",
"patch",
"band",
"skymap"),
210 storageClass=
"SourceCatalog",
211 name=
"{coaddName}Coadd_forced_src",
214 inputCatalogPsfsMultiprofit = connectionTypes.Input(
215 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
216 dimensions=(
"tract",
"patch",
"band",
"skymap"),
217 storageClass=
"ArrowAstropy",
218 name=
"{coaddName}Coadd_psfs_multiprofit",
221 outputCatalog = connectionTypes.Output(
222 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
223 "stored as a DataFrame with a multi-level column index per-patch.",
224 dimensions=(
"tract",
"patch",
"skymap"),
225 storageClass=
"DataFrame",
226 name=
"{coaddName}Coadd_obj"
230class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
231 pipelineConnections=WriteObjectTableConnections):
232 coaddName = pexConfig.Field(
239class WriteObjectTableTask(pipeBase.PipelineTask):
240 """Write filter-merged object tables as a DataFrame in parquet format.
242 _DefaultName =
"writeObjectTable"
243 ConfigClass = WriteObjectTableConfig
246 outputDataset =
"obj"
248 def runQuantum(self, butlerQC, inputRefs, outputRefs):
249 inputs = butlerQC.get(inputRefs)
251 catalogs = defaultdict(dict)
252 for dataset, connection
in (
253 (
"meas",
"inputCatalogMeas"),
254 (
"forced_src",
"inputCatalogForcedSrc"),
255 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
257 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
258 catalogs[ref.dataId[
"band"]][dataset] = cat
260 dataId = butlerQC.quantum.dataId
261 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
262 outputs = pipeBase.Struct(outputCatalog=df)
263 butlerQC.put(outputs, outputRefs)
265 def run(self, catalogs, tract, patch):
266 """Merge multiple catalogs.
271 Mapping from filter names to dict of catalogs.
273 tractId to use for the tractId column.
275 patchId to use for the patchId column.
279 catalog : `pandas.DataFrame`
285 Raised if any of the catalogs is of an unsupported type.
288 for filt, tableDict
in catalogs.items():
289 for dataset, table
in tableDict.items():
291 if isinstance(table, pd.DataFrame):
294 df = table.asAstropy().to_pandas()
295 elif isinstance(table, astropy.table.Table):
296 df = table.to_pandas()
298 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
299 df.set_index(
"id", drop=
True, inplace=
True)
302 df = df.reindex(sorted(df.columns), axis=1)
303 df = df.assign(tractId=tract, patchId=patch)
306 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
307 names=(
"dataset",
"band",
"column"))
312 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
316class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
317 defaultTemplates={
"catalogType":
""},
318 dimensions=(
"instrument",
"visit",
"detector")):
320 catalog = connectionTypes.Input(
321 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
322 name=
"{catalogType}src",
323 storageClass=
"SourceCatalog",
324 dimensions=(
"instrument",
"visit",
"detector")
326 outputCatalog = connectionTypes.Output(
327 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
328 name=
"{catalogType}source",
329 storageClass=
"ArrowAstropy",
330 dimensions=(
"instrument",
"visit",
"detector")
334class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
335 pipelineConnections=WriteSourceTableConnections):
339class WriteSourceTableTask(pipeBase.PipelineTask):
340 """Write source table to DataFrame Parquet format.
342 _DefaultName =
"writeSourceTable"
343 ConfigClass = WriteSourceTableConfig
345 def runQuantum(self, butlerQC, inputRefs, outputRefs):
346 inputs = butlerQC.get(inputRefs)
347 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
348 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
349 result = self.run(**inputs)
350 outputs = pipeBase.Struct(outputCatalog=result.table)
351 butlerQC.put(outputs, outputRefs)
353 def run(self, catalog, visit, detector, **kwargs):
354 """Convert `src` catalog to an Astropy table.
358 catalog: `afwTable.SourceCatalog`
359 catalog to be converted
360 visit, detector: `int`
361 Visit and detector ids to be added as columns.
363 Additional keyword arguments are ignored as a convenience for
364 subclasses that pass the same arguments to several different
369 result : `~lsst.pipe.base.Struct`
371 `astropy.table.Table` version of the input catalog
373 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
374 tbl = catalog.asAstropy()
377 tbl[
"detector"] = np.int16(detector)
379 return pipeBase.Struct(table=tbl)
382class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
383 defaultTemplates={
"catalogType":
""},
384 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
385 visitSummary = connectionTypes.Input(
386 doc=
"Input visit-summary catalog with updated calibration objects.",
387 name=
"finalVisitSummary",
388 storageClass=
"ExposureCatalog",
389 dimensions=(
"instrument",
"visit",),
392 def __init__(self, config):
400 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
403class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
404 pipelineConnections=WriteRecalibratedSourceTableConnections):
406 doReevaluatePhotoCalib = pexConfig.Field(
409 doc=(
"Add or replace local photoCalib columns"),
411 doReevaluateSkyWcs = pexConfig.Field(
414 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
418class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
419 """Write source table to DataFrame Parquet format.
421 _DefaultName =
"writeRecalibratedSourceTable"
422 ConfigClass = WriteRecalibratedSourceTableConfig
424 def runQuantum(self, butlerQC, inputRefs, outputRefs):
425 inputs = butlerQC.get(inputRefs)
427 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
428 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
430 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
431 exposure = ExposureF()
432 inputs[
"exposure"] = self.prepareCalibratedExposure(
434 visitSummary=inputs[
"visitSummary"],
435 detectorId=butlerQC.quantum.dataId[
"detector"]
437 inputs[
"catalog"] = self.addCalibColumns(**inputs)
439 result = self.run(**inputs)
440 outputs = pipeBase.Struct(outputCatalog=result.table)
441 butlerQC.put(outputs, outputRefs)
443 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
444 """Prepare a calibrated exposure and apply external calibrations
449 exposure : `lsst.afw.image.exposure.Exposure`
450 Input exposure to adjust calibrations. May be an empty Exposure.
452 Detector ID associated with the exposure.
453 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
454 Exposure catalog with all calibration objects. WCS and PhotoCalib
455 are always applied if ``visitSummary`` is provided and those
456 components are not `None`.
460 exposure : `lsst.afw.image.exposure.Exposure`
461 Exposure with adjusted calibrations.
463 if visitSummary
is not None:
464 row = visitSummary.find(detectorId)
466 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
467 if (photoCalib := row.getPhotoCalib())
is None:
468 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
469 "skipping reevaluation of photoCalib.", detectorId)
470 exposure.setPhotoCalib(
None)
472 exposure.setPhotoCalib(photoCalib)
473 if (skyWcs := row.getWcs())
is None:
474 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
475 "skipping reevaluation of skyWcs.", detectorId)
476 exposure.setWcs(
None)
478 exposure.setWcs(skyWcs)
482 def addCalibColumns(self, catalog, exposure, **kwargs):
483 """Add replace columns with calibs evaluated at each centroid
485 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
486 a source catalog, by rerunning the plugins.
490 catalog : `lsst.afw.table.SourceCatalog`
491 catalog to which calib columns will be added
492 exposure : `lsst.afw.image.exposure.Exposure`
493 Exposure with attached PhotoCalibs and SkyWcs attributes to be
494 reevaluated at local centroids. Pixels are not required.
496 Additional keyword arguments are ignored to facilitate passing the
497 same arguments to several methods.
501 newCat: `lsst.afw.table.SourceCatalog`
502 Source Catalog with requested local calib columns
504 measureConfig = SingleFrameMeasurementTask.ConfigClass()
505 measureConfig.doReplaceWithNoise =
False
508 for slot
in measureConfig.slots:
509 setattr(measureConfig.slots, slot,
None)
511 measureConfig.plugins.names = []
512 if self.config.doReevaluateSkyWcs:
513 measureConfig.plugins.names.add(
"base_LocalWcs")
514 self.log.info(
"Re-evaluating base_LocalWcs plugin")
515 if self.config.doReevaluatePhotoCalib:
516 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
517 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
518 pluginsNotToCopy = tuple(measureConfig.plugins.names)
522 aliasMap = catalog.schema.getAliasMap()
524 for item
in catalog.schema:
525 if not item.field.getName().startswith(pluginsNotToCopy):
526 mapper.addMapping(item.key)
528 schema = mapper.getOutputSchema()
530 schema.setAliasMap(aliasMap)
532 newCat.extend(catalog, mapper=mapper)
538 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
540 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
544 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
545 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
550 if wcsPlugin
is not None:
551 wcsPlugin.measure(row, exposure)
552 if pcPlugin
is not None:
553 pcPlugin.measure(row, exposure)
558class PostprocessAnalysis(object):
559 """Calculate columns from DataFrames or handles storing DataFrames.
561 This object manages and organizes an arbitrary set of computations
562 on a catalog. The catalog is defined by a
563 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
564 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
565 computations are defined by a collection of
566 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
567 ``CompositeFunctor``).
569 After the object is initialized, accessing the ``.df`` attribute (which
570 holds the `pandas.DataFrame` containing the results of the calculations)
571 triggers computation of said dataframe.
573 One of the conveniences of using this object is the ability to define a
574 desired common filter for all functors. This enables the same functor
575 collection to be passed to several different `PostprocessAnalysis` objects
576 without having to change the original functor collection, since the ``filt``
577 keyword argument of this object triggers an overwrite of the ``filt``
578 property for all functors in the collection.
580 This object also allows a list of refFlags to be passed, and defines a set
581 of default refFlags that are always included even if not requested.
583 If a list of DataFrames or Handles is passed, rather than a single one,
584 then the calculations will be mapped over all the input catalogs. In
585 principle, it should be straightforward to parallelize this activity, but
586 initial tests have failed (see TODO in code comments).
590 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
591 `~lsst.pipe.base.InMemoryDatasetHandle` or
593 Source catalog(s) for computation.
594 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
595 Computations to do (functors that act on ``handles``).
596 If a dict, the output
597 DataFrame will have columns keyed accordingly.
598 If a list, the column keys will come from the
599 ``.shortname`` attribute of each functor.
601 filt : `str`, optional
602 Filter in which to calculate. If provided,
603 this will overwrite any existing ``.filt`` attribute
604 of the provided functors.
606 flags : `list`, optional
607 List of flags (per-band) to include in output table.
608 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
610 refFlags : `list`, optional
611 List of refFlags (only reference band) to include in output table.
613 forcedFlags : `list`, optional
614 List of flags (per-band) to include in output table.
615 Taken from the ``forced_src`` dataset if applied to a
616 multilevel Object Table. Intended for flags from measurement plugins
617 only run during multi-band forced-photometry.
619 _defaultRefFlags = []
622 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
623 self.handles = handles
624 self.functors = functors
627 self.flags = list(flags)
if flags
is not None else []
628 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
629 self.refFlags = list(self._defaultRefFlags)
630 if refFlags
is not None:
631 self.refFlags += list(refFlags)
636 def defaultFuncs(self):
637 funcs = dict(self._defaultFuncs)
642 additionalFuncs = self.defaultFuncs
643 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
644 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
645 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
647 if isinstance(self.functors, CompositeFunctor):
652 func.funcDict.update(additionalFuncs)
653 func.filt = self.filt
659 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
667 def compute(self, dropna=False, pool=None):
669 if type(self.handles)
in (list, tuple):
671 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
675 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
676 self._df = pd.concat(dflist)
678 self._df = self.func(self.handles, dropna=dropna)
683class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
685 """Expected Connections for subclasses of TransformCatalogBaseTask.
689 inputCatalog = connectionTypes.Input(
691 storageClass=
"DataFrame",
693 outputCatalog = connectionTypes.Output(
695 storageClass=
"ArrowAstropy",
699class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
700 pipelineConnections=TransformCatalogBaseConnections):
701 functorFile = pexConfig.Field(
703 doc=
"Path to YAML file specifying Science Data Model functors to use "
704 "when copying columns and computing calibrated values.",
708 primaryKey = pexConfig.Field(
710 doc=
"Name of column to be set as the DataFrame index. If None, the index"
711 "will be named `id`",
715 columnsFromDataId = pexConfig.ListField(
719 doc=
"Columns to extract from the dataId",
723class TransformCatalogBaseTask(pipeBase.PipelineTask):
724 """Base class for transforming/standardizing a catalog by applying functors
725 that convert units and apply calibrations.
727 The purpose of this task is to perform a set of computations on an input
728 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
729 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
730 a new dataset (which needs to be declared in an ``outputDataset``
733 The calculations to be performed are defined in a YAML file that specifies
734 a set of functors to be computed, provided as a ``--functorFile`` config
735 parameter. An example of such a YAML file is the following:
742 args: slot_Centroid_x
745 args: slot_Centroid_y
747 functor: LocalNanojansky
749 - slot_PsfFlux_instFlux
750 - slot_PsfFlux_instFluxErr
751 - base_LocalPhotoCalib
752 - base_LocalPhotoCalibErr
754 functor: LocalNanojanskyErr
756 - slot_PsfFlux_instFlux
757 - slot_PsfFlux_instFluxErr
758 - base_LocalPhotoCalib
759 - base_LocalPhotoCalibErr
763 The names for each entry under "func" will become the names of columns in
764 the output dataset. All the functors referenced are defined in
765 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
766 functor are in the `args` list, and any additional entries for each column
767 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
768 treated as keyword arguments to be passed to the functor initialization.
770 The "flags" entry is the default shortcut for `Column` functors.
771 All columns listed under "flags" will be copied to the output table
772 untransformed. They can be of any datatype.
773 In the special case of transforming a multi-level oject table with
774 band and dataset indices (deepCoadd_obj), these will be taked from the
775 ``meas`` dataset and exploded out per band.
777 There are two special shortcuts that only apply when transforming
778 multi-level Object (deepCoadd_obj) tables:
779 - The "refFlags" entry is shortcut for `Column` functor
780 taken from the ``ref`` dataset if transforming an ObjectTable.
781 - The "forcedFlags" entry is shortcut for `Column` functors.
782 taken from the ``forced_src`` dataset if transforming an ObjectTable.
783 These are expanded out per band.
786 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
787 to organize and excecute the calculations.
790 def _DefaultName(self):
791 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
794 def outputDataset(self):
795 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
798 def inputDataset(self):
799 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
802 def ConfigClass(self):
803 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
805 def __init__(self, *args, **kwargs):
806 super().__init__(*args, **kwargs)
807 if self.config.functorFile:
808 self.log.info(
"Loading tranform functor definitions from %s",
809 self.config.functorFile)
810 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
811 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
815 def runQuantum(self, butlerQC, inputRefs, outputRefs):
816 inputs = butlerQC.get(inputRefs)
817 if self.
funcs is None:
818 raise ValueError(
"config.functorFile is None. "
819 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
820 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
821 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
822 butlerQC.put(result, outputRefs)
824 def run(self, handle, funcs=None, dataId=None, band=None):
825 """Do postprocessing calculations
827 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
828 ``DataFrame`` object and dataId,
829 returns a dataframe with results of postprocessing calculations.
833 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
834 `~lsst.pipe.base.InMemoryDatasetHandle` or
835 `~pandas.DataFrame`, or list of these.
836 DataFrames from which calculations are done.
837 funcs : `~lsst.pipe.tasks.functors.Functor`
838 Functors to apply to the table's columns
839 dataId : dict, optional
840 Used to add a `patchId` column to the output dataframe.
841 band : `str`, optional
842 Filter band that is being processed.
846 result : `lsst.pipe.base.Struct`
847 Result struct, with a single ``outputCatalog`` attribute holding
848 the transformed catalog.
850 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
852 df = self.transform(band, handle, funcs, dataId).df
853 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
856 raise UpstreamFailureNoWorkFound(
857 "Input catalog is empty, so there is nothing to transform/standardize",
860 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
863 def getFunctors(self):
866 def getAnalysis(self, handles, funcs=None, band=None):
869 analysis = PostprocessAnalysis(handles, funcs, filt=band)
872 def transform(self, band, handles, funcs, dataId):
873 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
875 if dataId
and self.config.columnsFromDataId:
876 for key
in self.config.columnsFromDataId:
878 if key ==
"detector":
880 df[key] = np.int16(dataId[key])
882 df[key] = dataId[key]
884 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
886 if self.config.primaryKey:
887 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
888 df.reset_index(inplace=
True, drop=
True)
889 df.set_index(self.config.primaryKey, inplace=
True)
891 return pipeBase.Struct(
898 defaultTemplates={
"coaddName":
"deep"},
899 dimensions=(
"tract",
"patch",
"skymap")):
900 inputCatalog = connectionTypes.Input(
901 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
902 "stored as a DataFrame with a multi-level column index per-patch.",
903 dimensions=(
"tract",
"patch",
"skymap"),
904 storageClass=
"DataFrame",
905 name=
"{coaddName}Coadd_obj",
908 inputCatalogRef = connectionTypes.Input(
909 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
910 "for each detection in deepCoadd_mergeDet.",
911 dimensions=(
"tract",
"patch",
"skymap"),
912 storageClass=
"SourceCatalog",
913 name=
"{coaddName}Coadd_ref",
916 inputCatalogSersicMultiprofit = connectionTypes.Input(
917 doc=
"Catalog of source measurements on the deepCoadd.",
918 dimensions=(
"tract",
"patch",
"skymap"),
919 storageClass=
"ArrowAstropy",
920 name=
"{coaddName}Coadd_Sersic_multiprofit",
923 inputCatalogEpoch = connectionTypes.Input(
924 doc=
"Catalog of mean epochs for each object per band.",
925 dimensions=(
"tract",
"patch",
"skymap"),
926 storageClass=
"ArrowAstropy",
930 outputCatalog = connectionTypes.Output(
931 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
933 dimensions=(
"tract",
"patch",
"skymap"),
934 storageClass=
"ArrowAstropy",
938 def __init__(self, *, config=None):
939 super().__init__(config=config)
940 if config.multilevelOutput:
941 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
944class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
945 pipelineConnections=TransformObjectCatalogConnections):
946 coaddName = pexConfig.Field(
951 outputBands = pexConfig.ListField(
955 doc=(
"These bands and only these bands will appear in the output,"
956 " NaN-filled if the input does not include them."
957 " If None, then use all bands found in the input.")
959 camelCase = pexConfig.Field(
962 doc=(
"Write per-band columns names with camelCase, else underscore "
963 "For example: gPsFlux instead of g_PsFlux.")
965 multilevelOutput = pexConfig.Field(
968 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
969 "and name-munged (False). If True, the output storage class will be "
970 "set to DataFrame, since astropy tables do not support multi-level indexing."),
971 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
973 goodFlags = pexConfig.ListField(
976 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
977 "All other flags are considered to be 'bad' flags and will be set to True.")
979 floatFillValue = pexConfig.Field(
982 doc=
"Fill value for float fields when populating empty tables."
984 integerFillValue = pexConfig.Field(
987 doc=
"Fill value for integer fields when populating empty tables."
990 def setDefaults(self):
991 super().setDefaults()
992 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
993 self.primaryKey =
"objectId"
994 self.columnsFromDataId = [
"tract",
"patch"]
995 self.goodFlags = [
"calib_astrometry_used",
996 "calib_photometry_reserved",
997 "calib_photometry_used",
998 "calib_psf_candidate",
999 "calib_psf_reserved",
1003class TransformObjectCatalogTask(TransformCatalogBaseTask):
1004 """Produce a flattened Object Table to match the format specified in
1007 Do the same set of postprocessing calculations on all bands.
1009 This is identical to `TransformCatalogBaseTask`, except for that it does
1010 the specified functor calculations for all filters present in the
1011 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1012 by the YAML file will be superceded.
1014 _DefaultName =
"transformObjectCatalog"
1015 ConfigClass = TransformObjectCatalogConfig
1017 datasets_multiband = (
"epoch",
"ref",
"Sersic_multiprofit")
1019 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1020 inputs = butlerQC.get(inputRefs)
1021 if self.funcs
is None:
1022 raise ValueError(
"config.functorFile is None. "
1023 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1024 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1025 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1026 handle_epoch=inputs[
"inputCatalogEpoch"],
1027 handle_ref=inputs[
"inputCatalogRef"],
1028 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1030 butlerQC.put(result, outputRefs)
1032 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1036 if isinstance(funcs, CompositeFunctor):
1037 funcDict_in = funcs.funcDict
1038 elif isinstance(funcs, dict):
1040 elif isinstance(funcs, list):
1041 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1043 raise TypeError(f
"Unsupported {type(funcs)=}")
1046 funcDicts_multiband = {}
1047 for dataset
in self.datasets_multiband:
1048 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1049 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1050 handles_multi[dataset] = handle_multi
1051 funcDicts_multiband[dataset] = {}
1055 templateDf = pd.DataFrame()
1057 columns = handle.get(component=
"columns")
1058 inputBands = columns.unique(level=1).values
1060 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1065 for name, func
in funcDict_in.items():
1066 if func.dataset
in funcDicts_multiband:
1068 if band := getattr(func,
"band_to_check",
None):
1069 if band
not in outputBands:
1072 elif hasattr(func,
"bands"):
1077 func.bands = tuple(inputBands)
1079 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1080 funcDict[name] = func
1085 for inputBand
in inputBands:
1086 if inputBand
not in outputBands:
1087 self.log.info(
"Ignoring %s band data in the input", inputBand)
1089 self.log.info(
"Transforming the catalog of band %s", inputBand)
1090 result = self.transform(inputBand, handle, funcs_band, dataId)
1091 dfDict[inputBand] = result.df
1092 analysisDict[inputBand] = result.analysis
1093 if templateDf.empty:
1094 templateDf = result.df
1097 for filt
in outputBands:
1098 if filt
not in dfDict:
1099 self.log.info(
"Adding empty columns for band %s", filt)
1100 dfTemp = templateDf.copy()
1101 for col
in dfTemp.columns:
1102 testValue = dfTemp[col].values[0]
1103 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1105 if col
in self.config.goodFlags:
1109 elif isinstance(testValue, numbers.Integral):
1113 if isinstance(testValue, np.unsignedinteger):
1114 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1116 fillValue = self.config.integerFillValue
1118 fillValue = self.config.floatFillValue
1119 dfTemp[col].values[:] = fillValue
1120 dfDict[filt] = dfTemp
1123 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1124 name_index = df.index.name
1127 if not self.config.multilevelOutput:
1128 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1129 if self.config.primaryKey
in noDupCols:
1130 noDupCols.remove(self.config.primaryKey)
1131 if dataId
and self.config.columnsFromDataId:
1132 noDupCols += self.config.columnsFromDataId
1133 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1134 inputBands=inputBands)
1137 for dataset, funcDict
in funcDicts_multiband.items():
1138 handle_multiband = handles_multi[dataset]
1139 df_dataset = handle_multiband.get()
1140 if isinstance(df_dataset, astropy.table.Table):
1142 if name_index
not in df_dataset.colnames:
1143 if self.config.primaryKey
in df_dataset.colnames:
1144 name_index_ap = self.config.primaryKey
1147 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1148 f
" {df_dataset.colnames=} for {dataset=}"
1151 name_index_ap = name_index
1152 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1154 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1157 result = self.transform(
1159 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1163 result.df.index.name = name_index
1165 if self.config.columnsFromDataId:
1166 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1168 result.df.drop(columns_drop, axis=1, inplace=
True)
1172 to_concat = pd.concat(
1173 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1174 )
if self.config.multilevelOutput
else result.df
1175 df = pd.concat([df, to_concat], axis=1)
1176 analysisDict[dataset] = result.analysis
1179 df.index.name = self.config.primaryKey
1181 if not self.config.multilevelOutput:
1182 tbl = pandas_to_astropy(df)
1186 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1188 return pipeBase.Struct(outputCatalog=tbl)
1191class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1192 dimensions=(
"tract",
"skymap")):
1193 inputCatalogs = connectionTypes.Input(
1194 doc=
"Per-Patch objectTables conforming to the standard data model.",
1196 storageClass=
"ArrowAstropy",
1197 dimensions=(
"tract",
"patch",
"skymap"),
1201 outputCatalog = connectionTypes.Output(
1202 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1203 name=
"objectTable_tract",
1204 storageClass=
"ArrowAstropy",
1205 dimensions=(
"tract",
"skymap"),
1209class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1210 pipelineConnections=ConsolidateObjectTableConnections):
1211 coaddName = pexConfig.Field(
1218class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1219 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1221 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1223 _DefaultName =
"consolidateObjectTable"
1224 ConfigClass = ConsolidateObjectTableConfig
1226 inputDataset =
"objectTable"
1227 outputDataset =
"objectTable_tract"
1229 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1230 inputs = butlerQC.get(inputRefs)
1231 self.log.info(
"Concatenating %s per-patch Object Tables",
1232 len(inputs[
"inputCatalogs"]))
1233 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1234 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1237class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1238 defaultTemplates={
"catalogType":
""},
1239 dimensions=(
"instrument",
"visit",
"detector")):
1241 inputCatalog = connectionTypes.Input(
1242 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1243 name=
"{catalogType}source",
1244 storageClass=
"DataFrame",
1245 dimensions=(
"instrument",
"visit",
"detector"),
1248 outputCatalog = connectionTypes.Output(
1249 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1250 "specified set of functors",
1251 name=
"{catalogType}sourceTable",
1252 storageClass=
"ArrowAstropy",
1253 dimensions=(
"instrument",
"visit",
"detector")
1257class TransformSourceTableConfig(TransformCatalogBaseConfig,
1258 pipelineConnections=TransformSourceTableConnections):
1260 def setDefaults(self):
1261 super().setDefaults()
1262 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1263 self.primaryKey =
"sourceId"
1264 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1267class TransformSourceTableTask(TransformCatalogBaseTask):
1268 """Transform/standardize a source catalog
1270 _DefaultName =
"transformSourceTable"
1271 ConfigClass = TransformSourceTableConfig
1274class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1275 dimensions=(
"instrument",
"visit",),
1276 defaultTemplates={
"calexpType":
""}):
1277 calexp = connectionTypes.Input(
1278 doc=
"Processed exposures used for metadata",
1280 storageClass=
"ExposureF",
1281 dimensions=(
"instrument",
"visit",
"detector"),
1285 visitSummary = connectionTypes.Output(
1286 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1287 "detector id for the id and are sorted for fast lookups of a "
1289 name=
"visitSummary",
1290 storageClass=
"ExposureCatalog",
1291 dimensions=(
"instrument",
"visit"),
1293 visitSummarySchema = connectionTypes.InitOutput(
1294 doc=
"Schema of the visitSummary catalog",
1295 name=
"visitSummary_schema",
1296 storageClass=
"ExposureCatalog",
1300class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1301 pipelineConnections=ConsolidateVisitSummaryConnections):
1302 """Config for ConsolidateVisitSummaryTask"""
1304 full = pexConfig.Field(
1305 "Whether to propate all exposure components. "
1306 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1307 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1308 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1314class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1315 """Task to consolidate per-detector visit metadata.
1317 This task aggregates the following metadata from all the detectors in a
1318 single visit into an exposure catalog:
1322 - The physical_filter and band (if available).
1324 - The aperture correction map.
1325 - The transmission curve.
1326 - The psf size, shape, and effective area at the center of the detector.
1327 - The corners of the bounding box in right ascension/declination.
1329 Tests for this task are performed in ci_hsc_gen3.
1331 _DefaultName =
"consolidateVisitSummary"
1332 ConfigClass = ConsolidateVisitSummaryConfig
1334 def __init__(self, **kwargs):
1335 super().__init__(**kwargs)
1336 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1337 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1338 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1339 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1340 ExposureSummaryStats.update_schema(self.schema)
1343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1344 dataRefs = butlerQC.get(inputRefs.calexp)
1345 visit = dataRefs[0].dataId[
"visit"]
1347 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1348 len(dataRefs), visit)
1350 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1352 butlerQC.put(expCatalog, outputRefs.visitSummary)
1354 def _combineExposureMetadata(self, visit, dataRefs):
1355 """Make a combined exposure catalog from a list of dataRefs.
1356 These dataRefs must point to exposures with wcs, summaryStats,
1357 and other visit metadata.
1362 Visit identification number.
1363 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1364 List of dataRefs in visit.
1368 visitSummary : `lsst.afw.table.ExposureCatalog`
1369 Exposure catalog with per-detector summary information.
1372 cat.resize(len(dataRefs))
1374 cat[
"visit"] = visit
1376 for i, dataRef
in enumerate(dataRefs):
1377 visitInfo = dataRef.get(component=
"visitInfo")
1378 filterLabel = dataRef.get(component=
"filter")
1379 summaryStats = dataRef.get(component=
"summaryStats")
1380 detector = dataRef.get(component=
"detector")
1381 wcs = dataRef.get(component=
"wcs")
1382 photoCalib = dataRef.get(component=
"photoCalib")
1383 bbox = dataRef.get(component=
"bbox")
1384 validPolygon = dataRef.get(component=
"validPolygon")
1388 rec.setVisitInfo(visitInfo)
1390 rec.setPhotoCalib(photoCalib)
1391 rec.setValidPolygon(validPolygon)
1393 if self.config.full:
1394 rec.setPsf(dataRef.get(component=
"psf"))
1395 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1396 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1398 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1399 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1400 rec.setId(detector.getId())
1401 summaryStats.update_record(rec)
1404 raise pipeBase.NoWorkFound(
1405 "No detectors had sufficient information to make a visit summary row."
1409 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1411 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1412 cat.setMetadata(metadata)
1418class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1419 defaultTemplates={
"catalogType":
""},
1420 dimensions=(
"instrument",
"visit")):
1421 inputCatalogs = connectionTypes.Input(
1422 doc=
"Input per-detector Source Tables",
1423 name=
"{catalogType}sourceTable",
1424 storageClass=
"ArrowAstropy",
1425 dimensions=(
"instrument",
"visit",
"detector"),
1429 outputCatalog = connectionTypes.Output(
1430 doc=
"Per-visit concatenation of Source Table",
1431 name=
"{catalogType}sourceTable_visit",
1432 storageClass=
"ArrowAstropy",
1433 dimensions=(
"instrument",
"visit")
1437class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1438 pipelineConnections=ConsolidateSourceTableConnections):
1442class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1443 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1445 _DefaultName =
"consolidateSourceTable"
1446 ConfigClass = ConsolidateSourceTableConfig
1448 inputDataset =
"sourceTable"
1449 outputDataset =
"sourceTable_visit"
1451 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1453 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1454 detectorOrder.sort()
1455 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1456 inputs = butlerQC.get(inputRefs)
1457 self.log.info(
"Concatenating %s per-detector Source Tables",
1458 len(inputs[
"inputCatalogs"]))
1459 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1460 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1463class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1464 dimensions=(
"instrument",),
1465 defaultTemplates={
"calexpType":
""}):
1466 visitSummaryRefs = connectionTypes.Input(
1467 doc=
"Data references for per-visit consolidated exposure metadata",
1468 name=
"finalVisitSummary",
1469 storageClass=
"ExposureCatalog",
1470 dimensions=(
"instrument",
"visit"),
1474 outputCatalog = connectionTypes.Output(
1475 doc=
"CCD and Visit metadata table",
1476 name=
"ccdVisitTable",
1477 storageClass=
"ArrowAstropy",
1478 dimensions=(
"instrument",)
1482class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1483 pipelineConnections=MakeCcdVisitTableConnections):
1484 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1487class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1488 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1490 _DefaultName =
"makeCcdVisitTable"
1491 ConfigClass = MakeCcdVisitTableConfig
1493 def run(self, visitSummaryRefs):
1494 """Make a table of ccd information from the visit summary catalogs.
1498 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1499 List of DeferredDatasetHandles pointing to exposure catalogs with
1500 per-detector summary information.
1504 result : `~lsst.pipe.base.Struct`
1505 Results struct with attribute:
1508 Catalog of ccd and visit information.
1511 for visitSummaryRef
in visitSummaryRefs:
1512 visitSummary = visitSummaryRef.get()
1513 if not visitSummary:
1515 visitInfo = visitSummary[0].getVisitInfo()
1518 strip_provenance_from_fits_header(visitSummary.metadata)
1521 summaryTable = visitSummary.asAstropy()
1522 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1523 "pixelScale",
"zenithDistance",
1524 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1525 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1526 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1527 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1528 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1529 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1530 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1531 "maxDistToNearestPsf",
1532 "effTime",
"effTimePsfSigmaScale",
1533 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1535 ccdEntry = summaryTable[selectColumns]
1540 ccdEntry.rename_column(
"visit",
"visitId")
1541 ccdEntry.rename_column(
"id",
"detectorId")
1545 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1547 ccdEntry[
"ccdVisitId"] = [
1548 self.config.idGenerator.apply(
1549 visitSummaryRef.dataId,
1550 detector=detector_id,
1557 for detector_id
in summaryTable[
"id"]
1559 ccdEntry[
"detector"] = summaryTable[
"id"]
1560 ccdEntry[
"seeing"] = (
1561 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1563 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1564 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1565 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1566 expTime = visitInfo.getExposureTime()
1567 ccdEntry[
"obsStart"] = (
1568 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1570 expTime_days = expTime / (60*60*24)
1571 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1572 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1573 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1574 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1575 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1576 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1577 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1578 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1579 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1580 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1581 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1582 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1586 ccdEntries.append(ccdEntry)
1588 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1589 return pipeBase.Struct(outputCatalog=outputCatalog)
1592class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1593 dimensions=(
"instrument",),
1594 defaultTemplates={
"calexpType":
""}):
1595 visitSummaries = connectionTypes.Input(
1596 doc=
"Per-visit consolidated exposure metadata",
1597 name=
"finalVisitSummary",
1598 storageClass=
"ExposureCatalog",
1599 dimensions=(
"instrument",
"visit",),
1603 outputCatalog = connectionTypes.Output(
1604 doc=
"Visit metadata table",
1606 storageClass=
"ArrowAstropy",
1607 dimensions=(
"instrument",)
1611class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1612 pipelineConnections=MakeVisitTableConnections):
1616class MakeVisitTableTask(pipeBase.PipelineTask):
1617 """Produce a `visitTable` from the visit summary exposure catalogs.
1619 _DefaultName =
"makeVisitTable"
1620 ConfigClass = MakeVisitTableConfig
1622 def run(self, visitSummaries):
1623 """Make a table of visit information from the visit summary catalogs.
1627 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1628 List of exposure catalogs with per-detector summary information.
1631 result : `~lsst.pipe.base.Struct`
1632 Results struct with attribute:
1635 Catalog of visit information.
1638 for visitSummary
in visitSummaries:
1639 visitSummary = visitSummary.get()
1640 if not visitSummary:
1642 visitRow = visitSummary[0]
1643 visitInfo = visitRow.getVisitInfo()
1646 visitEntry[
"visitId"] = visitRow[
"visit"]
1647 visitEntry[
"visit"] = visitRow[
"visit"]
1648 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1649 visitEntry[
"band"] = visitRow[
"band"]
1650 raDec = visitInfo.getBoresightRaDec()
1651 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1652 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1656 visitEntry[
"decl"] = visitEntry[
"dec"]
1658 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1659 azAlt = visitInfo.getBoresightAzAlt()
1660 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1661 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1662 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1663 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1664 expTime = visitInfo.getExposureTime()
1665 visitEntry[
"expTime"] = expTime
1666 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1667 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1668 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1669 expTime_days = expTime / (60*60*24)
1670 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1671 visitEntries.append(visitEntry)
1677 outputCatalog = astropy.table.Table(rows=visitEntries)
1678 return pipeBase.Struct(outputCatalog=outputCatalog)
1681@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1682 "This task will be removed after v30.",
1683 version=
"v29.0", category=FutureWarning)
1684class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1685 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1687 inputCatalog = connectionTypes.Input(
1688 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1689 "By default, it is the output of ForcedPhotCcdTask on calexps",
1691 storageClass=
"SourceCatalog",
1692 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1694 inputCatalogDiff = connectionTypes.Input(
1695 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1696 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1698 storageClass=
"SourceCatalog",
1699 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1701 outputCatalog = connectionTypes.Output(
1702 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1703 name=
"mergedForcedSource",
1704 storageClass=
"DataFrame",
1705 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1709@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1710 "This task will be removed after v30.",
1711 version=
"v29.0", category=FutureWarning)
1712class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1713 pipelineConnections=WriteForcedSourceTableConnections):
1715 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1721@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1722 "This task will be removed after v30.",
1723 version=
"v29.0", category=FutureWarning)
1724class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1725 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1727 Because the predecessor ForcedPhotCcdTask operates per-detector,
1728 per-tract, (i.e., it has tract in its dimensions), detectors
1729 on the tract boundary may have multiple forced source catalogs.
1731 The successor task TransformForcedSourceTable runs per-patch
1732 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1733 available multiple epochs.
1735 _DefaultName =
"writeForcedSourceTable"
1736 ConfigClass = WriteForcedSourceTableConfig
1738 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1739 inputs = butlerQC.get(inputRefs)
1740 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1741 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1742 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1743 outputs = self.run(**inputs)
1744 butlerQC.put(outputs, outputRefs)
1746 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1748 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1749 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1750 df = df.reindex(sorted(df.columns), axis=1)
1753 df[
"detector"] = np.int16(detector)
1754 df[
"band"] = band
if band
else pd.NA
1755 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1756 names=(
"dataset",
"column"))
1760 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1761 return pipeBase.Struct(outputCatalog=outputCatalog)
1764class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1765 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1767 inputCatalogs = connectionTypes.Input(
1768 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1769 name=
"mergedForcedSource",
1770 storageClass=
"DataFrame",
1771 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1775 referenceCatalog = connectionTypes.Input(
1776 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1777 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1780 storageClass=
"DataFrame",
1781 dimensions=(
"tract",
"patch",
"skymap"),
1784 outputCatalog = connectionTypes.Output(
1785 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1786 "specified set of functors",
1787 name=
"forcedSourceTable",
1788 storageClass=
"ArrowAstropy",
1789 dimensions=(
"tract",
"patch",
"skymap")
1793class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1794 pipelineConnections=TransformForcedSourceTableConnections):
1795 referenceColumns = pexConfig.ListField(
1797 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1799 doc=
"Columns to pull from reference catalog",
1802 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1807 doc=
"Rename the output DataFrame index to this name",
1809 default=
"forcedSourceId",
1812 def setDefaults(self):
1813 super().setDefaults()
1814 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1815 self.columnsFromDataId = [
"tract",
"patch"]
1818class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1819 """Transform/standardize a ForcedSource catalog
1821 Transforms each wide, per-detector forcedSource DataFrame per the
1822 specification file (per-camera defaults found in ForcedSource.yaml).
1823 All epochs that overlap the patch are aggregated into one per-patch
1824 narrow-DataFrame file.
1826 No de-duplication of rows is performed. Duplicate resolutions flags are
1827 pulled in from the referenceCatalog: `detect_isPrimary`,
1828 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1829 for analysis or compare duplicates for QA.
1831 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1832 per-visit rows can be retreived by joining with the CcdVisitTable on
1835 _DefaultName =
"transformForcedSourceTable"
1836 ConfigClass = TransformForcedSourceTableConfig
1838 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1839 inputs = butlerQC.get(inputRefs)
1840 if self.funcs
is None:
1841 raise ValueError(
"config.functorFile is None. "
1842 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1843 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1844 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1846 butlerQC.put(outputs, outputRefs)
1848 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1850 refColumns = list(self.config.referenceColumns)
1851 refColumns.append(self.config.keyRef)
1852 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1853 if ref.index.name != self.config.keyRef:
1859 ref.set_index(self.config.keyRef, inplace=
True)
1860 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1861 for handle
in inputCatalogs:
1862 result = self.transform(
None, handle, funcs, dataId)
1864 dfs.append(result.df.join(ref, how=
"inner"))
1866 outputCatalog = pd.concat(dfs)
1868 if outputCatalog.empty:
1869 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1873 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1875 outputCatalog.reset_index(inplace=
True)
1878 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1880 outputCatalog.index.rename(self.config.key, inplace=
True)
1882 self.log.info(
"Made a table of %d columns and %d rows",
1883 len(outputCatalog.columns), len(outputCatalog))
1884 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1887class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1888 defaultTemplates={
"catalogType":
""},
1889 dimensions=(
"instrument",
"tract")):
1890 inputCatalogs = connectionTypes.Input(
1891 doc=
"Input per-patch DataFrame Tables to be concatenated",
1892 name=
"{catalogType}ForcedSourceTable",
1893 storageClass=
"DataFrame",
1894 dimensions=(
"tract",
"patch",
"skymap"),
1898 outputCatalog = connectionTypes.Output(
1899 doc=
"Output per-tract concatenation of DataFrame Tables",
1900 name=
"{catalogType}ForcedSourceTable_tract",
1901 storageClass=
"DataFrame",
1902 dimensions=(
"tract",
"skymap"),
1906class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1907 pipelineConnections=ConsolidateTractConnections):
1911class ConsolidateTractTask(pipeBase.PipelineTask):
1912 """Concatenate any per-patch, dataframe list into a single
1913 per-tract DataFrame.
1915 _DefaultName =
"ConsolidateTract"
1916 ConfigClass = ConsolidateTractConfig
1918 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1919 inputs = butlerQC.get(inputRefs)
1922 self.log.info(
"Concatenating %s per-patch %s Tables",
1923 len(inputs[
"inputCatalogs"]),
1924 inputRefs.inputCatalogs[0].datasetType.name)
1925 df = pd.concat(inputs[
"inputCatalogs"])
1926 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)