22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
40from deprecated.sphinx
import deprecated
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
57from lsst.afw.image
import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header, TableVStack
61from .coaddBase
import reorderRefs
62from .functors
import CompositeFunctor, Column
64log = logging.getLogger(__name__)
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
70 newDf = pd.DataFrame()
72 dfBands = df.columns.unique(level=0).values
75 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c
in subdf.columns
if c
not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
82 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
90 defaultTemplates={
"coaddName":
"deep"},
91 dimensions=(
"tract",
"patch",
"skymap")):
92 inputCatalogMeas = connectionTypes.Input(
93 doc=
"Catalog of source measurements on the deepCoadd.",
94 dimensions=(
"tract",
"patch",
"band",
"skymap"),
95 storageClass=
"SourceCatalog",
96 name=
"{coaddName}Coadd_meas",
99 inputCatalogForcedSrc = connectionTypes.Input(
100 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
101 dimensions=(
"tract",
"patch",
"band",
"skymap"),
102 storageClass=
"SourceCatalog",
103 name=
"{coaddName}Coadd_forced_src",
106 inputCatalogPsfsMultiprofit = connectionTypes.Input(
107 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
108 dimensions=(
"tract",
"patch",
"band",
"skymap"),
109 storageClass=
"ArrowAstropy",
110 name=
"{coaddName}Coadd_psfs_multiprofit",
113 outputCatalog = connectionTypes.Output(
114 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
115 "stored as a DataFrame with a multi-level column index per-patch.",
116 dimensions=(
"tract",
"patch",
"skymap"),
117 storageClass=
"DataFrame",
118 name=
"{coaddName}Coadd_obj"
122class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
123 pipelineConnections=WriteObjectTableConnections):
124 coaddName = pexConfig.Field(
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged object tables as a DataFrame in parquet format.
134 _DefaultName =
"writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
138 outputDataset =
"obj"
140 def runQuantum(self, butlerQC, inputRefs, outputRefs):
141 inputs = butlerQC.get(inputRefs)
143 catalogs = defaultdict(dict)
144 for dataset, connection
in (
145 (
"meas",
"inputCatalogMeas"),
146 (
"forced_src",
"inputCatalogForcedSrc"),
147 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
149 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
150 catalogs[ref.dataId[
"band"]][dataset] = cat
152 dataId = butlerQC.quantum.dataId
153 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
154 outputs = pipeBase.Struct(outputCatalog=df)
155 butlerQC.put(outputs, outputRefs)
157 def run(self, catalogs, tract, patch):
158 """Merge multiple catalogs.
163 Mapping from filter names to dict of catalogs.
165 tractId to use for the tractId column.
167 patchId to use for the patchId column.
171 catalog : `pandas.DataFrame`
177 Raised if any of the catalogs is of an unsupported type.
180 for filt, tableDict
in catalogs.items():
181 for dataset, table
in tableDict.items():
183 if isinstance(table, pd.DataFrame):
186 df = table.asAstropy().to_pandas()
187 elif isinstance(table, astropy.table.Table):
188 df = table.to_pandas()
190 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
191 df.set_index(
"id", drop=
True, inplace=
True)
194 df = df.reindex(sorted(df.columns), axis=1)
195 df = df.assign(tractId=tract, patchId=patch)
198 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
199 names=(
"dataset",
"band",
"column"))
204 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
208class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
209 defaultTemplates={
"catalogType":
""},
210 dimensions=(
"instrument",
"visit",
"detector")):
212 catalog = connectionTypes.Input(
213 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
214 name=
"{catalogType}src",
215 storageClass=
"SourceCatalog",
216 dimensions=(
"instrument",
"visit",
"detector")
218 outputCatalog = connectionTypes.Output(
219 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
220 name=
"{catalogType}source",
221 storageClass=
"ArrowAstropy",
222 dimensions=(
"instrument",
"visit",
"detector")
226class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
227 pipelineConnections=WriteSourceTableConnections):
231class WriteSourceTableTask(pipeBase.PipelineTask):
232 """Write source table to DataFrame Parquet format.
234 _DefaultName =
"writeSourceTable"
235 ConfigClass = WriteSourceTableConfig
237 def runQuantum(self, butlerQC, inputRefs, outputRefs):
238 inputs = butlerQC.get(inputRefs)
239 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
240 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
241 result = self.run(**inputs)
242 outputs = pipeBase.Struct(outputCatalog=result.table)
243 butlerQC.put(outputs, outputRefs)
245 def run(self, catalog, visit, detector, **kwargs):
246 """Convert `src` catalog to an Astropy table.
250 catalog: `afwTable.SourceCatalog`
251 catalog to be converted
252 visit, detector: `int`
253 Visit and detector ids to be added as columns.
255 Additional keyword arguments are ignored as a convenience for
256 subclasses that pass the same arguments to several different
261 result : `~lsst.pipe.base.Struct`
263 `astropy.table.Table` version of the input catalog
265 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
266 tbl = catalog.asAstropy()
269 tbl[
"detector"] = np.int16(detector)
271 return pipeBase.Struct(table=tbl)
274class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
275 defaultTemplates={
"catalogType":
""},
276 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
277 visitSummary = connectionTypes.Input(
278 doc=
"Input visit-summary catalog with updated calibration objects.",
279 name=
"finalVisitSummary",
280 storageClass=
"ExposureCatalog",
281 dimensions=(
"instrument",
"visit",),
284 def __init__(self, config):
292 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
295class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
296 pipelineConnections=WriteRecalibratedSourceTableConnections):
298 doReevaluatePhotoCalib = pexConfig.Field(
301 doc=(
"Add or replace local photoCalib columns"),
303 doReevaluateSkyWcs = pexConfig.Field(
306 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
310class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
311 """Write source table to DataFrame Parquet format.
313 _DefaultName =
"writeRecalibratedSourceTable"
314 ConfigClass = WriteRecalibratedSourceTableConfig
316 def runQuantum(self, butlerQC, inputRefs, outputRefs):
317 inputs = butlerQC.get(inputRefs)
319 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
320 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
322 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
323 exposure = ExposureF()
324 inputs[
"exposure"] = self.prepareCalibratedExposure(
326 visitSummary=inputs[
"visitSummary"],
327 detectorId=butlerQC.quantum.dataId[
"detector"]
329 inputs[
"catalog"] = self.addCalibColumns(**inputs)
331 result = self.run(**inputs)
332 outputs = pipeBase.Struct(outputCatalog=result.table)
333 butlerQC.put(outputs, outputRefs)
335 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
336 """Prepare a calibrated exposure and apply external calibrations
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Input exposure to adjust calibrations. May be an empty Exposure.
344 Detector ID associated with the exposure.
345 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
346 Exposure catalog with all calibration objects. WCS and PhotoCalib
347 are always applied if ``visitSummary`` is provided and those
348 components are not `None`.
352 exposure : `lsst.afw.image.exposure.Exposure`
353 Exposure with adjusted calibrations.
355 if visitSummary
is not None:
356 row = visitSummary.find(detectorId)
358 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
359 if (photoCalib := row.getPhotoCalib())
is None:
360 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
361 "skipping reevaluation of photoCalib.", detectorId)
362 exposure.setPhotoCalib(
None)
364 exposure.setPhotoCalib(photoCalib)
365 if (skyWcs := row.getWcs())
is None:
366 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
367 "skipping reevaluation of skyWcs.", detectorId)
368 exposure.setWcs(
None)
370 exposure.setWcs(skyWcs)
374 def addCalibColumns(self, catalog, exposure, **kwargs):
375 """Add replace columns with calibs evaluated at each centroid
377 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
378 a source catalog, by rerunning the plugins.
382 catalog : `lsst.afw.table.SourceCatalog`
383 catalog to which calib columns will be added
384 exposure : `lsst.afw.image.exposure.Exposure`
385 Exposure with attached PhotoCalibs and SkyWcs attributes to be
386 reevaluated at local centroids. Pixels are not required.
388 Additional keyword arguments are ignored to facilitate passing the
389 same arguments to several methods.
393 newCat: `lsst.afw.table.SourceCatalog`
394 Source Catalog with requested local calib columns
396 measureConfig = SingleFrameMeasurementTask.ConfigClass()
397 measureConfig.doReplaceWithNoise =
False
400 for slot
in measureConfig.slots:
401 setattr(measureConfig.slots, slot,
None)
403 measureConfig.plugins.names = []
404 if self.config.doReevaluateSkyWcs:
405 measureConfig.plugins.names.add(
"base_LocalWcs")
406 self.log.info(
"Re-evaluating base_LocalWcs plugin")
407 if self.config.doReevaluatePhotoCalib:
408 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
409 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
410 pluginsNotToCopy = tuple(measureConfig.plugins.names)
414 aliasMap = catalog.schema.getAliasMap()
416 for item
in catalog.schema:
417 if not item.field.getName().startswith(pluginsNotToCopy):
418 mapper.addMapping(item.key)
420 schema = mapper.getOutputSchema()
422 schema.setAliasMap(aliasMap)
424 newCat.extend(catalog, mapper=mapper)
430 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
432 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
436 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
437 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
442 if wcsPlugin
is not None:
443 wcsPlugin.measure(row, exposure)
444 if pcPlugin
is not None:
445 pcPlugin.measure(row, exposure)
450class PostprocessAnalysis(object):
451 """Calculate columns from DataFrames or handles storing DataFrames.
453 This object manages and organizes an arbitrary set of computations
454 on a catalog. The catalog is defined by a
455 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
456 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
457 computations are defined by a collection of
458 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
459 ``CompositeFunctor``).
461 After the object is initialized, accessing the ``.df`` attribute (which
462 holds the `pandas.DataFrame` containing the results of the calculations)
463 triggers computation of said dataframe.
465 One of the conveniences of using this object is the ability to define a
466 desired common filter for all functors. This enables the same functor
467 collection to be passed to several different `PostprocessAnalysis` objects
468 without having to change the original functor collection, since the ``filt``
469 keyword argument of this object triggers an overwrite of the ``filt``
470 property for all functors in the collection.
472 This object also allows a list of refFlags to be passed, and defines a set
473 of default refFlags that are always included even if not requested.
475 If a list of DataFrames or Handles is passed, rather than a single one,
476 then the calculations will be mapped over all the input catalogs. In
477 principle, it should be straightforward to parallelize this activity, but
478 initial tests have failed (see TODO in code comments).
482 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
483 `~lsst.pipe.base.InMemoryDatasetHandle` or
485 Source catalog(s) for computation.
486 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
487 Computations to do (functors that act on ``handles``).
488 If a dict, the output
489 DataFrame will have columns keyed accordingly.
490 If a list, the column keys will come from the
491 ``.shortname`` attribute of each functor.
493 filt : `str`, optional
494 Filter in which to calculate. If provided,
495 this will overwrite any existing ``.filt`` attribute
496 of the provided functors.
498 flags : `list`, optional
499 List of flags (per-band) to include in output table.
500 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
502 refFlags : `list`, optional
503 List of refFlags (only reference band) to include in output table.
505 forcedFlags : `list`, optional
506 List of flags (per-band) to include in output table.
507 Taken from the ``forced_src`` dataset if applied to a
508 multilevel Object Table. Intended for flags from measurement plugins
509 only run during multi-band forced-photometry.
511 _defaultRefFlags = []
514 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
515 self.handles = handles
516 self.functors = functors
519 self.flags = list(flags)
if flags
is not None else []
520 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
521 self.refFlags = list(self._defaultRefFlags)
522 if refFlags
is not None:
523 self.refFlags += list(refFlags)
528 def defaultFuncs(self):
529 funcs = dict(self._defaultFuncs)
534 additionalFuncs = self.defaultFuncs
535 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
536 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
537 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
539 if isinstance(self.functors, CompositeFunctor):
544 func.funcDict.update(additionalFuncs)
545 func.filt = self.filt
551 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
559 def compute(self, dropna=False, pool=None):
561 if type(self.handles)
in (list, tuple):
563 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
567 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
568 self._df = pd.concat(dflist)
570 self._df = self.func(self.handles, dropna=dropna)
575class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
577 """Expected Connections for subclasses of TransformCatalogBaseTask.
581 inputCatalog = connectionTypes.Input(
583 storageClass=
"DataFrame",
585 outputCatalog = connectionTypes.Output(
587 storageClass=
"ArrowAstropy",
591class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
592 pipelineConnections=TransformCatalogBaseConnections):
593 functorFile = pexConfig.Field(
595 doc=
"Path to YAML file specifying Science Data Model functors to use "
596 "when copying columns and computing calibrated values.",
600 primaryKey = pexConfig.Field(
602 doc=
"Name of column to be set as the DataFrame index. If None, the index"
603 "will be named `id`",
607 columnsFromDataId = pexConfig.ListField(
611 doc=
"Columns to extract from the dataId",
615class TransformCatalogBaseTask(pipeBase.PipelineTask):
616 """Base class for transforming/standardizing a catalog by applying functors
617 that convert units and apply calibrations.
619 The purpose of this task is to perform a set of computations on an input
620 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
621 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
622 a new dataset (which needs to be declared in an ``outputDataset``
625 The calculations to be performed are defined in a YAML file that specifies
626 a set of functors to be computed, provided as a ``--functorFile`` config
627 parameter. An example of such a YAML file is the following:
634 args: slot_Centroid_x
637 args: slot_Centroid_y
639 functor: LocalNanojansky
641 - slot_PsfFlux_instFlux
642 - slot_PsfFlux_instFluxErr
643 - base_LocalPhotoCalib
644 - base_LocalPhotoCalibErr
646 functor: LocalNanojanskyErr
648 - slot_PsfFlux_instFlux
649 - slot_PsfFlux_instFluxErr
650 - base_LocalPhotoCalib
651 - base_LocalPhotoCalibErr
655 The names for each entry under "func" will become the names of columns in
656 the output dataset. All the functors referenced are defined in
657 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
658 functor are in the `args` list, and any additional entries for each column
659 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
660 treated as keyword arguments to be passed to the functor initialization.
662 The "flags" entry is the default shortcut for `Column` functors.
663 All columns listed under "flags" will be copied to the output table
664 untransformed. They can be of any datatype.
665 In the special case of transforming a multi-level oject table with
666 band and dataset indices (deepCoadd_obj), these will be taked from the
667 ``meas`` dataset and exploded out per band.
669 There are two special shortcuts that only apply when transforming
670 multi-level Object (deepCoadd_obj) tables:
671 - The "refFlags" entry is shortcut for `Column` functor
672 taken from the ``ref`` dataset if transforming an ObjectTable.
673 - The "forcedFlags" entry is shortcut for `Column` functors.
674 taken from the ``forced_src`` dataset if transforming an ObjectTable.
675 These are expanded out per band.
678 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
679 to organize and excecute the calculations.
682 def _DefaultName(self):
683 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
686 def outputDataset(self):
687 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
690 def inputDataset(self):
691 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
694 def ConfigClass(self):
695 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
697 def __init__(self, *args, **kwargs):
698 super().__init__(*args, **kwargs)
699 if self.config.functorFile:
700 self.log.info(
"Loading tranform functor definitions from %s",
701 self.config.functorFile)
702 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
703 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
707 def runQuantum(self, butlerQC, inputRefs, outputRefs):
708 inputs = butlerQC.get(inputRefs)
709 if self.
funcs is None:
710 raise ValueError(
"config.functorFile is None. "
711 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
712 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
713 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
714 butlerQC.put(result, outputRefs)
716 def run(self, handle, funcs=None, dataId=None, band=None):
717 """Do postprocessing calculations
719 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
720 ``DataFrame`` object and dataId,
721 returns a dataframe with results of postprocessing calculations.
725 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
726 `~lsst.pipe.base.InMemoryDatasetHandle` or
727 `~pandas.DataFrame`, or list of these.
728 DataFrames from which calculations are done.
729 funcs : `~lsst.pipe.tasks.functors.Functor`
730 Functors to apply to the table's columns
731 dataId : dict, optional
732 Used to add a `patchId` column to the output dataframe.
733 band : `str`, optional
734 Filter band that is being processed.
738 result : `lsst.pipe.base.Struct`
739 Result struct, with a single ``outputCatalog`` attribute holding
740 the transformed catalog.
742 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
744 df = self.transform(band, handle, funcs, dataId).df
745 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
748 raise UpstreamFailureNoWorkFound(
749 "Input catalog is empty, so there is nothing to transform/standardize",
752 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
755 def getFunctors(self):
758 def getAnalysis(self, handles, funcs=None, band=None):
761 analysis = PostprocessAnalysis(handles, funcs, filt=band)
764 def transform(self, band, handles, funcs, dataId):
765 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
767 if dataId
and self.config.columnsFromDataId:
768 for key
in self.config.columnsFromDataId:
770 if key ==
"detector":
772 df[key] = np.int16(dataId[key])
774 df[key] = dataId[key]
776 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
778 if self.config.primaryKey:
779 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
780 df.reset_index(inplace=
True, drop=
True)
781 df.set_index(self.config.primaryKey, inplace=
True)
783 return pipeBase.Struct(
790 defaultTemplates={
"coaddName":
"deep"},
791 dimensions=(
"tract",
"patch",
"skymap")):
792 inputCatalog = connectionTypes.Input(
793 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
794 "stored as a DataFrame with a multi-level column index per-patch.",
795 dimensions=(
"tract",
"patch",
"skymap"),
796 storageClass=
"DataFrame",
797 name=
"{coaddName}Coadd_obj",
800 inputCatalogRef = connectionTypes.Input(
801 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
802 "for each detection in deepCoadd_mergeDet.",
803 dimensions=(
"tract",
"patch",
"skymap"),
804 storageClass=
"SourceCatalog",
805 name=
"{coaddName}Coadd_ref",
808 inputCatalogSersicMultiprofit = connectionTypes.Input(
809 doc=
"Catalog of source measurements on the deepCoadd.",
810 dimensions=(
"tract",
"patch",
"skymap"),
811 storageClass=
"ArrowAstropy",
812 name=
"{coaddName}Coadd_Sersic_multiprofit",
815 inputCatalogEpoch = connectionTypes.Input(
816 doc=
"Catalog of mean epochs for each object per band.",
817 dimensions=(
"tract",
"patch",
"skymap"),
818 storageClass=
"ArrowAstropy",
822 outputCatalog = connectionTypes.Output(
823 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
825 dimensions=(
"tract",
"patch",
"skymap"),
826 storageClass=
"ArrowAstropy",
830 def __init__(self, *, config=None):
831 super().__init__(config=config)
832 if config.multilevelOutput:
833 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
836class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
837 pipelineConnections=TransformObjectCatalogConnections):
838 coaddName = pexConfig.Field(
843 outputBands = pexConfig.ListField(
847 doc=(
"These bands and only these bands will appear in the output,"
848 " NaN-filled if the input does not include them."
849 " If None, then use all bands found in the input.")
851 camelCase = pexConfig.Field(
854 doc=(
"Write per-band columns names with camelCase, else underscore "
855 "For example: gPsFlux instead of g_PsFlux.")
857 multilevelOutput = pexConfig.Field(
860 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
861 "and name-munged (False). If True, the output storage class will be "
862 "set to DataFrame, since astropy tables do not support multi-level indexing."),
863 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
865 goodFlags = pexConfig.ListField(
868 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
869 "All other flags are considered to be 'bad' flags and will be set to True.")
871 floatFillValue = pexConfig.Field(
874 doc=
"Fill value for float fields when populating empty tables."
876 integerFillValue = pexConfig.Field(
879 doc=
"Fill value for integer fields when populating empty tables."
882 def setDefaults(self):
883 super().setDefaults()
884 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
885 self.primaryKey =
"objectId"
886 self.columnsFromDataId = [
"tract",
"patch"]
887 self.goodFlags = [
"calib_astrometry_used",
888 "calib_photometry_reserved",
889 "calib_photometry_used",
890 "calib_psf_candidate",
891 "calib_psf_reserved",
895class TransformObjectCatalogTask(TransformCatalogBaseTask):
896 """Produce a flattened Object Table to match the format specified in
899 Do the same set of postprocessing calculations on all bands.
901 This is identical to `TransformCatalogBaseTask`, except for that it does
902 the specified functor calculations for all filters present in the
903 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
904 by the YAML file will be superceded.
906 _DefaultName =
"transformObjectCatalog"
907 ConfigClass = TransformObjectCatalogConfig
909 datasets_multiband = (
"epoch",
"ref",
"Sersic_multiprofit")
911 def runQuantum(self, butlerQC, inputRefs, outputRefs):
912 inputs = butlerQC.get(inputRefs)
913 if self.funcs
is None:
914 raise ValueError(
"config.functorFile is None. "
915 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
916 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
917 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
918 handle_epoch=inputs[
"inputCatalogEpoch"],
919 handle_ref=inputs[
"inputCatalogRef"],
920 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
922 butlerQC.put(result, outputRefs)
924 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
928 if isinstance(funcs, CompositeFunctor):
929 funcDict_in = funcs.funcDict
930 elif isinstance(funcs, dict):
932 elif isinstance(funcs, list):
933 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
935 raise TypeError(f
"Unsupported {type(funcs)=}")
938 funcDicts_multiband = {}
939 for dataset
in self.datasets_multiband:
940 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
941 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
942 handles_multi[dataset] = handle_multi
943 funcDicts_multiband[dataset] = {}
947 templateDf = pd.DataFrame()
949 columns = handle.get(component=
"columns")
950 inputBands = columns.unique(level=1).values
952 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
957 for name, func
in funcDict_in.items():
958 if func.dataset
in funcDicts_multiband:
960 if band := getattr(func,
"band_to_check",
None):
961 if band
not in outputBands:
964 elif hasattr(func,
"bands"):
969 func.bands = tuple(inputBands)
971 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
972 funcDict[name] = func
977 for inputBand
in inputBands:
978 if inputBand
not in outputBands:
979 self.log.info(
"Ignoring %s band data in the input", inputBand)
981 self.log.info(
"Transforming the catalog of band %s", inputBand)
982 result = self.transform(inputBand, handle, funcs_band, dataId)
983 dfDict[inputBand] = result.df
984 analysisDict[inputBand] = result.analysis
986 templateDf = result.df
989 for filt
in outputBands:
990 if filt
not in dfDict:
991 self.log.info(
"Adding empty columns for band %s", filt)
992 dfTemp = templateDf.copy()
993 for col
in dfTemp.columns:
994 testValue = dfTemp[col].values[0]
995 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
997 if col
in self.config.goodFlags:
1001 elif isinstance(testValue, numbers.Integral):
1005 if isinstance(testValue, np.unsignedinteger):
1006 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1008 fillValue = self.config.integerFillValue
1010 fillValue = self.config.floatFillValue
1011 dfTemp[col].values[:] = fillValue
1012 dfDict[filt] = dfTemp
1015 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1016 name_index = df.index.name
1019 if not self.config.multilevelOutput:
1020 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1021 if self.config.primaryKey
in noDupCols:
1022 noDupCols.remove(self.config.primaryKey)
1023 if dataId
and self.config.columnsFromDataId:
1024 noDupCols += self.config.columnsFromDataId
1025 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1026 inputBands=inputBands)
1029 for dataset, funcDict
in funcDicts_multiband.items():
1030 handle_multiband = handles_multi[dataset]
1031 df_dataset = handle_multiband.get()
1032 if isinstance(df_dataset, astropy.table.Table):
1034 if name_index
not in df_dataset.colnames:
1035 if self.config.primaryKey
in df_dataset.colnames:
1036 name_index_ap = self.config.primaryKey
1039 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1040 f
" {df_dataset.colnames=} for {dataset=}"
1043 name_index_ap = name_index
1044 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1046 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1049 result = self.transform(
1051 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1055 result.df.index.name = name_index
1057 if self.config.columnsFromDataId:
1058 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1060 result.df.drop(columns_drop, axis=1, inplace=
True)
1064 to_concat = pd.concat(
1065 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1066 )
if self.config.multilevelOutput
else result.df
1067 df = pd.concat([df, to_concat], axis=1)
1068 analysisDict[dataset] = result.analysis
1071 df.index.name = self.config.primaryKey
1073 if not self.config.multilevelOutput:
1074 tbl = pandas_to_astropy(df)
1078 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1080 return pipeBase.Struct(outputCatalog=tbl)
1083class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1084 dimensions=(
"tract",
"skymap")):
1085 inputCatalogs = connectionTypes.Input(
1086 doc=
"Per-Patch objectTables conforming to the standard data model.",
1088 storageClass=
"ArrowAstropy",
1089 dimensions=(
"tract",
"patch",
"skymap"),
1093 outputCatalog = connectionTypes.Output(
1094 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1095 name=
"objectTable_tract",
1096 storageClass=
"ArrowAstropy",
1097 dimensions=(
"tract",
"skymap"),
1101class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1102 pipelineConnections=ConsolidateObjectTableConnections):
1103 coaddName = pexConfig.Field(
1110class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1111 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1113 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1115 _DefaultName =
"consolidateObjectTable"
1116 ConfigClass = ConsolidateObjectTableConfig
1118 inputDataset =
"objectTable"
1119 outputDataset =
"objectTable_tract"
1121 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1122 inputs = butlerQC.get(inputRefs)
1123 self.log.info(
"Concatenating %s per-patch Object Tables",
1124 len(inputs[
"inputCatalogs"]))
1125 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1126 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1129class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1130 defaultTemplates={
"catalogType":
""},
1131 dimensions=(
"instrument",
"visit",
"detector")):
1133 inputCatalog = connectionTypes.Input(
1134 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1135 name=
"{catalogType}source",
1136 storageClass=
"DataFrame",
1137 dimensions=(
"instrument",
"visit",
"detector"),
1140 outputCatalog = connectionTypes.Output(
1141 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1142 "specified set of functors",
1143 name=
"{catalogType}sourceTable",
1144 storageClass=
"ArrowAstropy",
1145 dimensions=(
"instrument",
"visit",
"detector")
1149class TransformSourceTableConfig(TransformCatalogBaseConfig,
1150 pipelineConnections=TransformSourceTableConnections):
1152 def setDefaults(self):
1153 super().setDefaults()
1154 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1155 self.primaryKey =
"sourceId"
1156 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1159class TransformSourceTableTask(TransformCatalogBaseTask):
1160 """Transform/standardize a source catalog
1162 _DefaultName =
"transformSourceTable"
1163 ConfigClass = TransformSourceTableConfig
1166class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1167 dimensions=(
"instrument",
"visit",),
1168 defaultTemplates={
"calexpType":
""}):
1169 calexp = connectionTypes.Input(
1170 doc=
"Processed exposures used for metadata",
1172 storageClass=
"ExposureF",
1173 dimensions=(
"instrument",
"visit",
"detector"),
1177 visitSummary = connectionTypes.Output(
1178 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1179 "detector id for the id and are sorted for fast lookups of a "
1181 name=
"visitSummary",
1182 storageClass=
"ExposureCatalog",
1183 dimensions=(
"instrument",
"visit"),
1185 visitSummarySchema = connectionTypes.InitOutput(
1186 doc=
"Schema of the visitSummary catalog",
1187 name=
"visitSummary_schema",
1188 storageClass=
"ExposureCatalog",
1192class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1193 pipelineConnections=ConsolidateVisitSummaryConnections):
1194 """Config for ConsolidateVisitSummaryTask"""
1196 full = pexConfig.Field(
1197 "Whether to propate all exposure components. "
1198 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1199 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1200 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1206class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1207 """Task to consolidate per-detector visit metadata.
1209 This task aggregates the following metadata from all the detectors in a
1210 single visit into an exposure catalog:
1214 - The physical_filter and band (if available).
1216 - The aperture correction map.
1217 - The transmission curve.
1218 - The psf size, shape, and effective area at the center of the detector.
1219 - The corners of the bounding box in right ascension/declination.
1221 Tests for this task are performed in ci_hsc_gen3.
1223 _DefaultName =
"consolidateVisitSummary"
1224 ConfigClass = ConsolidateVisitSummaryConfig
1226 def __init__(self, **kwargs):
1227 super().__init__(**kwargs)
1228 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1229 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1230 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1231 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1232 ExposureSummaryStats.update_schema(self.schema)
1235 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1236 dataRefs = butlerQC.get(inputRefs.calexp)
1237 visit = dataRefs[0].dataId[
"visit"]
1239 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1240 len(dataRefs), visit)
1242 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1244 butlerQC.put(expCatalog, outputRefs.visitSummary)
1246 def _combineExposureMetadata(self, visit, dataRefs):
1247 """Make a combined exposure catalog from a list of dataRefs.
1248 These dataRefs must point to exposures with wcs, summaryStats,
1249 and other visit metadata.
1254 Visit identification number.
1255 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1256 List of dataRefs in visit.
1260 visitSummary : `lsst.afw.table.ExposureCatalog`
1261 Exposure catalog with per-detector summary information.
1264 cat.resize(len(dataRefs))
1266 cat[
"visit"] = visit
1268 for i, dataRef
in enumerate(dataRefs):
1269 visitInfo = dataRef.get(component=
"visitInfo")
1270 filterLabel = dataRef.get(component=
"filter")
1271 summaryStats = dataRef.get(component=
"summaryStats")
1272 detector = dataRef.get(component=
"detector")
1273 wcs = dataRef.get(component=
"wcs")
1274 photoCalib = dataRef.get(component=
"photoCalib")
1275 bbox = dataRef.get(component=
"bbox")
1276 validPolygon = dataRef.get(component=
"validPolygon")
1280 rec.setVisitInfo(visitInfo)
1282 rec.setPhotoCalib(photoCalib)
1283 rec.setValidPolygon(validPolygon)
1285 if self.config.full:
1286 rec.setPsf(dataRef.get(component=
"psf"))
1287 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1288 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1290 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1291 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1292 rec.setId(detector.getId())
1293 summaryStats.update_record(rec)
1296 raise pipeBase.NoWorkFound(
1297 "No detectors had sufficient information to make a visit summary row."
1301 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1303 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1304 cat.setMetadata(metadata)
1310class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1311 defaultTemplates={
"catalogType":
""},
1312 dimensions=(
"instrument",
"visit")):
1313 inputCatalogs = connectionTypes.Input(
1314 doc=
"Input per-detector Source Tables",
1315 name=
"{catalogType}sourceTable",
1316 storageClass=
"ArrowAstropy",
1317 dimensions=(
"instrument",
"visit",
"detector"),
1321 outputCatalog = connectionTypes.Output(
1322 doc=
"Per-visit concatenation of Source Table",
1323 name=
"{catalogType}sourceTable_visit",
1324 storageClass=
"ArrowAstropy",
1325 dimensions=(
"instrument",
"visit")
1329class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1330 pipelineConnections=ConsolidateSourceTableConnections):
1334class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1335 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1337 _DefaultName =
"consolidateSourceTable"
1338 ConfigClass = ConsolidateSourceTableConfig
1340 inputDataset =
"sourceTable"
1341 outputDataset =
"sourceTable_visit"
1343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1345 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1346 detectorOrder.sort()
1347 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1348 inputs = butlerQC.get(inputRefs)
1349 self.log.info(
"Concatenating %s per-detector Source Tables",
1350 len(inputs[
"inputCatalogs"]))
1351 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1352 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1355class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1356 dimensions=(
"instrument",),
1357 defaultTemplates={
"calexpType":
""}):
1358 visitSummaryRefs = connectionTypes.Input(
1359 doc=
"Data references for per-visit consolidated exposure metadata",
1360 name=
"finalVisitSummary",
1361 storageClass=
"ExposureCatalog",
1362 dimensions=(
"instrument",
"visit"),
1366 outputCatalog = connectionTypes.Output(
1367 doc=
"CCD and Visit metadata table",
1368 name=
"ccdVisitTable",
1369 storageClass=
"ArrowAstropy",
1370 dimensions=(
"instrument",)
1374class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1375 pipelineConnections=MakeCcdVisitTableConnections):
1376 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1379class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1380 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1382 _DefaultName =
"makeCcdVisitTable"
1383 ConfigClass = MakeCcdVisitTableConfig
1385 def run(self, visitSummaryRefs):
1386 """Make a table of ccd information from the visit summary catalogs.
1390 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1391 List of DeferredDatasetHandles pointing to exposure catalogs with
1392 per-detector summary information.
1396 result : `~lsst.pipe.base.Struct`
1397 Results struct with attribute:
1400 Catalog of ccd and visit information.
1403 for visitSummaryRef
in visitSummaryRefs:
1404 visitSummary = visitSummaryRef.get()
1405 if not visitSummary:
1407 visitInfo = visitSummary[0].getVisitInfo()
1410 strip_provenance_from_fits_header(visitSummary.metadata)
1413 summaryTable = visitSummary.asAstropy()
1414 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1415 "pixelScale",
"zenithDistance",
1416 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1417 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1418 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1419 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1420 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1421 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1422 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1423 "maxDistToNearestPsf",
1424 "effTime",
"effTimePsfSigmaScale",
1425 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1427 ccdEntry = summaryTable[selectColumns]
1432 ccdEntry.rename_column(
"visit",
"visitId")
1433 ccdEntry.rename_column(
"id",
"detectorId")
1437 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1439 ccdEntry[
"ccdVisitId"] = [
1440 self.config.idGenerator.apply(
1441 visitSummaryRef.dataId,
1442 detector=detector_id,
1449 for detector_id
in summaryTable[
"id"]
1451 ccdEntry[
"detector"] = summaryTable[
"id"]
1452 ccdEntry[
"seeing"] = (
1453 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1455 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1456 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1457 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1458 expTime = visitInfo.getExposureTime()
1459 ccdEntry[
"obsStart"] = (
1460 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1462 expTime_days = expTime / (60*60*24)
1463 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1464 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1465 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1466 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1467 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1468 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1469 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1470 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1471 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1472 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1473 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1474 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1478 ccdEntries.append(ccdEntry)
1480 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1481 return pipeBase.Struct(outputCatalog=outputCatalog)
1484class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1485 dimensions=(
"instrument",),
1486 defaultTemplates={
"calexpType":
""}):
1487 visitSummaries = connectionTypes.Input(
1488 doc=
"Per-visit consolidated exposure metadata",
1489 name=
"finalVisitSummary",
1490 storageClass=
"ExposureCatalog",
1491 dimensions=(
"instrument",
"visit",),
1495 outputCatalog = connectionTypes.Output(
1496 doc=
"Visit metadata table",
1498 storageClass=
"ArrowAstropy",
1499 dimensions=(
"instrument",)
1503class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1504 pipelineConnections=MakeVisitTableConnections):
1508class MakeVisitTableTask(pipeBase.PipelineTask):
1509 """Produce a `visitTable` from the visit summary exposure catalogs.
1511 _DefaultName =
"makeVisitTable"
1512 ConfigClass = MakeVisitTableConfig
1514 def run(self, visitSummaries):
1515 """Make a table of visit information from the visit summary catalogs.
1519 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1520 List of exposure catalogs with per-detector summary information.
1523 result : `~lsst.pipe.base.Struct`
1524 Results struct with attribute:
1527 Catalog of visit information.
1530 for visitSummary
in visitSummaries:
1531 visitSummary = visitSummary.get()
1532 if not visitSummary:
1534 visitRow = visitSummary[0]
1535 visitInfo = visitRow.getVisitInfo()
1538 visitEntry[
"visitId"] = visitRow[
"visit"]
1539 visitEntry[
"visit"] = visitRow[
"visit"]
1540 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1541 visitEntry[
"band"] = visitRow[
"band"]
1542 raDec = visitInfo.getBoresightRaDec()
1543 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1544 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1548 visitEntry[
"decl"] = visitEntry[
"dec"]
1550 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1551 azAlt = visitInfo.getBoresightAzAlt()
1552 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1553 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1554 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1555 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1556 expTime = visitInfo.getExposureTime()
1557 visitEntry[
"expTime"] = expTime
1558 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1559 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1560 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1561 expTime_days = expTime / (60*60*24)
1562 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1563 visitEntries.append(visitEntry)
1569 outputCatalog = astropy.table.Table(rows=visitEntries)
1570 return pipeBase.Struct(outputCatalog=outputCatalog)
1573@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1574 "This task will be removed after v30.",
1575 version=
"v29.0", category=FutureWarning)
1576class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1577 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1579 inputCatalog = connectionTypes.Input(
1580 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1581 "By default, it is the output of ForcedPhotCcdTask on calexps",
1583 storageClass=
"SourceCatalog",
1584 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1586 inputCatalogDiff = connectionTypes.Input(
1587 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1588 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1590 storageClass=
"SourceCatalog",
1591 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1593 outputCatalog = connectionTypes.Output(
1594 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1595 name=
"mergedForcedSource",
1596 storageClass=
"DataFrame",
1597 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1601@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1602 "This task will be removed after v30.",
1603 version=
"v29.0", category=FutureWarning)
1604class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1605 pipelineConnections=WriteForcedSourceTableConnections):
1607 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1613@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1614 "This task will be removed after v30.",
1615 version=
"v29.0", category=FutureWarning)
1616class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1617 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1619 Because the predecessor ForcedPhotCcdTask operates per-detector,
1620 per-tract, (i.e., it has tract in its dimensions), detectors
1621 on the tract boundary may have multiple forced source catalogs.
1623 The successor task TransformForcedSourceTable runs per-patch
1624 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1625 available multiple epochs.
1627 _DefaultName =
"writeForcedSourceTable"
1628 ConfigClass = WriteForcedSourceTableConfig
1630 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1631 inputs = butlerQC.get(inputRefs)
1632 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1633 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1634 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1635 outputs = self.run(**inputs)
1636 butlerQC.put(outputs, outputRefs)
1638 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1640 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1641 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1642 df = df.reindex(sorted(df.columns), axis=1)
1645 df[
"detector"] = np.int16(detector)
1646 df[
"band"] = band
if band
else pd.NA
1647 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1648 names=(
"dataset",
"column"))
1652 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1653 return pipeBase.Struct(outputCatalog=outputCatalog)
1656class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1657 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1659 inputCatalogs = connectionTypes.Input(
1660 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1661 name=
"mergedForcedSource",
1662 storageClass=
"DataFrame",
1663 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1667 referenceCatalog = connectionTypes.Input(
1668 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1669 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1672 storageClass=
"DataFrame",
1673 dimensions=(
"tract",
"patch",
"skymap"),
1676 outputCatalog = connectionTypes.Output(
1677 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1678 "specified set of functors",
1679 name=
"forcedSourceTable",
1680 storageClass=
"ArrowAstropy",
1681 dimensions=(
"tract",
"patch",
"skymap")
1685class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1686 pipelineConnections=TransformForcedSourceTableConnections):
1687 referenceColumns = pexConfig.ListField(
1689 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1691 doc=
"Columns to pull from reference catalog",
1694 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1699 doc=
"Rename the output DataFrame index to this name",
1701 default=
"forcedSourceId",
1704 def setDefaults(self):
1705 super().setDefaults()
1706 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1707 self.columnsFromDataId = [
"tract",
"patch"]
1710class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1711 """Transform/standardize a ForcedSource catalog
1713 Transforms each wide, per-detector forcedSource DataFrame per the
1714 specification file (per-camera defaults found in ForcedSource.yaml).
1715 All epochs that overlap the patch are aggregated into one per-patch
1716 narrow-DataFrame file.
1718 No de-duplication of rows is performed. Duplicate resolutions flags are
1719 pulled in from the referenceCatalog: `detect_isPrimary`,
1720 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1721 for analysis or compare duplicates for QA.
1723 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1724 per-visit rows can be retreived by joining with the CcdVisitTable on
1727 _DefaultName =
"transformForcedSourceTable"
1728 ConfigClass = TransformForcedSourceTableConfig
1730 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1731 inputs = butlerQC.get(inputRefs)
1732 if self.funcs
is None:
1733 raise ValueError(
"config.functorFile is None. "
1734 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1735 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1736 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1738 butlerQC.put(outputs, outputRefs)
1740 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1742 refColumns = list(self.config.referenceColumns)
1743 refColumns.append(self.config.keyRef)
1744 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1745 if ref.index.name != self.config.keyRef:
1751 ref.set_index(self.config.keyRef, inplace=
True)
1752 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1753 for handle
in inputCatalogs:
1754 result = self.transform(
None, handle, funcs, dataId)
1756 dfs.append(result.df.join(ref, how=
"inner"))
1758 outputCatalog = pd.concat(dfs)
1760 if outputCatalog.empty:
1761 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1765 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1767 outputCatalog.reset_index(inplace=
True)
1770 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1772 outputCatalog.index.rename(self.config.key, inplace=
True)
1774 self.log.info(
"Made a table of %d columns and %d rows",
1775 len(outputCatalog.columns), len(outputCatalog))
1776 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1779class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1780 defaultTemplates={
"catalogType":
""},
1781 dimensions=(
"instrument",
"tract")):
1782 inputCatalogs = connectionTypes.Input(
1783 doc=
"Input per-patch DataFrame Tables to be concatenated",
1784 name=
"{catalogType}ForcedSourceTable",
1785 storageClass=
"DataFrame",
1786 dimensions=(
"tract",
"patch",
"skymap"),
1790 outputCatalog = connectionTypes.Output(
1791 doc=
"Output per-tract concatenation of DataFrame Tables",
1792 name=
"{catalogType}ForcedSourceTable_tract",
1793 storageClass=
"DataFrame",
1794 dimensions=(
"tract",
"skymap"),
1798class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1799 pipelineConnections=ConsolidateTractConnections):
1803class ConsolidateTractTask(pipeBase.PipelineTask):
1804 """Concatenate any per-patch, dataframe list into a single
1805 per-tract DataFrame.
1807 _DefaultName =
"ConsolidateTract"
1808 ConfigClass = ConsolidateTractConfig
1810 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1811 inputs = butlerQC.get(inputRefs)
1814 self.log.info(
"Concatenating %s per-patch %s Tables",
1815 len(inputs[
"inputCatalogs"]),
1816 inputRefs.inputCatalogs[0].datasetType.name)
1817 df = pd.concat(inputs[
"inputCatalogs"])
1818 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)