22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
52from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
54from .functors
import CompositeFunctor, Column
56log = logging.getLogger(__name__)
59def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None):
60 """Flattens a dataframe with multilevel column index.
62 newDf = pd.DataFrame()
64 dfBands = df.columns.unique(level=0).values
67 columnFormat =
"{0}{1}" if camelCase
else "{0}_{1}"
68 newColumns = {c: columnFormat.format(band, c)
69 for c
in subdf.columns
if c
not in noDupCols}
70 cols = list(newColumns.keys())
71 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
74 presentBands = dfBands
if inputBands
is None else list(set(inputBands).intersection(dfBands))
76 noDupDf = df[presentBands[0]][noDupCols]
77 newDf = pd.concat([noDupDf, newDf], axis=1)
82 defaultTemplates={
"coaddName":
"deep"},
83 dimensions=(
"tract",
"patch",
"skymap")):
84 inputCatalogMeas = connectionTypes.Input(
85 doc=
"Catalog of source measurements on the deepCoadd.",
86 dimensions=(
"tract",
"patch",
"band",
"skymap"),
87 storageClass=
"SourceCatalog",
88 name=
"{coaddName}Coadd_meas",
91 inputCatalogForcedSrc = connectionTypes.Input(
92 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
93 dimensions=(
"tract",
"patch",
"band",
"skymap"),
94 storageClass=
"SourceCatalog",
95 name=
"{coaddName}Coadd_forced_src",
98 inputCatalogRef = connectionTypes.Input(
99 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
100 "for each detection in deepCoadd_mergeDet.",
101 dimensions=(
"tract",
"patch",
"skymap"),
102 storageClass=
"SourceCatalog",
103 name=
"{coaddName}Coadd_ref"
105 outputCatalog = connectionTypes.Output(
106 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
107 "stored as a DataFrame with a multi-level column index per-patch.",
108 dimensions=(
"tract",
"patch",
"skymap"),
109 storageClass=
"DataFrame",
110 name=
"{coaddName}Coadd_obj"
114class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
115 pipelineConnections=WriteObjectTableConnections):
116 coaddName = pexConfig.Field(
123class WriteObjectTableTask(pipeBase.PipelineTask):
124 """Write filter-merged source tables as a DataFrame in parquet format.
126 _DefaultName =
"writeObjectTable"
127 ConfigClass = WriteObjectTableConfig
130 inputDatasets = (
"forced_src",
"meas",
"ref")
133 outputDataset =
"obj"
135 def runQuantum(self, butlerQC, inputRefs, outputRefs):
136 inputs = butlerQC.get(inputRefs)
138 measDict = {ref.dataId[
"band"]: {
"meas": cat}
for ref, cat
in
139 zip(inputRefs.inputCatalogMeas, inputs[
"inputCatalogMeas"])}
140 forcedSourceDict = {ref.dataId[
"band"]: {
"forced_src": cat}
for ref, cat
in
141 zip(inputRefs.inputCatalogForcedSrc, inputs[
"inputCatalogForcedSrc"])}
144 for band
in measDict.keys():
145 catalogs[band] = {
"meas": measDict[band][
"meas"],
146 "forced_src": forcedSourceDict[band][
"forced_src"],
147 "ref": inputs[
"inputCatalogRef"]}
148 dataId = butlerQC.quantum.dataId
149 df = self.run(catalogs=catalogs, tract=dataId[
"tract"], patch=dataId[
"patch"])
150 outputs = pipeBase.Struct(outputCatalog=df)
151 butlerQC.put(outputs, outputRefs)
153 def run(self, catalogs, tract, patch):
154 """Merge multiple catalogs.
159 Mapping from filter names to dict of catalogs.
161 tractId to use for the tractId column.
163 patchId to use for the patchId column.
167 catalog : `pandas.DataFrame`
171 for filt, tableDict
in catalogs.items():
172 for dataset, table
in tableDict.items():
174 df = table.asAstropy().to_pandas().set_index(
"id", drop=
True)
177 df = df.reindex(sorted(df.columns), axis=1)
178 df = df.assign(tractId=tract, patchId=patch)
181 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
182 names=(
"dataset",
"band",
"column"))
187 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
191class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
192 defaultTemplates={
"catalogType":
""},
193 dimensions=(
"instrument",
"visit",
"detector")):
195 catalog = connectionTypes.Input(
196 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
197 name=
"{catalogType}src",
198 storageClass=
"SourceCatalog",
199 dimensions=(
"instrument",
"visit",
"detector")
201 outputCatalog = connectionTypes.Output(
202 doc=
"Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is "
203 "replaced with an index; all other columns are unchanged.",
204 name=
"{catalogType}source",
205 storageClass=
"DataFrame",
206 dimensions=(
"instrument",
"visit",
"detector")
210class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
211 pipelineConnections=WriteSourceTableConnections):
215class WriteSourceTableTask(pipeBase.PipelineTask):
216 """Write source table to DataFrame Parquet format.
218 _DefaultName =
"writeSourceTable"
219 ConfigClass = WriteSourceTableConfig
221 def runQuantum(self, butlerQC, inputRefs, outputRefs):
222 inputs = butlerQC.get(inputRefs)
223 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
224 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
225 result = self.run(**inputs)
226 outputs = pipeBase.Struct(outputCatalog=result.table)
227 butlerQC.put(outputs, outputRefs)
229 def run(self, catalog, visit, detector, **kwargs):
230 """Convert `src` catalog to DataFrame
234 catalog: `afwTable.SourceCatalog`
235 catalog to be converted
236 visit, detector: `int`
237 Visit and detector ids to be added as columns.
239 Additional keyword arguments are ignored as a convenience for
240 subclasses that pass the same arguments to several different
245 result : `~lsst.pipe.base.Struct`
247 `DataFrame` version of the input catalog
249 self.log.info(
"Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector)
250 df = catalog.asAstropy().to_pandas().set_index(
"id", drop=
True)
253 df[
"detector"] = np.int16(detector)
255 return pipeBase.Struct(table=df)
258class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
259 defaultTemplates={
"catalogType":
""},
260 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
261 visitSummary = connectionTypes.Input(
262 doc=
"Input visit-summary catalog with updated calibration objects.",
263 name=
"finalVisitSummary",
264 storageClass=
"ExposureCatalog",
265 dimensions=(
"instrument",
"visit",),
269class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
270 pipelineConnections=WriteRecalibratedSourceTableConnections):
272 doReevaluatePhotoCalib = pexConfig.Field(
275 doc=(
"Add or replace local photoCalib columns"),
277 doReevaluateSkyWcs = pexConfig.Field(
280 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
284class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
285 """Write source table to DataFrame Parquet format.
287 _DefaultName =
"writeRecalibratedSourceTable"
288 ConfigClass = WriteRecalibratedSourceTableConfig
290 def runQuantum(self, butlerQC, inputRefs, outputRefs):
291 inputs = butlerQC.get(inputRefs)
293 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
294 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
296 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
297 exposure = ExposureF()
298 inputs[
"exposure"] = self.prepareCalibratedExposure(
300 visitSummary=inputs[
"visitSummary"],
301 detectorId=butlerQC.quantum.dataId[
"detector"]
303 inputs[
"catalog"] = self.addCalibColumns(**inputs)
305 result = self.run(**inputs)
306 outputs = pipeBase.Struct(outputCatalog=result.table)
307 butlerQC.put(outputs, outputRefs)
309 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
310 """Prepare a calibrated exposure and apply external calibrations
315 exposure : `lsst.afw.image.exposure.Exposure`
316 Input exposure to adjust calibrations. May be an empty Exposure.
318 Detector ID associated with the exposure.
319 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
320 Exposure catalog with all calibration objects. WCS and PhotoCalib
321 are always applied if ``visitSummary`` is provided and those
322 components are not `None`.
326 exposure : `lsst.afw.image.exposure.Exposure`
327 Exposure with adjusted calibrations.
329 if visitSummary
is not None:
330 row = visitSummary.find(detectorId)
332 raise RuntimeError(f
"Visit summary for detector {detectorId} is unexpectedly missing.")
333 if (photoCalib := row.getPhotoCalib())
is None:
334 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
335 "skipping reevaluation of photoCalib.", detectorId)
336 exposure.setPhotoCalib(
None)
338 exposure.setPhotoCalib(photoCalib)
339 if (skyWcs := row.getWcs())
is None:
340 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
341 "skipping reevaluation of skyWcs.", detectorId)
342 exposure.setWcs(
None)
344 exposure.setWcs(skyWcs)
348 def addCalibColumns(self, catalog, exposure, **kwargs):
349 """Add replace columns with calibs evaluated at each centroid
351 Add or replace 'base_LocalWcs' and 'base_LocalPhotoCalib' columns in
352 a source catalog, by rerunning the plugins.
356 catalog : `lsst.afw.table.SourceCatalog`
357 catalog to which calib columns will be added
358 exposure : `lsst.afw.image.exposure.Exposure`
359 Exposure with attached PhotoCalibs and SkyWcs attributes to be
360 reevaluated at local centroids. Pixels are not required.
362 Additional keyword arguments are ignored to facilitate passing the
363 same arguments to several methods.
367 newCat: `lsst.afw.table.SourceCatalog`
368 Source Catalog with requested local calib columns
370 measureConfig = SingleFrameMeasurementTask.ConfigClass()
371 measureConfig.doReplaceWithNoise =
False
374 for slot
in measureConfig.slots:
375 setattr(measureConfig.slots, slot,
None)
377 measureConfig.plugins.names = []
378 if self.config.doReevaluateSkyWcs:
379 measureConfig.plugins.names.add(
"base_LocalWcs")
380 self.log.info(
"Re-evaluating base_LocalWcs plugin")
381 if self.config.doReevaluatePhotoCalib:
382 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
383 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
384 pluginsNotToCopy = tuple(measureConfig.plugins.names)
388 aliasMap = catalog.schema.getAliasMap()
390 for item
in catalog.schema:
391 if not item.field.getName().startswith(pluginsNotToCopy):
392 mapper.addMapping(item.key)
394 schema = mapper.getOutputSchema()
396 schema.setAliasMap(aliasMap)
398 newCat.extend(catalog, mapper=mapper)
404 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
406 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
410 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
411 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
416 if wcsPlugin
is not None:
417 wcsPlugin.measure(row, exposure)
418 if pcPlugin
is not None:
419 pcPlugin.measure(row, exposure)
424class PostprocessAnalysis(object):
425 """Calculate columns from DataFrames or handles storing DataFrames.
427 This object manages and organizes an arbitrary set of computations
428 on a catalog. The catalog is defined by a
429 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
430 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
431 computations are defined by a collection of
432 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
433 ``CompositeFunctor``).
435 After the object is initialized, accessing the ``.df`` attribute (which
436 holds the `pandas.DataFrame` containing the results of the calculations)
437 triggers computation of said dataframe.
439 One of the conveniences of using this object is the ability to define a
440 desired common filter for all functors. This enables the same functor
441 collection to be passed to several different `PostprocessAnalysis` objects
442 without having to change the original functor collection, since the ``filt``
443 keyword argument of this object triggers an overwrite of the ``filt``
444 property for all functors in the collection.
446 This object also allows a list of refFlags to be passed, and defines a set
447 of default refFlags that are always included even if not requested.
449 If a list of DataFrames or Handles is passed, rather than a single one,
450 then the calculations will be mapped over all the input catalogs. In
451 principle, it should be straightforward to parallelize this activity, but
452 initial tests have failed (see TODO in code comments).
456 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
457 `~lsst.pipe.base.InMemoryDatasetHandle` or
459 Source catalog(s) for computation.
460 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
461 Computations to do (functors that act on ``handles``).
462 If a dict, the output
463 DataFrame will have columns keyed accordingly.
464 If a list, the column keys will come from the
465 ``.shortname`` attribute of each functor.
467 filt : `str`, optional
468 Filter in which to calculate. If provided,
469 this will overwrite any existing ``.filt`` attribute
470 of the provided functors.
472 flags : `list`, optional
473 List of flags (per-band) to include in output table.
474 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
476 refFlags : `list`, optional
477 List of refFlags (only reference band) to include in output table.
479 forcedFlags : `list`, optional
480 List of flags (per-band) to include in output table.
481 Taken from the ``forced_src`` dataset if applied to a
482 multilevel Object Table. Intended for flags from measurement plugins
483 only run during multi-band forced-photometry.
485 _defaultRefFlags = []
488 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
489 self.handles = handles
490 self.functors = functors
493 self.flags = list(flags)
if flags
is not None else []
494 self.forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
495 self.refFlags = list(self._defaultRefFlags)
496 if refFlags
is not None:
497 self.refFlags += list(refFlags)
502 def defaultFuncs(self):
503 funcs = dict(self._defaultFuncs)
508 additionalFuncs = self.defaultFuncs
509 additionalFuncs.update({flag:
Column(flag, dataset=
"forced_src")
for flag
in self.forcedFlags})
510 additionalFuncs.update({flag:
Column(flag, dataset=
"ref")
for flag
in self.refFlags})
511 additionalFuncs.update({flag:
Column(flag, dataset=
"meas")
for flag
in self.flags})
513 if isinstance(self.functors, CompositeFunctor):
518 func.funcDict.update(additionalFuncs)
519 func.filt = self.filt
525 return [name
for name, func
in self.func.funcDict.items()
if func.noDup
or func.dataset ==
"ref"]
533 def compute(self, dropna=False, pool=None):
535 if type(self.handles)
in (list, tuple):
537 dflist = [self.func(handle, dropna=dropna)
for handle
in self.handles]
541 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.handles)
542 self._df = pd.concat(dflist)
544 self._df = self.func(self.handles, dropna=dropna)
549class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
551 """Expected Connections for subclasses of TransformCatalogBaseTask.
555 inputCatalog = connectionTypes.Input(
557 storageClass=
"DataFrame",
559 outputCatalog = connectionTypes.Output(
561 storageClass=
"DataFrame",
565class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
566 pipelineConnections=TransformCatalogBaseConnections):
567 functorFile = pexConfig.Field(
569 doc=
"Path to YAML file specifying Science Data Model functors to use "
570 "when copying columns and computing calibrated values.",
574 primaryKey = pexConfig.Field(
576 doc=
"Name of column to be set as the DataFrame index. If None, the index"
577 "will be named `id`",
581 columnsFromDataId = pexConfig.ListField(
585 doc=
"Columns to extract from the dataId",
589class TransformCatalogBaseTask(pipeBase.PipelineTask):
590 """Base class for transforming/standardizing a catalog by applying functors
591 that convert units and apply calibrations.
593 The purpose of this task is to perform a set of computations on an input
594 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
595 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
596 a new dataset (which needs to be declared in an ``outputDataset``
599 The calculations to be performed are defined in a YAML file that specifies
600 a set of functors to be computed, provided as a ``--functorFile`` config
601 parameter. An example of such a YAML file is the following:
608 args: slot_Centroid_x
611 args: slot_Centroid_y
613 functor: LocalNanojansky
615 - slot_PsfFlux_instFlux
616 - slot_PsfFlux_instFluxErr
617 - base_LocalPhotoCalib
618 - base_LocalPhotoCalibErr
620 functor: LocalNanojanskyErr
622 - slot_PsfFlux_instFlux
623 - slot_PsfFlux_instFluxErr
624 - base_LocalPhotoCalib
625 - base_LocalPhotoCalibErr
629 The names for each entry under "func" will become the names of columns in
630 the output dataset. All the functors referenced are defined in
631 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
632 functor are in the `args` list, and any additional entries for each column
633 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
634 treated as keyword arguments to be passed to the functor initialization.
636 The "flags" entry is the default shortcut for `Column` functors.
637 All columns listed under "flags" will be copied to the output table
638 untransformed. They can be of any datatype.
639 In the special case of transforming a multi-level oject table with
640 band and dataset indices (deepCoadd_obj), these will be taked from the
641 ``meas`` dataset and exploded out per band.
643 There are two special shortcuts that only apply when transforming
644 multi-level Object (deepCoadd_obj) tables:
645 - The "refFlags" entry is shortcut for `Column` functor
646 taken from the ``ref`` dataset if transforming an ObjectTable.
647 - The "forcedFlags" entry is shortcut for `Column` functors.
648 taken from the ``forced_src`` dataset if transforming an ObjectTable.
649 These are expanded out per band.
652 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
653 to organize and excecute the calculations.
656 def _DefaultName(self):
657 raise NotImplementedError(
"Subclass must define the \"_DefaultName\" attribute.")
660 def outputDataset(self):
661 raise NotImplementedError(
"Subclass must define the \"outputDataset\" attribute.")
664 def inputDataset(self):
665 raise NotImplementedError(
"Subclass must define \"inputDataset\" attribute.")
668 def ConfigClass(self):
669 raise NotImplementedError(
"Subclass must define \"ConfigClass\" attribute.")
671 def __init__(self, *args, **kwargs):
672 super().__init__(*args, **kwargs)
673 if self.config.functorFile:
674 self.log.info(
"Loading tranform functor definitions from %s",
675 self.config.functorFile)
676 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
677 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
681 def runQuantum(self, butlerQC, inputRefs, outputRefs):
682 inputs = butlerQC.get(inputRefs)
683 if self.
funcs is None:
684 raise ValueError(
"config.functorFile is None. "
685 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
686 result = self.run(handle=inputs[
"inputCatalog"], funcs=self.
funcs,
687 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
688 outputs = pipeBase.Struct(outputCatalog=result)
689 butlerQC.put(outputs, outputRefs)
691 def run(self, handle, funcs=None, dataId=None, band=None):
692 """Do postprocessing calculations
694 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
695 ``DataFrame`` object and dataId,
696 returns a dataframe with results of postprocessing calculations.
700 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
701 `~lsst.pipe.base.InMemoryDatasetHandle` or
702 `~pandas.DataFrame`, or list of these.
703 DataFrames from which calculations are done.
704 funcs : `~lsst.pipe.tasks.functors.Functor`
705 Functors to apply to the table's columns
706 dataId : dict, optional
707 Used to add a `patchId` column to the output dataframe.
708 band : `str`, optional
709 Filter band that is being processed.
713 df : `pandas.DataFrame`
715 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
717 df = self.transform(band, handle, funcs, dataId).df
718 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
721 def getFunctors(self):
724 def getAnalysis(self, handles, funcs=None, band=None):
727 analysis = PostprocessAnalysis(handles, funcs, filt=band)
730 def transform(self, band, handles, funcs, dataId):
731 analysis = self.getAnalysis(handles, funcs=funcs, band=band)
733 if dataId
and self.config.columnsFromDataId:
734 for key
in self.config.columnsFromDataId:
736 if key ==
"detector":
738 df[key] = np.int16(dataId[key])
740 df[key] = dataId[key]
742 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
744 if self.config.primaryKey:
745 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
746 df.reset_index(inplace=
True, drop=
True)
747 df.set_index(self.config.primaryKey, inplace=
True)
749 return pipeBase.Struct(
756 defaultTemplates={
"coaddName":
"deep"},
757 dimensions=(
"tract",
"patch",
"skymap")):
758 inputCatalog = connectionTypes.Input(
759 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
760 "stored as a DataFrame with a multi-level column index per-patch.",
761 dimensions=(
"tract",
"patch",
"skymap"),
762 storageClass=
"DataFrame",
763 name=
"{coaddName}Coadd_obj",
766 outputCatalog = connectionTypes.Output(
767 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
769 dimensions=(
"tract",
"patch",
"skymap"),
770 storageClass=
"DataFrame",
775class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
776 pipelineConnections=TransformObjectCatalogConnections):
777 coaddName = pexConfig.Field(
782 outputBands = pexConfig.ListField(
786 doc=(
"These bands and only these bands will appear in the output,"
787 " NaN-filled if the input does not include them."
788 " If None, then use all bands found in the input.")
790 camelCase = pexConfig.Field(
793 doc=(
"Write per-band columns names with camelCase, else underscore "
794 "For example: gPsFlux instead of g_PsFlux.")
796 multilevelOutput = pexConfig.Field(
799 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
800 "and name-munged (False).")
802 goodFlags = pexConfig.ListField(
805 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
806 "All other flags are considered to be 'bad' flags and will be set to True.")
808 floatFillValue = pexConfig.Field(
811 doc=
"Fill value for float fields when populating empty tables."
813 integerFillValue = pexConfig.Field(
816 doc=
"Fill value for integer fields when populating empty tables."
819 def setDefaults(self):
820 super().setDefaults()
821 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Object.yaml")
822 self.primaryKey =
"objectId"
823 self.columnsFromDataId = [
"tract",
"patch"]
824 self.goodFlags = [
"calib_astrometry_used",
825 "calib_photometry_reserved",
826 "calib_photometry_used",
827 "calib_psf_candidate",
828 "calib_psf_reserved",
832class TransformObjectCatalogTask(TransformCatalogBaseTask):
833 """Produce a flattened Object Table to match the format specified in
836 Do the same set of postprocessing calculations on all bands.
838 This is identical to `TransformCatalogBaseTask`, except for that it does
839 the specified functor calculations for all filters present in the
840 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
841 by the YAML file will be superceded.
843 _DefaultName =
"transformObjectCatalog"
844 ConfigClass = TransformObjectCatalogConfig
846 def run(self, handle, funcs=None, dataId=None, band=None):
850 templateDf = pd.DataFrame()
852 columns = handle.get(component=
"columns")
853 inputBands = columns.unique(level=1).values
855 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
858 for inputBand
in inputBands:
859 if inputBand
not in outputBands:
860 self.log.info(
"Ignoring %s band data in the input", inputBand)
862 self.log.info(
"Transforming the catalog of band %s", inputBand)
863 result = self.transform(inputBand, handle, funcs, dataId)
864 dfDict[inputBand] = result.df
865 analysisDict[inputBand] = result.analysis
867 templateDf = result.df
870 for filt
in outputBands:
871 if filt
not in dfDict:
872 self.log.info(
"Adding empty columns for band %s", filt)
873 dfTemp = templateDf.copy()
874 for col
in dfTemp.columns:
875 testValue = dfTemp[col].values[0]
876 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
878 if col
in self.config.goodFlags:
882 elif isinstance(testValue, numbers.Integral):
886 if isinstance(testValue, np.unsignedinteger):
887 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
889 fillValue = self.config.integerFillValue
891 fillValue = self.config.floatFillValue
892 dfTemp[col].values[:] = fillValue
893 dfDict[filt] = dfTemp
896 df = pd.concat(dfDict, axis=1, names=[
"band",
"column"])
898 if not self.config.multilevelOutput:
899 noDupCols = list(set.union(*[set(v.noDupCols)
for v
in analysisDict.values()]))
900 if self.config.primaryKey
in noDupCols:
901 noDupCols.remove(self.config.primaryKey)
902 if dataId
and self.config.columnsFromDataId:
903 noDupCols += self.config.columnsFromDataId
904 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
905 inputBands=inputBands)
907 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
912class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
913 dimensions=(
"tract",
"skymap")):
914 inputCatalogs = connectionTypes.Input(
915 doc=
"Per-Patch objectTables conforming to the standard data model.",
917 storageClass=
"DataFrame",
918 dimensions=(
"tract",
"patch",
"skymap"),
921 outputCatalog = connectionTypes.Output(
922 doc=
"Pre-tract horizontal concatenation of the input objectTables",
923 name=
"objectTable_tract",
924 storageClass=
"DataFrame",
925 dimensions=(
"tract",
"skymap"),
929class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
930 pipelineConnections=ConsolidateObjectTableConnections):
931 coaddName = pexConfig.Field(
938class ConsolidateObjectTableTask(pipeBase.PipelineTask):
939 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
941 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
943 _DefaultName =
"consolidateObjectTable"
944 ConfigClass = ConsolidateObjectTableConfig
946 inputDataset =
"objectTable"
947 outputDataset =
"objectTable_tract"
949 def runQuantum(self, butlerQC, inputRefs, outputRefs):
950 inputs = butlerQC.get(inputRefs)
951 self.log.info(
"Concatenating %s per-patch Object Tables",
952 len(inputs[
"inputCatalogs"]))
953 df = pd.concat(inputs[
"inputCatalogs"])
954 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
957class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
958 defaultTemplates={
"catalogType":
""},
959 dimensions=(
"instrument",
"visit",
"detector")):
961 inputCatalog = connectionTypes.Input(
962 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
963 name=
"{catalogType}source",
964 storageClass=
"DataFrame",
965 dimensions=(
"instrument",
"visit",
"detector"),
968 outputCatalog = connectionTypes.Output(
969 doc=
"Narrower, per-detector Source Table transformed and converted per a "
970 "specified set of functors",
971 name=
"{catalogType}sourceTable",
972 storageClass=
"DataFrame",
973 dimensions=(
"instrument",
"visit",
"detector")
977class TransformSourceTableConfig(TransformCatalogBaseConfig,
978 pipelineConnections=TransformSourceTableConnections):
980 def setDefaults(self):
981 super().setDefaults()
982 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"Source.yaml")
983 self.primaryKey =
"sourceId"
984 self.columnsFromDataId = [
"visit",
"detector",
"band",
"physical_filter"]
987class TransformSourceTableTask(TransformCatalogBaseTask):
988 """Transform/standardize a source catalog
990 _DefaultName =
"transformSourceTable"
991 ConfigClass = TransformSourceTableConfig
994class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
995 dimensions=(
"instrument",
"visit",),
996 defaultTemplates={
"calexpType":
""}):
997 calexp = connectionTypes.Input(
998 doc=
"Processed exposures used for metadata",
1000 storageClass=
"ExposureF",
1001 dimensions=(
"instrument",
"visit",
"detector"),
1005 visitSummary = connectionTypes.Output(
1006 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1007 "detector id for the id and are sorted for fast lookups of a "
1009 name=
"visitSummary",
1010 storageClass=
"ExposureCatalog",
1011 dimensions=(
"instrument",
"visit"),
1013 visitSummarySchema = connectionTypes.InitOutput(
1014 doc=
"Schema of the visitSummary catalog",
1015 name=
"visitSummary_schema",
1016 storageClass=
"ExposureCatalog",
1020class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1021 pipelineConnections=ConsolidateVisitSummaryConnections):
1022 """Config for ConsolidateVisitSummaryTask"""
1026class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1027 """Task to consolidate per-detector visit metadata.
1029 This task aggregates the following metadata from all the detectors in a
1030 single visit into an exposure catalog:
1034 - The physical_filter and band (if available).
1035 - The psf size, shape, and effective area at the center of the detector.
1036 - The corners of the bounding box in right ascension/declination.
1038 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1039 are not persisted here because of storage concerns, and because of their
1040 limited utility as summary statistics.
1042 Tests for this task are performed in ci_hsc_gen3.
1044 _DefaultName =
"consolidateVisitSummary"
1045 ConfigClass = ConsolidateVisitSummaryConfig
1047 def __init__(self, **kwargs):
1048 super().__init__(**kwargs)
1049 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1050 self.schema.addField(
"visit", type=
"L", doc=
"Visit number")
1051 self.schema.addField(
"physical_filter", type=
"String", size=32, doc=
"Physical filter")
1052 self.schema.addField(
"band", type=
"String", size=32, doc=
"Name of band")
1053 ExposureSummaryStats.update_schema(self.schema)
1056 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1057 dataRefs = butlerQC.get(inputRefs.calexp)
1058 visit = dataRefs[0].dataId[
"visit"]
1060 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1061 len(dataRefs), visit)
1063 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1065 butlerQC.put(expCatalog, outputRefs.visitSummary)
1067 def _combineExposureMetadata(self, visit, dataRefs):
1068 """Make a combined exposure catalog from a list of dataRefs.
1069 These dataRefs must point to exposures with wcs, summaryStats,
1070 and other visit metadata.
1075 Visit identification number.
1076 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1077 List of dataRefs in visit.
1081 visitSummary : `lsst.afw.table.ExposureCatalog`
1082 Exposure catalog with per-detector summary information.
1085 cat.resize(len(dataRefs))
1087 cat[
"visit"] = visit
1089 for i, dataRef
in enumerate(dataRefs):
1090 visitInfo = dataRef.get(component=
"visitInfo")
1091 filterLabel = dataRef.get(component=
"filter")
1092 summaryStats = dataRef.get(component=
"summaryStats")
1093 detector = dataRef.get(component=
"detector")
1094 wcs = dataRef.get(component=
"wcs")
1095 photoCalib = dataRef.get(component=
"photoCalib")
1096 detector = dataRef.get(component=
"detector")
1097 bbox = dataRef.get(component=
"bbox")
1098 validPolygon = dataRef.get(component=
"validPolygon")
1102 rec.setVisitInfo(visitInfo)
1104 rec.setPhotoCalib(photoCalib)
1105 rec.setValidPolygon(validPolygon)
1107 rec[
"physical_filter"] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1108 rec[
"band"] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1109 rec.setId(detector.getId())
1110 summaryStats.update_record(rec)
1113 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1115 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1116 cat.setMetadata(metadata)
1122class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1123 defaultTemplates={
"catalogType":
""},
1124 dimensions=(
"instrument",
"visit")):
1125 inputCatalogs = connectionTypes.Input(
1126 doc=
"Input per-detector Source Tables",
1127 name=
"{catalogType}sourceTable",
1128 storageClass=
"DataFrame",
1129 dimensions=(
"instrument",
"visit",
"detector"),
1132 outputCatalog = connectionTypes.Output(
1133 doc=
"Per-visit concatenation of Source Table",
1134 name=
"{catalogType}sourceTable_visit",
1135 storageClass=
"DataFrame",
1136 dimensions=(
"instrument",
"visit")
1140class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1141 pipelineConnections=ConsolidateSourceTableConnections):
1145class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1146 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1148 _DefaultName =
"consolidateSourceTable"
1149 ConfigClass = ConsolidateSourceTableConfig
1151 inputDataset =
"sourceTable"
1152 outputDataset =
"sourceTable_visit"
1154 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1155 from .makeWarp
import reorderRefs
1157 detectorOrder = [ref.dataId[
"detector"]
for ref
in inputRefs.inputCatalogs]
1158 detectorOrder.sort()
1159 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
"detector")
1160 inputs = butlerQC.get(inputRefs)
1161 self.log.info(
"Concatenating %s per-detector Source Tables",
1162 len(inputs[
"inputCatalogs"]))
1163 df = pd.concat(inputs[
"inputCatalogs"])
1164 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1167class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1168 dimensions=(
"instrument",),
1169 defaultTemplates={
"calexpType":
""}):
1170 visitSummaryRefs = connectionTypes.Input(
1171 doc=
"Data references for per-visit consolidated exposure metadata",
1172 name=
"finalVisitSummary",
1173 storageClass=
"ExposureCatalog",
1174 dimensions=(
"instrument",
"visit"),
1178 outputCatalog = connectionTypes.Output(
1179 doc=
"CCD and Visit metadata table",
1180 name=
"ccdVisitTable",
1181 storageClass=
"DataFrame",
1182 dimensions=(
"instrument",)
1186class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1187 pipelineConnections=MakeCcdVisitTableConnections):
1188 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1191class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1192 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1194 _DefaultName =
"makeCcdVisitTable"
1195 ConfigClass = MakeCcdVisitTableConfig
1197 def run(self, visitSummaryRefs):
1198 """Make a table of ccd information from the visit summary catalogs.
1202 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1203 List of DeferredDatasetHandles pointing to exposure catalogs with
1204 per-detector summary information.
1208 result : `~lsst.pipe.base.Struct`
1209 Results struct with attribute:
1212 Catalog of ccd and visit information.
1215 for visitSummaryRef
in visitSummaryRefs:
1216 visitSummary = visitSummaryRef.get()
1217 visitInfo = visitSummary[0].getVisitInfo()
1220 summaryTable = visitSummary.asAstropy()
1221 selectColumns = [
"id",
"visit",
"physical_filter",
"band",
"ra",
"dec",
1222 "pixelScale",
"zenithDistance",
1223 "expTime",
"zeroPoint",
"psfSigma",
"skyBg",
"skyNoise",
1224 "astromOffsetMean",
"astromOffsetStd",
"nPsfStar",
1225 "psfStarDeltaE1Median",
"psfStarDeltaE2Median",
1226 "psfStarDeltaE1Scatter",
"psfStarDeltaE2Scatter",
1227 "psfStarDeltaSizeMedian",
"psfStarDeltaSizeScatter",
1228 "psfStarScaledDeltaSizeScatter",
"psfTraceRadiusDelta",
1229 "psfApFluxDelta",
"psfApCorrSigmaScaledDelta",
1230 "maxDistToNearestPsf",
1231 "effTime",
"effTimePsfSigmaScale",
1232 "effTimeSkyBgScale",
"effTimeZeroPointScale",
1234 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
"id")
1239 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1243 ccdEntry[
"decl"] = ccdEntry.loc[:,
"dec"]
1245 ccdEntry[
"ccdVisitId"] = [
1246 self.config.idGenerator.apply(
1247 visitSummaryRef.dataId,
1248 detector=detector_id,
1255 for detector_id
in summaryTable[
"id"]
1257 ccdEntry[
"detector"] = summaryTable[
"id"]
1258 ccdEntry[
"seeing"] = (
1259 visitSummary[
"psfSigma"] * visitSummary[
"pixelScale"] * np.sqrt(8 * np.log(2))
1261 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1262 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1263 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1264 ccdEntry[
"obsStart"] = (
1265 ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=ccdEntry[
"expTime"].values[0])
1267 expTime_days = ccdEntry[
"expTime"] / (60*60*24)
1268 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1269 ccdEntry[
"darkTime"] = visitInfo.getDarkTime()
1270 ccdEntry[
"xSize"] = summaryTable[
"bbox_max_x"] - summaryTable[
"bbox_min_x"]
1271 ccdEntry[
"ySize"] = summaryTable[
"bbox_max_y"] - summaryTable[
"bbox_min_y"]
1272 ccdEntry[
"llcra"] = summaryTable[
"raCorners"][:, 0]
1273 ccdEntry[
"llcdec"] = summaryTable[
"decCorners"][:, 0]
1274 ccdEntry[
"ulcra"] = summaryTable[
"raCorners"][:, 1]
1275 ccdEntry[
"ulcdec"] = summaryTable[
"decCorners"][:, 1]
1276 ccdEntry[
"urcra"] = summaryTable[
"raCorners"][:, 2]
1277 ccdEntry[
"urcdec"] = summaryTable[
"decCorners"][:, 2]
1278 ccdEntry[
"lrcra"] = summaryTable[
"raCorners"][:, 3]
1279 ccdEntry[
"lrcdec"] = summaryTable[
"decCorners"][:, 3]
1283 ccdEntries.append(ccdEntry)
1285 outputCatalog = pd.concat(ccdEntries)
1286 outputCatalog.set_index(
"ccdVisitId", inplace=
True, verify_integrity=
True)
1287 return pipeBase.Struct(outputCatalog=outputCatalog)
1290class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1291 dimensions=(
"instrument",),
1292 defaultTemplates={
"calexpType":
""}):
1293 visitSummaries = connectionTypes.Input(
1294 doc=
"Per-visit consolidated exposure metadata",
1295 name=
"finalVisitSummary",
1296 storageClass=
"ExposureCatalog",
1297 dimensions=(
"instrument",
"visit",),
1301 outputCatalog = connectionTypes.Output(
1302 doc=
"Visit metadata table",
1304 storageClass=
"DataFrame",
1305 dimensions=(
"instrument",)
1309class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1310 pipelineConnections=MakeVisitTableConnections):
1314class MakeVisitTableTask(pipeBase.PipelineTask):
1315 """Produce a `visitTable` from the visit summary exposure catalogs.
1317 _DefaultName =
"makeVisitTable"
1318 ConfigClass = MakeVisitTableConfig
1320 def run(self, visitSummaries):
1321 """Make a table of visit information from the visit summary catalogs.
1325 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1326 List of exposure catalogs with per-detector summary information.
1329 result : `~lsst.pipe.base.Struct`
1330 Results struct with attribute:
1333 Catalog of visit information.
1336 for visitSummary
in visitSummaries:
1337 visitSummary = visitSummary.get()
1338 visitRow = visitSummary[0]
1339 visitInfo = visitRow.getVisitInfo()
1342 visitEntry[
"visitId"] = visitRow[
"visit"]
1343 visitEntry[
"visit"] = visitRow[
"visit"]
1344 visitEntry[
"physical_filter"] = visitRow[
"physical_filter"]
1345 visitEntry[
"band"] = visitRow[
"band"]
1346 raDec = visitInfo.getBoresightRaDec()
1347 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1348 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1352 visitEntry[
"decl"] = visitEntry[
"dec"]
1354 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1355 azAlt = visitInfo.getBoresightAzAlt()
1356 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1357 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1358 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1359 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1360 expTime = visitInfo.getExposureTime()
1361 visitEntry[
"expTime"] = expTime
1362 visitEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1363 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1364 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1365 expTime_days = expTime / (60*60*24)
1366 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1367 visitEntries.append(visitEntry)
1373 outputCatalog = pd.DataFrame(data=visitEntries)
1374 outputCatalog.set_index(
"visitId", inplace=
True, verify_integrity=
True)
1375 return pipeBase.Struct(outputCatalog=outputCatalog)
1378class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1379 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1381 inputCatalog = connectionTypes.Input(
1382 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1383 "By default, it is the output of ForcedPhotCcdTask on calexps",
1385 storageClass=
"SourceCatalog",
1386 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1388 inputCatalogDiff = connectionTypes.Input(
1389 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1390 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1392 storageClass=
"SourceCatalog",
1393 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1395 outputCatalog = connectionTypes.Output(
1396 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1397 name=
"mergedForcedSource",
1398 storageClass=
"DataFrame",
1399 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1403class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1404 pipelineConnections=WriteForcedSourceTableConnections):
1406 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1412class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1413 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1415 Because the predecessor ForcedPhotCcdTask operates per-detector,
1416 per-tract, (i.e., it has tract in its dimensions), detectors
1417 on the tract boundary may have multiple forced source catalogs.
1419 The successor task TransformForcedSourceTable runs per-patch
1420 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1421 available multiple epochs.
1423 _DefaultName =
"writeForcedSourceTable"
1424 ConfigClass = WriteForcedSourceTableConfig
1426 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1427 inputs = butlerQC.get(inputRefs)
1428 inputs[
"visit"] = butlerQC.quantum.dataId[
"visit"]
1429 inputs[
"detector"] = butlerQC.quantum.dataId[
"detector"]
1430 inputs[
"band"] = butlerQC.quantum.dataId[
"band"]
1431 outputs = self.run(**inputs)
1432 butlerQC.put(outputs, outputRefs)
1434 def run(self, inputCatalog, inputCatalogDiff, visit, detector, band=None):
1436 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
"calexp",
"diff")):
1437 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1438 df = df.reindex(sorted(df.columns), axis=1)
1441 df[
"detector"] = np.int16(detector)
1442 df[
"band"] = band
if band
else pd.NA
1443 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1444 names=(
"dataset",
"column"))
1448 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1449 return pipeBase.Struct(outputCatalog=outputCatalog)
1452class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1453 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1455 inputCatalogs = connectionTypes.Input(
1456 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1457 name=
"mergedForcedSource",
1458 storageClass=
"DataFrame",
1459 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1463 referenceCatalog = connectionTypes.Input(
1464 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1465 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1468 storageClass=
"DataFrame",
1469 dimensions=(
"tract",
"patch",
"skymap"),
1472 outputCatalog = connectionTypes.Output(
1473 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1474 "specified set of functors",
1475 name=
"forcedSourceTable",
1476 storageClass=
"DataFrame",
1477 dimensions=(
"tract",
"patch",
"skymap")
1481class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1482 pipelineConnections=TransformForcedSourceTableConnections):
1483 referenceColumns = pexConfig.ListField(
1485 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1487 doc=
"Columns to pull from reference catalog",
1490 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1495 doc=
"Rename the output DataFrame index to this name",
1497 default=
"forcedSourceId",
1500 def setDefaults(self):
1501 super().setDefaults()
1502 self.functorFile = os.path.join(
"$PIPE_TASKS_DIR",
"schemas",
"ForcedSource.yaml")
1503 self.columnsFromDataId = [
"tract",
"patch"]
1506class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1507 """Transform/standardize a ForcedSource catalog
1509 Transforms each wide, per-detector forcedSource DataFrame per the
1510 specification file (per-camera defaults found in ForcedSource.yaml).
1511 All epochs that overlap the patch are aggregated into one per-patch
1512 narrow-DataFrame file.
1514 No de-duplication of rows is performed. Duplicate resolutions flags are
1515 pulled in from the referenceCatalog: `detect_isPrimary`,
1516 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1517 for analysis or compare duplicates for QA.
1519 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1520 per-visit rows can be retreived by joining with the CcdVisitTable on
1523 _DefaultName =
"transformForcedSourceTable"
1524 ConfigClass = TransformForcedSourceTableConfig
1526 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1527 inputs = butlerQC.get(inputRefs)
1528 if self.funcs
is None:
1529 raise ValueError(
"config.functorFile is None. "
1530 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1531 outputs = self.run(inputs[
"inputCatalogs"], inputs[
"referenceCatalog"], funcs=self.funcs,
1532 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1534 butlerQC.put(outputs, outputRefs)
1536 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1538 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
1539 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1540 for handle
in inputCatalogs:
1541 result = self.transform(
None, handle, funcs, dataId)
1543 dfs.append(result.df.join(ref, how=
"inner"))
1545 outputCatalog = pd.concat(dfs)
1549 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1551 outputCatalog.reset_index(inplace=
True)
1554 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1556 outputCatalog.index.rename(self.config.key, inplace=
True)
1558 self.log.info(
"Made a table of %d columns and %d rows",
1559 len(outputCatalog.columns), len(outputCatalog))
1560 return pipeBase.Struct(outputCatalog=outputCatalog)
1563class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1564 defaultTemplates={
"catalogType":
""},
1565 dimensions=(
"instrument",
"tract")):
1566 inputCatalogs = connectionTypes.Input(
1567 doc=
"Input per-patch DataFrame Tables to be concatenated",
1568 name=
"{catalogType}ForcedSourceTable",
1569 storageClass=
"DataFrame",
1570 dimensions=(
"tract",
"patch",
"skymap"),
1574 outputCatalog = connectionTypes.Output(
1575 doc=
"Output per-tract concatenation of DataFrame Tables",
1576 name=
"{catalogType}ForcedSourceTable_tract",
1577 storageClass=
"DataFrame",
1578 dimensions=(
"tract",
"skymap"),
1582class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1583 pipelineConnections=ConsolidateTractConnections):
1587class ConsolidateTractTask(pipeBase.PipelineTask):
1588 """Concatenate any per-patch, dataframe list into a single
1589 per-tract DataFrame.
1591 _DefaultName =
"ConsolidateTract"
1592 ConfigClass = ConsolidateTractConfig
1594 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1595 inputs = butlerQC.get(inputRefs)
1598 self.log.info(
"Concatenating %s per-patch %s Tables",
1599 len(inputs[
"inputCatalogs"]),
1600 inputRefs.inputCatalogs[0].datasetType.name)
1601 df = pd.concat(inputs[
"inputCatalogs"])
1602 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inputBands=None)