22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
48from astro_metadata_translator.headers
import merge_headers
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header
61from .functors
import CompositeFunctor, Column
63log = logging.getLogger(__name__)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
69 newDf = pd.DataFrame()
71 dfBands = df.columns.unique(level=0).values
74 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c
in subdf.columns
if c
not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
81 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
…
89 """A helper class for stacking astropy tables without having them all in
95 Full size of the final table.
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
127 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
128 return cls(capacity=capacity)
131 """Add a single table to the stack.
135 table : `astropy.table.Table`
136 An astropy table instance.
139 self.
result = astropy.table.Table()
140 for name
in table.colnames:
142 column_cls = type(column)
143 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
144 self.
result[name][:len(table)] = column
145 self.
index = len(table)
146 self.
result.meta = table.meta.copy()
148 next_index = self.
index + len(table)
149 if set(self.
result.colnames) != set(table.colnames):
151 "Inconsistent columns in concatentation: "
152 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
154 for name
in table.colnames:
155 out_col = self.
result[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.
result[name][self.
index:next_index] = table[name]
160 self.
index = next_index
164 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
165 strip_provenance_from_fits_header(self.
result.meta)
169 """Vertically stack tables represented by deferred dataset handles.
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
184 handles = tuple(handles)
186 for handle
in handles:
187 vstack.extend(handle.get())
192 defaultTemplates={
"coaddName":
"deep"},
193 dimensions=(
"tract",
"patch",
"skymap")):
194 inputCatalogMeas = connectionTypes.Input(
195 doc=
"Catalog of source measurements on the deepCoadd.",
196 dimensions=(
"tract",
"patch",
"band",
"skymap"),
197 storageClass=
"SourceCatalog",
198 name=
"{coaddName}Coadd_meas",
201 inputCatalogForcedSrc = connectionTypes.Input(
202 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
203 dimensions=(
"tract",
"patch",
"band",
"skymap"),
204 storageClass=
"SourceCatalog",
205 name=
"{coaddName}Coadd_forced_src",
208 inputCatalogPsfsMultiprofit = connectionTypes.Input(
209 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
210 dimensions=(
"tract",
"patch",
"band",
"skymap"),
211 storageClass=
"ArrowAstropy",
212 name=
"{coaddName}Coadd_psfs_multiprofit",
215 outputCatalog = connectionTypes.Output(
216 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
217 "stored as a DataFrame with a multi-level column index per-patch.",
218 dimensions=(
"tract",
"patch",
"skymap"),
219 storageClass=
"DataFrame",
220 name=
"{coaddName}Coadd_obj"
224class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
225 pipelineConnections=WriteObjectTableConnections):
226 coaddName = pexConfig.Field(
233class WriteObjectTableTask(pipeBase.PipelineTask):
234 """Write filter-merged object tables as a DataFrame in parquet format.
236 _DefaultName =
"writeObjectTable"
237 ConfigClass = WriteObjectTableConfig
240 outputDataset =
"obj"
242 def runQuantum(self, butlerQC, inputRefs, outputRefs):
243 inputs = butlerQC.get(inputRefs)
245 catalogs = defaultdict(dict)
246 for dataset, connection
in (
247 (
"meas",
"inputCatalogMeas"),
248 (
"forced_src",
"inputCatalogForcedSrc"),
249 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
251 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
252 catalogs[ref.dataId[
"band"]][dataset] = cat
254 dataId = butlerQC.quantum.dataId
255 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
256 outputs = pipeBase.Struct(outputCatalog=df)
257 butlerQC.put(outputs, outputRefs)
259 def run(self, catalogs, tract, patch):
260 """Merge multiple catalogs.
265 Mapping from filter names to dict of catalogs.
267 tractId to use for the tractId column.
269 patchId to use for the patchId column.
273 catalog : `pandas.DataFrame`
279 Raised if any of the catalogs is of an unsupported type.
282 for filt, tableDict
in catalogs.items():
283 for dataset, table
in tableDict.items():
285 if isinstance(table, pd.DataFrame):
288 df = table.asAstropy().to_pandas()
289 elif isinstance(table, astropy.table.Table):
290 df = table.to_pandas()
292 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
293 df.set_index(
"id", drop=
True, inplace=
True)
296 df = df.reindex(sorted(df.columns), axis=1)
297 df = df.assign(tractId=tract, patchId=patch)
300 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
301 names=(
"dataset",
"band",
"column"))
306 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
310class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
311 defaultTemplates={
"catalogType":
""},
312 dimensions=(
"instrument",
"visit",
"detector")):
314 catalog = connectionTypes.Input(
315 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
316 name=
"{catalogType}src",
317 storageClass=
"SourceCatalog",
318 dimensions=(
"instrument",
"visit",
"detector")
320 outputCatalog = connectionTypes.Output(
321 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
322 name=
"{catalogType}source",
323 storageClass=
"ArrowAstropy",
324 dimensions=(
"instrument",
"visit",
"detector")
328class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
329 pipelineConnections=WriteSourceTableConnections):
333class WriteSourceTableTask(pipeBase.PipelineTask):
334 """Write source table to DataFrame Parquet format.
336 _DefaultName =
"writeSourceTable"
337 ConfigClass = WriteSourceTableConfig
339 def runQuantum(self, butlerQC, inputRefs, outputRefs):
340 inputs = butlerQC.get(inputRefs)
341 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
342 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
343 result = self.run(**inputs)
344 outputs = pipeBase.Struct(outputCatalog=result.table)
345 butlerQC.put(outputs, outputRefs)
347 def run(self, catalog, visit, detector, **kwargs):
348 """Convert `src` catalog to an Astropy table.
352 catalog: `afwTable.SourceCatalog`
353 catalog to be converted
354 visit, detector: `int`
355 Visit and detector ids to be added as columns.
357 Additional keyword arguments are ignored as a convenience for
358 subclasses that pass the same arguments to several different
363 result : `~lsst.pipe.base.Struct`
365 `astropy.table.Table` version of the input catalog
367 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
368 tbl = catalog.asAstropy()
371 tbl[
"detector"] = np.int16(detector)
373 return pipeBase.Struct(table=tbl)
376class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
377 defaultTemplates={
"catalogType":
""},
378 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
379 visitSummary = connectionTypes.Input(
380 doc=
"Input visit-summary catalog with updated calibration objects.",
381 name=
"finalVisitSummary",
382 storageClass=
"ExposureCatalog",
383 dimensions=(
"instrument",
"visit",),
386 def __init__(self, config):
394 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
397class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
398 pipelineConnections=WriteRecalibratedSourceTableConnections):
400 doReevaluatePhotoCalib = pexConfig.Field(
403 doc=(
"Add or replace local photoCalib columns"),
405 doReevaluateSkyWcs = pexConfig.Field(
408 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
412class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
413 """Write source table to DataFrame Parquet format.
415 _DefaultName =
"writeRecalibratedSourceTable"
416 ConfigClass = WriteRecalibratedSourceTableConfig
418 def runQuantum(self, butlerQC, inputRefs, outputRefs):
419 inputs = butlerQC.get(inputRefs)
421 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
422 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
424 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
425 exposure = ExposureF()
426 inputs[
"exposure"] = self.prepareCalibratedExposure(
428 visitSummary=inputs[
"visitSummary"],
429 detectorId=butlerQC.quantum.dataId[
"detector"]
431 inputs[
"catalog"] = self.addCalibColumns(**inputs)
433 result = self.run(**inputs)
434 outputs = pipeBase.Struct(outputCatalog=result.table)
435 butlerQC.put(outputs, outputRefs)
437 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
438 """Prepare a calibrated exposure and apply external calibrations
443 exposure : `lsst.afw.image.exposure.Exposure`
444 Input exposure to adjust calibrations. May be an empty Exposure.
446 Detector ID associated with the exposure.
447 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
448 Exposure catalog with all calibration objects. WCS and PhotoCalib
449 are always applied if ``visitSummary`` is provided and those
450 components are not `None`.
454 exposure : `lsst.afw.image.exposure.Exposure`
455 Exposure with adjusted calibrations.
457 if visitSummary
is not None:
458 row = visitSummary.find(detectorId)
460 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
461 if (photoCalib := row.getPhotoCalib())
is None:
462 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
463 "skipping reevaluation of photoCalib.", detectorId)
464 exposure.setPhotoCalib(
None)
466 exposure.setPhotoCalib(photoCalib)
467 if (skyWcs := row.getWcs())
is None:
468 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
469 "skipping reevaluation of skyWcs.", detectorId)
470 exposure.setWcs(
None)
472 exposure.setWcs(skyWcs)
476 def addCalibColumns(self, catalog, exposure, **kwargs):
477 """Add replace columns with calibs evaluated at each centroid
479 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
480 a source catalog, by rerunning the plugins.
484 catalog : `lsst.afw.table.SourceCatalog`
485 catalog to which calib columns will be added
486 exposure : `lsst.afw.image.exposure.Exposure`
487 Exposure with attached PhotoCalibs and SkyWcs attributes to be
488 reevaluated at local centroids. Pixels are not required.
490 Additional keyword arguments are ignored to facilitate passing the
491 same arguments to several methods.
495 newCat: `lsst.afw.table.SourceCatalog`
496 Source Catalog with requested local calib columns
498 measureConfig = SingleFrameMeasurementTask.ConfigClass()
499 measureConfig.doReplaceWithNoise =
False
502 for slot
in measureConfig.slots:
503 setattr(measureConfig.slots, slot,
None)
505 measureConfig.plugins.names = []
506 if self.config.doReevaluateSkyWcs:
507 measureConfig.plugins.names.add(
"base_LocalWcs")
508 self.log.info(
"Re-evaluating base_LocalWcs plugin")
509 if self.config.doReevaluatePhotoCalib:
510 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
511 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
512 pluginsNotToCopy = tuple(measureConfig.plugins.names)
516 aliasMap = catalog.schema.getAliasMap()
518 for item
in catalog.schema:
519 if not item.field.getName().startswith(pluginsNotToCopy):
520 mapper.addMapping(item.key)
522 schema = mapper.getOutputSchema()
524 schema.setAliasMap(aliasMap)
526 newCat.extend(catalog, mapper=mapper)
532 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
534 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
538 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
539 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
544 if wcsPlugin
is not None:
545 wcsPlugin.measure(row, exposure)
546 if pcPlugin
is not None:
547 pcPlugin.measure(row, exposure)
552class PostprocessAnalysis(object):
553 """Calculate columns from DataFrames or handles storing DataFrames.
555 This object manages and organizes an arbitrary set of computations
556 on a catalog. The catalog is defined by a
557 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
558 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
559 computations are defined by a collection of
560 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
561 ``CompositeFunctor``).
563 After the object is initialized, accessing the ``.df`` attribute (which
564 holds the `pandas.DataFrame` containing the results of the calculations)
565 triggers computation of said dataframe.
567 One of the conveniences of using this object is the ability to define a
568 desired common filter for all functors. This enables the same functor
569 collection to be passed to several different `PostprocessAnalysis` objects
570 without having to change the original functor collection, since the ``filt``
571 keyword argument of this object triggers an overwrite of the ``filt``
572 property for all functors in the collection.
574 This object also allows a list of refFlags to be passed, and defines a set
575 of default refFlags that are always included even if not requested.
577 If a list of DataFrames or Handles is passed, rather than a single one,
578 then the calculations will be mapped over all the input catalogs. In
579 principle, it should be straightforward to parallelize this activity, but
580 initial tests have failed (see TODO in code comments).
584 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
585 `~lsst.pipe.base.InMemoryDatasetHandle` or
587 Source catalog(s) for computation.
588 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
589 Computations to do (functors that act on ``handles``).
590 If a dict, the output
591 DataFrame will have columns keyed accordingly.
592 If a list, the column keys will come from the
593 ``.shortname`` attribute of each functor.
595 filt : `str`, optional
596 Filter in which to calculate. If provided,
597 this will overwrite any existing ``.filt`` attribute
598 of the provided functors.
600 flags : `list`, optional
601 List of flags (per-band) to include in output table.
602 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
604 refFlags : `list`, optional
605 List of refFlags (only reference band) to include in output table.
607 forcedFlags : `list`, optional
608 List of flags (per-band) to include in output table.
609 Taken from the ``forced_src`` dataset if applied to a
610 multilevel Object Table. Intended for flags from measurement plugins
611 only run during multi-band forced-photometry.
613 _defaultRefFlags = []
616 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
617 self.handles = handles
618 self.functors = functors
621 self.flags = list(flags)
if flags
is not None else []
622 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
623 self.refFlags = list(self._defaultRefFlags)
624 if refFlags
is not None:
625 self.refFlags += list(refFlags)
630 def defaultFuncs(self):
631 funcs = dict(self._defaultFuncs)
636 additionalFuncs = self.defaultFuncs
637 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
638 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
639 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
641 if isinstance(self.functors, CompositeFunctor):
646 func.funcDict.update(additionalFuncs)
647 func.filt = self.filt
653 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
661 def compute(self, dropna=False, pool=None):
663 if type(self.handles)
in (list, tuple):
665 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
669 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
670 self._df = pd.concat(dflist)
672 self._df = self.func(self.handles, dropna=dropna)
677class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
679 """Expected Connections for subclasses of TransformCatalogBaseTask.
683 inputCatalog = connectionTypes.Input(
685 storageClass=
"DataFrame",
687 outputCatalog = connectionTypes.Output(
689 storageClass=
"ArrowAstropy",
693class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
694 pipelineConnections=TransformCatalogBaseConnections):
695 functorFile = pexConfig.Field(
697 doc=
"Path to YAML file specifying Science Data Model functors to use "
698 "when copying columns and computing calibrated values.",
702 primaryKey = pexConfig.Field(
704 doc=
"Name of column to be set as the DataFrame index. If None, the index"
705 "will be named `id`",
709 columnsFromDataId = pexConfig.ListField(
713 doc=
"Columns to extract from the dataId",
717class TransformCatalogBaseTask(pipeBase.PipelineTask):
718 """Base class for transforming/standardizing a catalog by applying functors
719 that convert units and apply calibrations.
721 The purpose of this task is to perform a set of computations on an input
722 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
723 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
724 a new dataset (which needs to be declared in an ``outputDataset``
727 The calculations to be performed are defined in a YAML file that specifies
728 a set of functors to be computed, provided as a ``--functorFile`` config
729 parameter. An example of such a YAML file is the following:
736 args: slot_Centroid_x
739 args: slot_Centroid_y
741 functor: LocalNanojansky
743 - slot_PsfFlux_instFlux
744 - slot_PsfFlux_instFluxErr
745 - base_LocalPhotoCalib
746 - base_LocalPhotoCalibErr
748 functor: LocalNanojanskyErr
750 - slot_PsfFlux_instFlux
751 - slot_PsfFlux_instFluxErr
752 - base_LocalPhotoCalib
753 - base_LocalPhotoCalibErr
718 """Base class for transforming/standardizing a catalog by applying functors …
757 The names for each entry under "func" will become the names of columns in
758 the output dataset. All the functors referenced are defined in
759 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
760 functor are in the `args` list, and any additional entries for each column
761 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
762 treated as keyword arguments to be passed to the functor initialization.
764 The "flags" entry is the default shortcut for `Column` functors.
765 All columns listed under "flags" will be copied to the output table
766 untransformed. They can be of any datatype.
767 In the special case of transforming a multi-level oject table with
768 band and dataset indices (deepCoadd_obj), these will be taked from the
769 ``meas`` dataset and exploded out per band.
771 There are two special shortcuts that only apply when transforming
772 multi-level Object (deepCoadd_obj) tables:
773 - The "refFlags" entry is shortcut for `Column` functor
774 taken from the ``ref`` dataset if transforming an ObjectTable.
775 - The "forcedFlags" entry is shortcut for `Column` functors.
776 taken from the ``forced_src`` dataset if transforming an ObjectTable.
777 These are expanded out per band.
780 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
781 to organize and excecute the calculations.
784 def _DefaultName(self):
785 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
788 def outputDataset(self):
789 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
792 def inputDataset(self):
793 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
796 def ConfigClass(self):
797 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
799 def __init__(self, *args, **kwargs):
800 super().__init__(*args, **kwargs)
801 if self.config.functorFile:
802 self.log.info(
"Loading tranform functor definitions from %s",
803 self.config.functorFile)
804 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
805 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
809 def runQuantum(self, butlerQC, inputRefs, outputRefs):
810 inputs = butlerQC.get(inputRefs)
811 if self.
funcs is None:
812 raise ValueError(
"config.functorFile is None. "
813 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
814 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
815 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
816 butlerQC.put(result, outputRefs)
818 def run(self, handle, funcs=None, dataId=None, band=None):
819 """Do postprocessing calculations
821 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
822 ``DataFrame`` object and dataId,
823 returns a dataframe with results of postprocessing calculations.
827 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
828 `~lsst.pipe.base.InMemoryDatasetHandle` or
829 `~pandas.DataFrame`, or list of these.
830 DataFrames from which calculations are done.
831 funcs : `~lsst.pipe.tasks.functors.Functor`
832 Functors to apply to the table's columns
833 dataId : dict, optional
834 Used to add a `patchId` column to the output dataframe.
835 band : `str`, optional
836 Filter band that is being processed.
840 result : `lsst.pipe.base.Struct`
841 Result struct, with a single ``outputCatalog`` attribute holding
842 the transformed catalog.
844 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
846 df = self.transform(band, handle, funcs, dataId).df
847 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
848 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
851 def getFunctors(self):
854 def getAnalysis(self, handles, funcs=None, band=None):
857 analysis = PostprocessAnalysis(handles, funcs, filt=band)
860 def transform(self, band, handles, funcs, dataId):
861 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
863 if dataId
and self.config.columnsFromDataId:
864 for key
in self.config.columnsFromDataId:
866 if key ==
"detector":
868 df[key] = np.int16(dataId[key])
870 df[key] = dataId[key]
872 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
874 if self.config.primaryKey:
875 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
876 df.reset_index(inplace=
True, drop=
True)
877 df.set_index(self.config.primaryKey, inplace=
True)
879 return pipeBase.Struct(
886 defaultTemplates={
"coaddName":
"deep"},
887 dimensions=(
"tract",
"patch",
"skymap")):
888 inputCatalog = connectionTypes.Input(
889 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
890 "stored as a DataFrame with a multi-level column index per-patch.",
891 dimensions=(
"tract",
"patch",
"skymap"),
892 storageClass=
"DataFrame",
893 name=
"{coaddName}Coadd_obj",
896 inputCatalogRef = connectionTypes.Input(
897 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
898 "for each detection in deepCoadd_mergeDet.",
899 dimensions=(
"tract",
"patch",
"skymap"),
900 storageClass=
"SourceCatalog",
901 name=
"{coaddName}Coadd_ref",
904 inputCatalogSersicMultiprofit = connectionTypes.Input(
905 doc=
"Catalog of source measurements on the deepCoadd.",
906 dimensions=(
"tract",
"patch",
"skymap"),
907 storageClass=
"ArrowAstropy",
908 name=
"{coaddName}Coadd_Sersic_multiprofit",
911 outputCatalog = connectionTypes.Output(
912 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
914 dimensions=(
"tract",
"patch",
"skymap"),
915 storageClass=
"ArrowAstropy",
919 def __init__(self, *, config=None):
920 super().__init__(config=config)
921 if config.multilevelOutput:
922 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
925class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
926 pipelineConnections=TransformObjectCatalogConnections):
927 coaddName = pexConfig.Field(
932 outputBands = pexConfig.ListField(
936 doc=(
"These bands and only these bands will appear in the output,"
937 " NaN-filled if the input does not include them."
938 " If None, then use all bands found in the input.")
940 camelCase = pexConfig.Field(
943 doc=(
"Write per-band columns names with camelCase, else underscore "
944 "For example: gPsFlux instead of g_PsFlux.")
946 multilevelOutput = pexConfig.Field(
949 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
950 "and name-munged (False). If True, the output storage class will be "
951 "set to DataFrame, since astropy tables do not support multi-level indexing."),
952 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
954 goodFlags = pexConfig.ListField(
957 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
958 "All other flags are considered to be 'bad' flags and will be set to True.")
960 floatFillValue = pexConfig.Field(
963 doc=
"Fill value for float fields when populating empty tables."
965 integerFillValue = pexConfig.Field(
968 doc=
"Fill value for integer fields when populating empty tables."
971 def setDefaults(self):
972 super().setDefaults()
973 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
974 self.primaryKey =
"objectId"
975 self.columnsFromDataId = [
"tract",
"patch"]
976 self.goodFlags = [
"calib_astrometry_used",
977 "calib_photometry_reserved",
978 "calib_photometry_used",
979 "calib_psf_candidate",
980 "calib_psf_reserved",
984class TransformObjectCatalogTask(TransformCatalogBaseTask):
985 """Produce a flattened Object Table to match the format specified in
988 Do the same set of postprocessing calculations on all bands.
990 This is identical to `TransformCatalogBaseTask`, except for that it does
991 the specified functor calculations for all filters present in the
992 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
993 by the YAML file will be superceded.
995 _DefaultName =
"transformObjectCatalog"
996 ConfigClass = TransformObjectCatalogConfig
998 datasets_multiband = (
"ref",
"Sersic_multiprofit")
1000 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1001 inputs = butlerQC.get(inputRefs)
1002 if self.funcs
is None:
1003 raise ValueError(
"config.functorFile is None. "
1004 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1005 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1006 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1007 handle_ref=inputs[
"inputCatalogRef"],
1008 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1010 butlerQC.put(result, outputRefs)
1012 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1016 if isinstance(funcs, CompositeFunctor):
1017 funcDict_in = funcs.funcDict
1018 elif isinstance(funcs, dict):
1020 elif isinstance(funcs, list):
1021 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1023 raise TypeError(f
"Unsupported {type(funcs)=}")
1026 funcDicts_multiband = {}
1027 for dataset
in self.datasets_multiband:
1028 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1029 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1030 handles_multi[dataset] = handle_multi
1031 funcDicts_multiband[dataset] = {}
1035 templateDf = pd.DataFrame()
1037 columns = handle.get(component=
"columns")
1038 inputBands = columns.unique(level=1).values
1040 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1045 for name, func
in funcDict_in.items():
1046 if func.dataset
in funcDicts_multiband:
1048 if band := getattr(func,
"band_to_check",
None):
1049 if band
not in outputBands:
1052 elif hasattr(func,
"bands"):
1057 func.bands = tuple(inputBands)
1059 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1060 funcDict[name] = func
1065 for inputBand
in inputBands:
1066 if inputBand
not in outputBands:
1067 self.log.info(
"Ignoring %s band data in the input", inputBand)
1069 self.log.info(
"Transforming the catalog of band %s", inputBand)
1070 result = self.transform(inputBand, handle, funcs_band, dataId)
1071 dfDict[inputBand] = result.df
1072 analysisDict[inputBand] = result.analysis
1073 if templateDf.empty:
1074 templateDf = result.df
1077 for filt
in outputBands:
1078 if filt
not in dfDict:
1079 self.log.info(
"Adding empty columns for band %s", filt)
1080 dfTemp = templateDf.copy()
1081 for col
in dfTemp.columns:
1082 testValue = dfTemp[col].values[0]
1083 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1085 if col
in self.config.goodFlags:
1089 elif isinstance(testValue, numbers.Integral):
1093 if isinstance(testValue, np.unsignedinteger):
1094 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1096 fillValue = self.config.integerFillValue
1098 fillValue = self.config.floatFillValue
1099 dfTemp[col].values[:] = fillValue
1100 dfDict[filt] = dfTemp
1103 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1104 name_index = df.index.name
1107 if not self.config.multilevelOutput:
1108 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1109 if self.config.primaryKey
in noDupCols:
1110 noDupCols.remove(self.config.primaryKey)
1111 if dataId
and self.config.columnsFromDataId:
1112 noDupCols += self.config.columnsFromDataId
1113 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1114 inputBands=inputBands)
1117 for dataset, funcDict
in funcDicts_multiband.items():
1118 handle_multiband = handles_multi[dataset]
1119 df_dataset = handle_multiband.get()
1120 if isinstance(df_dataset, astropy.table.Table):
1121 df_dataset = df_dataset.to_pandas().set_index(name_index, drop=
False)
1123 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1126 result = self.transform(
1128 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1132 result.df.index.name = name_index
1134 if self.config.columnsFromDataId:
1135 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1137 result.df.drop(columns_drop, axis=1, inplace=
True)
1141 to_concat = pd.concat(
1142 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1143 )
if self.config.multilevelOutput
else result.df
1144 df = pd.concat([df, to_concat], axis=1)
1145 analysisDict[dataset] = result.analysis
1148 df.index.name = self.config.primaryKey
1150 if not self.config.multilevelOutput:
1151 tbl = pandas_to_astropy(df)
1155 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1157 return pipeBase.Struct(outputCatalog=tbl)
1160class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1161 dimensions=(
"tract",
"skymap")):
1162 inputCatalogs = connectionTypes.Input(
1163 doc=
"Per-Patch objectTables conforming to the standard data model.",
1165 storageClass=
"ArrowAstropy",
1166 dimensions=(
"tract",
"patch",
"skymap"),
1170 outputCatalog = connectionTypes.Output(
1171 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1172 name=
"objectTable_tract",
1173 storageClass=
"ArrowAstropy",
1174 dimensions=(
"tract",
"skymap"),
1178class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1179 pipelineConnections=ConsolidateObjectTableConnections):
1180 coaddName = pexConfig.Field(
1187class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1188 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1190 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1192 _DefaultName =
"consolidateObjectTable"
1193 ConfigClass = ConsolidateObjectTableConfig
1195 inputDataset =
"objectTable"
1196 outputDataset =
"objectTable_tract"
1198 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1199 inputs = butlerQC.get(inputRefs)
1200 self.log.info(
"Concatenating %s per-patch Object Tables",
1201 len(inputs[
"inputCatalogs"]))
1202 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1203 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1206class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1207 defaultTemplates={
"catalogType":
""},
1208 dimensions=(
"instrument",
"visit",
"detector")):
1210 inputCatalog = connectionTypes.Input(
1211 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1212 name=
"{catalogType}source",
1213 storageClass=
"DataFrame",
1214 dimensions=(
"instrument",
"visit",
"detector"),
1217 outputCatalog = connectionTypes.Output(
1218 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1219 "specified set of functors",
1220 name=
"{catalogType}sourceTable",
1221 storageClass=
"ArrowAstropy",
1222 dimensions=(
"instrument",
"visit",
"detector")
1226class TransformSourceTableConfig(TransformCatalogBaseConfig,
1227 pipelineConnections=TransformSourceTableConnections):
1229 def setDefaults(self):
1230 super().setDefaults()
1231 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1232 self.primaryKey =
"sourceId"
1233 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1236class TransformSourceTableTask(TransformCatalogBaseTask):
1237 """Transform/standardize a source catalog
1239 _DefaultName =
"transformSourceTable"
1240 ConfigClass = TransformSourceTableConfig
1243class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1244 dimensions=(
"instrument",
"visit",),
1245 defaultTemplates={
"calexpType":
""}):
1246 calexp = connectionTypes.Input(
1247 doc=
"Processed exposures used for metadata",
1249 storageClass=
"ExposureF",
1250 dimensions=(
"instrument",
"visit",
"detector"),
1254 visitSummary = connectionTypes.Output(
1255 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1256 "detector id for the id and are sorted for fast lookups of a "
1258 name=
"visitSummary",
1259 storageClass=
"ExposureCatalog",
1260 dimensions=(
"instrument",
"visit"),
1262 visitSummarySchema = connectionTypes.InitOutput(
1263 doc=
"Schema of the visitSummary catalog",
1264 name=
"visitSummary_schema",
1265 storageClass=
"ExposureCatalog",
1269class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1270 pipelineConnections=ConsolidateVisitSummaryConnections):
1271 """Config for ConsolidateVisitSummaryTask"""
1275class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1276 """Task to consolidate per-detector visit metadata.
1278 This task aggregates the following metadata from all the detectors in a
1279 single visit into an exposure catalog:
1283 - The physical_filter and band (if available).
1284 - The psf size, shape, and effective area at the center of the detector.
1285 - The corners of the bounding box in right ascension/declination.
1287 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1288 are not persisted here because of storage concerns, and because of their
1289 limited utility as summary statistics.
1291 Tests for this task are performed in ci_hsc_gen3.
1293 _DefaultName =
"consolidateVisitSummary"
1294 ConfigClass = ConsolidateVisitSummaryConfig
1296 def __init__(self, **kwargs):
1297 super().__init__(**kwargs)
1298 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1299 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1300 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1301 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1302 ExposureSummaryStats.update_schema(self.schema)
1305 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1306 dataRefs = butlerQC.get(inputRefs.calexp)
1307 visit = dataRefs[0].dataId[
"visit"]
1309 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1310 len(dataRefs), visit)
1312 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1314 butlerQC.put(expCatalog, outputRefs.visitSummary)
1316 def _combineExposureMetadata(self, visit, dataRefs):
1317 """Make a combined exposure catalog from a list of dataRefs.
1318 These dataRefs must point to exposures with wcs, summaryStats,
1319 and other visit metadata.
1324 Visit identification number.
1325 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1326 List of dataRefs in visit.
1330 visitSummary : `lsst.afw.table.ExposureCatalog`
1331 Exposure catalog with per-detector summary information.
1334 cat.resize(len(dataRefs))
1336 cat[
"visit"] = visit
1338 for i, dataRef
in enumerate(dataRefs):
1339 visitInfo = dataRef.get(component=
"visitInfo")
1340 filterLabel = dataRef.get(component=
"filter")
1341 summaryStats = dataRef.get(component=
"summaryStats")
1342 detector = dataRef.get(component=
"detector")
1343 wcs = dataRef.get(component=
"wcs")
1344 photoCalib = dataRef.get(component=
"photoCalib")
1345 detector = dataRef.get(component=
"detector")
1346 bbox = dataRef.get(component=
"bbox")
1347 validPolygon = dataRef.get(component=
"validPolygon")
1351 rec.setVisitInfo(visitInfo)
1353 rec.setPhotoCalib(photoCalib)
1354 rec.setValidPolygon(validPolygon)
1356 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1357 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1358 rec.setId(detector.getId())
1359 summaryStats.update_record(rec)
1362 raise pipeBase.NoWorkFound(
1363 "No detectors had sufficient information to make a visit summary row."
1367 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1369 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1370 cat.setMetadata(metadata)
1376class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1377 defaultTemplates={
"catalogType":
""},
1378 dimensions=(
"instrument",
"visit")):
1379 inputCatalogs = connectionTypes.Input(
1380 doc=
"Input per-detector Source Tables",
1381 name=
"{catalogType}sourceTable",
1382 storageClass=
"ArrowAstropy",
1383 dimensions=(
"instrument",
"visit",
"detector"),
1387 outputCatalog = connectionTypes.Output(
1388 doc=
"Per-visit concatenation of Source Table",
1389 name=
"{catalogType}sourceTable_visit",
1390 storageClass=
"ArrowAstropy",
1391 dimensions=(
"instrument",
"visit")
1395class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1396 pipelineConnections=ConsolidateSourceTableConnections):
1400class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1401 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1403 _DefaultName =
"consolidateSourceTable"
1404 ConfigClass = ConsolidateSourceTableConfig
1406 inputDataset =
"sourceTable"
1407 outputDataset =
"sourceTable_visit"
1409 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1410 from .makeWarp
import reorderRefs
1412 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1413 detectorOrder.sort()
1414 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1415 inputs = butlerQC.get(inputRefs)
1416 self.log.info(
"Concatenating %s per-detector Source Tables",
1417 len(inputs[
"inputCatalogs"]))
1418 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1419 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1422class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1423 dimensions=(
"instrument",),
1424 defaultTemplates={
"calexpType":
""}):
1425 visitSummaryRefs = connectionTypes.Input(
1426 doc=
"Data references for per-visit consolidated exposure metadata",
1427 name=
"finalVisitSummary",
1428 storageClass=
"ExposureCatalog",
1429 dimensions=(
"instrument",
"visit"),
1433 outputCatalog = connectionTypes.Output(
1434 doc=
"CCD and Visit metadata table",
1435 name=
"ccdVisitTable",
1436 storageClass=
"ArrowAstropy",
1437 dimensions=(
"instrument",)
1441class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1442 pipelineConnections=MakeCcdVisitTableConnections):
1443 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1446class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1447 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1449 _DefaultName =
"makeCcdVisitTable"
1450 ConfigClass = MakeCcdVisitTableConfig
1452 def run(self, visitSummaryRefs):
1453 """Make a table of ccd information from the visit summary catalogs.
1457 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1458 List of DeferredDatasetHandles pointing to exposure catalogs with
1459 per-detector summary information.
1463 result : `~lsst.pipe.base.Struct`
1464 Results struct with attribute:
1467 Catalog of ccd and visit information.
1470 for visitSummaryRef
in visitSummaryRefs:
1471 visitSummary = visitSummaryRef.get()
1472 if not visitSummary:
1474 visitInfo = visitSummary[0].getVisitInfo()
1477 strip_provenance_from_fits_header(visitSummary.metadata)
1480 summaryTable = visitSummary.asAstropy()
1481 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1482 "pixelScale",
"zenithDistance",
1483 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1484 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1485 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1486 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1487 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1488 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1489 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1490 "maxDistToNearestPsf",
1491 "effTime",
"effTimePsfSigmaScale",
1492 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1494 ccdEntry = summaryTable[selectColumns]
1499 ccdEntry.rename_column(
"visit",
"visitId")
1500 ccdEntry.rename_column(
"id",
"detectorId")
1504 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1506 ccdEntry[
"ccdVisitId"] = [
1507 self.config.idGenerator.apply(
1508 visitSummaryRef.dataId,
1509 detector=detector_id,
1516 for detector_id
in summaryTable[
"id"]
1518 ccdEntry[
"detector"] = summaryTable[
"id"]
1519 ccdEntry[
"seeing"] = (
1520 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1522 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1523 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1524 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1525 expTime = visitInfo.getExposureTime()
1526 ccdEntry[
"obsStart"] = (
1527 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1529 expTime_days = expTime / (60*60*24)
1530 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1531 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1532 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1533 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1534 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1535 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1536 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1537 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1538 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1539 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1540 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1541 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1545 ccdEntries.append(ccdEntry)
1547 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1548 return pipeBase.Struct(outputCatalog=outputCatalog)
1551class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1552 dimensions=(
"instrument",),
1553 defaultTemplates={
"calexpType":
""}):
1554 visitSummaries = connectionTypes.Input(
1555 doc=
"Per-visit consolidated exposure metadata",
1556 name=
"finalVisitSummary",
1557 storageClass=
"ExposureCatalog",
1558 dimensions=(
"instrument",
"visit",),
1562 outputCatalog = connectionTypes.Output(
1563 doc=
"Visit metadata table",
1565 storageClass=
"ArrowAstropy",
1566 dimensions=(
"instrument",)
1570class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1571 pipelineConnections=MakeVisitTableConnections):
1575class MakeVisitTableTask(pipeBase.PipelineTask):
1576 """Produce a `visitTable` from the visit summary exposure catalogs.
1578 _DefaultName =
"makeVisitTable"
1579 ConfigClass = MakeVisitTableConfig
1581 def run(self, visitSummaries):
1582 """Make a table of visit information from the visit summary catalogs.
1586 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1587 List of exposure catalogs with per-detector summary information.
1590 result : `~lsst.pipe.base.Struct`
1591 Results struct with attribute:
1594 Catalog of visit information.
1597 for visitSummary
in visitSummaries:
1598 visitSummary = visitSummary.get()
1599 if not visitSummary:
1601 visitRow = visitSummary[0]
1602 visitInfo = visitRow.getVisitInfo()
1605 visitEntry[
"visitId"] = visitRow[
"visit"]
1606 visitEntry[
"visit"] = visitRow[
"visit"]
1607 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1608 visitEntry[
"band"] = visitRow[
"band"]
1609 raDec = visitInfo.getBoresightRaDec()
1610 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1611 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1615 visitEntry[
"decl"] = visitEntry[
"dec"]
1617 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1618 azAlt = visitInfo.getBoresightAzAlt()
1619 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1620 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1621 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1622 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1623 expTime = visitInfo.getExposureTime()
1624 visitEntry[
"expTime"] = expTime
1625 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1626 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1627 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1628 expTime_days = expTime / (60*60*24)
1629 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1630 visitEntries.append(visitEntry)
1636 outputCatalog = astropy.table.Table(rows=visitEntries)
1637 return pipeBase.Struct(outputCatalog=outputCatalog)
1640class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1641 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1643 inputCatalog = connectionTypes.Input(
1644 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1645 "By default, it is the output of ForcedPhotCcdTask on calexps",
1647 storageClass=
"SourceCatalog",
1648 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1650 inputCatalogDiff = connectionTypes.Input(
1651 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1652 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1654 storageClass=
"SourceCatalog",
1655 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1657 outputCatalog = connectionTypes.Output(
1658 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1659 name=
"mergedForcedSource",
1660 storageClass=
"DataFrame",
1661 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1665class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1666 pipelineConnections=WriteForcedSourceTableConnections):
1668 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1674class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1675 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1677 Because the predecessor ForcedPhotCcdTask operates per-detector,
1678 per-tract, (i.e., it has tract in its dimensions), detectors
1679 on the tract boundary may have multiple forced source catalogs.
1681 The successor task TransformForcedSourceTable runs per-patch
1682 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1683 available multiple epochs.
1685 _DefaultName =
"writeForcedSourceTable"
1686 ConfigClass = WriteForcedSourceTableConfig
1688 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1689 inputs = butlerQC.get(inputRefs)
1690 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1691 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1692 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1693 outputs = self.run(**inputs)
1694 butlerQC.put(outputs, outputRefs)
1696 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1698 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1699 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1700 df = df.reindex(sorted(df.columns), axis=1)
1703 df[
"detector"] = np.int16(detector)
1704 df[
"band"] = band
if band
else pd.NA
1705 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1706 names=(
"dataset",
"column"))
1710 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1711 return pipeBase.Struct(outputCatalog=outputCatalog)
1714class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1715 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1717 inputCatalogs = connectionTypes.Input(
1718 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1719 name=
"mergedForcedSource",
1720 storageClass=
"DataFrame",
1721 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1725 referenceCatalog = connectionTypes.Input(
1726 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1727 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1730 storageClass=
"DataFrame",
1731 dimensions=(
"tract",
"patch",
"skymap"),
1734 outputCatalog = connectionTypes.Output(
1735 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1736 "specified set of functors",
1737 name=
"forcedSourceTable",
1738 storageClass=
"DataFrame",
1739 dimensions=(
"tract",
"patch",
"skymap")
1743class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1744 pipelineConnections=TransformForcedSourceTableConnections):
1745 referenceColumns = pexConfig.ListField(
1747 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1749 doc=
"Columns to pull from reference catalog",
1752 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1757 doc=
"Rename the output DataFrame index to this name",
1759 default=
"forcedSourceId",
1762 def setDefaults(self):
1763 super().setDefaults()
1764 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1765 self.columnsFromDataId = [
"tract",
"patch"]
1768class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1769 """Transform/standardize a ForcedSource catalog
1771 Transforms each wide, per-detector forcedSource DataFrame per the
1772 specification file (per-camera defaults found in ForcedSource.yaml).
1773 All epochs that overlap the patch are aggregated into one per-patch
1774 narrow-DataFrame file.
1776 No de-duplication of rows is performed. Duplicate resolutions flags are
1777 pulled in from the referenceCatalog: `detect_isPrimary`,
1778 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1779 for analysis or compare duplicates for QA.
1781 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1782 per-visit rows can be retreived by joining with the CcdVisitTable on
1785 _DefaultName =
"transformForcedSourceTable"
1786 ConfigClass = TransformForcedSourceTableConfig
1788 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1789 inputs = butlerQC.get(inputRefs)
1790 if self.funcs
is None:
1791 raise ValueError(
"config.functorFile is None. "
1792 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1793 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1794 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1796 butlerQC.put(outputs, outputRefs)
1798 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1800 refColumns = list(self.config.referenceColumns)
1801 refColumns.append(self.config.keyRef)
1802 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1803 if ref.index.name != self.config.keyRef:
1809 ref.set_index(self.config.keyRef, inplace=
True)
1810 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1811 for handle
in inputCatalogs:
1812 result = self.transform(
None, handle, funcs, dataId)
1814 dfs.append(result.df.join(ref, how=
"inner"))
1816 outputCatalog = pd.concat(dfs)
1820 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1822 outputCatalog.reset_index(inplace=
True)
1825 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1827 outputCatalog.index.rename(self.config.key, inplace=
True)
1829 self.log.info(
"Made a table of %d columns and %d rows",
1830 len(outputCatalog.columns), len(outputCatalog))
1831 return pipeBase.Struct(outputCatalog=outputCatalog)
1834class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1835 defaultTemplates={
"catalogType":
""},
1836 dimensions=(
"instrument",
"tract")):
1837 inputCatalogs = connectionTypes.Input(
1838 doc=
"Input per-patch DataFrame Tables to be concatenated",
1839 name=
"{catalogType}ForcedSourceTable",
1840 storageClass=
"DataFrame",
1841 dimensions=(
"tract",
"patch",
"skymap"),
1845 outputCatalog = connectionTypes.Output(
1846 doc=
"Output per-tract concatenation of DataFrame Tables",
1847 name=
"{catalogType}ForcedSourceTable_tract",
1848 storageClass=
"DataFrame",
1849 dimensions=(
"tract",
"skymap"),
1853class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1854 pipelineConnections=ConsolidateTractConnections):
1858class ConsolidateTractTask(pipeBase.PipelineTask):
1859 """Concatenate any per-patch, dataframe list into a single
1860 per-tract DataFrame.
1862 _DefaultName =
"ConsolidateTract"
1863 ConfigClass = ConsolidateTractConfig
1865 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1866 inputs = butlerQC.get(inputRefs)
1869 self.log.info(
"Concatenating %s per-patch %s Tables",
1870 len(inputs[
"inputCatalogs"]),
1871 inputRefs.inputCatalogs[0].datasetType.name)
1872 df = pd.concat(inputs[
"inputCatalogs"])
1873 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)