22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
40from deprecated.sphinx
import deprecated
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
55from lsst.pipe.base import NoWorkFound, UpstreamFailureNoWorkFound, connectionTypes
57from lsst.afw.image
import ExposureSummaryStats, ExposureF
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header, TableVStack
61from .coaddBase
import reorderRefs
62from .functors
import CompositeFunctor, Column
64log = logging.getLogger(__name__)
67def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
68 """Flattens a dataframe with multilevel column index.
70 newDf = pd.DataFrame()
72 dfBands = df.columns.unique(level=0).values
75 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
76 newColumns = {c: columnFormat.format(band, c)
77 for c
in subdf.columns
if c
not in noDupCols}
78 cols = list(newColumns.keys())
79 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
82 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
84 noDupDf = df[presentBands[0]][noDupCols]
85 newDf = pd.concat([noDupDf, newDf], axis=1)
90 defaultTemplates={
"coaddName":
"deep"},
91 dimensions=(
"tract",
"patch",
"skymap")):
92 inputCatalogMeas = connectionTypes.Input(
93 doc=
"Catalog of source measurements on the deepCoadd.",
94 dimensions=(
"tract",
"patch",
"band",
"skymap"),
95 storageClass=
"SourceCatalog",
96 name=
"{coaddName}Coadd_meas",
99 inputCatalogForcedSrc = connectionTypes.Input(
100 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
101 dimensions=(
"tract",
"patch",
"band",
"skymap"),
102 storageClass=
"SourceCatalog",
103 name=
"{coaddName}Coadd_forced_src",
106 inputCatalogPsfsMultiprofit = connectionTypes.Input(
107 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
108 dimensions=(
"tract",
"patch",
"band",
"skymap"),
109 storageClass=
"ArrowAstropy",
110 name=
"{coaddName}Coadd_psfs_multiprofit",
113 outputCatalog = connectionTypes.Output(
114 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
115 "stored as a DataFrame with a multi-level column index per-patch.",
116 dimensions=(
"tract",
"patch",
"skymap"),
117 storageClass=
"DataFrame",
118 name=
"{coaddName}Coadd_obj"
122class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
123 pipelineConnections=WriteObjectTableConnections):
124 coaddName = pexConfig.Field(
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged object tables as a DataFrame in parquet format.
134 _DefaultName =
"writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
138 outputDataset =
"obj"
140 def runQuantum(self, butlerQC, inputRefs, outputRefs):
141 inputs = butlerQC.get(inputRefs)
143 catalogs = defaultdict(dict)
144 for dataset, connection
in (
145 (
"meas",
"inputCatalogMeas"),
146 (
"forced_src",
"inputCatalogForcedSrc"),
147 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
149 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
150 catalogs[ref.dataId[
"band"]][dataset] = cat
152 dataId = butlerQC.quantum.dataId
153 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
154 outputs = pipeBase.Struct(outputCatalog=df)
155 butlerQC.put(outputs, outputRefs)
157 def run(self, catalogs, tract, patch):
158 """Merge multiple catalogs.
163 Mapping from filter names to dict of catalogs.
165 tractId to use for the tractId column.
167 patchId to use for the patchId column.
171 catalog : `pandas.DataFrame`
177 Raised if any of the catalogs is of an unsupported type.
180 for filt, tableDict
in catalogs.items():
181 for dataset, table
in tableDict.items():
183 if isinstance(table, pd.DataFrame):
186 df = table.asAstropy().to_pandas()
187 elif isinstance(table, astropy.table.Table):
188 df = table.to_pandas()
190 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
191 df.set_index(
"id", drop=
True, inplace=
True)
194 df = df.reindex(sorted(df.columns), axis=1)
195 df = df.assign(tractId=tract, patchId=patch)
198 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
199 names=(
"dataset",
"band",
"column"))
204 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
208class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
209 defaultTemplates={
"catalogType":
""},
210 dimensions=(
"instrument",
"visit",
"detector")):
212 catalog = connectionTypes.Input(
213 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
214 name=
"{catalogType}src",
215 storageClass=
"SourceCatalog",
216 dimensions=(
"instrument",
"visit",
"detector")
218 outputCatalog = connectionTypes.Output(
219 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
220 name=
"{catalogType}source",
221 storageClass=
"ArrowAstropy",
222 dimensions=(
"instrument",
"visit",
"detector")
226class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
227 pipelineConnections=WriteSourceTableConnections):
231class WriteSourceTableTask(pipeBase.PipelineTask):
232 """Write source table to DataFrame Parquet format.
234 _DefaultName =
"writeSourceTable"
235 ConfigClass = WriteSourceTableConfig
237 def runQuantum(self, butlerQC, inputRefs, outputRefs):
238 inputs = butlerQC.get(inputRefs)
239 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
240 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
241 result = self.run(**inputs)
242 outputs = pipeBase.Struct(outputCatalog=result.table)
243 butlerQC.put(outputs, outputRefs)
245 def run(self, catalog, visit, detector, **kwargs):
246 """Convert `src` catalog to an Astropy table.
250 catalog: `afwTable.SourceCatalog`
251 catalog to be converted
252 visit, detector: `int`
253 Visit and detector ids to be added as columns.
255 Additional keyword arguments are ignored as a convenience for
256 subclasses that pass the same arguments to several different
261 result : `~lsst.pipe.base.Struct`
263 `astropy.table.Table` version of the input catalog
265 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
266 tbl = catalog.asAstropy()
269 tbl[
"detector"] = np.int16(detector)
271 return pipeBase.Struct(table=tbl)
274class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
275 defaultTemplates={
"catalogType":
""},
276 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
277 visitSummary = connectionTypes.Input(
278 doc=
"Input visit-summary catalog with updated calibration objects.",
279 name=
"finalVisitSummary",
280 storageClass=
"ExposureCatalog",
281 dimensions=(
"instrument",
"visit",),
284 def __init__(self, config):
292 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
295class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
296 pipelineConnections=WriteRecalibratedSourceTableConnections):
298 doReevaluatePhotoCalib = pexConfig.Field(
301 doc=(
"Add or replace local photoCalib columns"),
303 doReevaluateSkyWcs = pexConfig.Field(
306 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
310class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
311 """Write source table to DataFrame Parquet format.
313 _DefaultName =
"writeRecalibratedSourceTable"
314 ConfigClass = WriteRecalibratedSourceTableConfig
316 def runQuantum(self, butlerQC, inputRefs, outputRefs):
317 inputs = butlerQC.get(inputRefs)
319 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
320 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
322 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
323 exposure = ExposureF()
324 inputs[
"exposure"] = self.prepareCalibratedExposure(
326 visitSummary=inputs[
"visitSummary"],
327 detectorId=butlerQC.quantum.dataId[
"detector"]
329 inputs[
"catalog"] = self.addCalibColumns(**inputs)
331 result = self.run(**inputs)
332 outputs = pipeBase.Struct(outputCatalog=result.table)
333 butlerQC.put(outputs, outputRefs)
335 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
336 """Prepare a calibrated exposure and apply external calibrations
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Input exposure to adjust calibrations. May be an empty Exposure.
344 Detector ID associated with the exposure.
345 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
346 Exposure catalog with all calibration objects. WCS and PhotoCalib
347 are always applied if ``visitSummary`` is provided and those
348 components are not `None`.
352 exposure : `lsst.afw.image.exposure.Exposure`
353 Exposure with adjusted calibrations.
355 if visitSummary
is not None:
356 row = visitSummary.find(detectorId)
358 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
359 if (photoCalib := row.getPhotoCalib())
is None:
360 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
361 "skipping reevaluation of photoCalib.", detectorId)
362 exposure.setPhotoCalib(
None)
364 exposure.setPhotoCalib(photoCalib)
365 if (skyWcs := row.getWcs())
is None:
366 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
367 "skipping reevaluation of skyWcs.", detectorId)
368 exposure.setWcs(
None)
370 exposure.setWcs(skyWcs)
374 def addCalibColumns(self, catalog, exposure, **kwargs):
375 """Add replace columns with calibs evaluated at each centroid
377 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
378 a source catalog, by rerunning the plugins.
382 catalog : `lsst.afw.table.SourceCatalog`
383 catalog to which calib columns will be added
384 exposure : `lsst.afw.image.exposure.Exposure`
385 Exposure with attached PhotoCalibs and SkyWcs attributes to be
386 reevaluated at local centroids. Pixels are not required.
388 Additional keyword arguments are ignored to facilitate passing the
389 same arguments to several methods.
393 newCat: `lsst.afw.table.SourceCatalog`
394 Source Catalog with requested local calib columns
396 measureConfig = SingleFrameMeasurementTask.ConfigClass()
397 measureConfig.doReplaceWithNoise =
False
400 for slot
in measureConfig.slots:
401 setattr(measureConfig.slots, slot,
None)
403 measureConfig.plugins.names = []
404 if self.config.doReevaluateSkyWcs:
405 measureConfig.plugins.names.add(
"base_LocalWcs")
406 self.log.info(
"Re-evaluating base_LocalWcs plugin")
407 if self.config.doReevaluatePhotoCalib:
408 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
409 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
410 pluginsNotToCopy = tuple(measureConfig.plugins.names)
414 aliasMap = catalog.schema.getAliasMap()
416 for item
in catalog.schema:
417 if not item.field.getName().startswith(pluginsNotToCopy):
418 mapper.addMapping(item.key)
420 schema = mapper.getOutputSchema()
422 schema.setAliasMap(aliasMap)
424 newCat.extend(catalog, mapper=mapper)
430 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
432 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
436 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
437 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
442 if wcsPlugin
is not None:
443 wcsPlugin.measure(row, exposure)
444 if pcPlugin
is not None:
445 pcPlugin.measure(row, exposure)
450class PostprocessAnalysis(object):
451 """Calculate columns from DataFrames or handles storing DataFrames.
453 This object manages and organizes an arbitrary set of computations
454 on a catalog. The catalog is defined by a
455 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
456 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
457 computations are defined by a collection of
458 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
459 ``CompositeFunctor``).
461 After the object is initialized, accessing the ``.df`` attribute (which
462 holds the `pandas.DataFrame` containing the results of the calculations)
463 triggers computation of said dataframe.
465 One of the conveniences of using this object is the ability to define a
466 desired common filter for all functors. This enables the same functor
467 collection to be passed to several different `PostprocessAnalysis` objects
468 without having to change the original functor collection, since the ``filt``
469 keyword argument of this object triggers an overwrite of the ``filt``
470 property for all functors in the collection.
472 This object also allows a list of refFlags to be passed, and defines a set
473 of default refFlags that are always included even if not requested.
475 If a list of DataFrames or Handles is passed, rather than a single one,
476 then the calculations will be mapped over all the input catalogs. In
477 principle, it should be straightforward to parallelize this activity, but
478 initial tests have failed (see TODO in code comments).
482 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
483 `~lsst.pipe.base.InMemoryDatasetHandle` or
485 Source catalog(s) for computation.
486 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
487 Computations to do (functors that act on ``handles``).
488 If a dict, the output
489 DataFrame will have columns keyed accordingly.
490 If a list, the column keys will come from the
491 ``.shortname`` attribute of each functor.
493 filt : `str`, optional
494 Filter in which to calculate. If provided,
495 this will overwrite any existing ``.filt`` attribute
496 of the provided functors.
498 flags : `list`, optional
499 List of flags (per-band) to include in output table.
500 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
502 refFlags : `list`, optional
503 List of refFlags (only reference band) to include in output table.
505 forcedFlags : `list`, optional
506 List of flags (per-band) to include in output table.
507 Taken from the ``forced_src`` dataset if applied to a
508 multilevel Object Table. Intended for flags from measurement plugins
509 only run during multi-band forced-photometry.
511 _defaultRefFlags = []
514 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
515 self.handles = handles
516 self.functors = functors
519 self.flags = list(flags)
if flags
is not None else []
520 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
521 self.refFlags = list(self._defaultRefFlags)
522 if refFlags
is not None:
523 self.refFlags += list(refFlags)
528 def defaultFuncs(self):
529 funcs = dict(self._defaultFuncs)
534 additionalFuncs = self.defaultFuncs
535 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
536 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
537 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
539 if isinstance(self.functors, CompositeFunctor):
544 func.funcDict.update(additionalFuncs)
545 func.filt = self.filt
551 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
559 def compute(self, dropna=False, pool=None):
561 if type(self.handles)
in (list, tuple):
563 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
567 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
568 self._df = pd.concat(dflist)
570 self._df = self.func(self.handles, dropna=dropna)
575class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
577 """Expected Connections for subclasses of TransformCatalogBaseTask.
581 inputCatalog = connectionTypes.Input(
583 storageClass=
"DataFrame",
585 outputCatalog = connectionTypes.Output(
587 storageClass=
"ArrowAstropy",
591class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
592 pipelineConnections=TransformCatalogBaseConnections):
593 functorFile = pexConfig.Field(
595 doc=
"Path to YAML file specifying Science Data Model functors to use "
596 "when copying columns and computing calibrated values.",
600 primaryKey = pexConfig.Field(
602 doc=
"Name of column to be set as the DataFrame index. If None, the index"
603 "will be named `id`",
607 columnsFromDataId = pexConfig.ListField(
611 doc=
"Columns to extract from the dataId",
615class TransformCatalogBaseTask(pipeBase.PipelineTask):
616 """Base class for transforming/standardizing a catalog by applying functors
617 that convert units and apply calibrations.
619 The purpose of this task is to perform a set of computations on an input
620 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
621 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
622 a new dataset (which needs to be declared in an ``outputDataset``
625 The calculations to be performed are defined in a YAML file that specifies
626 a set of functors to be computed, provided as a ``--functorFile`` config
627 parameter. An example of such a YAML file is the following:
634 args: slot_Centroid_x
637 args: slot_Centroid_y
639 functor: LocalNanojansky
641 - slot_PsfFlux_instFlux
642 - slot_PsfFlux_instFluxErr
643 - base_LocalPhotoCalib
644 - base_LocalPhotoCalibErr
646 functor: LocalNanojanskyErr
648 - slot_PsfFlux_instFlux
649 - slot_PsfFlux_instFluxErr
650 - base_LocalPhotoCalib
651 - base_LocalPhotoCalibErr
655 The names for each entry under "func" will become the names of columns in
656 the output dataset. All the functors referenced are defined in
657 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
658 functor are in the `args` list, and any additional entries for each column
659 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
660 treated as keyword arguments to be passed to the functor initialization.
662 The "flags" entry is the default shortcut for `Column` functors.
663 All columns listed under "flags" will be copied to the output table
664 untransformed. They can be of any datatype.
665 In the special case of transforming a multi-level oject table with
666 band and dataset indices (deepCoadd_obj), these will be taked from the
667 ``meas`` dataset and exploded out per band.
669 There are two special shortcuts that only apply when transforming
670 multi-level Object (deepCoadd_obj) tables:
671 - The "refFlags" entry is shortcut for `Column` functor
672 taken from the ``ref`` dataset if transforming an ObjectTable.
673 - The "forcedFlags" entry is shortcut for `Column` functors.
674 taken from the ``forced_src`` dataset if transforming an ObjectTable.
675 These are expanded out per band.
678 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
679 to organize and excecute the calculations.
682 def _DefaultName(self):
683 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
686 def outputDataset(self):
687 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
690 def inputDataset(self):
691 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
694 def ConfigClass(self):
695 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
697 def __init__(self, *args, **kwargs):
698 super().__init__(*args, **kwargs)
699 if self.config.functorFile:
700 self.log.info(
"Loading tranform functor definitions from %s",
701 self.config.functorFile)
702 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
703 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
707 def runQuantum(self, butlerQC, inputRefs, outputRefs):
708 inputs = butlerQC.get(inputRefs)
709 if self.
funcs is None:
710 raise ValueError(
"config.functorFile is None. "
711 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
712 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
713 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
714 butlerQC.put(result, outputRefs)
716 def run(self, handle, funcs=None, dataId=None, band=None):
717 """Do postprocessing calculations
719 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
720 ``DataFrame`` object and dataId,
721 returns a dataframe with results of postprocessing calculations.
725 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
726 `~lsst.pipe.base.InMemoryDatasetHandle` or
727 `~pandas.DataFrame`, or list of these.
728 DataFrames from which calculations are done.
729 funcs : `~lsst.pipe.tasks.functors.Functor`
730 Functors to apply to the table's columns
731 dataId : dict, optional
732 Used to add a `patchId` column to the output dataframe.
733 band : `str`, optional
734 Filter band that is being processed.
738 result : `lsst.pipe.base.Struct`
739 Result struct, with a single ``outputCatalog`` attribute holding
740 the transformed catalog.
742 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
744 df = self.transform(band, handle, funcs, dataId).df
745 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
748 raise UpstreamFailureNoWorkFound(
749 "Input catalog is empty, so there is nothing to transform/standardize",
752 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
755 def getFunctors(self):
758 def getAnalysis(self, handles, funcs=None, band=None):
761 analysis = PostprocessAnalysis(handles, funcs, filt=band)
764 def transform(self, band, handles, funcs, dataId):
765 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
767 if dataId
and self.config.columnsFromDataId:
768 for key
in self.config.columnsFromDataId:
770 if key ==
"detector":
772 df[key] = np.int16(dataId[key])
774 df[key] = dataId[key]
776 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
778 if self.config.primaryKey:
779 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
780 df.reset_index(inplace=
True, drop=
True)
781 df.set_index(self.config.primaryKey, inplace=
True)
783 return pipeBase.Struct(
790 defaultTemplates={
"coaddName":
"deep"},
791 dimensions=(
"tract",
"patch",
"skymap")):
792 inputCatalog = connectionTypes.Input(
793 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
794 "stored as a DataFrame with a multi-level column index per-patch.",
795 dimensions=(
"tract",
"patch",
"skymap"),
796 storageClass=
"DataFrame",
797 name=
"{coaddName}Coadd_obj",
800 inputCatalogRef = connectionTypes.Input(
801 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
802 "for each detection in deepCoadd_mergeDet.",
803 dimensions=(
"tract",
"patch",
"skymap"),
804 storageClass=
"SourceCatalog",
805 name=
"{coaddName}Coadd_ref",
808 inputCatalogExpMultiprofit = connectionTypes.Input(
809 doc=
"Catalog of multiband Exponential fits.",
810 dimensions=(
"tract",
"patch",
"skymap"),
811 storageClass=
"ArrowAstropy",
812 name=
"{coaddName}Coadd_Exp_multiprofit",
815 inputCatalogSersicMultiprofit = connectionTypes.Input(
816 doc=
"Catalog of multiband Sersic fits.",
817 dimensions=(
"tract",
"patch",
"skymap"),
818 storageClass=
"ArrowAstropy",
819 name=
"{coaddName}Coadd_Sersic_multiprofit",
822 inputCatalogEpoch = connectionTypes.Input(
823 doc=
"Catalog of mean epochs for each object per band.",
824 dimensions=(
"tract",
"patch",
"skymap"),
825 storageClass=
"ArrowAstropy",
829 outputCatalog = connectionTypes.Output(
830 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
832 dimensions=(
"tract",
"patch",
"skymap"),
833 storageClass=
"ArrowAstropy",
837 def __init__(self, *, config=None):
838 super().__init__(config=config)
839 if config.multilevelOutput:
840 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
843class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
844 pipelineConnections=TransformObjectCatalogConnections):
845 coaddName = pexConfig.Field(
850 outputBands = pexConfig.ListField(
854 doc=(
"These bands and only these bands will appear in the output,"
855 " NaN-filled if the input does not include them."
856 " If None, then use all bands found in the input.")
858 camelCase = pexConfig.Field(
861 doc=(
"Write per-band columns names with camelCase, else underscore "
862 "For example: gPsFlux instead of g_PsFlux.")
864 multilevelOutput = pexConfig.Field(
867 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
868 "and name-munged (False). If True, the output storage class will be "
869 "set to DataFrame, since astropy tables do not support multi-level indexing."),
870 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
872 goodFlags = pexConfig.ListField(
875 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
876 "All other flags are considered to be 'bad' flags and will be set to True.")
878 floatFillValue = pexConfig.Field(
881 doc=
"Fill value for float fields when populating empty tables."
883 integerFillValue = pexConfig.Field(
886 doc=
"Fill value for integer fields when populating empty tables."
889 def setDefaults(self):
890 super().setDefaults()
891 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
892 self.primaryKey =
"objectId"
893 self.columnsFromDataId = [
"tract",
"patch"]
894 self.goodFlags = [
"calib_astrometry_used",
895 "calib_photometry_reserved",
896 "calib_photometry_used",
897 "calib_psf_candidate",
898 "calib_psf_reserved",
902class TransformObjectCatalogTask(TransformCatalogBaseTask):
903 """Produce a flattened Object Table to match the format specified in
906 Do the same set of postprocessing calculations on all bands.
908 This is identical to `TransformCatalogBaseTask`, except for that it does
909 the specified functor calculations for all filters present in the
910 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
911 by the YAML file will be superceded.
913 _DefaultName =
"transformObjectCatalog"
914 ConfigClass = TransformObjectCatalogConfig
916 datasets_multiband = (
"epoch",
"ref",
"Exp_multiprofit",
"Sersic_multiprofit")
918 def runQuantum(self, butlerQC, inputRefs, outputRefs):
919 inputs = butlerQC.get(inputRefs)
920 if self.funcs
is None:
921 raise ValueError(
"config.functorFile is None. "
922 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
923 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
924 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
925 handle_epoch=inputs[
"inputCatalogEpoch"],
926 handle_ref=inputs[
"inputCatalogRef"],
927 handle_Exp_multiprofit=inputs[
"inputCatalogExpMultiprofit"],
928 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
930 butlerQC.put(result, outputRefs)
932 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
936 if isinstance(funcs, CompositeFunctor):
937 funcDict_in = funcs.funcDict
938 elif isinstance(funcs, dict):
940 elif isinstance(funcs, list):
941 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
943 raise TypeError(f
"Unsupported {type(funcs)=}")
946 funcDicts_multiband = {}
947 for dataset
in self.datasets_multiband:
948 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
949 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
950 handles_multi[dataset] = handle_multi
951 funcDicts_multiband[dataset] = {}
955 templateDf = pd.DataFrame()
957 columns = handle.get(component=
"columns")
958 inputBands = columns.unique(level=1).values
960 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
965 for name, func
in funcDict_in.items():
966 if func.dataset
in funcDicts_multiband:
968 if band := getattr(func,
"band_to_check",
None):
969 if band
not in outputBands:
972 elif hasattr(func,
"bands"):
977 func.bands = tuple(inputBands)
979 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
980 funcDict[name] = func
985 for inputBand
in inputBands:
986 if inputBand
not in outputBands:
987 self.log.info(
"Ignoring %s band data in the input", inputBand)
989 self.log.info(
"Transforming the catalog of band %s", inputBand)
990 result = self.transform(inputBand, handle, funcs_band, dataId)
991 dfDict[inputBand] = result.df
992 analysisDict[inputBand] = result.analysis
994 templateDf = result.df
997 for filt
in outputBands:
998 if filt
not in dfDict:
999 self.log.info(
"Adding empty columns for band %s", filt)
1000 dfTemp = templateDf.copy()
1001 for col
in dfTemp.columns:
1002 testValue = dfTemp[col].values[0]
1003 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1005 if col
in self.config.goodFlags:
1009 elif isinstance(testValue, numbers.Integral):
1013 if isinstance(testValue, np.unsignedinteger):
1014 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1016 fillValue = self.config.integerFillValue
1018 fillValue = self.config.floatFillValue
1019 dfTemp[col].values[:] = fillValue
1020 dfDict[filt] = dfTemp
1023 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1024 name_index = df.index.name
1027 if not self.config.multilevelOutput:
1028 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1029 if self.config.primaryKey
in noDupCols:
1030 noDupCols.remove(self.config.primaryKey)
1031 if dataId
and self.config.columnsFromDataId:
1032 noDupCols += self.config.columnsFromDataId
1033 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1034 inputBands=inputBands)
1037 for dataset, funcDict
in funcDicts_multiband.items():
1038 handle_multiband = handles_multi[dataset]
1039 df_dataset = handle_multiband.get()
1040 if isinstance(df_dataset, astropy.table.Table):
1042 if name_index
not in df_dataset.colnames:
1043 if self.config.primaryKey
in df_dataset.colnames:
1044 name_index_ap = self.config.primaryKey
1047 f
"Neither of {name_index=} nor {self.config.primaryKey=} appear in"
1048 f
" {df_dataset.colnames=} for {dataset=}"
1051 name_index_ap = name_index
1052 df_dataset = df_dataset.to_pandas().set_index(name_index_ap, drop=
False)
1054 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1057 result = self.transform(
1059 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1063 result.df.index.name = name_index
1065 if self.config.columnsFromDataId:
1066 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1068 result.df.drop(columns_drop, axis=1, inplace=
True)
1072 to_concat = pd.concat(
1073 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1074 )
if self.config.multilevelOutput
else result.df
1075 df = pd.concat([df, to_concat], axis=1)
1076 analysisDict[dataset] = result.analysis
1079 df.index.name = self.config.primaryKey
1081 if not self.config.multilevelOutput:
1082 tbl = pandas_to_astropy(df)
1086 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1088 return pipeBase.Struct(outputCatalog=tbl)
1091class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1092 dimensions=(
"tract",
"skymap")):
1093 inputCatalogs = connectionTypes.Input(
1094 doc=
"Per-Patch objectTables conforming to the standard data model.",
1096 storageClass=
"ArrowAstropy",
1097 dimensions=(
"tract",
"patch",
"skymap"),
1101 outputCatalog = connectionTypes.Output(
1102 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1103 name=
"objectTable_tract",
1104 storageClass=
"ArrowAstropy",
1105 dimensions=(
"tract",
"skymap"),
1109class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1110 pipelineConnections=ConsolidateObjectTableConnections):
1111 coaddName = pexConfig.Field(
1118class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1119 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1121 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1123 _DefaultName =
"consolidateObjectTable"
1124 ConfigClass = ConsolidateObjectTableConfig
1126 inputDataset =
"objectTable"
1127 outputDataset =
"objectTable_tract"
1129 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1130 inputs = butlerQC.get(inputRefs)
1131 self.log.info(
"Concatenating %s per-patch Object Tables",
1132 len(inputs[
"inputCatalogs"]))
1133 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1134 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1137class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1138 defaultTemplates={
"catalogType":
""},
1139 dimensions=(
"instrument",
"visit",
"detector")):
1141 inputCatalog = connectionTypes.Input(
1142 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1143 name=
"{catalogType}source",
1144 storageClass=
"DataFrame",
1145 dimensions=(
"instrument",
"visit",
"detector"),
1148 outputCatalog = connectionTypes.Output(
1149 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1150 "specified set of functors",
1151 name=
"{catalogType}sourceTable",
1152 storageClass=
"ArrowAstropy",
1153 dimensions=(
"instrument",
"visit",
"detector")
1157class TransformSourceTableConfig(TransformCatalogBaseConfig,
1158 pipelineConnections=TransformSourceTableConnections):
1160 def setDefaults(self):
1161 super().setDefaults()
1162 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1163 self.primaryKey =
"sourceId"
1164 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1167class TransformSourceTableTask(TransformCatalogBaseTask):
1168 """Transform/standardize a source catalog
1170 _DefaultName =
"transformSourceTable"
1171 ConfigClass = TransformSourceTableConfig
1174class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1175 dimensions=(
"instrument",
"visit",),
1176 defaultTemplates={
"calexpType":
""}):
1177 calexp = connectionTypes.Input(
1178 doc=
"Processed exposures used for metadata",
1180 storageClass=
"ExposureF",
1181 dimensions=(
"instrument",
"visit",
"detector"),
1185 visitSummary = connectionTypes.Output(
1186 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1187 "detector id for the id and are sorted for fast lookups of a "
1189 name=
"visitSummary",
1190 storageClass=
"ExposureCatalog",
1191 dimensions=(
"instrument",
"visit"),
1193 visitSummarySchema = connectionTypes.InitOutput(
1194 doc=
"Schema of the visitSummary catalog",
1195 name=
"visitSummary_schema",
1196 storageClass=
"ExposureCatalog",
1200class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1201 pipelineConnections=ConsolidateVisitSummaryConnections):
1202 """Config for ConsolidateVisitSummaryTask"""
1204 full = pexConfig.Field(
1205 "Whether to propate all exposure components. "
1206 "This adds PSF, aperture correction map, transmission curve, and detector, which can increase file "
1207 "size by more than factor of 10, but it makes the visit summaries produced by this task fully usable"
1208 "by tasks that were designed to run downstream of lsst.drp.tasks.UpdateVisitSummaryTask.",
1214class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1215 """Task to consolidate per-detector visit metadata.
1217 This task aggregates the following metadata from all the detectors in a
1218 single visit into an exposure catalog:
1222 - The physical_filter and band (if available).
1224 - The aperture correction map.
1225 - The transmission curve.
1226 - The psf size, shape, and effective area at the center of the detector.
1227 - The corners of the bounding box in right ascension/declination.
1229 Tests for this task are performed in ci_hsc_gen3.
1231 _DefaultName =
"consolidateVisitSummary"
1232 ConfigClass = ConsolidateVisitSummaryConfig
1234 def __init__(self, **kwargs):
1235 super().__init__(**kwargs)
1236 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1237 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1238 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1239 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1240 ExposureSummaryStats.update_schema(self.schema)
1243 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1244 dataRefs = butlerQC.get(inputRefs.calexp)
1245 visit = dataRefs[0].dataId[
"visit"]
1247 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1248 len(dataRefs), visit)
1250 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1252 butlerQC.put(expCatalog, outputRefs.visitSummary)
1254 def _combineExposureMetadata(self, visit, dataRefs):
1255 """Make a combined exposure catalog from a list of dataRefs.
1256 These dataRefs must point to exposures with wcs, summaryStats,
1257 and other visit metadata.
1262 Visit identification number.
1263 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1264 List of dataRefs in visit.
1268 visitSummary : `lsst.afw.table.ExposureCatalog`
1269 Exposure catalog with per-detector summary information.
1272 cat.resize(len(dataRefs))
1274 cat[
"visit"] = visit
1276 for i, dataRef
in enumerate(dataRefs):
1277 visitInfo = dataRef.get(component=
"visitInfo")
1278 filterLabel = dataRef.get(component=
"filter")
1279 summaryStats = dataRef.get(component=
"summaryStats")
1280 detector = dataRef.get(component=
"detector")
1281 wcs = dataRef.get(component=
"wcs")
1282 photoCalib = dataRef.get(component=
"photoCalib")
1283 bbox = dataRef.get(component=
"bbox")
1284 validPolygon = dataRef.get(component=
"validPolygon")
1288 rec.setVisitInfo(visitInfo)
1290 rec.setPhotoCalib(photoCalib)
1291 rec.setValidPolygon(validPolygon)
1293 if self.config.full:
1294 rec.setPsf(dataRef.get(component=
"psf"))
1295 rec.setApCorrMap(dataRef.get(component=
"apCorrMap"))
1296 rec.setTransmissionCurve(dataRef.get(component=
"transmissionCurve"))
1298 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1299 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1300 rec.setId(detector.getId())
1301 summaryStats.update_record(rec)
1304 raise pipeBase.NoWorkFound(
1305 "No detectors had sufficient information to make a visit summary row."
1309 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1311 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1312 cat.setMetadata(metadata)
1318class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1319 defaultTemplates={
"catalogType":
""},
1320 dimensions=(
"instrument",
"visit")):
1321 inputCatalogs = connectionTypes.Input(
1322 doc=
"Input per-detector Source Tables",
1323 name=
"{catalogType}sourceTable",
1324 storageClass=
"ArrowAstropy",
1325 dimensions=(
"instrument",
"visit",
"detector"),
1329 outputCatalog = connectionTypes.Output(
1330 doc=
"Per-visit concatenation of Source Table",
1331 name=
"{catalogType}sourceTable_visit",
1332 storageClass=
"ArrowAstropy",
1333 dimensions=(
"instrument",
"visit")
1337class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1338 pipelineConnections=ConsolidateSourceTableConnections):
1342class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1343 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1345 _DefaultName =
"consolidateSourceTable"
1346 ConfigClass = ConsolidateSourceTableConfig
1348 inputDataset =
"sourceTable"
1349 outputDataset =
"sourceTable_visit"
1351 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1353 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1354 detectorOrder.sort()
1355 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1356 inputs = butlerQC.get(inputRefs)
1357 self.log.info(
"Concatenating %s per-detector Source Tables",
1358 len(inputs[
"inputCatalogs"]))
1359 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1360 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1363class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1364 dimensions=(
"instrument",),
1365 defaultTemplates={
"calexpType":
""}):
1366 visitSummaryRefs = connectionTypes.Input(
1367 doc=
"Data references for per-visit consolidated exposure metadata",
1368 name=
"finalVisitSummary",
1369 storageClass=
"ExposureCatalog",
1370 dimensions=(
"instrument",
"visit"),
1374 outputCatalog = connectionTypes.Output(
1375 doc=
"CCD and Visit metadata table",
1376 name=
"ccdVisitTable",
1377 storageClass=
"ArrowAstropy",
1378 dimensions=(
"instrument",)
1382class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1383 pipelineConnections=MakeCcdVisitTableConnections):
1384 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1387class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1388 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1390 _DefaultName =
"makeCcdVisitTable"
1391 ConfigClass = MakeCcdVisitTableConfig
1393 def run(self, visitSummaryRefs):
1394 """Make a table of ccd information from the visit summary catalogs.
1398 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1399 List of DeferredDatasetHandles pointing to exposure catalogs with
1400 per-detector summary information.
1404 result : `~lsst.pipe.base.Struct`
1405 Results struct with attribute:
1408 Catalog of ccd and visit information.
1411 for visitSummaryRef
in visitSummaryRefs:
1412 visitSummary = visitSummaryRef.get()
1413 if not visitSummary:
1415 visitInfo = visitSummary[0].getVisitInfo()
1418 strip_provenance_from_fits_header(visitSummary.metadata)
1421 summaryTable = visitSummary.asAstropy()
1422 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1423 "pixelScale",
"zenithDistance",
1424 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1425 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1426 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1427 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1428 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1429 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1430 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1431 "maxDistToNearestPsf",
1432 "effTime",
"effTimePsfSigmaScale",
1433 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1435 ccdEntry = summaryTable[selectColumns]
1440 ccdEntry.rename_column(
"visit",
"visitId")
1441 ccdEntry.rename_column(
"id",
"detectorId")
1445 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1447 ccdEntry[
"ccdVisitId"] = [
1448 self.config.idGenerator.apply(
1449 visitSummaryRef.dataId,
1450 detector=detector_id,
1457 for detector_id
in summaryTable[
"id"]
1459 ccdEntry[
"detector"] = summaryTable[
"id"]
1460 ccdEntry[
"seeing"] = (
1461 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1463 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1464 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1465 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1466 expTime = visitInfo.getExposureTime()
1467 ccdEntry[
"obsStart"] = (
1468 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1470 expTime_days = expTime / (60*60*24)
1471 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1472 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1473 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1474 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1475 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1476 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1477 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1478 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1479 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1480 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1481 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1482 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1486 ccdEntries.append(ccdEntry)
1488 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1489 return pipeBase.Struct(outputCatalog=outputCatalog)
1492class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1493 dimensions=(
"instrument",),
1494 defaultTemplates={
"calexpType":
""}):
1495 visitSummaries = connectionTypes.Input(
1496 doc=
"Per-visit consolidated exposure metadata",
1497 name=
"finalVisitSummary",
1498 storageClass=
"ExposureCatalog",
1499 dimensions=(
"instrument",
"visit",),
1503 outputCatalog = connectionTypes.Output(
1504 doc=
"Visit metadata table",
1506 storageClass=
"ArrowAstropy",
1507 dimensions=(
"instrument",)
1511class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1512 pipelineConnections=MakeVisitTableConnections):
1516class MakeVisitTableTask(pipeBase.PipelineTask):
1517 """Produce a `visitTable` from the visit summary exposure catalogs.
1519 _DefaultName =
"makeVisitTable"
1520 ConfigClass = MakeVisitTableConfig
1522 def run(self, visitSummaries):
1523 """Make a table of visit information from the visit summary catalogs.
1527 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1528 List of exposure catalogs with per-detector summary information.
1531 result : `~lsst.pipe.base.Struct`
1532 Results struct with attribute:
1535 Catalog of visit information.
1538 for visitSummary
in visitSummaries:
1539 visitSummary = visitSummary.get()
1540 if not visitSummary:
1542 visitRow = visitSummary[0]
1543 visitInfo = visitRow.getVisitInfo()
1546 visitEntry[
"visitId"] = visitRow[
"visit"]
1547 visitEntry[
"visit"] = visitRow[
"visit"]
1548 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1549 visitEntry[
"band"] = visitRow[
"band"]
1550 raDec = visitInfo.getBoresightRaDec()
1551 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1552 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1556 visitEntry[
"decl"] = visitEntry[
"dec"]
1558 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1559 azAlt = visitInfo.getBoresightAzAlt()
1560 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1561 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1562 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1563 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1564 expTime = visitInfo.getExposureTime()
1565 visitEntry[
"expTime"] = expTime
1566 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.date.nsecs(scale=dafBase.DateTime.TAI),
"ns")
1567 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1568 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1569 expTime_days = expTime / (60*60*24)
1570 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1571 visitEntries.append(visitEntry)
1577 outputCatalog = astropy.table.Table(rows=visitEntries)
1578 return pipeBase.Struct(outputCatalog=outputCatalog)
1581@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1582 "This task will be removed after v30.",
1583 version=
"v29.0", category=FutureWarning)
1584class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1585 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1587 inputCatalog = connectionTypes.Input(
1588 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1589 "By default, it is the output of ForcedPhotCcdTask on calexps",
1591 storageClass=
"SourceCatalog",
1592 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1594 inputCatalogDiff = connectionTypes.Input(
1595 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1596 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1598 storageClass=
"SourceCatalog",
1599 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1601 outputCatalog = connectionTypes.Output(
1602 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1603 name=
"mergedForcedSource",
1604 storageClass=
"DataFrame",
1605 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1609@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1610 "This task will be removed after v30.",
1611 version=
"v29.0", category=FutureWarning)
1612class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1613 pipelineConnections=WriteForcedSourceTableConnections):
1615 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1621@deprecated(reason=
"This task is replaced by lsst.pipe.tasks.ForcedPhotCcdTask. "
1622 "This task will be removed after v30.",
1623 version=
"v29.0", category=FutureWarning)
1624class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1625 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1627 Because the predecessor ForcedPhotCcdTask operates per-detector,
1628 per-tract, (i.e., it has tract in its dimensions), detectors
1629 on the tract boundary may have multiple forced source catalogs.
1631 The successor task TransformForcedSourceTable runs per-patch
1632 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1633 available multiple epochs.
1635 _DefaultName =
"writeForcedSourceTable"
1636 ConfigClass = WriteForcedSourceTableConfig
1638 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1639 inputs = butlerQC.get(inputRefs)
1640 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1641 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1642 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1643 outputs = self.run(**inputs)
1644 butlerQC.put(outputs, outputRefs)
1646 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1648 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1649 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1650 df = df.reindex(sorted(df.columns), axis=1)
1653 df[
"detector"] = np.int16(detector)
1654 df[
"band"] = band
if band
else pd.NA
1655 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1656 names=(
"dataset",
"column"))
1660 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1661 return pipeBase.Struct(outputCatalog=outputCatalog)
1664class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1665 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1667 inputCatalogs = connectionTypes.Input(
1668 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1669 name=
"mergedForcedSource",
1670 storageClass=
"DataFrame",
1671 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1675 referenceCatalog = connectionTypes.Input(
1676 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1677 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1680 storageClass=
"DataFrame",
1681 dimensions=(
"tract",
"patch",
"skymap"),
1684 outputCatalog = connectionTypes.Output(
1685 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1686 "specified set of functors",
1687 name=
"forcedSourceTable",
1688 storageClass=
"ArrowAstropy",
1689 dimensions=(
"tract",
"patch",
"skymap")
1693class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1694 pipelineConnections=TransformForcedSourceTableConnections):
1695 referenceColumns = pexConfig.ListField(
1697 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1699 doc=
"Columns to pull from reference catalog",
1702 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1707 doc=
"Rename the output DataFrame index to this name",
1709 default=
"forcedSourceId",
1712 def setDefaults(self):
1713 super().setDefaults()
1714 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1715 self.columnsFromDataId = [
"tract",
"patch"]
1718class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1719 """Transform/standardize a ForcedSource catalog
1721 Transforms each wide, per-detector forcedSource DataFrame per the
1722 specification file (per-camera defaults found in ForcedSource.yaml).
1723 All epochs that overlap the patch are aggregated into one per-patch
1724 narrow-DataFrame file.
1726 No de-duplication of rows is performed. Duplicate resolutions flags are
1727 pulled in from the referenceCatalog: `detect_isPrimary`,
1728 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1729 for analysis or compare duplicates for QA.
1731 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1732 per-visit rows can be retreived by joining with the CcdVisitTable on
1735 _DefaultName =
"transformForcedSourceTable"
1736 ConfigClass = TransformForcedSourceTableConfig
1738 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1739 inputs = butlerQC.get(inputRefs)
1740 if self.funcs
is None:
1741 raise ValueError(
"config.functorFile is None. "
1742 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1743 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1744 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1746 butlerQC.put(outputs, outputRefs)
1748 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1750 refColumns = list(self.config.referenceColumns)
1751 refColumns.append(self.config.keyRef)
1752 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1753 if ref.index.name != self.config.keyRef:
1759 ref.set_index(self.config.keyRef, inplace=
True)
1760 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1761 for handle
in inputCatalogs:
1762 result = self.transform(
None, handle, funcs, dataId)
1764 dfs.append(result.df.join(ref, how=
"inner"))
1766 outputCatalog = pd.concat(dfs)
1768 if outputCatalog.empty:
1769 raise NoWorkFound(f
"No forced photometry rows for {dataId}.")
1773 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1775 outputCatalog.reset_index(inplace=
True)
1778 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1780 outputCatalog.index.rename(self.config.key, inplace=
True)
1782 self.log.info(
"Made a table of %d columns and %d rows",
1783 len(outputCatalog.columns), len(outputCatalog))
1784 return pipeBase.Struct(outputCatalog=pandas_to_astropy(outputCatalog))
1787class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1788 defaultTemplates={
"catalogType":
""},
1789 dimensions=(
"instrument",
"tract")):
1790 inputCatalogs = connectionTypes.Input(
1791 doc=
"Input per-patch DataFrame Tables to be concatenated",
1792 name=
"{catalogType}ForcedSourceTable",
1793 storageClass=
"DataFrame",
1794 dimensions=(
"tract",
"patch",
"skymap"),
1798 outputCatalog = connectionTypes.Output(
1799 doc=
"Output per-tract concatenation of DataFrame Tables",
1800 name=
"{catalogType}ForcedSourceTable_tract",
1801 storageClass=
"DataFrame",
1802 dimensions=(
"tract",
"skymap"),
1806class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1807 pipelineConnections=ConsolidateTractConnections):
1811class ConsolidateTractTask(pipeBase.PipelineTask):
1812 """Concatenate any per-patch, dataframe list into a single
1813 per-tract DataFrame.
1815 _DefaultName =
"ConsolidateTract"
1816 ConfigClass = ConsolidateTractConfig
1818 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1819 inputs = butlerQC.get(inputRefs)
1822 self.log.info(
"Concatenating %s per-patch %s Tables",
1823 len(inputs[
"inputCatalogs"]),
1824 inputRefs.inputCatalogs[0].datasetType.name)
1825 df = pd.concat(inputs[
"inputCatalogs"])
1826 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)