22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
48from astro_metadata_translator.headers
import merge_headers
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
57from lsst.afw.image
import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header
61from .functors
import CompositeFunctor, Column
63log = logging.getLogger(__name__)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
69 newDf = pd.DataFrame()
71 dfBands = df.columns.unique(level=0).values
74 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c
in subdf.columns
if c
not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
81 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
…
89 """A helper class for stacking astropy tables without having them all in
95 Full size of the final table.
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
127 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
128 return cls(capacity=capacity)
131 """Add a single table to the stack.
135 table : `astropy.table.Table`
136 An astropy table instance.
139 self.
result = astropy.table.Table()
140 for name
in table.colnames:
142 column_cls = type(column)
143 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
144 self.
result[name][:len(table)] = column
145 self.
index = len(table)
146 self.
result.meta = table.meta.copy()
148 next_index = self.
index + len(table)
149 if set(self.
result.colnames) != set(table.colnames):
151 "Inconsistent columns in concatentation: "
152 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
154 for name
in table.colnames:
155 out_col = self.
result[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.
result[name][self.
index:next_index] = table[name]
160 self.
index = next_index
164 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
165 strip_provenance_from_fits_header(self.
result.meta)
169 """Vertically stack tables represented by deferred dataset handles.
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
184 handles = tuple(handles)
186 rowcount = tuple(handle.get(component=
"rowcount")
for handle
in handles)
187 handles = tuple(handle
for handle, count
in zip(handles, rowcount)
if count > 0)
190 for handle
in handles:
191 vstack.extend(handle.get())
196 defaultTemplates={
"coaddName":
"deep"},
197 dimensions=(
"tract",
"patch",
"skymap")):
198 inputCatalogMeas = connectionTypes.Input(
199 doc=
"Catalog of source measurements on the deepCoadd.",
200 dimensions=(
"tract",
"patch",
"band",
"skymap"),
201 storageClass=
"SourceCatalog",
202 name=
"{coaddName}Coadd_meas",
205 inputCatalogForcedSrc = connectionTypes.Input(
206 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
207 dimensions=(
"tract",
"patch",
"band",
"skymap"),
208 storageClass=
"SourceCatalog",
209 name=
"{coaddName}Coadd_forced_src",
212 inputCatalogPsfsMultiprofit = connectionTypes.Input(
213 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
214 dimensions=(
"tract",
"patch",
"band",
"skymap"),
215 storageClass=
"ArrowAstropy",
216 name=
"{coaddName}Coadd_psfs_multiprofit",
219 outputCatalog = connectionTypes.Output(
220 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
221 "stored as a DataFrame with a multi-level column index per-patch.",
222 dimensions=(
"tract",
"patch",
"skymap"),
223 storageClass=
"DataFrame",
224 name=
"{coaddName}Coadd_obj"
228class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
229 pipelineConnections=WriteObjectTableConnections):
230 coaddName = pexConfig.Field(
237class WriteObjectTableTask(pipeBase.PipelineTask):
238 """Write filter-merged object tables as a DataFrame in parquet format.
240 _DefaultName =
"writeObjectTable"
241 ConfigClass = WriteObjectTableConfig
244 outputDataset =
"obj"
246 def runQuantum(self, butlerQC, inputRefs, outputRefs):
247 inputs = butlerQC.get(inputRefs)
249 catalogs = defaultdict(dict)
250 for dataset, connection
in (
251 (
"meas",
"inputCatalogMeas"),
252 (
"forced_src",
"inputCatalogForcedSrc"),
253 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
255 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
256 catalogs[ref.dataId[
"band"]][dataset] = cat
258 dataId = butlerQC.quantum.dataId
259 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
260 outputs = pipeBase.Struct(outputCatalog=df)
261 butlerQC.put(outputs, outputRefs)
263 def run(self, catalogs, tract, patch):
264 """Merge multiple catalogs.
269 Mapping from filter names to dict of catalogs.
271 tractId to use for the tractId column.
273 patchId to use for the patchId column.
277 catalog : `pandas.DataFrame`
283 Raised if any of the catalogs is of an unsupported type.
286 for filt, tableDict
in catalogs.items():
287 for dataset, table
in tableDict.items():
289 if isinstance(table, pd.DataFrame):
292 df = table.asAstropy().to_pandas()
293 elif isinstance(table, astropy.table.Table):
294 df = table.to_pandas()
296 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
297 df.set_index(
"id", drop=
True, inplace=
True)
300 df = df.reindex(sorted(df.columns), axis=1)
301 df = df.assign(tractId=tract, patchId=patch)
304 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
305 names=(
"dataset",
"band",
"column"))
310 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
314class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
315 defaultTemplates={
"catalogType":
""},
316 dimensions=(
"instrument",
"visit",
"detector")):
318 catalog = connectionTypes.Input(
319 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
320 name=
"{catalogType}src",
321 storageClass=
"SourceCatalog",
322 dimensions=(
"instrument",
"visit",
"detector")
324 outputCatalog = connectionTypes.Output(
325 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
326 name=
"{catalogType}source",
327 storageClass=
"ArrowAstropy",
328 dimensions=(
"instrument",
"visit",
"detector")
332class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
333 pipelineConnections=WriteSourceTableConnections):
337class WriteSourceTableTask(pipeBase.PipelineTask):
338 """Write source table to DataFrame Parquet format.
340 _DefaultName =
"writeSourceTable"
341 ConfigClass = WriteSourceTableConfig
343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
344 inputs = butlerQC.get(inputRefs)
345 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
346 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
347 result = self.run(**inputs)
348 outputs = pipeBase.Struct(outputCatalog=result.table)
349 butlerQC.put(outputs, outputRefs)
351 def run(self, catalog, visit, detector, **kwargs):
352 """Convert `src` catalog to an Astropy table.
356 catalog: `afwTable.SourceCatalog`
357 catalog to be converted
358 visit, detector: `int`
359 Visit and detector ids to be added as columns.
361 Additional keyword arguments are ignored as a convenience for
362 subclasses that pass the same arguments to several different
367 result : `~lsst.pipe.base.Struct`
369 `astropy.table.Table` version of the input catalog
371 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
372 tbl = catalog.asAstropy()
375 tbl[
"detector"] = np.int16(detector)
377 return pipeBase.Struct(table=tbl)
380class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
381 defaultTemplates={
"catalogType":
""},
382 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
383 visitSummary = connectionTypes.Input(
384 doc=
"Input visit-summary catalog with updated calibration objects.",
385 name=
"finalVisitSummary",
386 storageClass=
"ExposureCatalog",
387 dimensions=(
"instrument",
"visit",),
390 def __init__(self, config):
398 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
401class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
402 pipelineConnections=WriteRecalibratedSourceTableConnections):
404 doReevaluatePhotoCalib = pexConfig.Field(
407 doc=(
"Add or replace local photoCalib columns"),
409 doReevaluateSkyWcs = pexConfig.Field(
412 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
416class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
417 """Write source table to DataFrame Parquet format.
419 _DefaultName =
"writeRecalibratedSourceTable"
420 ConfigClass = WriteRecalibratedSourceTableConfig
422 def runQuantum(self, butlerQC, inputRefs, outputRefs):
423 inputs = butlerQC.get(inputRefs)
425 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
426 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
428 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
429 exposure = ExposureF()
430 inputs[
"exposure"] = self.prepareCalibratedExposure(
432 visitSummary=inputs[
"visitSummary"],
433 detectorId=butlerQC.quantum.dataId[
"detector"]
435 inputs[
"catalog"] = self.addCalibColumns(**inputs)
437 result = self.run(**inputs)
438 outputs = pipeBase.Struct(outputCatalog=result.table)
439 butlerQC.put(outputs, outputRefs)
441 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
442 """Prepare a calibrated exposure and apply external calibrations
447 exposure : `lsst.afw.image.exposure.Exposure`
448 Input exposure to adjust calibrations. May be an empty Exposure.
450 Detector ID associated with the exposure.
451 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
452 Exposure catalog with all calibration objects. WCS and PhotoCalib
453 are always applied if ``visitSummary`` is provided and those
454 components are not `None`.
458 exposure : `lsst.afw.image.exposure.Exposure`
459 Exposure with adjusted calibrations.
461 if visitSummary
is not None:
462 row = visitSummary.find(detectorId)
464 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
465 if (photoCalib := row.getPhotoCalib())
is None:
466 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
467 "skipping reevaluation of photoCalib.", detectorId)
468 exposure.setPhotoCalib(
None)
470 exposure.setPhotoCalib(photoCalib)
471 if (skyWcs := row.getWcs())
is None:
472 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
473 "skipping reevaluation of skyWcs.", detectorId)
474 exposure.setWcs(
None)
476 exposure.setWcs(skyWcs)
480 def addCalibColumns(self, catalog, exposure, **kwargs):
481 """Add replace columns with calibs evaluated at each centroid
483 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
484 a source catalog, by rerunning the plugins.
488 catalog : `lsst.afw.table.SourceCatalog`
489 catalog to which calib columns will be added
490 exposure : `lsst.afw.image.exposure.Exposure`
491 Exposure with attached PhotoCalibs and SkyWcs attributes to be
492 reevaluated at local centroids. Pixels are not required.
494 Additional keyword arguments are ignored to facilitate passing the
495 same arguments to several methods.
499 newCat: `lsst.afw.table.SourceCatalog`
500 Source Catalog with requested local calib columns
502 measureConfig = SingleFrameMeasurementTask.ConfigClass()
503 measureConfig.doReplaceWithNoise =
False
506 for slot
in measureConfig.slots:
507 setattr(measureConfig.slots, slot,
None)
509 measureConfig.plugins.names = []
510 if self.config.doReevaluateSkyWcs:
511 measureConfig.plugins.names.add(
"base_LocalWcs")
512 self.log.info(
"Re-evaluating base_LocalWcs plugin")
513 if self.config.doReevaluatePhotoCalib:
514 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
515 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
516 pluginsNotToCopy = tuple(measureConfig.plugins.names)
520 aliasMap = catalog.schema.getAliasMap()
522 for item
in catalog.schema:
523 if not item.field.getName().startswith(pluginsNotToCopy):
524 mapper.addMapping(item.key)
526 schema = mapper.getOutputSchema()
528 schema.setAliasMap(aliasMap)
530 newCat.extend(catalog, mapper=mapper)
536 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
538 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
542 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
543 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
548 if wcsPlugin
is not None:
549 wcsPlugin.measure(row, exposure)
550 if pcPlugin
is not None:
551 pcPlugin.measure(row, exposure)
556class PostprocessAnalysis(object):
557 """Calculate columns from DataFrames or handles storing DataFrames.
559 This object manages and organizes an arbitrary set of computations
560 on a catalog. The catalog is defined by a
561 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
562 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
563 computations are defined by a collection of
564 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
565 ``CompositeFunctor``).
567 After the object is initialized, accessing the ``.df`` attribute (which
568 holds the `pandas.DataFrame` containing the results of the calculations)
569 triggers computation of said dataframe.
571 One of the conveniences of using this object is the ability to define a
572 desired common filter for all functors. This enables the same functor
573 collection to be passed to several different `PostprocessAnalysis` objects
574 without having to change the original functor collection, since the ``filt``
575 keyword argument of this object triggers an overwrite of the ``filt``
576 property for all functors in the collection.
578 This object also allows a list of refFlags to be passed, and defines a set
579 of default refFlags that are always included even if not requested.
581 If a list of DataFrames or Handles is passed, rather than a single one,
582 then the calculations will be mapped over all the input catalogs. In
583 principle, it should be straightforward to parallelize this activity, but
584 initial tests have failed (see TODO in code comments).
588 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
589 `~lsst.pipe.base.InMemoryDatasetHandle` or
591 Source catalog(s) for computation.
592 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
593 Computations to do (functors that act on ``handles``).
594 If a dict, the output
595 DataFrame will have columns keyed accordingly.
596 If a list, the column keys will come from the
597 ``.shortname`` attribute of each functor.
599 filt : `str`, optional
600 Filter in which to calculate. If provided,
601 this will overwrite any existing ``.filt`` attribute
602 of the provided functors.
604 flags : `list`, optional
605 List of flags (per-band) to include in output table.
606 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
608 refFlags : `list`, optional
609 List of refFlags (only reference band) to include in output table.
611 forcedFlags : `list`, optional
612 List of flags (per-band) to include in output table.
613 Taken from the ``forced_src`` dataset if applied to a
614 multilevel Object Table. Intended for flags from measurement plugins
615 only run during multi-band forced-photometry.
617 _defaultRefFlags = []
620 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
621 self.handles = handles
622 self.functors = functors
625 self.flags = list(flags)
if flags
is not None else []
626 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
627 self.refFlags = list(self._defaultRefFlags)
628 if refFlags
is not None:
629 self.refFlags += list(refFlags)
634 def defaultFuncs(self):
635 funcs = dict(self._defaultFuncs)
640 additionalFuncs = self.defaultFuncs
641 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
642 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
643 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
645 if isinstance(self.functors, CompositeFunctor):
650 func.funcDict.update(additionalFuncs)
651 func.filt = self.filt
657 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
665 def compute(self, dropna=False, pool=None):
667 if type(self.handles)
in (list, tuple):
669 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
673 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
674 self._df = pd.concat(dflist)
676 self._df = self.func(self.handles, dropna=dropna)
681class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
683 """Expected Connections for subclasses of TransformCatalogBaseTask.
687 inputCatalog = connectionTypes.Input(
689 storageClass=
"DataFrame",
691 outputCatalog = connectionTypes.Output(
693 storageClass=
"ArrowAstropy",
697class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
698 pipelineConnections=TransformCatalogBaseConnections):
699 functorFile = pexConfig.Field(
701 doc=
"Path to YAML file specifying Science Data Model functors to use "
702 "when copying columns and computing calibrated values.",
706 primaryKey = pexConfig.Field(
708 doc=
"Name of column to be set as the DataFrame index. If None, the index"
709 "will be named `id`",
713 columnsFromDataId = pexConfig.ListField(
717 doc=
"Columns to extract from the dataId",
721class TransformCatalogBaseTask(pipeBase.PipelineTask):
722 """Base class for transforming/standardizing a catalog by applying functors
723 that convert units and apply calibrations.
725 The purpose of this task is to perform a set of computations on an input
726 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
727 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
728 a new dataset (which needs to be declared in an ``outputDataset``
731 The calculations to be performed are defined in a YAML file that specifies
732 a set of functors to be computed, provided as a ``--functorFile`` config
733 parameter. An example of such a YAML file is the following:
740 args: slot_Centroid_x
743 args: slot_Centroid_y
745 functor: LocalNanojansky
747 - slot_PsfFlux_instFlux
748 - slot_PsfFlux_instFluxErr
749 - base_LocalPhotoCalib
750 - base_LocalPhotoCalibErr
752 functor: LocalNanojanskyErr
754 - slot_PsfFlux_instFlux
755 - slot_PsfFlux_instFluxErr
756 - base_LocalPhotoCalib
757 - base_LocalPhotoCalibErr
722 """Base class for transforming/standardizing a catalog by applying functors …
761 The names for each entry under "func" will become the names of columns in
762 the output dataset. All the functors referenced are defined in
763 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
764 functor are in the `args` list, and any additional entries for each column
765 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
766 treated as keyword arguments to be passed to the functor initialization.
768 The "flags" entry is the default shortcut for `Column` functors.
769 All columns listed under "flags" will be copied to the output table
770 untransformed. They can be of any datatype.
771 In the special case of transforming a multi-level oject table with
772 band and dataset indices (deepCoadd_obj), these will be taked from the
773 ``meas`` dataset and exploded out per band.
775 There are two special shortcuts that only apply when transforming
776 multi-level Object (deepCoadd_obj) tables:
777 - The "refFlags" entry is shortcut for `Column` functor
778 taken from the ``ref`` dataset if transforming an ObjectTable.
779 - The "forcedFlags" entry is shortcut for `Column` functors.
780 taken from the ``forced_src`` dataset if transforming an ObjectTable.
781 These are expanded out per band.
784 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
785 to organize and excecute the calculations.
788 def _DefaultName(self):
789 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
792 def outputDataset(self):
793 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
796 def inputDataset(self):
797 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
800 def ConfigClass(self):
801 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
803 def __init__(self, *args, **kwargs):
804 super().__init__(*args, **kwargs)
805 if self.config.functorFile:
806 self.log.info(
"Loading tranform functor definitions from %s",
807 self.config.functorFile)
808 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
809 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
813 def runQuantum(self, butlerQC, inputRefs, outputRefs):
814 inputs = butlerQC.get(inputRefs)
815 if self.
funcs is None:
816 raise ValueError(
"config.functorFile is None. "
817 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
818 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
819 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
820 butlerQC.put(result, outputRefs)
822 def run(self, handle, funcs=None, dataId=None, band=None):
823 """Do postprocessing calculations
825 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
826 ``DataFrame`` object and dataId,
827 returns a dataframe with results of postprocessing calculations.
831 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
832 `~lsst.pipe.base.InMemoryDatasetHandle` or
833 `~pandas.DataFrame`, or list of these.
834 DataFrames from which calculations are done.
835 funcs : `~lsst.pipe.tasks.functors.Functor`
836 Functors to apply to the table's columns
837 dataId : dict, optional
838 Used to add a `patchId` column to the output dataframe.
839 band : `str`, optional
840 Filter band that is being processed.
844 result : `lsst.pipe.base.Struct`
845 Result struct, with a single ``outputCatalog`` attribute holding
846 the transformed catalog.
848 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
850 df = self.transform(band, handle, funcs, dataId).df
851 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
852 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
855 def getFunctors(self):
858 def getAnalysis(self, handles, funcs=None, band=None):
861 analysis = PostprocessAnalysis(handles, funcs, filt=band)
864 def transform(self, band, handles, funcs, dataId):
865 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
867 if dataId
and self.config.columnsFromDataId:
868 for key
in self.config.columnsFromDataId:
870 if key ==
"detector":
872 df[key] = np.int16(dataId[key])
874 df[key] = dataId[key]
876 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
878 if self.config.primaryKey:
879 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
880 df.reset_index(inplace=
True, drop=
True)
881 df.set_index(self.config.primaryKey, inplace=
True)
883 return pipeBase.Struct(
890 defaultTemplates={
"coaddName":
"deep"},
891 dimensions=(
"tract",
"patch",
"skymap")):
892 inputCatalog = connectionTypes.Input(
893 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
894 "stored as a DataFrame with a multi-level column index per-patch.",
895 dimensions=(
"tract",
"patch",
"skymap"),
896 storageClass=
"DataFrame",
897 name=
"{coaddName}Coadd_obj",
900 inputCatalogRef = connectionTypes.Input(
901 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
902 "for each detection in deepCoadd_mergeDet.",
903 dimensions=(
"tract",
"patch",
"skymap"),
904 storageClass=
"SourceCatalog",
905 name=
"{coaddName}Coadd_ref",
908 inputCatalogSersicMultiprofit = connectionTypes.Input(
909 doc=
"Catalog of source measurements on the deepCoadd.",
910 dimensions=(
"tract",
"patch",
"skymap"),
911 storageClass=
"ArrowAstropy",
912 name=
"{coaddName}Coadd_Sersic_multiprofit",
915 inputCatalogEpoch = connectionTypes.Input(
916 doc=
"Catalog of mean epochs for each object per band.",
917 dimensions=(
"tract",
"patch",
"skymap"),
918 storageClass=
"ArrowAstropy",
922 outputCatalog = connectionTypes.Output(
923 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
925 dimensions=(
"tract",
"patch",
"skymap"),
926 storageClass=
"ArrowAstropy",
930 def __init__(self, *, config=None):
931 super().__init__(config=config)
932 if config.multilevelOutput:
933 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
936class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
937 pipelineConnections=TransformObjectCatalogConnections):
938 coaddName = pexConfig.Field(
943 outputBands = pexConfig.ListField(
947 doc=(
"These bands and only these bands will appear in the output,"
948 " NaN-filled if the input does not include them."
949 " If None, then use all bands found in the input.")
951 camelCase = pexConfig.Field(
954 doc=(
"Write per-band columns names with camelCase, else underscore "
955 "For example: gPsFlux instead of g_PsFlux.")
957 multilevelOutput = pexConfig.Field(
960 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
961 "and name-munged (False). If True, the output storage class will be "
962 "set to DataFrame, since astropy tables do not support multi-level indexing."),
963 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
965 goodFlags = pexConfig.ListField(
968 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
969 "All other flags are considered to be 'bad' flags and will be set to True.")
971 floatFillValue = pexConfig.Field(
974 doc=
"Fill value for float fields when populating empty tables."
976 integerFillValue = pexConfig.Field(
979 doc=
"Fill value for integer fields when populating empty tables."
982 def setDefaults(self):
983 super().setDefaults()
984 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
985 self.primaryKey =
"objectId"
986 self.columnsFromDataId = [
"tract",
"patch"]
987 self.goodFlags = [
"calib_astrometry_used",
988 "calib_photometry_reserved",
989 "calib_photometry_used",
990 "calib_psf_candidate",
991 "calib_psf_reserved",
995class TransformObjectCatalogTask(TransformCatalogBaseTask):
996 """Produce a flattened Object Table to match the format specified in
999 Do the same set of postprocessing calculations on all bands.
1001 This is identical to `TransformCatalogBaseTask`, except for that it does
1002 the specified functor calculations for all filters present in the
1003 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1004 by the YAML file will be superceded.
1006 _DefaultName =
"transformObjectCatalog"
1007 ConfigClass = TransformObjectCatalogConfig
1009 datasets_multiband = (
"epoch",
"ref",
"Sersic_multiprofit")
1011 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1012 inputs = butlerQC.get(inputRefs)
1013 if self.funcs
is None:
1014 raise ValueError(
"config.functorFile is None. "
1015 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1016 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1017 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1018 handle_epoch=inputs[
"inputCatalogEpoch"],
1019 handle_ref=inputs[
"inputCatalogRef"],
1020 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1022 butlerQC.put(result, outputRefs)
1024 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1028 if isinstance(funcs, CompositeFunctor):
1029 funcDict_in = funcs.funcDict
1030 elif isinstance(funcs, dict):
1032 elif isinstance(funcs, list):
1033 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1035 raise TypeError(f
"Unsupported {type(funcs)=}")
1038 funcDicts_multiband = {}
1039 for dataset
in self.datasets_multiband:
1040 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1041 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1042 handles_multi[dataset] = handle_multi
1043 funcDicts_multiband[dataset] = {}
1047 templateDf = pd.DataFrame()
1049 columns = handle.get(component=
"columns")
1050 inputBands = columns.unique(level=1).values
1052 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1057 for name, func
in funcDict_in.items():
1058 if func.dataset
in funcDicts_multiband:
1060 if band := getattr(func,
"band_to_check",
None):
1061 if band
not in outputBands:
1064 elif hasattr(func,
"bands"):
1069 func.bands = tuple(inputBands)
1071 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1072 funcDict[name] = func
1077 for inputBand
in inputBands:
1078 if inputBand
not in outputBands:
1079 self.log.info(
"Ignoring %s band data in the input", inputBand)
1081 self.log.info(
"Transforming the catalog of band %s", inputBand)
1082 result = self.transform(inputBand, handle, funcs_band, dataId)
1083 dfDict[inputBand] = result.df
1084 analysisDict[inputBand] = result.analysis
1085 if templateDf.empty:
1086 templateDf = result.df
1089 for filt
in outputBands:
1090 if filt
not in dfDict:
1091 self.log.info(
"Adding empty columns for band %s", filt)
1092 dfTemp = templateDf.copy()
1093 for col
in dfTemp.columns:
1094 testValue = dfTemp[col].values[0]
1095 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1097 if col
in self.config.goodFlags:
1101 elif isinstance(testValue, numbers.Integral):
1105 if isinstance(testValue, np.unsignedinteger):
1106 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1108 fillValue = self.config.integerFillValue
1110 fillValue = self.config.floatFillValue
1111 dfTemp[col].values[:] = fillValue
1112 dfDict[filt] = dfTemp
1115 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1116 name_index = df.index.name
1119 if not self.config.multilevelOutput:
1120 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1121 if self.config.primaryKey
in noDupCols:
1122 noDupCols.remove(self.config.primaryKey)
1123 if dataId
and self.config.columnsFromDataId:
1124 noDupCols += self.config.columnsFromDataId
1125 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1126 inputBands=inputBands)
1129 for dataset, funcDict
in funcDicts_multiband.items():
1130 handle_multiband = handles_multi[dataset]
1131 df_dataset = handle_multiband.get()
1132 if isinstance(df_dataset, astropy.table.Table):
1134 if name_index
not in df_dataset.colnames:
1135 if self.config.primaryKey
in df_dataset.colnames:
1136 name_index_ap = self.config.primaryKey
1139 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1140 f
" {df_dataset.colnames=} for {dataset=}"
1143 name_index_ap = name_index
1144 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1146 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1149 result = self.transform(
1151 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1155 result.df.index.name = name_index
1157 if self.config.columnsFromDataId:
1158 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1160 result.df.drop(columns_drop, axis=1, inplace=
True)
1164 to_concat = pd.concat(
1165 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1166 )
if self.config.multilevelOutput
else result.df
1167 df = pd.concat([df, to_concat], axis=1)
1168 analysisDict[dataset] = result.analysis
1171 df.index.name = self.config.primaryKey
1173 if not self.config.multilevelOutput:
1174 tbl = pandas_to_astropy(df)
1178 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1180 return pipeBase.Struct(outputCatalog=tbl)
1183class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1184 dimensions=(
"tract",
"skymap")):
1185 inputCatalogs = connectionTypes.Input(
1186 doc=
"Per-Patch objectTables conforming to the standard data model.",
1188 storageClass=
"ArrowAstropy",
1189 dimensions=(
"tract",
"patch",
"skymap"),
1193 outputCatalog = connectionTypes.Output(
1194 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1195 name=
"objectTable_tract",
1196 storageClass=
"ArrowAstropy",
1197 dimensions=(
"tract",
"skymap"),
1201class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1202 pipelineConnections=ConsolidateObjectTableConnections):
1203 coaddName = pexConfig.Field(
1210class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1211 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1213 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1215 _DefaultName =
"consolidateObjectTable"
1216 ConfigClass = ConsolidateObjectTableConfig
1218 inputDataset =
"objectTable"
1219 outputDataset =
"objectTable_tract"
1221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1222 inputs = butlerQC.get(inputRefs)
1223 self.log.info(
"Concatenating %s per-patch Object Tables",
1224 len(inputs[
"inputCatalogs"]))
1225 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1226 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1229class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1230 defaultTemplates={
"catalogType":
""},
1231 dimensions=(
"instrument",
"visit",
"detector")):
1233 inputCatalog = connectionTypes.Input(
1234 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1235 name=
"{catalogType}source",
1236 storageClass=
"DataFrame",
1237 dimensions=(
"instrument",
"visit",
"detector"),
1240 outputCatalog = connectionTypes.Output(
1241 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1242 "specified set of functors",
1243 name=
"{catalogType}sourceTable",
1244 storageClass=
"ArrowAstropy",
1245 dimensions=(
"instrument",
"visit",
"detector")
1249class TransformSourceTableConfig(TransformCatalogBaseConfig,
1250 pipelineConnections=TransformSourceTableConnections):
1252 def setDefaults(self):
1253 super().setDefaults()
1254 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1255 self.primaryKey =
"sourceId"
1256 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1259class TransformSourceTableTask(TransformCatalogBaseTask):
1260 """Transform/standardize a source catalog
1262 _DefaultName =
"transformSourceTable"
1263 ConfigClass = TransformSourceTableConfig
1266class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1267 dimensions=(
"instrument",
"visit",),
1268 defaultTemplates={
"calexpType":
""}):
1269 calexp = connectionTypes.Input(
1270 doc=
"Processed exposures used for metadata",
1272 storageClass=
"ExposureF",
1273 dimensions=(
"instrument",
"visit",
"detector"),
1277 visitSummary = connectionTypes.Output(
1278 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1279 "detector id for the id and are sorted for fast lookups of a "
1281 name=
"visitSummary",
1282 storageClass=
"ExposureCatalog",
1283 dimensions=(
"instrument",
"visit"),
1285 visitSummarySchema = connectionTypes.InitOutput(
1286 doc=
"Schema of the visitSummary catalog",
1287 name=
"visitSummary_schema",
1288 storageClass=
"ExposureCatalog",
1292class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1293 pipelineConnections=ConsolidateVisitSummaryConnections):
1294 """Config for ConsolidateVisitSummaryTask"""
1296 full = pexConfig.Field(
1297 "Whether to propate all exposure components. "
1298 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1299 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1300 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1306class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1307 """Task to consolidate per-detector visit metadata.
1309 This task aggregates the following metadata from all the detectors in a
1310 single visit into an exposure catalog:
1314 - The physical_filter and band (if available).
1317 - The aperture correction map.
1318 - The transmission curve.
1319 - The psf size, shape, and effective area at the center of the detector.
1320 - The corners of the bounding box in right ascension/declination.
1322 Tests for this task are performed in ci_hsc_gen3.
1324 _DefaultName =
"consolidateVisitSummary"
1325 ConfigClass = ConsolidateVisitSummaryConfig
1327 def __init__(self, **kwargs):
1328 super().__init__(**kwargs)
1329 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1330 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1331 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1332 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1333 ExposureSummaryStats.update_schema(self.schema)
1336 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1337 dataRefs = butlerQC.get(inputRefs.calexp)
1338 visit = dataRefs[0].dataId[
"visit"]
1340 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1341 len(dataRefs), visit)
1343 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1345 butlerQC.put(expCatalog, outputRefs.visitSummary)
1347 def _combineExposureMetadata(self, visit, dataRefs):
1348 """Make a combined exposure catalog from a list of dataRefs.
1349 These dataRefs must point to exposures with wcs, summaryStats,
1350 and other visit metadata.
1355 Visit identification number.
1356 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1357 List of dataRefs in visit.
1361 visitSummary : `lsst.afw.table.ExposureCatalog`
1362 Exposure catalog with per-detector summary information.
1365 cat.resize(len(dataRefs))
1367 cat[
"visit"] = visit
1369 for i, dataRef
in enumerate(dataRefs):
1370 visitInfo = dataRef.get(component=
"visitInfo")
1371 filterLabel = dataRef.get(component=
"filter")
1372 summaryStats = dataRef.get(component=
"summaryStats")
1373 detector = dataRef.get(component=
"detector")
1374 wcs = dataRef.get(component=
"wcs")
1375 photoCalib = dataRef.get(component=
"photoCalib")
1376 bbox = dataRef.get(component=
"bbox")
1377 validPolygon = dataRef.get(component=
"validPolygon")
1381 rec.setVisitInfo(visitInfo)
1383 rec.setPhotoCalib(photoCalib)
1384 rec.setValidPolygon(validPolygon)
1386 if self.config.full:
1387 rec.setDetector(dataRef.get(component=
"detector"))
1388 rec.setPsf(dataRef.get(component=
"psf"))
1389 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1390 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1392 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1393 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1394 rec.setId(detector.getId())
1395 summaryStats.update_record(rec)
1398 raise pipeBase.NoWorkFound(
1399 "No detectors had sufficient information to make a visit summary row."
1403 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1405 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1406 cat.setMetadata(metadata)
1412class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1413 defaultTemplates={
"catalogType":
""},
1414 dimensions=(
"instrument",
"visit")):
1415 inputCatalogs = connectionTypes.Input(
1416 doc=
"Input per-detector Source Tables",
1417 name=
"{catalogType}sourceTable",
1418 storageClass=
"ArrowAstropy",
1419 dimensions=(
"instrument",
"visit",
"detector"),
1423 outputCatalog = connectionTypes.Output(
1424 doc=
"Per-visit concatenation of Source Table",
1425 name=
"{catalogType}sourceTable_visit",
1426 storageClass=
"ArrowAstropy",
1427 dimensions=(
"instrument",
"visit")
1431class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1432 pipelineConnections=ConsolidateSourceTableConnections):
1436class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1437 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1439 _DefaultName =
"consolidateSourceTable"
1440 ConfigClass = ConsolidateSourceTableConfig
1442 inputDataset =
"sourceTable"
1443 outputDataset =
"sourceTable_visit"
1445 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1446 from .makeWarp
import reorderRefs
1448 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1449 detectorOrder.sort()
1450 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1451 inputs = butlerQC.get(inputRefs)
1452 self.log.info(
"Concatenating %s per-detector Source Tables",
1453 len(inputs[
"inputCatalogs"]))
1454 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1455 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1458class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1459 dimensions=(
"instrument",),
1460 defaultTemplates={
"calexpType":
""}):
1461 visitSummaryRefs = connectionTypes.Input(
1462 doc=
"Data references for per-visit consolidated exposure metadata",
1463 name=
"finalVisitSummary",
1464 storageClass=
"ExposureCatalog",
1465 dimensions=(
"instrument",
"visit"),
1469 outputCatalog = connectionTypes.Output(
1470 doc=
"CCD and Visit metadata table",
1471 name=
"ccdVisitTable",
1472 storageClass=
"ArrowAstropy",
1473 dimensions=(
"instrument",)
1477class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1478 pipelineConnections=MakeCcdVisitTableConnections):
1479 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1482class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1483 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1485 _DefaultName =
"makeCcdVisitTable"
1486 ConfigClass = MakeCcdVisitTableConfig
1488 def run(self, visitSummaryRefs):
1489 """Make a table of ccd information from the visit summary catalogs.
1493 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1494 List of DeferredDatasetHandles pointing to exposure catalogs with
1495 per-detector summary information.
1499 result : `~lsst.pipe.base.Struct`
1500 Results struct with attribute:
1503 Catalog of ccd and visit information.
1506 for visitSummaryRef
in visitSummaryRefs:
1507 visitSummary = visitSummaryRef.get()
1508 if not visitSummary:
1510 visitInfo = visitSummary[0].getVisitInfo()
1513 strip_provenance_from_fits_header(visitSummary.metadata)
1516 summaryTable = visitSummary.asAstropy()
1517 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1518 "pixelScale",
"zenithDistance",
1519 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1520 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1521 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1522 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1523 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1524 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1525 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1526 "maxDistToNearestPsf",
1527 "effTime",
"effTimePsfSigmaScale",
1528 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1530 ccdEntry = summaryTable[selectColumns]
1535 ccdEntry.rename_column(
"visit",
"visitId")
1536 ccdEntry.rename_column(
"id",
"detectorId")
1540 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1542 ccdEntry[
"ccdVisitId"] = [
1543 self.config.idGenerator.apply(
1544 visitSummaryRef.dataId,
1545 detector=detector_id,
1552 for detector_id
in summaryTable[
"id"]
1554 ccdEntry[
"detector"] = summaryTable[
"id"]
1555 ccdEntry[
"seeing"] = (
1556 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1558 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1559 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1560 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1561 expTime = visitInfo.getExposureTime()
1562 ccdEntry[
"obsStart"] = (
1563 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1565 expTime_days = expTime / (60*60*24)
1566 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1567 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1568 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1569 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1570 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1571 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1572 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1573 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1574 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1575 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1576 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1577 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1581 ccdEntries.append(ccdEntry)
1583 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1584 return pipeBase.Struct(outputCatalog=outputCatalog)
1587class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1588 dimensions=(
"instrument",),
1589 defaultTemplates={
"calexpType":
""}):
1590 visitSummaries = connectionTypes.Input(
1591 doc=
"Per-visit consolidated exposure metadata",
1592 name=
"finalVisitSummary",
1593 storageClass=
"ExposureCatalog",
1594 dimensions=(
"instrument",
"visit",),
1598 outputCatalog = connectionTypes.Output(
1599 doc=
"Visit metadata table",
1601 storageClass=
"ArrowAstropy",
1602 dimensions=(
"instrument",)
1606class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1607 pipelineConnections=MakeVisitTableConnections):
1611class MakeVisitTableTask(pipeBase.PipelineTask):
1612 """Produce a `visitTable` from the visit summary exposure catalogs.
1614 _DefaultName =
"makeVisitTable"
1615 ConfigClass = MakeVisitTableConfig
1617 def run(self, visitSummaries):
1618 """Make a table of visit information from the visit summary catalogs.
1622 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1623 List of exposure catalogs with per-detector summary information.
1626 result : `~lsst.pipe.base.Struct`
1627 Results struct with attribute:
1630 Catalog of visit information.
1633 for visitSummary
in visitSummaries:
1634 visitSummary = visitSummary.get()
1635 if not visitSummary:
1637 visitRow = visitSummary[0]
1638 visitInfo = visitRow.getVisitInfo()
1641 visitEntry[
"visitId"] = visitRow[
"visit"]
1642 visitEntry[
"visit"] = visitRow[
"visit"]
1643 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1644 visitEntry[
"band"] = visitRow[
"band"]
1645 raDec = visitInfo.getBoresightRaDec()
1646 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1647 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1651 visitEntry[
"decl"] = visitEntry[
"dec"]
1653 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1654 azAlt = visitInfo.getBoresightAzAlt()
1655 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1656 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1657 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1658 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1659 expTime = visitInfo.getExposureTime()
1660 visitEntry[
"expTime"] = expTime
1661 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1662 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1663 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1664 expTime_days = expTime / (60*60*24)
1665 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1666 visitEntries.append(visitEntry)
1672 outputCatalog = astropy.table.Table(rows=visitEntries)
1673 return pipeBase.Struct(outputCatalog=outputCatalog)
1676class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1677 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1679 inputCatalog = connectionTypes.Input(
1680 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1681 "By default, it is the output of ForcedPhotCcdTask on calexps",
1683 storageClass=
"SourceCatalog",
1684 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1686 inputCatalogDiff = connectionTypes.Input(
1687 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1688 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1690 storageClass=
"SourceCatalog",
1691 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1693 outputCatalog = connectionTypes.Output(
1694 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1695 name=
"mergedForcedSource",
1696 storageClass=
"DataFrame",
1697 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1701class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1702 pipelineConnections=WriteForcedSourceTableConnections):
1704 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1710class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1711 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1713 Because the predecessor ForcedPhotCcdTask operates per-detector,
1714 per-tract, (i.e., it has tract in its dimensions), detectors
1715 on the tract boundary may have multiple forced source catalogs.
1717 The successor task TransformForcedSourceTable runs per-patch
1718 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1719 available multiple epochs.
1721 _DefaultName =
"writeForcedSourceTable"
1722 ConfigClass = WriteForcedSourceTableConfig
1724 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1725 inputs = butlerQC.get(inputRefs)
1726 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1727 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1728 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1729 outputs = self.run(**inputs)
1730 butlerQC.put(outputs, outputRefs)
1732 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1734 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1735 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1736 df = df.reindex(sorted(df.columns), axis=1)
1739 df[
"detector"] = np.int16(detector)
1740 df[
"band"] = band
if band
else pd.NA
1741 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1742 names=(
"dataset",
"column"))
1746 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1747 return pipeBase.Struct(outputCatalog=outputCatalog)
1750class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1751 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1753 inputCatalogs = connectionTypes.Input(
1754 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1755 name=
"mergedForcedSource",
1756 storageClass=
"DataFrame",
1757 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1761 referenceCatalog = connectionTypes.Input(
1762 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1763 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1766 storageClass=
"DataFrame",
1767 dimensions=(
"tract",
"patch",
"skymap"),
1770 outputCatalog = connectionTypes.Output(
1771 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1772 "specified set of functors",
1773 name=
"forcedSourceTable",
1774 storageClass=
"ArrowAstropy",
1775 dimensions=(
"tract",
"patch",
"skymap")
1779class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1780 pipelineConnections=TransformForcedSourceTableConnections):
1781 referenceColumns = pexConfig.ListField(
1783 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1785 doc=
"Columns to pull from reference catalog",
1788 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1793 doc=
"Rename the output DataFrame index to this name",
1795 default=
"forcedSourceId",
1798 def setDefaults(self):
1799 super().setDefaults()
1800 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1801 self.columnsFromDataId = [
"tract",
"patch"]
1804class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1805 """Transform/standardize a ForcedSource catalog
1807 Transforms each wide, per-detector forcedSource DataFrame per the
1808 specification file (per-camera defaults found in ForcedSource.yaml).
1809 All epochs that overlap the patch are aggregated into one per-patch
1810 narrow-DataFrame file.
1812 No de-duplication of rows is performed. Duplicate resolutions flags are
1813 pulled in from the referenceCatalog: `detect_isPrimary`,
1814 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1815 for analysis or compare duplicates for QA.
1817 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1818 per-visit rows can be retreived by joining with the CcdVisitTable on
1821 _DefaultName =
"transformForcedSourceTable"
1822 ConfigClass = TransformForcedSourceTableConfig
1824 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1825 inputs = butlerQC.get(inputRefs)
1826 if self.funcs
is None:
1827 raise ValueError(
"config.functorFile is None. "
1828 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1829 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1830 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1832 butlerQC.put(outputs, outputRefs)
1834 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1836 refColumns = list(self.config.referenceColumns)
1837 refColumns.append(self.config.keyRef)
1838 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1839 if ref.index.name != self.config.keyRef:
1845 ref.set_index(self.config.keyRef, inplace=
True)
1846 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1847 for handle
in inputCatalogs:
1848 result = self.transform(
None, handle, funcs, dataId)
1850 dfs.append(result.df.join(ref, how=
"inner"))
1852 outputCatalog = pd.concat(dfs)
1854 if outputCatalog.empty:
1855 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1859 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1861 outputCatalog.reset_index(inplace=
True)
1864 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1866 outputCatalog.index.rename(self.config.key, inplace=
True)
1868 self.log.info(
"Made a table of %d columns and %d rows",
1869 len(outputCatalog.columns), len(outputCatalog))
1870 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1873class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1874 defaultTemplates={
"catalogType":
""},
1875 dimensions=(
"instrument",
"tract")):
1876 inputCatalogs = connectionTypes.Input(
1877 doc=
"Input per-patch DataFrame Tables to be concatenated",
1878 name=
"{catalogType}ForcedSourceTable",
1879 storageClass=
"DataFrame",
1880 dimensions=(
"tract",
"patch",
"skymap"),
1884 outputCatalog = connectionTypes.Output(
1885 doc=
"Output per-tract concatenation of DataFrame Tables",
1886 name=
"{catalogType}ForcedSourceTable_tract",
1887 storageClass=
"DataFrame",
1888 dimensions=(
"tract",
"skymap"),
1892class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1893 pipelineConnections=ConsolidateTractConnections):
1897class ConsolidateTractTask(pipeBase.PipelineTask):
1898 """Concatenate any per-patch, dataframe list into a single
1899 per-tract DataFrame.
1901 _DefaultName =
"ConsolidateTract"
1902 ConfigClass = ConsolidateTractConfig
1904 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1905 inputs = butlerQC.get(inputRefs)
1908 self.log.info(
"Concatenating %s per-patch %s Tables",
1909 len(inputs[
"inputCatalogs"]),
1910 inputRefs.inputCatalogs[0].datasetType.name)
1911 df = pd.concat(inputs[
"inputCatalogs"])
1912 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)