22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
48from astro_metadata_translator.headers
import merge_headers
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
57from lsst.afw.image
import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header
61from .functors
import CompositeFunctor, Column
63log = logging.getLogger(__name__)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
69 newDf = pd.DataFrame()
71 dfBands = df.columns.unique(level=0).values
74 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c
in subdf.columns
if c
not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
81 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
89 """A helper class for stacking astropy tables without having them all in
95 Full size of the final table.
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
127 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
128 return cls(capacity=capacity)
131 """Add a single table to the stack.
135 table : `astropy.table.Table`
136 An astropy table instance.
139 self.
result = astropy.table.Table()
140 for name
in table.colnames:
142 column_cls = type(column)
143 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
144 self.
result[name][:len(table)] = column
145 self.
index = len(table)
146 self.
result.meta = table.meta.copy()
148 next_index = self.
index + len(table)
149 if set(self.
result.colnames) != set(table.colnames):
151 "Inconsistent columns in concatentation: "
152 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
154 for name
in table.colnames:
155 out_col = self.
result[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.
result[name][self.
index:next_index] = table[name]
160 self.
index = next_index
164 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
165 strip_provenance_from_fits_header(self.
result.meta)
169 """Vertically stack tables represented by deferred dataset handles.
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
184 handles = tuple(handles)
186 rowcount = tuple(handle.get(component=
"rowcount")
for handle
in handles)
187 handles = tuple(handle
for handle, count
in zip(handles, rowcount)
if count > 0)
190 for handle
in handles:
191 vstack.extend(handle.get())
196 defaultTemplates={
"coaddName":
"deep"},
197 dimensions=(
"tract",
"patch",
"skymap")):
198 inputCatalogMeas = connectionTypes.Input(
199 doc=
"Catalog of source measurements on the deepCoadd.",
200 dimensions=(
"tract",
"patch",
"band",
"skymap"),
201 storageClass=
"SourceCatalog",
202 name=
"{coaddName}Coadd_meas",
205 inputCatalogForcedSrc = connectionTypes.Input(
206 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
207 dimensions=(
"tract",
"patch",
"band",
"skymap"),
208 storageClass=
"SourceCatalog",
209 name=
"{coaddName}Coadd_forced_src",
212 inputCatalogPsfsMultiprofit = connectionTypes.Input(
213 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
214 dimensions=(
"tract",
"patch",
"band",
"skymap"),
215 storageClass=
"ArrowAstropy",
216 name=
"{coaddName}Coadd_psfs_multiprofit",
219 outputCatalog = connectionTypes.Output(
220 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
221 "stored as a DataFrame with a multi-level column index per-patch.",
222 dimensions=(
"tract",
"patch",
"skymap"),
223 storageClass=
"DataFrame",
224 name=
"{coaddName}Coadd_obj"
228class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
229 pipelineConnections=WriteObjectTableConnections):
230 coaddName = pexConfig.Field(
237class WriteObjectTableTask(pipeBase.PipelineTask):
238 """Write filter-merged object tables as a DataFrame in parquet format.
240 _DefaultName =
"writeObjectTable"
241 ConfigClass = WriteObjectTableConfig
244 outputDataset =
"obj"
246 def runQuantum(self, butlerQC, inputRefs, outputRefs):
247 inputs = butlerQC.get(inputRefs)
249 catalogs = defaultdict(dict)
250 for dataset, connection
in (
251 (
"meas",
"inputCatalogMeas"),
252 (
"forced_src",
"inputCatalogForcedSrc"),
253 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
255 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
256 catalogs[ref.dataId[
"band"]][dataset] = cat
258 dataId = butlerQC.quantum.dataId
259 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
260 outputs = pipeBase.Struct(outputCatalog=df)
261 butlerQC.put(outputs, outputRefs)
263 def run(self, catalogs, tract, patch):
264 """Merge multiple catalogs.
269 Mapping from filter names to dict of catalogs.
271 tractId to use for the tractId column.
273 patchId to use for the patchId column.
277 catalog : `pandas.DataFrame`
283 Raised if any of the catalogs is of an unsupported type.
286 for filt, tableDict
in catalogs.items():
287 for dataset, table
in tableDict.items():
289 if isinstance(table, pd.DataFrame):
292 df = table.asAstropy().to_pandas()
293 elif isinstance(table, astropy.table.Table):
294 df = table.to_pandas()
296 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
297 df.set_index(
"id", drop=
True, inplace=
True)
300 df = df.reindex(sorted(df.columns), axis=1)
301 df = df.assign(tractId=tract, patchId=patch)
304 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
305 names=(
"dataset",
"band",
"column"))
310 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
314class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
315 defaultTemplates={
"catalogType":
""},
316 dimensions=(
"instrument",
"visit",
"detector")):
318 catalog = connectionTypes.Input(
319 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
320 name=
"{catalogType}src",
321 storageClass=
"SourceCatalog",
322 dimensions=(
"instrument",
"visit",
"detector")
324 outputCatalog = connectionTypes.Output(
325 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
326 name=
"{catalogType}source",
327 storageClass=
"ArrowAstropy",
328 dimensions=(
"instrument",
"visit",
"detector")
332class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
333 pipelineConnections=WriteSourceTableConnections):
337class WriteSourceTableTask(pipeBase.PipelineTask):
338 """Write source table to DataFrame Parquet format.
340 _DefaultName =
"writeSourceTable"
341 ConfigClass = WriteSourceTableConfig
343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
344 inputs = butlerQC.get(inputRefs)
345 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
346 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
347 result = self.run(**inputs)
348 outputs = pipeBase.Struct(outputCatalog=result.table)
349 butlerQC.put(outputs, outputRefs)
351 def run(self, catalog, visit, detector, **kwargs):
352 """Convert `src` catalog to an Astropy table.
356 catalog: `afwTable.SourceCatalog`
357 catalog to be converted
358 visit, detector: `int`
359 Visit and detector ids to be added as columns.
361 Additional keyword arguments are ignored as a convenience for
362 subclasses that pass the same arguments to several different
367 result : `~lsst.pipe.base.Struct`
369 `astropy.table.Table` version of the input catalog
371 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
372 tbl = catalog.asAstropy()
375 tbl[
"detector"] = np.int16(detector)
377 return pipeBase.Struct(table=tbl)
380class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
381 defaultTemplates={
"catalogType":
""},
382 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
383 visitSummary = connectionTypes.Input(
384 doc=
"Input visit-summary catalog with updated calibration objects.",
385 name=
"finalVisitSummary",
386 storageClass=
"ExposureCatalog",
387 dimensions=(
"instrument",
"visit",),
390 def __init__(self, config):
398 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
401class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
402 pipelineConnections=WriteRecalibratedSourceTableConnections):
404 doReevaluatePhotoCalib = pexConfig.Field(
407 doc=(
"Add or replace local photoCalib columns"),
409 doReevaluateSkyWcs = pexConfig.Field(
412 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
416class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
417 """Write source table to DataFrame Parquet format.
419 _DefaultName =
"writeRecalibratedSourceTable"
420 ConfigClass = WriteRecalibratedSourceTableConfig
422 def runQuantum(self, butlerQC, inputRefs, outputRefs):
423 inputs = butlerQC.get(inputRefs)
425 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
426 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
428 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
429 exposure = ExposureF()
430 inputs[
"exposure"] = self.prepareCalibratedExposure(
432 visitSummary=inputs[
"visitSummary"],
433 detectorId=butlerQC.quantum.dataId[
"detector"]
435 inputs[
"catalog"] = self.addCalibColumns(**inputs)
437 result = self.run(**inputs)
438 outputs = pipeBase.Struct(outputCatalog=result.table)
439 butlerQC.put(outputs, outputRefs)
441 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
442 """Prepare a calibrated exposure and apply external calibrations
447 exposure : `lsst.afw.image.exposure.Exposure`
448 Input exposure to adjust calibrations. May be an empty Exposure.
450 Detector ID associated with the exposure.
451 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
452 Exposure catalog with all calibration objects. WCS and PhotoCalib
453 are always applied if ``visitSummary`` is provided and those
454 components are not `None`.
458 exposure : `lsst.afw.image.exposure.Exposure`
459 Exposure with adjusted calibrations.
461 if visitSummary
is not None:
462 row = visitSummary.find(detectorId)
464 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
465 if (photoCalib := row.getPhotoCalib())
is None:
466 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
467 "skipping reevaluation of photoCalib.", detectorId)
468 exposure.setPhotoCalib(
None)
470 exposure.setPhotoCalib(photoCalib)
471 if (skyWcs := row.getWcs())
is None:
472 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
473 "skipping reevaluation of skyWcs.", detectorId)
474 exposure.setWcs(
None)
476 exposure.setWcs(skyWcs)
480 def addCalibColumns(self, catalog, exposure, **kwargs):
481 """Add replace columns with calibs evaluated at each centroid
483 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
484 a source catalog, by rerunning the plugins.
488 catalog : `lsst.afw.table.SourceCatalog`
489 catalog to which calib columns will be added
490 exposure : `lsst.afw.image.exposure.Exposure`
491 Exposure with attached PhotoCalibs and SkyWcs attributes to be
492 reevaluated at local centroids. Pixels are not required.
494 Additional keyword arguments are ignored to facilitate passing the
495 same arguments to several methods.
499 newCat: `lsst.afw.table.SourceCatalog`
500 Source Catalog with requested local calib columns
502 measureConfig = SingleFrameMeasurementTask.ConfigClass()
503 measureConfig.doReplaceWithNoise =
False
506 for slot
in measureConfig.slots:
507 setattr(measureConfig.slots, slot,
None)
509 measureConfig.plugins.names = []
510 if self.config.doReevaluateSkyWcs:
511 measureConfig.plugins.names.add(
"base_LocalWcs")
512 self.log.info(
"Re-evaluating base_LocalWcs plugin")
513 if self.config.doReevaluatePhotoCalib:
514 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
515 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
516 pluginsNotToCopy = tuple(measureConfig.plugins.names)
520 aliasMap = catalog.schema.getAliasMap()
522 for item
in catalog.schema:
523 if not item.field.getName().startswith(pluginsNotToCopy):
524 mapper.addMapping(item.key)
526 schema = mapper.getOutputSchema()
528 schema.setAliasMap(aliasMap)
530 newCat.extend(catalog, mapper=mapper)
536 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
538 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
542 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
543 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
548 if wcsPlugin
is not None:
549 wcsPlugin.measure(row, exposure)
550 if pcPlugin
is not None:
551 pcPlugin.measure(row, exposure)
556class PostprocessAnalysis(object):
557 """Calculate columns from DataFrames or handles storing DataFrames.
559 This object manages and organizes an arbitrary set of computations
560 on a catalog. The catalog is defined by a
561 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
562 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
563 computations are defined by a collection of
564 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
565 ``CompositeFunctor``).
567 After the object is initialized, accessing the ``.df`` attribute (which
568 holds the `pandas.DataFrame` containing the results of the calculations)
569 triggers computation of said dataframe.
571 One of the conveniences of using this object is the ability to define a
572 desired common filter for all functors. This enables the same functor
573 collection to be passed to several different `PostprocessAnalysis` objects
574 without having to change the original functor collection, since the ``filt``
575 keyword argument of this object triggers an overwrite of the ``filt``
576 property for all functors in the collection.
578 This object also allows a list of refFlags to be passed, and defines a set
579 of default refFlags that are always included even if not requested.
581 If a list of DataFrames or Handles is passed, rather than a single one,
582 then the calculations will be mapped over all the input catalogs. In
583 principle, it should be straightforward to parallelize this activity, but
584 initial tests have failed (see TODO in code comments).
588 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
589 `~lsst.pipe.base.InMemoryDatasetHandle` or
591 Source catalog(s) for computation.
592 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
593 Computations to do (functors that act on ``handles``).
594 If a dict, the output
595 DataFrame will have columns keyed accordingly.
596 If a list, the column keys will come from the
597 ``.shortname`` attribute of each functor.
599 filt : `str`, optional
600 Filter in which to calculate. If provided,
601 this will overwrite any existing ``.filt`` attribute
602 of the provided functors.
604 flags : `list`, optional
605 List of flags (per-band) to include in output table.
606 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
608 refFlags : `list`, optional
609 List of refFlags (only reference band) to include in output table.
611 forcedFlags : `list`, optional
612 List of flags (per-band) to include in output table.
613 Taken from the ``forced_src`` dataset if applied to a
614 multilevel Object Table. Intended for flags from measurement plugins
615 only run during multi-band forced-photometry.
617 _defaultRefFlags = []
620 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
621 self.handles = handles
622 self.functors = functors
625 self.flags = list(flags)
if flags
is not None else []
626 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
627 self.refFlags = list(self._defaultRefFlags)
628 if refFlags
is not None:
629 self.refFlags += list(refFlags)
634 def defaultFuncs(self):
635 funcs = dict(self._defaultFuncs)
640 additionalFuncs = self.defaultFuncs
641 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
642 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
643 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
645 if isinstance(self.functors, CompositeFunctor):
650 func.funcDict.update(additionalFuncs)
651 func.filt = self.filt
657 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
665 def compute(self, dropna=False, pool=None):
667 if type(self.handles)
in (list, tuple):
669 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
673 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
674 self._df = pd.concat(dflist)
676 self._df = self.func(self.handles, dropna=dropna)
681class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
683 """Expected Connections for subclasses of TransformCatalogBaseTask.
687 inputCatalog = connectionTypes.Input(
689 storageClass=
"DataFrame",
691 outputCatalog = connectionTypes.Output(
693 storageClass=
"ArrowAstropy",
697class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
698 pipelineConnections=TransformCatalogBaseConnections):
699 functorFile = pexConfig.Field(
701 doc=
"Path to YAML file specifying Science Data Model functors to use "
702 "when copying columns and computing calibrated values.",
706 primaryKey = pexConfig.Field(
708 doc=
"Name of column to be set as the DataFrame index. If None, the index"
709 "will be named `id`",
713 columnsFromDataId = pexConfig.ListField(
717 doc=
"Columns to extract from the dataId",
721class TransformCatalogBaseTask(pipeBase.PipelineTask):
722 """Base class for transforming/standardizing a catalog by applying functors
723 that convert units and apply calibrations.
725 The purpose of this task is to perform a set of computations on an input
726 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
727 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
728 a new dataset (which needs to be declared in an ``outputDataset``
731 The calculations to be performed are defined in a YAML file that specifies
732 a set of functors to be computed, provided as a ``--functorFile`` config
733 parameter. An example of such a YAML file is the following:
740 args: slot_Centroid_x
743 args: slot_Centroid_y
745 functor: LocalNanojansky
747 - slot_PsfFlux_instFlux
748 - slot_PsfFlux_instFluxErr
749 - base_LocalPhotoCalib
750 - base_LocalPhotoCalibErr
752 functor: LocalNanojanskyErr
754 - slot_PsfFlux_instFlux
755 - slot_PsfFlux_instFluxErr
756 - base_LocalPhotoCalib
757 - base_LocalPhotoCalibErr
761 The names for each entry under "func" will become the names of columns in
762 the output dataset. All the functors referenced are defined in
763 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
764 functor are in the `args` list, and any additional entries for each column
765 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
766 treated as keyword arguments to be passed to the functor initialization.
768 The "flags" entry is the default shortcut for `Column` functors.
769 All columns listed under "flags" will be copied to the output table
770 untransformed. They can be of any datatype.
771 In the special case of transforming a multi-level oject table with
772 band and dataset indices (deepCoadd_obj), these will be taked from the
773 ``meas`` dataset and exploded out per band.
775 There are two special shortcuts that only apply when transforming
776 multi-level Object (deepCoadd_obj) tables:
777 - The "refFlags" entry is shortcut for `Column` functor
778 taken from the ``ref`` dataset if transforming an ObjectTable.
779 - The "forcedFlags" entry is shortcut for `Column` functors.
780 taken from the ``forced_src`` dataset if transforming an ObjectTable.
781 These are expanded out per band.
784 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
785 to organize and excecute the calculations.
788 def _DefaultName(self):
789 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
792 def outputDataset(self):
793 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
796 def inputDataset(self):
797 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
800 def ConfigClass(self):
801 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
803 def __init__(self, *args, **kwargs):
804 super().__init__(*args, **kwargs)
805 if self.config.functorFile:
806 self.log.info(
"Loading tranform functor definitions from %s",
807 self.config.functorFile)
808 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
809 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
813 def runQuantum(self, butlerQC, inputRefs, outputRefs):
814 inputs = butlerQC.get(inputRefs)
815 if self.
funcs is None:
816 raise ValueError(
"config.functorFile is None. "
817 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
818 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
819 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
820 butlerQC.put(result, outputRefs)
822 def run(self, handle, funcs=None, dataId=None, band=None):
823 """Do postprocessing calculations
825 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
826 ``DataFrame`` object and dataId,
827 returns a dataframe with results of postprocessing calculations.
831 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
832 `~lsst.pipe.base.InMemoryDatasetHandle` or
833 `~pandas.DataFrame`, or list of these.
834 DataFrames from which calculations are done.
835 funcs : `~lsst.pipe.tasks.functors.Functor`
836 Functors to apply to the table's columns
837 dataId : dict, optional
838 Used to add a `patchId` column to the output dataframe.
839 band : `str`, optional
840 Filter band that is being processed.
844 result : `lsst.pipe.base.Struct`
845 Result struct, with a single ``outputCatalog`` attribute holding
846 the transformed catalog.
848 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
850 df = self.transform(band, handle, funcs, dataId).df
851 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
852 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
855 def getFunctors(self):
858 def getAnalysis(self, handles, funcs=None, band=None):
861 analysis = PostprocessAnalysis(handles, funcs, filt=band)
864 def transform(self, band, handles, funcs, dataId):
865 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
867 if dataId
and self.config.columnsFromDataId:
868 for key
in self.config.columnsFromDataId:
870 if key ==
"detector":
872 df[key] = np.int16(dataId[key])
874 df[key] = dataId[key]
876 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
878 if self.config.primaryKey:
879 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
880 df.reset_index(inplace=
True, drop=
True)
881 df.set_index(self.config.primaryKey, inplace=
True)
883 return pipeBase.Struct(
890 defaultTemplates={
"coaddName":
"deep"},
891 dimensions=(
"tract",
"patch",
"skymap")):
892 inputCatalog = connectionTypes.Input(
893 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
894 "stored as a DataFrame with a multi-level column index per-patch.",
895 dimensions=(
"tract",
"patch",
"skymap"),
896 storageClass=
"DataFrame",
897 name=
"{coaddName}Coadd_obj",
900 inputCatalogRef = connectionTypes.Input(
901 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
902 "for each detection in deepCoadd_mergeDet.",
903 dimensions=(
"tract",
"patch",
"skymap"),
904 storageClass=
"SourceCatalog",
905 name=
"{coaddName}Coadd_ref",
908 inputCatalogSersicMultiprofit = connectionTypes.Input(
909 doc=
"Catalog of source measurements on the deepCoadd.",
910 dimensions=(
"tract",
"patch",
"skymap"),
911 storageClass=
"ArrowAstropy",
912 name=
"{coaddName}Coadd_Sersic_multiprofit",
915 inputCatalogEpoch = connectionTypes.Input(
916 doc=
"Catalog of mean epochs for each object per band.",
917 dimensions=(
"tract",
"patch",
"skymap"),
918 storageClass=
"ArrowAstropy",
922 outputCatalog = connectionTypes.Output(
923 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
925 dimensions=(
"tract",
"patch",
"skymap"),
926 storageClass=
"ArrowAstropy",
930 def __init__(self, *, config=None):
931 super().__init__(config=config)
932 if config.multilevelOutput:
933 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
936class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
937 pipelineConnections=TransformObjectCatalogConnections):
938 coaddName = pexConfig.Field(
943 outputBands = pexConfig.ListField(
947 doc=(
"These bands and only these bands will appear in the output,"
948 " NaN-filled if the input does not include them."
949 " If None, then use all bands found in the input.")
951 camelCase = pexConfig.Field(
954 doc=(
"Write per-band columns names with camelCase, else underscore "
955 "For example: gPsFlux instead of g_PsFlux.")
957 multilevelOutput = pexConfig.Field(
960 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
961 "and name-munged (False). If True, the output storage class will be "
962 "set to DataFrame, since astropy tables do not support multi-level indexing."),
963 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
965 goodFlags = pexConfig.ListField(
968 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
969 "All other flags are considered to be 'bad' flags and will be set to True.")
971 floatFillValue = pexConfig.Field(
974 doc=
"Fill value for float fields when populating empty tables."
976 integerFillValue = pexConfig.Field(
979 doc=
"Fill value for integer fields when populating empty tables."
982 def setDefaults(self):
983 super().setDefaults()
984 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
985 self.primaryKey =
"objectId"
986 self.columnsFromDataId = [
"tract",
"patch"]
987 self.goodFlags = [
"calib_astrometry_used",
988 "calib_photometry_reserved",
989 "calib_photometry_used",
990 "calib_psf_candidate",
991 "calib_psf_reserved",
995class TransformObjectCatalogTask(TransformCatalogBaseTask):
996 """Produce a flattened Object Table to match the format specified in
999 Do the same set of postprocessing calculations on all bands.
1001 This is identical to `TransformCatalogBaseTask`, except for that it does
1002 the specified functor calculations for all filters present in the
1003 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1004 by the YAML file will be superceded.
1006 _DefaultName =
"transformObjectCatalog"
1007 ConfigClass = TransformObjectCatalogConfig
1009 datasets_multiband = (
"epoch",
"ref",
"Sersic_multiprofit")
1011 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1012 inputs = butlerQC.get(inputRefs)
1013 if self.funcs
is None:
1014 raise ValueError(
"config.functorFile is None. "
1015 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1016 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1017 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1018 handle_epoch=inputs[
"inputCatalogEpoch"],
1019 handle_ref=inputs[
"inputCatalogRef"],
1020 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1022 butlerQC.put(result, outputRefs)
1024 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1028 if isinstance(funcs, CompositeFunctor):
1029 funcDict_in = funcs.funcDict
1030 elif isinstance(funcs, dict):
1032 elif isinstance(funcs, list):
1033 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1035 raise TypeError(f
"Unsupported {type(funcs)=}")
1038 funcDicts_multiband = {}
1039 for dataset
in self.datasets_multiband:
1040 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1041 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1042 handles_multi[dataset] = handle_multi
1043 funcDicts_multiband[dataset] = {}
1047 templateDf = pd.DataFrame()
1049 columns = handle.get(component=
"columns")
1050 inputBands = columns.unique(level=1).values
1052 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1057 for name, func
in funcDict_in.items():
1058 if func.dataset
in funcDicts_multiband:
1060 if band := getattr(func,
"band_to_check",
None):
1061 if band
not in outputBands:
1064 elif hasattr(func,
"bands"):
1069 func.bands = tuple(inputBands)
1071 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1072 funcDict[name] = func
1077 for inputBand
in inputBands:
1078 if inputBand
not in outputBands:
1079 self.log.info(
"Ignoring %s band data in the input", inputBand)
1081 self.log.info(
"Transforming the catalog of band %s", inputBand)
1082 result = self.transform(inputBand, handle, funcs_band, dataId)
1083 dfDict[inputBand] = result.df
1084 analysisDict[inputBand] = result.analysis
1085 if templateDf.empty:
1086 templateDf = result.df
1089 for filt
in outputBands:
1090 if filt
not in dfDict:
1091 self.log.info(
"Adding empty columns for band %s", filt)
1092 dfTemp = templateDf.copy()
1093 for col
in dfTemp.columns:
1094 testValue = dfTemp[col].values[0]
1095 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1097 if col
in self.config.goodFlags:
1101 elif isinstance(testValue, numbers.Integral):
1105 if isinstance(testValue, np.unsignedinteger):
1106 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1108 fillValue = self.config.integerFillValue
1110 fillValue = self.config.floatFillValue
1111 dfTemp[col].values[:] = fillValue
1112 dfDict[filt] = dfTemp
1115 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1116 name_index = df.index.name
1119 if not self.config.multilevelOutput:
1120 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1121 if self.config.primaryKey
in noDupCols:
1122 noDupCols.remove(self.config.primaryKey)
1123 if dataId
and self.config.columnsFromDataId:
1124 noDupCols += self.config.columnsFromDataId
1125 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1126 inputBands=inputBands)
1129 for dataset, funcDict
in funcDicts_multiband.items():
1130 handle_multiband = handles_multi[dataset]
1131 df_dataset = handle_multiband.get()
1132 if isinstance(df_dataset, astropy.table.Table):
1134 if name_index
not in df_dataset.colnames:
1135 if self.config.primaryKey
in df_dataset.colnames:
1136 name_index_ap = self.config.primaryKey
1139 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1140 f
" {df_dataset.colnames=} for {dataset=}"
1143 name_index_ap = name_index
1144 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1146 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1149 result = self.transform(
1151 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1155 result.df.index.name = name_index
1157 if self.config.columnsFromDataId:
1158 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1160 result.df.drop(columns_drop, axis=1, inplace=
True)
1164 to_concat = pd.concat(
1165 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1166 )
if self.config.multilevelOutput
else result.df
1167 df = pd.concat([df, to_concat], axis=1)
1168 analysisDict[dataset] = result.analysis
1171 df.index.name = self.config.primaryKey
1173 if not self.config.multilevelOutput:
1174 tbl = pandas_to_astropy(df)
1178 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1180 return pipeBase.Struct(outputCatalog=tbl)
1183class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1184 dimensions=(
"tract",
"skymap")):
1185 inputCatalogs = connectionTypes.Input(
1186 doc=
"Per-Patch objectTables conforming to the standard data model.",
1188 storageClass=
"ArrowAstropy",
1189 dimensions=(
"tract",
"patch",
"skymap"),
1193 outputCatalog = connectionTypes.Output(
1194 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1195 name=
"objectTable_tract",
1196 storageClass=
"ArrowAstropy",
1197 dimensions=(
"tract",
"skymap"),
1201class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1202 pipelineConnections=ConsolidateObjectTableConnections):
1203 coaddName = pexConfig.Field(
1210class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1211 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1213 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1215 _DefaultName =
"consolidateObjectTable"
1216 ConfigClass = ConsolidateObjectTableConfig
1218 inputDataset =
"objectTable"
1219 outputDataset =
"objectTable_tract"
1221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1222 inputs = butlerQC.get(inputRefs)
1223 self.log.info(
"Concatenating %s per-patch Object Tables",
1224 len(inputs[
"inputCatalogs"]))
1225 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1226 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1229class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1230 defaultTemplates={
"catalogType":
""},
1231 dimensions=(
"instrument",
"visit",
"detector")):
1233 inputCatalog = connectionTypes.Input(
1234 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1235 name=
"{catalogType}source",
1236 storageClass=
"DataFrame",
1237 dimensions=(
"instrument",
"visit",
"detector"),
1240 outputCatalog = connectionTypes.Output(
1241 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1242 "specified set of functors",
1243 name=
"{catalogType}sourceTable",
1244 storageClass=
"ArrowAstropy",
1245 dimensions=(
"instrument",
"visit",
"detector")
1249class TransformSourceTableConfig(TransformCatalogBaseConfig,
1250 pipelineConnections=TransformSourceTableConnections):
1252 def setDefaults(self):
1253 super().setDefaults()
1254 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1255 self.primaryKey =
"sourceId"
1256 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1259class TransformSourceTableTask(TransformCatalogBaseTask):
1260 """Transform/standardize a source catalog
1262 _DefaultName =
"transformSourceTable"
1263 ConfigClass = TransformSourceTableConfig
1266class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1267 dimensions=(
"instrument",
"visit",),
1268 defaultTemplates={
"calexpType":
""}):
1269 calexp = connectionTypes.Input(
1270 doc=
"Processed exposures used for metadata",
1272 storageClass=
"ExposureF",
1273 dimensions=(
"instrument",
"visit",
"detector"),
1277 visitSummary = connectionTypes.Output(
1278 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1279 "detector id for the id and are sorted for fast lookups of a "
1281 name=
"visitSummary",
1282 storageClass=
"ExposureCatalog",
1283 dimensions=(
"instrument",
"visit"),
1285 visitSummarySchema = connectionTypes.InitOutput(
1286 doc=
"Schema of the visitSummary catalog",
1287 name=
"visitSummary_schema",
1288 storageClass=
"ExposureCatalog",
1292class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1293 pipelineConnections=ConsolidateVisitSummaryConnections):
1294 """Config for ConsolidateVisitSummaryTask"""
1298class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1299 """Task to consolidate per-detector visit metadata.
1301 This task aggregates the following metadata from all the detectors in a
1302 single visit into an exposure catalog:
1306 - The physical_filter and band (if available).
1307 - The psf size, shape, and effective area at the center of the detector.
1308 - The corners of the bounding box in right ascension/declination.
1310 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1311 are not persisted here because of storage concerns, and because of their
1312 limited utility as summary statistics.
1314 Tests for this task are performed in ci_hsc_gen3.
1316 _DefaultName =
"consolidateVisitSummary"
1317 ConfigClass = ConsolidateVisitSummaryConfig
1319 def __init__(self, **kwargs):
1320 super().__init__(**kwargs)
1321 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1322 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1323 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1324 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1325 ExposureSummaryStats.update_schema(self.schema)
1328 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1329 dataRefs = butlerQC.get(inputRefs.calexp)
1330 visit = dataRefs[0].dataId[
"visit"]
1332 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1333 len(dataRefs), visit)
1335 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1337 butlerQC.put(expCatalog, outputRefs.visitSummary)
1339 def _combineExposureMetadata(self, visit, dataRefs):
1340 """Make a combined exposure catalog from a list of dataRefs.
1341 These dataRefs must point to exposures with wcs, summaryStats,
1342 and other visit metadata.
1347 Visit identification number.
1348 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1349 List of dataRefs in visit.
1353 visitSummary : `lsst.afw.table.ExposureCatalog`
1354 Exposure catalog with per-detector summary information.
1357 cat.resize(len(dataRefs))
1359 cat[
"visit"] = visit
1361 for i, dataRef
in enumerate(dataRefs):
1362 visitInfo = dataRef.get(component=
"visitInfo")
1363 filterLabel = dataRef.get(component=
"filter")
1364 summaryStats = dataRef.get(component=
"summaryStats")
1365 detector = dataRef.get(component=
"detector")
1366 wcs = dataRef.get(component=
"wcs")
1367 photoCalib = dataRef.get(component=
"photoCalib")
1368 detector = dataRef.get(component=
"detector")
1369 bbox = dataRef.get(component=
"bbox")
1370 validPolygon = dataRef.get(component=
"validPolygon")
1374 rec.setVisitInfo(visitInfo)
1376 rec.setPhotoCalib(photoCalib)
1377 rec.setValidPolygon(validPolygon)
1379 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1380 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1381 rec.setId(detector.getId())
1382 summaryStats.update_record(rec)
1385 raise pipeBase.NoWorkFound(
1386 "No detectors had sufficient information to make a visit summary row."
1390 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1392 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1393 cat.setMetadata(metadata)
1399class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1400 defaultTemplates={
"catalogType":
""},
1401 dimensions=(
"instrument",
"visit")):
1402 inputCatalogs = connectionTypes.Input(
1403 doc=
"Input per-detector Source Tables",
1404 name=
"{catalogType}sourceTable",
1405 storageClass=
"ArrowAstropy",
1406 dimensions=(
"instrument",
"visit",
"detector"),
1410 outputCatalog = connectionTypes.Output(
1411 doc=
"Per-visit concatenation of Source Table",
1412 name=
"{catalogType}sourceTable_visit",
1413 storageClass=
"ArrowAstropy",
1414 dimensions=(
"instrument",
"visit")
1418class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1419 pipelineConnections=ConsolidateSourceTableConnections):
1423class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1424 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1426 _DefaultName =
"consolidateSourceTable"
1427 ConfigClass = ConsolidateSourceTableConfig
1429 inputDataset =
"sourceTable"
1430 outputDataset =
"sourceTable_visit"
1432 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1433 from .makeWarp
import reorderRefs
1435 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1436 detectorOrder.sort()
1437 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1438 inputs = butlerQC.get(inputRefs)
1439 self.log.info(
"Concatenating %s per-detector Source Tables",
1440 len(inputs[
"inputCatalogs"]))
1441 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1442 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1445class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1446 dimensions=(
"instrument",),
1447 defaultTemplates={
"calexpType":
""}):
1448 visitSummaryRefs = connectionTypes.Input(
1449 doc=
"Data references for per-visit consolidated exposure metadata",
1450 name=
"finalVisitSummary",
1451 storageClass=
"ExposureCatalog",
1452 dimensions=(
"instrument",
"visit"),
1456 outputCatalog = connectionTypes.Output(
1457 doc=
"CCD and Visit metadata table",
1458 name=
"ccdVisitTable",
1459 storageClass=
"ArrowAstropy",
1460 dimensions=(
"instrument",)
1464class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1465 pipelineConnections=MakeCcdVisitTableConnections):
1466 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1469class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1470 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1472 _DefaultName =
"makeCcdVisitTable"
1473 ConfigClass = MakeCcdVisitTableConfig
1475 def run(self, visitSummaryRefs):
1476 """Make a table of ccd information from the visit summary catalogs.
1480 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1481 List of DeferredDatasetHandles pointing to exposure catalogs with
1482 per-detector summary information.
1486 result : `~lsst.pipe.base.Struct`
1487 Results struct with attribute:
1490 Catalog of ccd and visit information.
1493 for visitSummaryRef
in visitSummaryRefs:
1494 visitSummary = visitSummaryRef.get()
1495 if not visitSummary:
1497 visitInfo = visitSummary[0].getVisitInfo()
1500 strip_provenance_from_fits_header(visitSummary.metadata)
1503 summaryTable = visitSummary.asAstropy()
1504 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1505 "pixelScale",
"zenithDistance",
1506 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1507 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1508 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1509 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1510 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1511 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1512 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1513 "maxDistToNearestPsf",
1514 "effTime",
"effTimePsfSigmaScale",
1515 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1517 ccdEntry = summaryTable[selectColumns]
1522 ccdEntry.rename_column(
"visit",
"visitId")
1523 ccdEntry.rename_column(
"id",
"detectorId")
1527 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1529 ccdEntry[
"ccdVisitId"] = [
1530 self.config.idGenerator.apply(
1531 visitSummaryRef.dataId,
1532 detector=detector_id,
1539 for detector_id
in summaryTable[
"id"]
1541 ccdEntry[
"detector"] = summaryTable[
"id"]
1542 ccdEntry[
"seeing"] = (
1543 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1545 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1546 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1547 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1548 expTime = visitInfo.getExposureTime()
1549 ccdEntry[
"obsStart"] = (
1550 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1552 expTime_days = expTime / (60*60*24)
1553 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1554 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1555 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1556 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1557 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1558 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1559 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1560 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1561 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1562 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1563 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1564 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1568 ccdEntries.append(ccdEntry)
1570 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1571 return pipeBase.Struct(outputCatalog=outputCatalog)
1574class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1575 dimensions=(
"instrument",),
1576 defaultTemplates={
"calexpType":
""}):
1577 visitSummaries = connectionTypes.Input(
1578 doc=
"Per-visit consolidated exposure metadata",
1579 name=
"finalVisitSummary",
1580 storageClass=
"ExposureCatalog",
1581 dimensions=(
"instrument",
"visit",),
1585 outputCatalog = connectionTypes.Output(
1586 doc=
"Visit metadata table",
1588 storageClass=
"ArrowAstropy",
1589 dimensions=(
"instrument",)
1593class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1594 pipelineConnections=MakeVisitTableConnections):
1598class MakeVisitTableTask(pipeBase.PipelineTask):
1599 """Produce a `visitTable` from the visit summary exposure catalogs.
1601 _DefaultName =
"makeVisitTable"
1602 ConfigClass = MakeVisitTableConfig
1604 def run(self, visitSummaries):
1605 """Make a table of visit information from the visit summary catalogs.
1609 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1610 List of exposure catalogs with per-detector summary information.
1613 result : `~lsst.pipe.base.Struct`
1614 Results struct with attribute:
1617 Catalog of visit information.
1620 for visitSummary
in visitSummaries:
1621 visitSummary = visitSummary.get()
1622 if not visitSummary:
1624 visitRow = visitSummary[0]
1625 visitInfo = visitRow.getVisitInfo()
1628 visitEntry[
"visitId"] = visitRow[
"visit"]
1629 visitEntry[
"visit"] = visitRow[
"visit"]
1630 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1631 visitEntry[
"band"] = visitRow[
"band"]
1632 raDec = visitInfo.getBoresightRaDec()
1633 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1634 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1638 visitEntry[
"decl"] = visitEntry[
"dec"]
1640 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1641 azAlt = visitInfo.getBoresightAzAlt()
1642 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1643 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1644 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1645 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1646 expTime = visitInfo.getExposureTime()
1647 visitEntry[
"expTime"] = expTime
1648 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1649 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1650 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1651 expTime_days = expTime / (60*60*24)
1652 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1653 visitEntries.append(visitEntry)
1659 outputCatalog = astropy.table.Table(rows=visitEntries)
1660 return pipeBase.Struct(outputCatalog=outputCatalog)
1663class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1664 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1666 inputCatalog = connectionTypes.Input(
1667 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1668 "By default, it is the output of ForcedPhotCcdTask on calexps",
1670 storageClass=
"SourceCatalog",
1671 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1673 inputCatalogDiff = connectionTypes.Input(
1674 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1675 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1677 storageClass=
"SourceCatalog",
1678 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1680 outputCatalog = connectionTypes.Output(
1681 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1682 name=
"mergedForcedSource",
1683 storageClass=
"DataFrame",
1684 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1688class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1689 pipelineConnections=WriteForcedSourceTableConnections):
1691 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1697class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1698 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1700 Because the predecessor ForcedPhotCcdTask operates per-detector,
1701 per-tract, (i.e., it has tract in its dimensions), detectors
1702 on the tract boundary may have multiple forced source catalogs.
1704 The successor task TransformForcedSourceTable runs per-patch
1705 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1706 available multiple epochs.
1708 _DefaultName =
"writeForcedSourceTable"
1709 ConfigClass = WriteForcedSourceTableConfig
1711 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1712 inputs = butlerQC.get(inputRefs)
1713 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1714 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1715 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1716 outputs = self.run(**inputs)
1717 butlerQC.put(outputs, outputRefs)
1719 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1721 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1722 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1723 df = df.reindex(sorted(df.columns), axis=1)
1726 df[
"detector"] = np.int16(detector)
1727 df[
"band"] = band
if band
else pd.NA
1728 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1729 names=(
"dataset",
"column"))
1733 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1734 return pipeBase.Struct(outputCatalog=outputCatalog)
1737class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1738 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1740 inputCatalogs = connectionTypes.Input(
1741 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1742 name=
"mergedForcedSource",
1743 storageClass=
"DataFrame",
1744 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1748 referenceCatalog = connectionTypes.Input(
1749 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1750 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1753 storageClass=
"DataFrame",
1754 dimensions=(
"tract",
"patch",
"skymap"),
1757 outputCatalog = connectionTypes.Output(
1758 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1759 "specified set of functors",
1760 name=
"forcedSourceTable",
1761 storageClass=
"ArrowAstropy",
1762 dimensions=(
"tract",
"patch",
"skymap")
1766class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1767 pipelineConnections=TransformForcedSourceTableConnections):
1768 referenceColumns = pexConfig.ListField(
1770 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1772 doc=
"Columns to pull from reference catalog",
1775 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1780 doc=
"Rename the output DataFrame index to this name",
1782 default=
"forcedSourceId",
1785 def setDefaults(self):
1786 super().setDefaults()
1787 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1788 self.columnsFromDataId = [
"tract",
"patch"]
1791class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1792 """Transform/standardize a ForcedSource catalog
1794 Transforms each wide, per-detector forcedSource DataFrame per the
1795 specification file (per-camera defaults found in ForcedSource.yaml).
1796 All epochs that overlap the patch are aggregated into one per-patch
1797 narrow-DataFrame file.
1799 No de-duplication of rows is performed. Duplicate resolutions flags are
1800 pulled in from the referenceCatalog: `detect_isPrimary`,
1801 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1802 for analysis or compare duplicates for QA.
1804 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1805 per-visit rows can be retreived by joining with the CcdVisitTable on
1808 _DefaultName =
"transformForcedSourceTable"
1809 ConfigClass = TransformForcedSourceTableConfig
1811 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1812 inputs = butlerQC.get(inputRefs)
1813 if self.funcs
is None:
1814 raise ValueError(
"config.functorFile is None. "
1815 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1816 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1817 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1819 butlerQC.put(outputs, outputRefs)
1821 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1823 refColumns = list(self.config.referenceColumns)
1824 refColumns.append(self.config.keyRef)
1825 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1826 if ref.index.name != self.config.keyRef:
1832 ref.set_index(self.config.keyRef, inplace=
True)
1833 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1834 for handle
in inputCatalogs:
1835 result = self.transform(
None, handle, funcs, dataId)
1837 dfs.append(result.df.join(ref, how=
"inner"))
1839 outputCatalog = pd.concat(dfs)
1841 if outputCatalog.empty:
1842 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1846 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1848 outputCatalog.reset_index(inplace=
True)
1851 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1853 outputCatalog.index.rename(self.config.key, inplace=
True)
1855 self.log.info(
"Made a table of %d columns and %d rows",
1856 len(outputCatalog.columns), len(outputCatalog))
1857 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1860class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1861 defaultTemplates={
"catalogType":
""},
1862 dimensions=(
"instrument",
"tract")):
1863 inputCatalogs = connectionTypes.Input(
1864 doc=
"Input per-patch DataFrame Tables to be concatenated",
1865 name=
"{catalogType}ForcedSourceTable",
1866 storageClass=
"DataFrame",
1867 dimensions=(
"tract",
"patch",
"skymap"),
1871 outputCatalog = connectionTypes.Output(
1872 doc=
"Output per-tract concatenation of DataFrame Tables",
1873 name=
"{catalogType}ForcedSourceTable_tract",
1874 storageClass=
"DataFrame",
1875 dimensions=(
"tract",
"skymap"),
1879class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1880 pipelineConnections=ConsolidateTractConnections):
1884class ConsolidateTractTask(pipeBase.PipelineTask):
1885 """Concatenate any per-patch, dataframe list into a single
1886 per-tract DataFrame.
1888 _DefaultName =
"ConsolidateTract"
1889 ConfigClass = ConsolidateTractConfig
1891 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1892 inputs = butlerQC.get(inputRefs)
1895 self.log.info(
"Concatenating %s per-patch %s Tables",
1896 len(inputs[
"inputCatalogs"]),
1897 inputRefs.inputCatalogs[0].datasetType.name)
1898 df = pd.concat(inputs[
"inputCatalogs"])
1899 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)