22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
38from collections
import defaultdict
48from astro_metadata_translator.headers
import merge_headers
54from lsst.daf.butler.formatters.parquet
import pandas_to_astropy
58from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
59from lsst.obs.base.utils
import strip_provenance_from_fits_header
61from .functors
import CompositeFunctor, Column
63log = logging.getLogger(__name__)
66def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
67 """Flattens a dataframe with multilevel column index.
69 newDf = pd.DataFrame()
71 dfBands = df.columns.unique(level=0).values
74 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
75 newColumns = {c: columnFormat.format(band, c)
76 for c
in subdf.columns
if c
not in noDupCols}
77 cols = list(newColumns.keys())
78 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
81 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
83 noDupDf = df[presentBands[0]][noDupCols]
84 newDf = pd.concat([noDupDf, newDf], axis=1)
89 """A helper class for stacking astropy tables without having them all in
95 Full size of the final table.
99 Unlike `astropy.table.vstack`, this class requires all tables to have the
100 exact same columns (it's slightly more strict than even the
101 ``join_type="exact"`` argument to `astropy.table.vstack`).
111 """Construct from an iterable of
112 `lsst.daf.butler.DeferredDatasetHandle`.
116 handles : `~collections.abc.Iterable` [ \
117 `lsst.daf.butler.DeferredDatasetHandle` ]
118 Iterable of handles. Must have a storage class that supports the
119 "rowcount" component, which is all that will be fetched.
123 vstack : `TableVStack`
124 An instance of this class, initialized with capacity equal to the
125 sum of the rowcounts of all the given table handles.
127 capacity = sum(handle.get(component=
"rowcount")
for handle
in handles)
128 return cls(capacity=capacity)
131 """Add a single table to the stack.
135 table : `astropy.table.Table`
136 An astropy table instance.
139 self.
result = astropy.table.Table()
140 for name
in table.colnames:
142 column_cls = type(column)
143 self.
result[name] = column_cls.info.new_like([column], self.
capacity, name=name)
144 self.
result[name][:len(table)] = column
145 self.
index = len(table)
146 self.
result.meta = table.meta.copy()
148 next_index = self.
index + len(table)
149 if set(self.
result.colnames) != set(table.colnames):
151 "Inconsistent columns in concatentation: "
152 f
"{set(self.result.colnames).symmetric_difference(table.colnames)}"
154 for name
in table.colnames:
155 out_col = self.
result[name]
157 if out_col.dtype != in_col.dtype:
158 raise TypeError(f
"Type mismatch on column {name!r}: {out_col.dtype} != {in_col.dtype}.")
159 self.
result[name][self.
index:next_index] = table[name]
160 self.
index = next_index
164 self.
result.meta = merge_headers([self.
result.meta, table.meta], mode=
"drop")
165 strip_provenance_from_fits_header(self.
result.meta)
169 """Vertically stack tables represented by deferred dataset handles.
173 handles : `~collections.abc.Iterable` [ \
174 `lsst.daf.butler.DeferredDatasetHandle` ]
175 Iterable of handles. Must have the "ArrowAstropy" storage class
176 and identical columns.
180 table : `astropy.table.Table`
181 Concatenated table with the same columns as each input table and
182 the rows of all of them.
184 handles = tuple(handles)
186 rowcount = tuple(handle.get(component=
"rowcount")
for handle
in handles)
187 handles = tuple(handle
for handle, count
in zip(handles, rowcount)
if count > 0)
190 for handle
in handles:
191 vstack.extend(handle.get())
196 defaultTemplates={
"coaddName":
"deep"},
197 dimensions=(
"tract",
"patch",
"skymap")):
198 inputCatalogMeas = connectionTypes.Input(
199 doc=
"Catalog of source measurements on the deepCoadd.",
200 dimensions=(
"tract",
"patch",
"band",
"skymap"),
201 storageClass=
"SourceCatalog",
202 name=
"{coaddName}Coadd_meas",
205 inputCatalogForcedSrc = connectionTypes.Input(
206 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
207 dimensions=(
"tract",
"patch",
"band",
"skymap"),
208 storageClass=
"SourceCatalog",
209 name=
"{coaddName}Coadd_forced_src",
212 inputCatalogPsfsMultiprofit = connectionTypes.Input(
213 doc=
"Catalog of Gaussian mixture model fit parameters for the PSF model at each object centroid.",
214 dimensions=(
"tract",
"patch",
"band",
"skymap"),
215 storageClass=
"ArrowAstropy",
216 name=
"{coaddName}Coadd_psfs_multiprofit",
219 outputCatalog = connectionTypes.Output(
220 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
221 "stored as a DataFrame with a multi-level column index per-patch.",
222 dimensions=(
"tract",
"patch",
"skymap"),
223 storageClass=
"DataFrame",
224 name=
"{coaddName}Coadd_obj"
228class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
229 pipelineConnections=WriteObjectTableConnections):
230 coaddName = pexConfig.Field(
237class WriteObjectTableTask(pipeBase.PipelineTask):
238 """Write filter-merged object tables as a DataFrame in parquet format.
240 _DefaultName =
"writeObjectTable"
241 ConfigClass = WriteObjectTableConfig
244 outputDataset =
"obj"
246 def runQuantum(self, butlerQC, inputRefs, outputRefs):
247 inputs = butlerQC.get(inputRefs)
249 catalogs = defaultdict(dict)
250 for dataset, connection
in (
251 (
"meas",
"inputCatalogMeas"),
252 (
"forced_src",
"inputCatalogForcedSrc"),
253 (
"psfs_multiprofit",
"inputCatalogPsfsMultiprofit"),
255 for ref, cat
in zip(getattr(inputRefs, connection), inputs[connection]):
256 catalogs[ref.dataId[
"band"]][dataset] = cat
258 dataId = butlerQC.quantum.dataId
259 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
260 outputs = pipeBase.Struct(outputCatalog=df)
261 butlerQC.put(outputs, outputRefs)
263 def run(self, catalogs, tract, patch):
264 """Merge multiple catalogs.
269 Mapping from filter names to dict of catalogs.
271 tractId to use for the tractId column.
273 patchId to use for the patchId column.
277 catalog : `pandas.DataFrame`
283 Raised if any of the catalogs is of an unsupported type.
286 for filt, tableDict
in catalogs.items():
287 for dataset, table
in tableDict.items():
289 if isinstance(table, pd.DataFrame):
292 df = table.asAstropy().to_pandas()
293 elif isinstance(table, astropy.table.Table):
294 df = table.to_pandas()
296 raise ValueError(f
"{dataset=} has unsupported {type(table)=}")
297 df.set_index(
"id", drop=
True, inplace=
True)
300 df = df.reindex(sorted(df.columns), axis=1)
301 df = df.assign(tractId=tract, patchId=patch)
304 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
305 names=(
"dataset",
"band",
"column"))
310 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
314class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
315 defaultTemplates={
"catalogType":
""},
316 dimensions=(
"instrument",
"visit",
"detector")):
318 catalog = connectionTypes.Input(
319 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
320 name=
"{catalogType}src",
321 storageClass=
"SourceCatalog",
322 dimensions=(
"instrument",
"visit",
"detector")
324 outputCatalog = connectionTypes.Output(
325 doc=
"Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.",
326 name=
"{catalogType}source",
327 storageClass=
"ArrowAstropy",
328 dimensions=(
"instrument",
"visit",
"detector")
332class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
333 pipelineConnections=WriteSourceTableConnections):
337class WriteSourceTableTask(pipeBase.PipelineTask):
338 """Write source table to DataFrame Parquet format.
340 _DefaultName =
"writeSourceTable"
341 ConfigClass = WriteSourceTableConfig
343 def runQuantum(self, butlerQC, inputRefs, outputRefs):
344 inputs = butlerQC.get(inputRefs)
345 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
346 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
347 result = self.run(**inputs)
348 outputs = pipeBase.Struct(outputCatalog=result.table)
349 butlerQC.put(outputs, outputRefs)
351 def run(self, catalog, visit, detector, **kwargs):
352 """Convert `src` catalog to an Astropy table.
356 catalog: `afwTable.SourceCatalog`
357 catalog to be converted
358 visit, detector: `int`
359 Visit and detector ids to be added as columns.
361 Additional keyword arguments are ignored as a convenience for
362 subclasses that pass the same arguments to several different
367 result : `~lsst.pipe.base.Struct`
369 `astropy.table.Table` version of the input catalog
371 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
372 tbl = catalog.asAstropy()
375 tbl[
"detector"] = np.int16(detector)
377 return pipeBase.Struct(table=tbl)
380class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
381 defaultTemplates={
"catalogType":
""},
382 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
383 visitSummary = connectionTypes.Input(
384 doc=
"Input visit-summary catalog with updated calibration objects.",
385 name=
"finalVisitSummary",
386 storageClass=
"ExposureCatalog",
387 dimensions=(
"instrument",
"visit",),
390 def __init__(self, config):
398 self.catalog = dataclasses.replace(self.catalog, deferGraphConstraint=
True)
401class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
402 pipelineConnections=WriteRecalibratedSourceTableConnections):
404 doReevaluatePhotoCalib = pexConfig.Field(
407 doc=(
"Add or replace local photoCalib columns"),
409 doReevaluateSkyWcs = pexConfig.Field(
412 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
416class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
417 """Write source table to DataFrame Parquet format.
419 _DefaultName =
"writeRecalibratedSourceTable"
420 ConfigClass = WriteRecalibratedSourceTableConfig
422 def runQuantum(self, butlerQC, inputRefs, outputRefs):
423 inputs = butlerQC.get(inputRefs)
425 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
426 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
428 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
429 exposure = ExposureF()
430 inputs[
"exposure"] = self.prepareCalibratedExposure(
432 visitSummary=inputs[
"visitSummary"],
433 detectorId=butlerQC.quantum.dataId[
"detector"]
435 inputs[
"catalog"] = self.addCalibColumns(**inputs)
437 result = self.run(**inputs)
438 outputs = pipeBase.Struct(outputCatalog=result.table)
439 butlerQC.put(outputs, outputRefs)
441 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
442 """Prepare a calibrated exposure and apply external calibrations
447 exposure : `lsst.afw.image.exposure.Exposure`
448 Input exposure to adjust calibrations. May be an empty Exposure.
450 Detector ID associated with the exposure.
451 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
452 Exposure catalog with all calibration objects. WCS and PhotoCalib
453 are always applied if ``visitSummary`` is provided and those
454 components are not `None`.
458 exposure : `lsst.afw.image.exposure.Exposure`
459 Exposure with adjusted calibrations.
461 if visitSummary
is not None:
462 row = visitSummary.find(detectorId)
464 raise pipeBase.NoWorkFound(f
"Visit summary for detector {detectorId} is missing.")
465 if (photoCalib := row.getPhotoCalib())
is None:
466 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
467 "skipping reevaluation of photoCalib.", detectorId)
468 exposure.setPhotoCalib(
None)
470 exposure.setPhotoCalib(photoCalib)
471 if (skyWcs := row.getWcs())
is None:
472 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
473 "skipping reevaluation of skyWcs.", detectorId)
474 exposure.setWcs(
None)
476 exposure.setWcs(skyWcs)
480 def addCalibColumns(self, catalog, exposure, **kwargs):
481 """Add replace columns with calibs evaluated at each centroid
483 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
484 a source catalog, by rerunning the plugins.
488 catalog : `lsst.afw.table.SourceCatalog`
489 catalog to which calib columns will be added
490 exposure : `lsst.afw.image.exposure.Exposure`
491 Exposure with attached PhotoCalibs and SkyWcs attributes to be
492 reevaluated at local centroids. Pixels are not required.
494 Additional keyword arguments are ignored to facilitate passing the
495 same arguments to several methods.
499 newCat: `lsst.afw.table.SourceCatalog`
500 Source Catalog with requested local calib columns
502 measureConfig = SingleFrameMeasurementTask.ConfigClass()
503 measureConfig.doReplaceWithNoise =
False
506 for slot
in measureConfig.slots:
507 setattr(measureConfig.slots, slot,
None)
509 measureConfig.plugins.names = []
510 if self.config.doReevaluateSkyWcs:
511 measureConfig.plugins.names.add(
"base_LocalWcs")
512 self.log.info(
"Re-evaluating base_LocalWcs plugin")
513 if self.config.doReevaluatePhotoCalib:
514 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
515 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
516 pluginsNotToCopy = tuple(measureConfig.plugins.names)
520 aliasMap = catalog.schema.getAliasMap()
522 for item
in catalog.schema:
523 if not item.field.getName().startswith(pluginsNotToCopy):
524 mapper.addMapping(item.key)
526 schema = mapper.getOutputSchema()
528 schema.setAliasMap(aliasMap)
530 newCat.extend(catalog, mapper=mapper)
536 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
538 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
542 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
543 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
548 if wcsPlugin
is not None:
549 wcsPlugin.measure(row, exposure)
550 if pcPlugin
is not None:
551 pcPlugin.measure(row, exposure)
556class PostprocessAnalysis(object):
557 """Calculate columns from DataFrames or handles storing DataFrames.
559 This object manages and organizes an arbitrary set of computations
560 on a catalog. The catalog is defined by a
561 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
562 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
563 computations are defined by a collection of
564 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
565 ``CompositeFunctor``).
567 After the object is initialized, accessing the ``.df`` attribute (which
568 holds the `pandas.DataFrame` containing the results of the calculations)
569 triggers computation of said dataframe.
571 One of the conveniences of using this object is the ability to define a
572 desired common filter for all functors. This enables the same functor
573 collection to be passed to several different `PostprocessAnalysis` objects
574 without having to change the original functor collection, since the ``filt``
575 keyword argument of this object triggers an overwrite of the ``filt``
576 property for all functors in the collection.
578 This object also allows a list of refFlags to be passed, and defines a set
579 of default refFlags that are always included even if not requested.
581 If a list of DataFrames or Handles is passed, rather than a single one,
582 then the calculations will be mapped over all the input catalogs. In
583 principle, it should be straightforward to parallelize this activity, but
584 initial tests have failed (see TODO in code comments).
588 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
589 `~lsst.pipe.base.InMemoryDatasetHandle` or
591 Source catalog(s) for computation.
592 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
593 Computations to do (functors that act on ``handles``).
594 If a dict, the output
595 DataFrame will have columns keyed accordingly.
596 If a list, the column keys will come from the
597 ``.shortname`` attribute of each functor.
599 filt : `str`, optional
600 Filter in which to calculate. If provided,
601 this will overwrite any existing ``.filt`` attribute
602 of the provided functors.
604 flags : `list`, optional
605 List of flags (per-band) to include in output table.
606 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
608 refFlags : `list`, optional
609 List of refFlags (only reference band) to include in output table.
611 forcedFlags : `list`, optional
612 List of flags (per-band) to include in output table.
613 Taken from the ``forced_src`` dataset if applied to a
614 multilevel Object Table. Intended for flags from measurement plugins
615 only run during multi-band forced-photometry.
617 _defaultRefFlags = []
620 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
621 self.handles = handles
622 self.functors = functors
625 self.flags = list(flags)
if flags
is not None else []
626 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
627 self.refFlags = list(self._defaultRefFlags)
628 if refFlags
is not None:
629 self.refFlags += list(refFlags)
634 def defaultFuncs(self):
635 funcs = dict(self._defaultFuncs)
640 additionalFuncs = self.defaultFuncs
641 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
642 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
643 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
645 if isinstance(self.functors, CompositeFunctor):
650 func.funcDict.update(additionalFuncs)
651 func.filt = self.filt
657 return [name
for name, func
in self.func.funcDict.items()
if func.noDup]
665 def compute(self, dropna=False, pool=None):
667 if type(self.handles)
in (list, tuple):
669 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
673 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
674 self._df = pd.concat(dflist)
676 self._df = self.func(self.handles, dropna=dropna)
681class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
683 """Expected Connections for subclasses of TransformCatalogBaseTask.
687 inputCatalog = connectionTypes.Input(
689 storageClass=
"DataFrame",
691 outputCatalog = connectionTypes.Output(
693 storageClass=
"ArrowAstropy",
697class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
698 pipelineConnections=TransformCatalogBaseConnections):
699 functorFile = pexConfig.Field(
701 doc=
"Path to YAML file specifying Science Data Model functors to use "
702 "when copying columns and computing calibrated values.",
706 primaryKey = pexConfig.Field(
708 doc=
"Name of column to be set as the DataFrame index. If None, the index"
709 "will be named `id`",
713 columnsFromDataId = pexConfig.ListField(
717 doc=
"Columns to extract from the dataId",
721class TransformCatalogBaseTask(pipeBase.PipelineTask):
722 """Base class for transforming/standardizing a catalog by applying functors
723 that convert units and apply calibrations.
725 The purpose of this task is to perform a set of computations on an input
726 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
727 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
728 a new dataset (which needs to be declared in an ``outputDataset``
731 The calculations to be performed are defined in a YAML file that specifies
732 a set of functors to be computed, provided as a ``--functorFile`` config
733 parameter. An example of such a YAML file is the following:
740 args: slot_Centroid_x
743 args: slot_Centroid_y
745 functor: LocalNanojansky
747 - slot_PsfFlux_instFlux
748 - slot_PsfFlux_instFluxErr
749 - base_LocalPhotoCalib
750 - base_LocalPhotoCalibErr
752 functor: LocalNanojanskyErr
754 - slot_PsfFlux_instFlux
755 - slot_PsfFlux_instFluxErr
756 - base_LocalPhotoCalib
757 - base_LocalPhotoCalibErr
761 The names for each entry under "func" will become the names of columns in
762 the output dataset. All the functors referenced are defined in
763 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
764 functor are in the `args` list, and any additional entries for each column
765 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
766 treated as keyword arguments to be passed to the functor initialization.
768 The "flags" entry is the default shortcut for `Column` functors.
769 All columns listed under "flags" will be copied to the output table
770 untransformed. They can be of any datatype.
771 In the special case of transforming a multi-level oject table with
772 band and dataset indices (deepCoadd_obj), these will be taked from the
773 ``meas`` dataset and exploded out per band.
775 There are two special shortcuts that only apply when transforming
776 multi-level Object (deepCoadd_obj) tables:
777 - The "refFlags" entry is shortcut for `Column` functor
778 taken from the ``ref`` dataset if transforming an ObjectTable.
779 - The "forcedFlags" entry is shortcut for `Column` functors.
780 taken from the ``forced_src`` dataset if transforming an ObjectTable.
781 These are expanded out per band.
784 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
785 to organize and excecute the calculations.
788 def _DefaultName(self):
789 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
792 def outputDataset(self):
793 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
796 def inputDataset(self):
797 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
800 def ConfigClass(self):
801 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
803 def __init__(self, *args, **kwargs):
804 super().__init__(*args, **kwargs)
805 if self.config.functorFile:
806 self.log.info(
"Loading tranform functor definitions from %s",
807 self.config.functorFile)
808 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
809 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
813 def runQuantum(self, butlerQC, inputRefs, outputRefs):
814 inputs = butlerQC.get(inputRefs)
815 if self.
funcs is None:
816 raise ValueError(
"config.functorFile is None. "
817 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
818 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
819 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
820 butlerQC.put(result, outputRefs)
822 def run(self, handle, funcs=None, dataId=None, band=None):
823 """Do postprocessing calculations
825 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
826 ``DataFrame`` object and dataId,
827 returns a dataframe with results of postprocessing calculations.
831 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
832 `~lsst.pipe.base.InMemoryDatasetHandle` or
833 `~pandas.DataFrame`, or list of these.
834 DataFrames from which calculations are done.
835 funcs : `~lsst.pipe.tasks.functors.Functor`
836 Functors to apply to the table's columns
837 dataId : dict, optional
838 Used to add a `patchId` column to the output dataframe.
839 band : `str`, optional
840 Filter band that is being processed.
844 result : `lsst.pipe.base.Struct`
845 Result struct, with a single ``outputCatalog`` attribute holding
846 the transformed catalog.
848 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
850 df = self.transform(band, handle, funcs, dataId).df
851 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
852 result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df))
855 def getFunctors(self):
858 def getAnalysis(self, handles, funcs=None, band=None):
861 analysis = PostprocessAnalysis(handles, funcs, filt=band)
864 def transform(self, band, handles, funcs, dataId):
865 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
867 if dataId
and self.config.columnsFromDataId:
868 for key
in self.config.columnsFromDataId:
870 if key ==
"detector":
872 df[key] = np.int16(dataId[key])
874 df[key] = dataId[key]
876 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
878 if self.config.primaryKey:
879 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
880 df.reset_index(inplace=
True, drop=
True)
881 df.set_index(self.config.primaryKey, inplace=
True)
883 return pipeBase.Struct(
890 defaultTemplates={
"coaddName":
"deep"},
891 dimensions=(
"tract",
"patch",
"skymap")):
892 inputCatalog = connectionTypes.Input(
893 doc=
"The vertical concatenation of the {coaddName}_{meas|forced_src|psfs_multiprofit} catalogs, "
894 "stored as a DataFrame with a multi-level column index per-patch.",
895 dimensions=(
"tract",
"patch",
"skymap"),
896 storageClass=
"DataFrame",
897 name=
"{coaddName}Coadd_obj",
900 inputCatalogRef = connectionTypes.Input(
901 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
902 "for each detection in deepCoadd_mergeDet.",
903 dimensions=(
"tract",
"patch",
"skymap"),
904 storageClass=
"SourceCatalog",
905 name=
"{coaddName}Coadd_ref",
908 inputCatalogSersicMultiprofit = connectionTypes.Input(
909 doc=
"Catalog of source measurements on the deepCoadd.",
910 dimensions=(
"tract",
"patch",
"skymap"),
911 storageClass=
"ArrowAstropy",
912 name=
"{coaddName}Coadd_Sersic_multiprofit",
915 outputCatalog = connectionTypes.Output(
916 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
918 dimensions=(
"tract",
"patch",
"skymap"),
919 storageClass=
"ArrowAstropy",
923 def __init__(self, *, config=None):
924 super().__init__(config=config)
925 if config.multilevelOutput:
926 self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass=
"DataFrame")
929class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
930 pipelineConnections=TransformObjectCatalogConnections):
931 coaddName = pexConfig.Field(
936 outputBands = pexConfig.ListField(
940 doc=(
"These bands and only these bands will appear in the output,"
941 " NaN-filled if the input does not include them."
942 " If None, then use all bands found in the input.")
944 camelCase = pexConfig.Field(
947 doc=(
"Write per-band columns names with camelCase, else underscore "
948 "For example: gPsFlux instead of g_PsFlux.")
950 multilevelOutput = pexConfig.Field(
953 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
954 "and name-munged (False). If True, the output storage class will be "
955 "set to DataFrame, since astropy tables do not support multi-level indexing."),
956 deprecated=
"Support for multi-level outputs is deprecated and will be removed after v29.",
958 goodFlags = pexConfig.ListField(
961 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
962 "All other flags are considered to be 'bad' flags and will be set to True.")
964 floatFillValue = pexConfig.Field(
967 doc=
"Fill value for float fields when populating empty tables."
969 integerFillValue = pexConfig.Field(
972 doc=
"Fill value for integer fields when populating empty tables."
975 def setDefaults(self):
976 super().setDefaults()
977 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
978 self.primaryKey =
"objectId"
979 self.columnsFromDataId = [
"tract",
"patch"]
980 self.goodFlags = [
"calib_astrometry_used",
981 "calib_photometry_reserved",
982 "calib_photometry_used",
983 "calib_psf_candidate",
984 "calib_psf_reserved",
988class TransformObjectCatalogTask(TransformCatalogBaseTask):
989 """Produce a flattened Object Table to match the format specified in
992 Do the same set of postprocessing calculations on all bands.
994 This is identical to `TransformCatalogBaseTask`, except for that it does
995 the specified functor calculations for all filters present in the
996 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
997 by the YAML file will be superceded.
999 _DefaultName =
"transformObjectCatalog"
1000 ConfigClass = TransformObjectCatalogConfig
1002 datasets_multiband = (
"ref",
"Sersic_multiprofit")
1004 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1005 inputs = butlerQC.get(inputRefs)
1006 if self.funcs
is None:
1007 raise ValueError(
"config.functorFile is None. "
1008 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1009 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.funcs,
1010 dataId=dict(outputRefs.outputCatalog.dataId.mapping),
1011 handle_ref=inputs[
"inputCatalogRef"],
1012 handle_Sersic_multiprofit=inputs[
"inputCatalogSersicMultiprofit"],
1014 butlerQC.put(result, outputRefs)
1016 def run(self, handle, funcs=None, dataId=None, band=None, **kwargs):
1020 if isinstance(funcs, CompositeFunctor):
1021 funcDict_in = funcs.funcDict
1022 elif isinstance(funcs, dict):
1024 elif isinstance(funcs, list):
1025 funcDict_in = {idx: v
for idx, v
in enumerate(funcs)}
1027 raise TypeError(f
"Unsupported {type(funcs)=}")
1030 funcDicts_multiband = {}
1031 for dataset
in self.datasets_multiband:
1032 if (handle_multi := kwargs.get(f
"handle_{dataset}"))
is None:
1033 raise RuntimeError(f
"Missing required handle_{dataset} kwarg")
1034 handles_multi[dataset] = handle_multi
1035 funcDicts_multiband[dataset] = {}
1039 templateDf = pd.DataFrame()
1041 columns = handle.get(component=
"columns")
1042 inputBands = columns.unique(level=1).values
1044 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1049 for name, func
in funcDict_in.items():
1050 if func.dataset
in funcDicts_multiband:
1052 if band := getattr(func,
"band_to_check",
None):
1053 if band
not in outputBands:
1056 elif hasattr(func,
"bands"):
1061 func.bands = tuple(inputBands)
1063 funcDict = funcDicts_multiband.get(func.dataset, funcDict_band)
1064 funcDict[name] = func
1069 for inputBand
in inputBands:
1070 if inputBand
not in outputBands:
1071 self.log.info(
"Ignoring %s band data in the input", inputBand)
1073 self.log.info(
"Transforming the catalog of band %s", inputBand)
1074 result = self.transform(inputBand, handle, funcs_band, dataId)
1075 dfDict[inputBand] = result.df
1076 analysisDict[inputBand] = result.analysis
1077 if templateDf.empty:
1078 templateDf = result.df
1081 for filt
in outputBands:
1082 if filt
not in dfDict:
1083 self.log.info(
"Adding empty columns for band %s", filt)
1084 dfTemp = templateDf.copy()
1085 for col
in dfTemp.columns:
1086 testValue = dfTemp[col].values[0]
1087 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1089 if col
in self.config.goodFlags:
1093 elif isinstance(testValue, numbers.Integral):
1097 if isinstance(testValue, np.unsignedinteger):
1098 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1100 fillValue = self.config.integerFillValue
1102 fillValue = self.config.floatFillValue
1103 dfTemp[col].values[:] = fillValue
1104 dfDict[filt] = dfTemp
1107 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
1108 name_index = df.index.name
1111 if not self.config.multilevelOutput:
1112 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
1113 if self.config.primaryKey
in noDupCols:
1114 noDupCols.remove(self.config.primaryKey)
1115 if dataId
and self.config.columnsFromDataId:
1116 noDupCols += self.config.columnsFromDataId
1117 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1118 inputBands=inputBands)
1121 for dataset, funcDict
in funcDicts_multiband.items():
1122 handle_multiband = handles_multi[dataset]
1123 df_dataset = handle_multiband.get()
1124 if isinstance(df_dataset, astropy.table.Table):
1125 df_dataset = df_dataset.to_pandas().set_index(name_index, drop=
False)
1127 df_dataset = df_dataset.asAstropy().to_pandas().set_index(name_index, drop=
False)
1130 result = self.transform(
1132 pipeBase.InMemoryDatasetHandle(df_dataset, storageClass=
"DataFrame"),
1136 result.df.index.name = name_index
1138 if self.config.columnsFromDataId:
1139 columns_drop = [column
for column
in self.config.columnsFromDataId
if column
in result.df]
1141 result.df.drop(columns_drop, axis=1, inplace=
True)
1145 to_concat = pd.concat(
1146 {band: result.df
for band
in self.config.outputBands}, axis=1, names=[
"band",
"column"]
1147 )
if self.config.multilevelOutput
else result.df
1148 df = pd.concat([df, to_concat], axis=1)
1149 analysisDict[dataset] = result.analysis
1152 df.index.name = self.config.primaryKey
1154 if not self.config.multilevelOutput:
1155 tbl = pandas_to_astropy(df)
1159 self.log.info(
"Made a table of %d columns and %d rows", len(tbl.columns), len(tbl))
1161 return pipeBase.Struct(outputCatalog=tbl)
1164class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1165 dimensions=(
"tract",
"skymap")):
1166 inputCatalogs = connectionTypes.Input(
1167 doc=
"Per-Patch objectTables conforming to the standard data model.",
1169 storageClass=
"ArrowAstropy",
1170 dimensions=(
"tract",
"patch",
"skymap"),
1174 outputCatalog = connectionTypes.Output(
1175 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1176 name=
"objectTable_tract",
1177 storageClass=
"ArrowAstropy",
1178 dimensions=(
"tract",
"skymap"),
1182class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1183 pipelineConnections=ConsolidateObjectTableConnections):
1184 coaddName = pexConfig.Field(
1191class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1192 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
1194 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1196 _DefaultName =
"consolidateObjectTable"
1197 ConfigClass = ConsolidateObjectTableConfig
1199 inputDataset =
"objectTable"
1200 outputDataset =
"objectTable_tract"
1202 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1203 inputs = butlerQC.get(inputRefs)
1204 self.log.info(
"Concatenating %s per-patch Object Tables",
1205 len(inputs[
"inputCatalogs"]))
1206 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1207 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1210class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1211 defaultTemplates={
"catalogType":
""},
1212 dimensions=(
"instrument",
"visit",
"detector")):
1214 inputCatalog = connectionTypes.Input(
1215 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1216 name=
"{catalogType}source",
1217 storageClass=
"DataFrame",
1218 dimensions=(
"instrument",
"visit",
"detector"),
1221 outputCatalog = connectionTypes.Output(
1222 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1223 "specified set of functors",
1224 name=
"{catalogType}sourceTable",
1225 storageClass=
"ArrowAstropy",
1226 dimensions=(
"instrument",
"visit",
"detector")
1230class TransformSourceTableConfig(TransformCatalogBaseConfig,
1231 pipelineConnections=TransformSourceTableConnections):
1233 def setDefaults(self):
1234 super().setDefaults()
1235 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
1236 self.primaryKey =
"sourceId"
1237 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
1240class TransformSourceTableTask(TransformCatalogBaseTask):
1241 """Transform/standardize a source catalog
1243 _DefaultName =
"transformSourceTable"
1244 ConfigClass = TransformSourceTableConfig
1247class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1248 dimensions=(
"instrument",
"visit",),
1249 defaultTemplates={
"calexpType":
""}):
1250 calexp = connectionTypes.Input(
1251 doc=
"Processed exposures used for metadata",
1253 storageClass=
"ExposureF",
1254 dimensions=(
"instrument",
"visit",
"detector"),
1258 visitSummary = connectionTypes.Output(
1259 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1260 "detector id for the id and are sorted for fast lookups of a "
1262 name=
"visitSummary",
1263 storageClass=
"ExposureCatalog",
1264 dimensions=(
"instrument",
"visit"),
1266 visitSummarySchema = connectionTypes.InitOutput(
1267 doc=
"Schema of the visitSummary catalog",
1268 name=
"visitSummary_schema",
1269 storageClass=
"ExposureCatalog",
1273class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1274 pipelineConnections=ConsolidateVisitSummaryConnections):
1275 """Config for ConsolidateVisitSummaryTask"""
1279class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1280 """Task to consolidate per-detector visit metadata.
1282 This task aggregates the following metadata from all the detectors in a
1283 single visit into an exposure catalog:
1287 - The physical_filter and band (if available).
1288 - The psf size, shape, and effective area at the center of the detector.
1289 - The corners of the bounding box in right ascension/declination.
1291 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1292 are not persisted here because of storage concerns, and because of their
1293 limited utility as summary statistics.
1295 Tests for this task are performed in ci_hsc_gen3.
1297 _DefaultName =
"consolidateVisitSummary"
1298 ConfigClass = ConsolidateVisitSummaryConfig
1300 def __init__(self, **kwargs):
1301 super().__init__(**kwargs)
1302 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1303 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1304 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1305 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1306 ExposureSummaryStats.update_schema(self.schema)
1309 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1310 dataRefs = butlerQC.get(inputRefs.calexp)
1311 visit = dataRefs[0].dataId[
"visit"]
1313 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1314 len(dataRefs), visit)
1316 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1318 butlerQC.put(expCatalog, outputRefs.visitSummary)
1320 def _combineExposureMetadata(self, visit, dataRefs):
1321 """Make a combined exposure catalog from a list of dataRefs.
1322 These dataRefs must point to exposures with wcs, summaryStats,
1323 and other visit metadata.
1328 Visit identification number.
1329 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1330 List of dataRefs in visit.
1334 visitSummary : `lsst.afw.table.ExposureCatalog`
1335 Exposure catalog with per-detector summary information.
1338 cat.resize(len(dataRefs))
1340 cat[
"visit"] = visit
1342 for i, dataRef
in enumerate(dataRefs):
1343 visitInfo = dataRef.get(component=
"visitInfo")
1344 filterLabel = dataRef.get(component=
"filter")
1345 summaryStats = dataRef.get(component=
"summaryStats")
1346 detector = dataRef.get(component=
"detector")
1347 wcs = dataRef.get(component=
"wcs")
1348 photoCalib = dataRef.get(component=
"photoCalib")
1349 detector = dataRef.get(component=
"detector")
1350 bbox = dataRef.get(component=
"bbox")
1351 validPolygon = dataRef.get(component=
"validPolygon")
1355 rec.setVisitInfo(visitInfo)
1357 rec.setPhotoCalib(photoCalib)
1358 rec.setValidPolygon(validPolygon)
1360 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1361 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1362 rec.setId(detector.getId())
1363 summaryStats.update_record(rec)
1366 raise pipeBase.NoWorkFound(
1367 "No detectors had sufficient information to make a visit summary row."
1371 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1373 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1374 cat.setMetadata(metadata)
1380class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1381 defaultTemplates={
"catalogType":
""},
1382 dimensions=(
"instrument",
"visit")):
1383 inputCatalogs = connectionTypes.Input(
1384 doc=
"Input per-detector Source Tables",
1385 name=
"{catalogType}sourceTable",
1386 storageClass=
"ArrowAstropy",
1387 dimensions=(
"instrument",
"visit",
"detector"),
1391 outputCatalog = connectionTypes.Output(
1392 doc=
"Per-visit concatenation of Source Table",
1393 name=
"{catalogType}sourceTable_visit",
1394 storageClass=
"ArrowAstropy",
1395 dimensions=(
"instrument",
"visit")
1399class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1400 pipelineConnections=ConsolidateSourceTableConnections):
1404class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1405 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1407 _DefaultName =
"consolidateSourceTable"
1408 ConfigClass = ConsolidateSourceTableConfig
1410 inputDataset =
"sourceTable"
1411 outputDataset =
"sourceTable_visit"
1413 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1414 from .makeWarp
import reorderRefs
1416 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1417 detectorOrder.sort()
1418 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1419 inputs = butlerQC.get(inputRefs)
1420 self.log.info(
"Concatenating %s per-detector Source Tables",
1421 len(inputs[
"inputCatalogs"]))
1422 table = TableVStack.vstack_handles(inputs[
"inputCatalogs"])
1423 butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs)
1426class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1427 dimensions=(
"instrument",),
1428 defaultTemplates={
"calexpType":
""}):
1429 visitSummaryRefs = connectionTypes.Input(
1430 doc=
"Data references for per-visit consolidated exposure metadata",
1431 name=
"finalVisitSummary",
1432 storageClass=
"ExposureCatalog",
1433 dimensions=(
"instrument",
"visit"),
1437 outputCatalog = connectionTypes.Output(
1438 doc=
"CCD and Visit metadata table",
1439 name=
"ccdVisitTable",
1440 storageClass=
"ArrowAstropy",
1441 dimensions=(
"instrument",)
1445class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1446 pipelineConnections=MakeCcdVisitTableConnections):
1447 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1450class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1451 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1453 _DefaultName =
"makeCcdVisitTable"
1454 ConfigClass = MakeCcdVisitTableConfig
1456 def run(self, visitSummaryRefs):
1457 """Make a table of ccd information from the visit summary catalogs.
1461 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1462 List of DeferredDatasetHandles pointing to exposure catalogs with
1463 per-detector summary information.
1467 result : `~lsst.pipe.base.Struct`
1468 Results struct with attribute:
1471 Catalog of ccd and visit information.
1474 for visitSummaryRef
in visitSummaryRefs:
1475 visitSummary = visitSummaryRef.get()
1476 if not visitSummary:
1478 visitInfo = visitSummary[0].getVisitInfo()
1481 strip_provenance_from_fits_header(visitSummary.metadata)
1484 summaryTable = visitSummary.asAstropy()
1485 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1486 "pixelScale",
"zenithDistance",
1487 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1488 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1489 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1490 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1491 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1492 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1493 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1494 "maxDistToNearestPsf",
1495 "effTime",
"effTimePsfSigmaScale",
1496 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1498 ccdEntry = summaryTable[selectColumns]
1503 ccdEntry.rename_column(
"visit",
"visitId")
1504 ccdEntry.rename_column(
"id",
"detectorId")
1508 ccdEntry[
"decl"] = ccdEntry[
"dec"]
1510 ccdEntry[
"ccdVisitId"] = [
1511 self.config.idGenerator.apply(
1512 visitSummaryRef.dataId,
1513 detector=detector_id,
1520 for detector_id
in summaryTable[
"id"]
1522 ccdEntry[
"detector"] = summaryTable[
"id"]
1523 ccdEntry[
"seeing"] = (
1524 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1526 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1527 ccdEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1528 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1529 expTime = visitInfo.getExposureTime()
1530 ccdEntry[
"obsStart"] = (
1531 ccdEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1533 expTime_days = expTime / (60*60*24)
1534 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1535 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1536 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1537 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1538 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1539 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1540 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1541 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1542 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1543 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1544 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1545 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1549 ccdEntries.append(ccdEntry)
1551 outputCatalog = astropy.table.vstack(ccdEntries, join_type=
"exact")
1552 return pipeBase.Struct(outputCatalog=outputCatalog)
1555class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1556 dimensions=(
"instrument",),
1557 defaultTemplates={
"calexpType":
""}):
1558 visitSummaries = connectionTypes.Input(
1559 doc=
"Per-visit consolidated exposure metadata",
1560 name=
"finalVisitSummary",
1561 storageClass=
"ExposureCatalog",
1562 dimensions=(
"instrument",
"visit",),
1566 outputCatalog = connectionTypes.Output(
1567 doc=
"Visit metadata table",
1569 storageClass=
"ArrowAstropy",
1570 dimensions=(
"instrument",)
1574class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1575 pipelineConnections=MakeVisitTableConnections):
1579class MakeVisitTableTask(pipeBase.PipelineTask):
1580 """Produce a `visitTable` from the visit summary exposure catalogs.
1582 _DefaultName =
"makeVisitTable"
1583 ConfigClass = MakeVisitTableConfig
1585 def run(self, visitSummaries):
1586 """Make a table of visit information from the visit summary catalogs.
1590 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1591 List of exposure catalogs with per-detector summary information.
1594 result : `~lsst.pipe.base.Struct`
1595 Results struct with attribute:
1598 Catalog of visit information.
1601 for visitSummary
in visitSummaries:
1602 visitSummary = visitSummary.get()
1603 if not visitSummary:
1605 visitRow = visitSummary[0]
1606 visitInfo = visitRow.getVisitInfo()
1609 visitEntry[
"visitId"] = visitRow[
"visit"]
1610 visitEntry[
"visit"] = visitRow[
"visit"]
1611 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1612 visitEntry[
"band"] = visitRow[
"band"]
1613 raDec = visitInfo.getBoresightRaDec()
1614 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1615 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1619 visitEntry[
"decl"] = visitEntry[
"dec"]
1621 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1622 azAlt = visitInfo.getBoresightAzAlt()
1623 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1624 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1625 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1626 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1627 expTime = visitInfo.getExposureTime()
1628 visitEntry[
"expTime"] = expTime
1629 visitEntry[
"expMidpt"] = np.datetime64(visitInfo.getDate().toPython(),
"ns")
1630 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1631 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9),
"ns")
1632 expTime_days = expTime / (60*60*24)
1633 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1634 visitEntries.append(visitEntry)
1640 outputCatalog = astropy.table.Table(rows=visitEntries)
1641 return pipeBase.Struct(outputCatalog=outputCatalog)
1644class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1645 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1647 inputCatalog = connectionTypes.Input(
1648 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1649 "By default, it is the output of ForcedPhotCcdTask on calexps",
1651 storageClass=
"SourceCatalog",
1652 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1654 inputCatalogDiff = connectionTypes.Input(
1655 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1656 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1658 storageClass=
"SourceCatalog",
1659 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1661 outputCatalog = connectionTypes.Output(
1662 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1663 name=
"mergedForcedSource",
1664 storageClass=
"DataFrame",
1665 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1669class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1670 pipelineConnections=WriteForcedSourceTableConnections):
1672 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1678class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1679 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1681 Because the predecessor ForcedPhotCcdTask operates per-detector,
1682 per-tract, (i.e., it has tract in its dimensions), detectors
1683 on the tract boundary may have multiple forced source catalogs.
1685 The successor task TransformForcedSourceTable runs per-patch
1686 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1687 available multiple epochs.
1689 _DefaultName =
"writeForcedSourceTable"
1690 ConfigClass = WriteForcedSourceTableConfig
1692 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1693 inputs = butlerQC.get(inputRefs)
1694 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1695 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1696 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1697 outputs = self.run(**inputs)
1698 butlerQC.put(outputs, outputRefs)
1700 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1702 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1703 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1704 df = df.reindex(sorted(df.columns), axis=1)
1707 df[
"detector"] = np.int16(detector)
1708 df[
"band"] = band
if band
else pd.NA
1709 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1710 names=(
"dataset",
"column"))
1714 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1715 return pipeBase.Struct(outputCatalog=outputCatalog)
1718class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1719 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1721 inputCatalogs = connectionTypes.Input(
1722 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1723 name=
"mergedForcedSource",
1724 storageClass=
"DataFrame",
1725 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1729 referenceCatalog = connectionTypes.Input(
1730 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1731 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1734 storageClass=
"DataFrame",
1735 dimensions=(
"tract",
"patch",
"skymap"),
1738 outputCatalog = connectionTypes.Output(
1739 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1740 "specified set of functors",
1741 name=
"forcedSourceTable",
1742 storageClass=
"DataFrame",
1743 dimensions=(
"tract",
"patch",
"skymap")
1747class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1748 pipelineConnections=TransformForcedSourceTableConnections):
1749 referenceColumns = pexConfig.ListField(
1751 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1753 doc=
"Columns to pull from reference catalog",
1756 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1761 doc=
"Rename the output DataFrame index to this name",
1763 default=
"forcedSourceId",
1766 def setDefaults(self):
1767 super().setDefaults()
1768 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1769 self.columnsFromDataId = [
"tract",
"patch"]
1772class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1773 """Transform/standardize a ForcedSource catalog
1775 Transforms each wide, per-detector forcedSource DataFrame per the
1776 specification file (per-camera defaults found in ForcedSource.yaml).
1777 All epochs that overlap the patch are aggregated into one per-patch
1778 narrow-DataFrame file.
1780 No de-duplication of rows is performed. Duplicate resolutions flags are
1781 pulled in from the referenceCatalog: `detect_isPrimary`,
1782 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1783 for analysis or compare duplicates for QA.
1785 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1786 per-visit rows can be retreived by joining with the CcdVisitTable on
1789 _DefaultName =
"transformForcedSourceTable"
1790 ConfigClass = TransformForcedSourceTableConfig
1792 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1793 inputs = butlerQC.get(inputRefs)
1794 if self.funcs
is None:
1795 raise ValueError(
"config.functorFile is None. "
1796 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1797 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1798 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1800 butlerQC.put(outputs, outputRefs)
1802 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1804 refColumns = list(self.config.referenceColumns)
1805 refColumns.append(self.config.keyRef)
1806 ref = referenceCatalog.get(parameters={
"columns": refColumns})
1807 if ref.index.name != self.config.keyRef:
1813 ref.set_index(self.config.keyRef, inplace=
True)
1814 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1815 for handle
in inputCatalogs:
1816 result = self.transform(
None, handle, funcs, dataId)
1818 dfs.append(result.df.join(ref, how=
"inner"))
1820 outputCatalog = pd.concat(dfs)
1824 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1826 outputCatalog.reset_index(inplace=
True)
1829 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1831 outputCatalog.index.rename(self.config.key, inplace=
True)
1833 self.log.info(
"Made a table of %d columns and %d rows",
1834 len(outputCatalog.columns), len(outputCatalog))
1835 return pipeBase.Struct(outputCatalog=outputCatalog)
1838class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1839 defaultTemplates={
"catalogType":
""},
1840 dimensions=(
"instrument",
"tract")):
1841 inputCatalogs = connectionTypes.Input(
1842 doc=
"Input per-patch DataFrame Tables to be concatenated",
1843 name=
"{catalogType}ForcedSourceTable",
1844 storageClass=
"DataFrame",
1845 dimensions=(
"tract",
"patch",
"skymap"),
1849 outputCatalog = connectionTypes.Output(
1850 doc=
"Output per-tract concatenation of DataFrame Tables",
1851 name=
"{catalogType}ForcedSourceTable_tract",
1852 storageClass=
"DataFrame",
1853 dimensions=(
"tract",
"skymap"),
1857class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1858 pipelineConnections=ConsolidateTractConnections):
1862class ConsolidateTractTask(pipeBase.PipelineTask):
1863 """Concatenate any per-patch, dataframe list into a single
1864 per-tract DataFrame.
1866 _DefaultName =
"ConsolidateTract"
1867 ConfigClass = ConsolidateTractConfig
1869 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1870 inputs = butlerQC.get(inputRefs)
1873 self.log.info(
"Concatenating %s per-patch %s Tables",
1874 len(inputs[
"inputCatalogs"]),
1875 inputRefs.inputCatalogs[0].datasetType.name)
1876 df = pd.concat(inputs[
"inputCatalogs"])
1877 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
from_handles(cls, handles)
vstack_handles(cls, handles)
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)