22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
52from lsst.meas.base import SingleFrameMeasurementTask, DetectorVisitIdGeneratorConfig
54from .functors
import CompositeFunctor, Column
56log = logging.getLogger(__name__)
59def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
60 """Flattens a dataframe with multilevel column index.
62 newDf = pd.DataFrame()
64 dfBands = df.columns.unique(level=0).values
67 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
68 newColumns = {c: columnFormat.format(band, c)
69 for c
in subdf.columns
if c
not in noDupCols}
70 cols = list(newColumns.keys())
71 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
74 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
76 noDupDf = df[presentBands[0]][noDupCols]
77 newDf = pd.concat([noDupDf, newDf], axis=1)
82 defaultTemplates={
"coaddName":
"deep"},
83 dimensions=(
"tract",
"patch",
"skymap")):
84 inputCatalogMeas = connectionTypes.Input(
85 doc=
"Catalog of source measurements on the deepCoadd.",
86 dimensions=(
"tract",
"patch",
"band",
"skymap"),
87 storageClass=
"SourceCatalog",
88 name=
"{coaddName}Coadd_meas",
91 inputCatalogForcedSrc = connectionTypes.Input(
92 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
93 dimensions=(
"tract",
"patch",
"band",
"skymap"),
94 storageClass=
"SourceCatalog",
95 name=
"{coaddName}Coadd_forced_src",
98 inputCatalogRef = connectionTypes.Input(
99 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
100 "for each detection in deepCoadd_mergeDet.",
101 dimensions=(
"tract",
"patch",
"skymap"),
102 storageClass=
"SourceCatalog",
103 name=
"{coaddName}Coadd_ref"
105 outputCatalog = connectionTypes.Output(
106 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
107 "stored as a DataFrame with a multi-level column index per-patch.",
108 dimensions=(
"tract",
"patch",
"skymap"),
109 storageClass=
"DataFrame",
110 name=
"{coaddName}Coadd_obj"
114class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
115 pipelineConnections=WriteObjectTableConnections):
116 engine = pexConfig.Field(
119 doc=
"Parquet engine for writing (pyarrow or fastparquet)",
120 deprecated=
"This config is no longer used, and will be removed after v26."
122 coaddName = pexConfig.Field(
129class WriteObjectTableTask(pipeBase.PipelineTask):
130 """Write filter-merged source tables as a DataFrame in parquet format.
132 _DefaultName =
"writeObjectTable"
133 ConfigClass = WriteObjectTableConfig
136 inputDatasets = (
'forced_src',
'meas',
'ref')
139 outputDataset =
'obj'
141 def runQuantum(self, butlerQC, inputRefs, outputRefs):
142 inputs = butlerQC.get(inputRefs)
144 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
145 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
146 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
147 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
150 for band
in measDict.keys():
151 catalogs[band] = {
'meas': measDict[band][
'meas'],
152 'forced_src': forcedSourceDict[band][
'forced_src'],
153 'ref': inputs[
'inputCatalogRef']}
154 dataId = butlerQC.quantum.dataId
155 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
156 outputs = pipeBase.Struct(outputCatalog=df)
157 butlerQC.put(outputs, outputRefs)
159 def run(self, catalogs, tract, patch):
160 """Merge multiple catalogs.
165 Mapping from filter names to dict of catalogs.
167 tractId to use for the tractId column.
169 patchId to use for the patchId column.
173 catalog : `pandas.DataFrame`
177 for filt, tableDict
in catalogs.items():
178 for dataset, table
in tableDict.items():
180 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
183 df = df.reindex(sorted(df.columns), axis=1)
184 df = df.assign(tractId=tract, patchId=patch)
187 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
188 names=(
'dataset',
'band',
'column'))
193 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
197class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
198 defaultTemplates={
"catalogType":
""},
199 dimensions=(
"instrument",
"visit",
"detector")):
201 catalog = connectionTypes.Input(
202 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
203 name=
"{catalogType}src",
204 storageClass=
"SourceCatalog",
205 dimensions=(
"instrument",
"visit",
"detector")
207 outputCatalog = connectionTypes.Output(
208 doc=
"Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is "
209 "replaced with an index; all other columns are unchanged.",
210 name=
"{catalogType}source",
211 storageClass=
"DataFrame",
212 dimensions=(
"instrument",
"visit",
"detector")
216class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
217 pipelineConnections=WriteSourceTableConnections):
218 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
221class WriteSourceTableTask(pipeBase.PipelineTask):
222 """Write source table to DataFrame Parquet format.
224 _DefaultName =
"writeSourceTable"
225 ConfigClass = WriteSourceTableConfig
227 def runQuantum(self, butlerQC, inputRefs, outputRefs):
228 inputs = butlerQC.get(inputRefs)
229 inputs[
'ccdVisitId'] = self.config.idGenerator.apply(butlerQC.quantum.dataId).catalog_id
230 result = self.run(**inputs)
231 outputs = pipeBase.Struct(outputCatalog=result.table)
232 butlerQC.put(outputs, outputRefs)
234 def run(self, catalog, ccdVisitId=None, **kwargs):
235 """Convert `src` catalog to DataFrame
239 catalog: `afwTable.SourceCatalog`
240 catalog to be converted
242 ccdVisitId to be added as a column
244 Additional keyword arguments are ignored as a convenience for
245 subclasses that pass the same arguments to several different
250 result : `~lsst.pipe.base.Struct`
252 `DataFrame` version of the input catalog
254 self.log.info(
"Generating DataFrame from src catalog ccdVisitId=%s", ccdVisitId)
255 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
256 df[
'ccdVisitId'] = ccdVisitId
258 return pipeBase.Struct(table=df)
261class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
262 defaultTemplates={
"catalogType":
""},
263 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
264 exposure = connectionTypes.Input(
265 doc=
"Input exposure to perform photometry on.",
267 storageClass=
"ExposureF",
268 dimensions=[
"instrument",
"visit",
"detector"],
271 "Deprecated, as the `calexp` is not needed and just creates unnecessary i/o. "
272 "Will be removed after v26."
275 visitSummary = connectionTypes.Input(
276 doc=
"Input visit-summary catalog with updated calibration objects.",
277 name=
"finalVisitSummary",
278 storageClass=
"ExposureCatalog",
279 dimensions=(
"instrument",
"visit",),
283class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
284 pipelineConnections=WriteRecalibratedSourceTableConnections):
286 doReevaluatePhotoCalib = pexConfig.Field(
289 doc=(
"Add or replace local photoCalib columns"),
291 doReevaluateSkyWcs = pexConfig.Field(
294 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec"),
296 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
299class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
300 """Write source table to DataFrame Parquet format.
302 _DefaultName =
"writeRecalibratedSourceTable"
303 ConfigClass = WriteRecalibratedSourceTableConfig
305 def runQuantum(self, butlerQC, inputRefs, outputRefs):
306 inputs = butlerQC.get(inputRefs)
308 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
309 inputs[
'idGenerator'] = idGenerator
310 inputs[
'ccdVisitId'] = idGenerator.catalog_id
312 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
313 inputs[
'exposure'] = self.prepareCalibratedExposure(
314 exposure=inputs[
"exposure"],
315 visitSummary=inputs[
"visitSummary"],
316 detectorId=butlerQC.quantum.dataId[
"detector"]
318 inputs[
'catalog'] = self.addCalibColumns(**inputs)
320 result = self.run(**inputs)
321 outputs = pipeBase.Struct(outputCatalog=result.table)
322 butlerQC.put(outputs, outputRefs)
324 def prepareCalibratedExposure(self, exposure, detectorId, visitSummary=None):
325 """Prepare a calibrated exposure and apply external calibrations
330 exposure : `lsst.afw.image.exposure.Exposure`
331 Input exposure to adjust calibrations. May be empty.
333 Detector ID associated with the exposure.
334 visitSummary : `lsst.afw.table.ExposureCatalog`, optional
335 Exposure catalog with all calibration objects. WCS and PhotoCalib
336 are always applied if ``visitSummary`` is provided and those
337 components are not `None`.
341 exposure : `lsst.afw.image.exposure.Exposure`
342 Exposure with adjusted calibrations.
344 if visitSummary
is not None:
345 row = visitSummary.find(detectorId)
347 raise RuntimeError(f
"Visit summary for detector {detectorId} is unexpectedly missing.")
348 if (photoCalib := row.getPhotoCalib())
is None:
349 self.log.warning(
"Detector id %s has None for photoCalib in visit summary; "
350 "skipping reevaluation of photoCalib.", detectorId)
351 exposure.setPhotoCalib(
None)
353 exposure.setPhotoCalib(photoCalib)
354 if (skyWcs := row.getWcs())
is None:
355 self.log.warning(
"Detector id %s has None for skyWcs in visit summary; "
356 "skipping reevaluation of skyWcs.", detectorId)
357 exposure.setWcs(
None)
359 exposure.setWcs(skyWcs)
363 def addCalibColumns(self, catalog, exposure, idGenerator, **kwargs):
364 """Add replace columns with calibs evaluated at each centroid
366 Add or replace 'base_LocalWcs' `base_LocalPhotoCalib' columns in a
367 a source catalog, by rerunning the plugins.
371 catalog : `lsst.afw.table.SourceCatalog`
372 catalog to which calib columns will be added
373 exposure : `lsst.afw.image.exposure.Exposure`
374 Exposure with attached PhotoCalibs and SkyWcs attributes to be
375 reevaluated at local centroids. Pixels are not required.
376 idGenerator : `lsst.meas.base.IdGenerator`
377 Object that generates Source IDs and random seeds.
379 Additional keyword arguments are ignored to facilitate passing the
380 same arguments to several methods.
384 newCat: `lsst.afw.table.SourceCatalog`
385 Source Catalog with requested local calib columns
387 measureConfig = SingleFrameMeasurementTask.ConfigClass()
388 measureConfig.doReplaceWithNoise =
False
391 for slot
in measureConfig.slots:
392 setattr(measureConfig.slots, slot,
None)
394 measureConfig.plugins.names = []
395 if self.config.doReevaluateSkyWcs:
396 measureConfig.plugins.names.add(
"base_LocalWcs")
397 self.log.info(
"Re-evaluating base_LocalWcs plugin")
398 if self.config.doReevaluatePhotoCalib:
399 measureConfig.plugins.names.add(
"base_LocalPhotoCalib")
400 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
401 pluginsNotToCopy = tuple(measureConfig.plugins.names)
405 aliasMap = catalog.schema.getAliasMap()
407 for item
in catalog.schema:
408 if not item.field.getName().startswith(pluginsNotToCopy):
409 mapper.addMapping(item.key)
411 schema = mapper.getOutputSchema()
413 schema.setAliasMap(aliasMap)
415 newCat.extend(catalog, mapper=mapper)
421 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
423 wcsPlugin = measurement.plugins[
"base_LocalWcs"]
427 if self.config.doReevaluatePhotoCalib
and exposure.getPhotoCalib()
is not None:
428 pcPlugin = measurement.plugins[
"base_LocalPhotoCalib"]
433 if wcsPlugin
is not None:
434 wcsPlugin.measure(row, exposure)
435 if pcPlugin
is not None:
436 pcPlugin.measure(row, exposure)
442 """Calculate columns from DataFrames or handles storing DataFrames.
444 This object manages and organizes an arbitrary set of computations
445 on a catalog. The catalog is defined by a
446 `DeferredDatasetHandle` or `InMemoryDatasetHandle` object
447 (or list thereof), such as a ``deepCoadd_obj`` dataset, and the
448 computations are defined by a collection of
449 `~lsst.pipe.tasks.functors.Functor` objects (or, equivalently, a
450 ``CompositeFunctor``).
452 After the object is initialized, accessing the ``.df`` attribute (which
453 holds the `pandas.DataFrame` containing the results of the calculations)
454 triggers computation of said dataframe.
456 One of the conveniences of using this object is the ability to define a
457 desired common filter for all functors. This enables the same functor
458 collection to be passed to several different `PostprocessAnalysis` objects
459 without having to change the original functor collection, since the ``filt``
460 keyword argument of this object triggers an overwrite of the ``filt``
461 property for all functors in the collection.
463 This object also allows a list of refFlags to be passed, and defines a set
464 of default refFlags that are always included even if not requested.
466 If a list of DataFrames or Handles is passed, rather than a single one,
467 then the calculations will be mapped over all the input catalogs. In
468 principle, it should be straightforward to parallelize this activity, but
469 initial tests have failed (see TODO in code comments).
473 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
474 `~lsst.pipe.base.InMemoryDatasetHandle` or
476 Source catalog(s) for computation.
477 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
478 Computations to do (functors that act on ``handles``).
479 If a dict, the output
480 DataFrame will have columns keyed accordingly.
481 If a list, the column keys will come from the
482 ``.shortname`` attribute of each functor.
484 filt : `str`, optional
485 Filter in which to calculate. If provided,
486 this will overwrite any existing ``.filt`` attribute
487 of the provided functors.
489 flags : `list`, optional
490 List of flags (per-band) to include in output table.
491 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
493 refFlags : `list`, optional
494 List of refFlags (only reference band) to include in output table.
496 forcedFlags : `list`, optional
497 List of flags (per-band) to include in output table.
498 Taken from the ``forced_src`` dataset if applied to a
499 multilevel Object Table. Intended for flags from measurement plugins
500 only run during multi-band forced-photometry.
502 _defaultRefFlags = []
505 def __init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
510 self.
flags = list(flags)
if flags
is not None else []
511 self.
forcedFlags = list(forcedFlags)
if forcedFlags
is not None else []
513 if refFlags
is not None:
526 additionalFuncs.update({flag:
Column(flag, dataset=
'forced_src')
for flag
in self.
forcedFlags})
527 additionalFuncs.update({flag:
Column(flag, dataset=
'ref')
for flag
in self.
refFlags})
528 additionalFuncs.update({flag:
Column(flag, dataset=
'meas')
for flag
in self.
flags})
530 if isinstance(self.
functors, CompositeFunctor):
535 func.funcDict.update(additionalFuncs)
536 func.filt = self.
filt
542 return [name
for name, func
in self.
func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
552 if type(self.
handles)
in (list, tuple):
554 dflist = [self.
func(handle, dropna=dropna)
for handle
in self.
handles]
558 dflist = pool.map(functools.partial(self.
func, dropna=dropna), self.
handles)
559 self.
_df = pd.concat(dflist)
568 """Expected Connections for subclasses of TransformCatalogBaseTask.
572 inputCatalog = connectionTypes.Input(
574 storageClass=
"DataFrame",
576 outputCatalog = connectionTypes.Output(
578 storageClass=
"DataFrame",
583 pipelineConnections=TransformCatalogBaseConnections):
584 functorFile = pexConfig.Field(
586 doc=
"Path to YAML file specifying Science Data Model functors to use "
587 "when copying columns and computing calibrated values.",
591 primaryKey = pexConfig.Field(
593 doc=
"Name of column to be set as the DataFrame index. If None, the index"
594 "will be named `id`",
598 columnsFromDataId = pexConfig.ListField(
602 doc=
"Columns to extract from the dataId",
607 """Base class for transforming/standardizing a catalog by applying functors
608 that convert units and apply calibrations.
610 The purpose of this task is to perform a set of computations on an input
611 ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` that holds a
612 ``DataFrame`` dataset (such as ``deepCoadd_obj``), and write the results to
613 a new dataset (which needs to be declared in an ``outputDataset``
616 The calculations to be performed are defined in a YAML file that specifies
617 a set of functors to be computed, provided as a ``--functorFile`` config
618 parameter. An example of such a YAML file is the following:
625 args: slot_Centroid_x
628 args: slot_Centroid_y
630 functor: LocalNanojansky
632 - slot_PsfFlux_instFlux
633 - slot_PsfFlux_instFluxErr
634 - base_LocalPhotoCalib
635 - base_LocalPhotoCalibErr
637 functor: LocalNanojanskyErr
639 - slot_PsfFlux_instFlux
640 - slot_PsfFlux_instFluxErr
641 - base_LocalPhotoCalib
642 - base_LocalPhotoCalibErr
646 The names for each entry under "func" will become the names of columns in
647 the output dataset. All the functors referenced are defined in
648 `~lsst.pipe.tasks.functors`. Positional arguments to be passed to each
649 functor are in the `args` list, and any additional entries for each column
650 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are
651 treated as keyword arguments to be passed to the functor initialization.
653 The "flags" entry is the default shortcut for `Column` functors.
654 All columns listed under "flags" will be copied to the output table
655 untransformed. They can be of any datatype.
656 In the special case of transforming a multi-level oject table with
657 band and dataset indices (deepCoadd_obj), these will be taked from the
658 `meas` dataset and exploded out per band.
660 There are two special shortcuts that only apply when transforming
661 multi-level Object (deepCoadd_obj) tables:
662 - The "refFlags" entry is shortcut for `Column` functor
663 taken from the `'ref'` dataset if transforming an ObjectTable.
664 - The "forcedFlags" entry is shortcut for `Column` functors.
665 taken from the ``forced_src`` dataset if transforming an ObjectTable.
666 These are expanded out per band.
669 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
670 to organize and excecute the calculations.
674 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
678 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
682 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
686 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
689 super().__init__(*args, **kwargs)
690 if self.config.functorFile:
691 self.log.info(
'Loading tranform functor definitions from %s',
692 self.config.functorFile)
693 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
694 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
699 inputs = butlerQC.get(inputRefs)
700 if self.
funcs is None:
701 raise ValueError(
"config.functorFile is None. "
702 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
703 result = self.
run(handle=inputs[
'inputCatalog'], funcs=self.
funcs,
704 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
705 outputs = pipeBase.Struct(outputCatalog=result)
706 butlerQC.put(outputs, outputRefs)
708 def run(self, handle, funcs=None, dataId=None, band=None):
709 """Do postprocessing calculations
711 Takes a ``DeferredDatasetHandle`` or ``InMemoryDatasetHandle`` or
712 ``DataFrame`` object and dataId,
713 returns a dataframe with results of postprocessing calculations.
717 handles : `~lsst.daf.butler.DeferredDatasetHandle` or
718 `~lsst.pipe.base.InMemoryDatasetHandle` or
719 `~pandas.DataFrame`, or list of these.
720 DataFrames from which calculations are done.
721 funcs : `~lsst.pipe.tasks.functors.Functor`
722 Functors to apply to the table's columns
723 dataId : dict, optional
724 Used to add a `patchId` column to the output dataframe.
725 band : `str`, optional
726 Filter band that is being processed.
730 df : `pandas.DataFrame`
732 self.log.info(
"Transforming/standardizing the source table dataId: %s", dataId)
734 df = self.
transform(band, handle, funcs, dataId).df
735 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
748 analysis = self.
getAnalysis(handles, funcs=funcs, band=band)
750 if dataId
and self.config.columnsFromDataId:
751 for key
in self.config.columnsFromDataId:
753 df[key] = dataId[key]
755 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
757 if self.config.primaryKey:
758 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
759 df.reset_index(inplace=
True, drop=
True)
760 df.set_index(self.config.primaryKey, inplace=
True)
762 return pipeBase.Struct(
769 defaultTemplates={
"coaddName":
"deep"},
770 dimensions=(
"tract",
"patch",
"skymap")):
771 inputCatalog = connectionTypes.Input(
772 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
773 "stored as a DataFrame with a multi-level column index per-patch.",
774 dimensions=(
"tract",
"patch",
"skymap"),
775 storageClass=
"DataFrame",
776 name=
"{coaddName}Coadd_obj",
779 outputCatalog = connectionTypes.Output(
780 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
782 dimensions=(
"tract",
"patch",
"skymap"),
783 storageClass=
"DataFrame",
789 pipelineConnections=TransformObjectCatalogConnections):
790 coaddName = pexConfig.Field(
796 filterMap = pexConfig.DictField(
800 doc=(
"Dictionary mapping full filter name to short one for column name munging."
801 "These filters determine the output columns no matter what filters the "
802 "input data actually contain."),
803 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
804 "Will be removed after v22.")
806 outputBands = pexConfig.ListField(
810 doc=(
"These bands and only these bands will appear in the output,"
811 " NaN-filled if the input does not include them."
812 " If None, then use all bands found in the input.")
814 camelCase = pexConfig.Field(
817 doc=(
"Write per-band columns names with camelCase, else underscore "
818 "For example: gPsFlux instead of g_PsFlux.")
820 multilevelOutput = pexConfig.Field(
823 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
824 "and name-munged (False).")
826 goodFlags = pexConfig.ListField(
829 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
830 "All other flags are considered to be 'bad' flags and will be set to True.")
832 floatFillValue = pexConfig.Field(
835 doc=
"Fill value for float fields when populating empty tables."
837 integerFillValue = pexConfig.Field(
840 doc=
"Fill value for integer fields when populating empty tables."
843 def setDefaults(self):
844 super().setDefaults()
845 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Object.yaml')
846 self.primaryKey =
'objectId'
847 self.columnsFromDataId = [
'tract',
'patch']
848 self.goodFlags = [
'calib_astrometry_used',
849 'calib_photometry_reserved',
850 'calib_photometry_used',
851 'calib_psf_candidate',
852 'calib_psf_reserved',
857 """Produce a flattened Object Table to match the format specified in
860 Do the same set of postprocessing calculations on all bands.
862 This is identical to `TransformCatalogBaseTask`, except for that it does
863 the specified functor calculations for all filters present in the
864 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
865 by the YAML file will be superceded.
867 _DefaultName =
"transformObjectCatalog"
868 ConfigClass = TransformObjectCatalogConfig
870 def run(self, handle, funcs=None, dataId=None, band=None):
874 templateDf = pd.DataFrame()
876 columns = handle.get(component=
'columns')
877 inputBands = columns.unique(level=1).values
879 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
882 for inputBand
in inputBands:
883 if inputBand
not in outputBands:
884 self.log.info(
"Ignoring %s band data in the input", inputBand)
886 self.log.info(
"Transforming the catalog of band %s", inputBand)
887 result = self.transform(inputBand, handle, funcs, dataId)
888 dfDict[inputBand] = result.df
889 analysisDict[inputBand] = result.analysis
891 templateDf = result.df
894 for filt
in outputBands:
895 if filt
not in dfDict:
896 self.log.info(
"Adding empty columns for band %s", filt)
897 dfTemp = templateDf.copy()
898 for col
in dfTemp.columns:
899 testValue = dfTemp[col].values[0]
900 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
902 if col
in self.config.goodFlags:
906 elif isinstance(testValue, numbers.Integral):
910 if isinstance(testValue, np.unsignedinteger):
911 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
913 fillValue = self.config.integerFillValue
915 fillValue = self.config.floatFillValue
916 dfTemp[col].values[:] = fillValue
917 dfDict[filt] = dfTemp
920 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
922 if not self.config.multilevelOutput:
923 noDupCols = list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
924 if self.config.primaryKey
in noDupCols:
925 noDupCols.remove(self.config.primaryKey)
926 if dataId
and self.config.columnsFromDataId:
927 noDupCols += self.config.columnsFromDataId
928 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
929 inputBands=inputBands)
931 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
936class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
937 dimensions=(
"tract",
"skymap")):
938 inputCatalogs = connectionTypes.Input(
939 doc=
"Per-Patch objectTables conforming to the standard data model.",
941 storageClass=
"DataFrame",
942 dimensions=(
"tract",
"patch",
"skymap"),
945 outputCatalog = connectionTypes.Output(
946 doc=
"Pre-tract horizontal concatenation of the input objectTables",
947 name=
"objectTable_tract",
948 storageClass=
"DataFrame",
949 dimensions=(
"tract",
"skymap"),
953class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
954 pipelineConnections=ConsolidateObjectTableConnections):
955 coaddName = pexConfig.Field(
962class ConsolidateObjectTableTask(pipeBase.PipelineTask):
963 """Write patch-merged source tables to a tract-level DataFrame Parquet file.
965 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
967 _DefaultName =
"consolidateObjectTable"
968 ConfigClass = ConsolidateObjectTableConfig
970 inputDataset =
'objectTable'
971 outputDataset =
'objectTable_tract'
973 def runQuantum(self, butlerQC, inputRefs, outputRefs):
974 inputs = butlerQC.get(inputRefs)
975 self.log.info(
"Concatenating %s per-patch Object Tables",
976 len(inputs[
'inputCatalogs']))
977 df = pd.concat(inputs[
'inputCatalogs'])
978 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
981class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
982 defaultTemplates={
"catalogType":
""},
983 dimensions=(
"instrument",
"visit",
"detector")):
985 inputCatalog = connectionTypes.Input(
986 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
987 name=
"{catalogType}source",
988 storageClass=
"DataFrame",
989 dimensions=(
"instrument",
"visit",
"detector"),
992 outputCatalog = connectionTypes.Output(
993 doc=
"Narrower, per-detector Source Table transformed and converted per a "
994 "specified set of functors",
995 name=
"{catalogType}sourceTable",
996 storageClass=
"DataFrame",
997 dimensions=(
"instrument",
"visit",
"detector")
1002 pipelineConnections=TransformSourceTableConnections):
1004 def setDefaults(self):
1005 super().setDefaults()
1006 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Source.yaml')
1007 self.primaryKey =
'sourceId'
1008 self.columnsFromDataId = [
'visit',
'detector',
'band',
'physical_filter']
1012 """Transform/standardize a source catalog
1014 _DefaultName =
"transformSourceTable"
1015 ConfigClass = TransformSourceTableConfig
1018class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1019 dimensions=(
"instrument",
"visit",),
1020 defaultTemplates={
"calexpType":
""}):
1021 calexp = connectionTypes.Input(
1022 doc=
"Processed exposures used for metadata",
1024 storageClass=
"ExposureF",
1025 dimensions=(
"instrument",
"visit",
"detector"),
1029 visitSummary = connectionTypes.Output(
1030 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1031 "detector id for the id and are sorted for fast lookups of a "
1033 name=
"visitSummary",
1034 storageClass=
"ExposureCatalog",
1035 dimensions=(
"instrument",
"visit"),
1037 visitSummarySchema = connectionTypes.InitOutput(
1038 doc=
"Schema of the visitSummary catalog",
1039 name=
"visitSummary_schema",
1040 storageClass=
"ExposureCatalog",
1044class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1045 pipelineConnections=ConsolidateVisitSummaryConnections):
1046 """Config for ConsolidateVisitSummaryTask"""
1050class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1051 """Task to consolidate per-detector visit metadata.
1053 This task aggregates the following metadata from all the detectors in a
1054 single visit into an exposure catalog:
1058 - The physical_filter and band (if available).
1059 - The psf size, shape, and effective area at the center of the detector.
1060 - The corners of the bounding box in right ascension/declination.
1062 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1063 are not persisted here because of storage concerns, and because of their
1064 limited utility as summary statistics.
1066 Tests for this task are performed in ci_hsc_gen3.
1068 _DefaultName =
"consolidateVisitSummary"
1069 ConfigClass = ConsolidateVisitSummaryConfig
1071 def __init__(self, **kwargs):
1072 super().__init__(**kwargs)
1073 self.schema = afwTable.ExposureTable.makeMinimalSchema()
1074 self.schema.addField(
'visit', type=
'L', doc=
'Visit number')
1075 self.schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1076 self.schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1077 ExposureSummaryStats.update_schema(self.schema)
1080 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1081 dataRefs = butlerQC.get(inputRefs.calexp)
1082 visit = dataRefs[0].dataId[
'visit']
1084 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1085 len(dataRefs), visit)
1087 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1089 butlerQC.put(expCatalog, outputRefs.visitSummary)
1091 def _combineExposureMetadata(self, visit, dataRefs):
1092 """Make a combined exposure catalog from a list of dataRefs.
1093 These dataRefs must point to exposures with wcs, summaryStats,
1094 and other visit metadata.
1099 Visit identification number.
1100 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1101 List of dataRefs in visit.
1105 visitSummary : `lsst.afw.table.ExposureCatalog`
1106 Exposure catalog with per-detector summary information.
1109 cat.resize(len(dataRefs))
1111 cat[
'visit'] = visit
1113 for i, dataRef
in enumerate(dataRefs):
1114 visitInfo = dataRef.get(component=
'visitInfo')
1115 filterLabel = dataRef.get(component=
'filter')
1116 summaryStats = dataRef.get(component=
'summaryStats')
1117 detector = dataRef.get(component=
'detector')
1118 wcs = dataRef.get(component=
'wcs')
1119 photoCalib = dataRef.get(component=
'photoCalib')
1120 detector = dataRef.get(component=
'detector')
1121 bbox = dataRef.get(component=
'bbox')
1122 validPolygon = dataRef.get(component=
'validPolygon')
1126 rec.setVisitInfo(visitInfo)
1128 rec.setPhotoCalib(photoCalib)
1129 rec.setValidPolygon(validPolygon)
1131 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1132 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1133 rec.setId(detector.getId())
1134 summaryStats.update_record(rec)
1137 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1139 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1140 cat.setMetadata(metadata)
1146class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1147 defaultTemplates={
"catalogType":
""},
1148 dimensions=(
"instrument",
"visit")):
1149 inputCatalogs = connectionTypes.Input(
1150 doc=
"Input per-detector Source Tables",
1151 name=
"{catalogType}sourceTable",
1152 storageClass=
"DataFrame",
1153 dimensions=(
"instrument",
"visit",
"detector"),
1156 outputCatalog = connectionTypes.Output(
1157 doc=
"Per-visit concatenation of Source Table",
1158 name=
"{catalogType}sourceTable_visit",
1159 storageClass=
"DataFrame",
1160 dimensions=(
"instrument",
"visit")
1164class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1165 pipelineConnections=ConsolidateSourceTableConnections):
1169class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1170 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1172 _DefaultName =
'consolidateSourceTable'
1173 ConfigClass = ConsolidateSourceTableConfig
1175 inputDataset =
'sourceTable'
1176 outputDataset =
'sourceTable_visit'
1178 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1179 from .makeWarp
import reorderRefs
1181 detectorOrder = [ref.dataId[
'detector']
for ref
in inputRefs.inputCatalogs]
1182 detectorOrder.sort()
1183 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
1184 inputs = butlerQC.get(inputRefs)
1185 self.log.info(
"Concatenating %s per-detector Source Tables",
1186 len(inputs[
'inputCatalogs']))
1187 df = pd.concat(inputs[
'inputCatalogs'])
1188 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1191class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1192 dimensions=(
"instrument",),
1193 defaultTemplates={
"calexpType":
""}):
1194 visitSummaryRefs = connectionTypes.Input(
1195 doc=
"Data references for per-visit consolidated exposure metadata",
1196 name=
"finalVisitSummary",
1197 storageClass=
"ExposureCatalog",
1198 dimensions=(
"instrument",
"visit"),
1202 outputCatalog = connectionTypes.Output(
1203 doc=
"CCD and Visit metadata table",
1204 name=
"ccdVisitTable",
1205 storageClass=
"DataFrame",
1206 dimensions=(
"instrument",)
1210class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1211 pipelineConnections=MakeCcdVisitTableConnections):
1212 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1215class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1216 """Produce a `ccdVisitTable` from the visit summary exposure catalogs.
1218 _DefaultName =
'makeCcdVisitTable'
1219 ConfigClass = MakeCcdVisitTableConfig
1221 def run(self, visitSummaryRefs):
1222 """Make a table of ccd information from the visit summary catalogs.
1226 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1227 List of DeferredDatasetHandles pointing to exposure catalogs with
1228 per-detector summary information.
1232 result : `~lsst.pipe.base.Struct`
1233 Results struct with attribute:
1236 Catalog of ccd and visit information.
1239 for visitSummaryRef
in visitSummaryRefs:
1240 visitSummary = visitSummaryRef.get()
1241 visitInfo = visitSummary[0].getVisitInfo()
1244 summaryTable = visitSummary.asAstropy()
1245 selectColumns = [
'id',
'visit',
'physical_filter',
'band',
'ra',
'dec',
'zenithDistance',
1246 'zeroPoint',
'psfSigma',
'skyBg',
'skyNoise',
1247 'astromOffsetMean',
'astromOffsetStd',
'nPsfStar',
1248 'psfStarDeltaE1Median',
'psfStarDeltaE2Median',
1249 'psfStarDeltaE1Scatter',
'psfStarDeltaE2Scatter',
1250 'psfStarDeltaSizeMedian',
'psfStarDeltaSizeScatter',
1251 'psfStarScaledDeltaSizeScatter',
1252 'psfTraceRadiusDelta',
'maxDistToNearestPsf',
1253 'effTime',
'effTimePsfSigmaScale',
1254 'effTimeSkyBgScale',
'effTimeZeroPointScale']
1255 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
'id')
1260 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1264 ccdEntry[
"decl"] = ccdEntry.loc[:,
"dec"]
1266 ccdEntry[
'ccdVisitId'] = [
1267 self.config.idGenerator.apply(
1268 visitSummaryRef.dataId,
1269 detector=detector_id,
1276 for detector_id
in summaryTable[
'id']
1278 ccdEntry[
'detector'] = summaryTable[
'id']
1279 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds()
if vR.getWcs()
1280 else np.nan
for vR
in visitSummary])
1281 ccdEntry[
"seeing"] = visitSummary[
'psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1283 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1284 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1285 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1286 expTime = visitInfo.getExposureTime()
1287 ccdEntry[
'expTime'] = expTime
1288 ccdEntry[
"obsStart"] = ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1289 expTime_days = expTime / (60*60*24)
1290 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1291 ccdEntry[
'darkTime'] = visitInfo.getDarkTime()
1292 ccdEntry[
'xSize'] = summaryTable[
'bbox_max_x'] - summaryTable[
'bbox_min_x']
1293 ccdEntry[
'ySize'] = summaryTable[
'bbox_max_y'] - summaryTable[
'bbox_min_y']
1294 ccdEntry[
'llcra'] = summaryTable[
'raCorners'][:, 0]
1295 ccdEntry[
'llcdec'] = summaryTable[
'decCorners'][:, 0]
1296 ccdEntry[
'ulcra'] = summaryTable[
'raCorners'][:, 1]
1297 ccdEntry[
'ulcdec'] = summaryTable[
'decCorners'][:, 1]
1298 ccdEntry[
'urcra'] = summaryTable[
'raCorners'][:, 2]
1299 ccdEntry[
'urcdec'] = summaryTable[
'decCorners'][:, 2]
1300 ccdEntry[
'lrcra'] = summaryTable[
'raCorners'][:, 3]
1301 ccdEntry[
'lrcdec'] = summaryTable[
'decCorners'][:, 3]
1305 ccdEntries.append(ccdEntry)
1307 outputCatalog = pd.concat(ccdEntries)
1308 outputCatalog.set_index(
'ccdVisitId', inplace=
True, verify_integrity=
True)
1309 return pipeBase.Struct(outputCatalog=outputCatalog)
1312class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1313 dimensions=(
"instrument",),
1314 defaultTemplates={
"calexpType":
""}):
1315 visitSummaries = connectionTypes.Input(
1316 doc=
"Per-visit consolidated exposure metadata",
1317 name=
"finalVisitSummary",
1318 storageClass=
"ExposureCatalog",
1319 dimensions=(
"instrument",
"visit",),
1323 outputCatalog = connectionTypes.Output(
1324 doc=
"Visit metadata table",
1326 storageClass=
"DataFrame",
1327 dimensions=(
"instrument",)
1331class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1332 pipelineConnections=MakeVisitTableConnections):
1336class MakeVisitTableTask(pipeBase.PipelineTask):
1337 """Produce a `visitTable` from the visit summary exposure catalogs.
1339 _DefaultName =
'makeVisitTable'
1340 ConfigClass = MakeVisitTableConfig
1342 def run(self, visitSummaries):
1343 """Make a table of visit information from the visit summary catalogs.
1347 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1348 List of exposure catalogs with per-detector summary information.
1351 result : `~lsst.pipe.base.Struct`
1352 Results struct with attribute:
1355 Catalog of visit information.
1358 for visitSummary
in visitSummaries:
1359 visitSummary = visitSummary.get()
1360 visitRow = visitSummary[0]
1361 visitInfo = visitRow.getVisitInfo()
1364 visitEntry[
"visitId"] = visitRow[
'visit']
1365 visitEntry[
"visit"] = visitRow[
'visit']
1366 visitEntry[
"physical_filter"] = visitRow[
'physical_filter']
1367 visitEntry[
"band"] = visitRow[
'band']
1368 raDec = visitInfo.getBoresightRaDec()
1369 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1370 visitEntry[
"dec"] = raDec.getDec().asDegrees()
1374 visitEntry[
"decl"] = visitEntry[
"dec"]
1376 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1377 azAlt = visitInfo.getBoresightAzAlt()
1378 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1379 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1380 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1381 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1382 expTime = visitInfo.getExposureTime()
1383 visitEntry[
"expTime"] = expTime
1384 visitEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1385 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1386 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1387 expTime_days = expTime / (60*60*24)
1388 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1389 visitEntries.append(visitEntry)
1395 outputCatalog = pd.DataFrame(data=visitEntries)
1396 outputCatalog.set_index(
'visitId', inplace=
True, verify_integrity=
True)
1397 return pipeBase.Struct(outputCatalog=outputCatalog)
1400class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1401 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1403 inputCatalog = connectionTypes.Input(
1404 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1405 "By default, it is the output of ForcedPhotCcdTask on calexps",
1407 storageClass=
"SourceCatalog",
1408 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1410 inputCatalogDiff = connectionTypes.Input(
1411 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1412 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1414 storageClass=
"SourceCatalog",
1415 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1417 outputCatalog = connectionTypes.Output(
1418 doc=
"InputCatalogs horizonatally joined on `objectId` in DataFrame parquet format",
1419 name=
"mergedForcedSource",
1420 storageClass=
"DataFrame",
1421 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1425class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1426 pipelineConnections=WriteForcedSourceTableConnections):
1428 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1432 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
1435class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1436 """Merge and convert per-detector forced source catalogs to DataFrame Parquet format.
1438 Because the predecessor ForcedPhotCcdTask operates per-detector,
1439 per-tract, (i.e., it has tract in its dimensions), detectors
1440 on the tract boundary may have multiple forced source catalogs.
1442 The successor task TransformForcedSourceTable runs per-patch
1443 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1444 available multiple epochs.
1446 _DefaultName =
"writeForcedSourceTable"
1447 ConfigClass = WriteForcedSourceTableConfig
1449 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1450 inputs = butlerQC.get(inputRefs)
1452 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
1453 inputs[
'ccdVisitId'] = idGenerator.catalog_id
1454 inputs[
'band'] = butlerQC.quantum.dataId[
'band']
1455 outputs = self.run(**inputs)
1456 butlerQC.put(outputs, outputRefs)
1458 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1460 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
'calexp',
'diff')):
1461 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1462 df = df.reindex(sorted(df.columns), axis=1)
1463 df[
'ccdVisitId'] = ccdVisitId
if ccdVisitId
else pd.NA
1464 df[
'band'] = band
if band
else pd.NA
1465 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1466 names=(
'dataset',
'column'))
1470 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1471 return pipeBase.Struct(outputCatalog=outputCatalog)
1474class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1475 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1477 inputCatalogs = connectionTypes.Input(
1478 doc=
"DataFrames of merged ForcedSources produced by WriteForcedSourceTableTask",
1479 name=
"mergedForcedSource",
1480 storageClass=
"DataFrame",
1481 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1485 referenceCatalog = connectionTypes.Input(
1486 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1487 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1490 storageClass=
"DataFrame",
1491 dimensions=(
"tract",
"patch",
"skymap"),
1494 outputCatalog = connectionTypes.Output(
1495 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1496 "specified set of functors",
1497 name=
"forcedSourceTable",
1498 storageClass=
"DataFrame",
1499 dimensions=(
"tract",
"patch",
"skymap")
1504 pipelineConnections=TransformForcedSourceTableConnections):
1505 referenceColumns = pexConfig.ListField(
1507 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1509 doc=
"Columns to pull from reference catalog",
1512 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1517 doc=
"Rename the output DataFrame index to this name",
1519 default=
"forcedSourceId",
1522 def setDefaults(self):
1523 super().setDefaults()
1524 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'ForcedSource.yaml')
1525 self.columnsFromDataId = [
'tract',
'patch']
1529 """Transform/standardize a ForcedSource catalog
1531 Transforms each wide, per-detector forcedSource DataFrame per the
1532 specification file (per-camera defaults found in ForcedSource.yaml).
1533 All epochs that overlap the patch are aggregated into one per-patch
1534 narrow-DataFrame file.
1536 No de-duplication of rows is performed. Duplicate resolutions flags are
1537 pulled in from the referenceCatalog: `detect_isPrimary`,
1538 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1539 for analysis or compare duplicates for QA.
1541 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1542 per-visit rows can be retreived by joining with the CcdVisitTable on
1545 _DefaultName =
"transformForcedSourceTable"
1546 ConfigClass = TransformForcedSourceTableConfig
1548 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1549 inputs = butlerQC.get(inputRefs)
1550 if self.funcs
is None:
1551 raise ValueError(
"config.functorFile is None. "
1552 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1553 outputs = self.run(inputs[
'inputCatalogs'], inputs[
'referenceCatalog'], funcs=self.funcs,
1554 dataId=dict(outputRefs.outputCatalog.dataId.mapping))
1556 butlerQC.put(outputs, outputRefs)
1558 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1560 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
1561 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1562 for handle
in inputCatalogs:
1563 result = self.transform(
None, handle, funcs, dataId)
1565 dfs.append(result.df.join(ref, how=
'inner'))
1567 outputCatalog = pd.concat(dfs)
1571 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1573 outputCatalog.reset_index(inplace=
True)
1576 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1578 outputCatalog.index.rename(self.config.key, inplace=
True)
1580 self.log.info(
"Made a table of %d columns and %d rows",
1581 len(outputCatalog.columns), len(outputCatalog))
1582 return pipeBase.Struct(outputCatalog=outputCatalog)
1585class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1586 defaultTemplates={
"catalogType":
""},
1587 dimensions=(
"instrument",
"tract")):
1588 inputCatalogs = connectionTypes.Input(
1589 doc=
"Input per-patch DataFrame Tables to be concatenated",
1590 name=
"{catalogType}ForcedSourceTable",
1591 storageClass=
"DataFrame",
1592 dimensions=(
"tract",
"patch",
"skymap"),
1596 outputCatalog = connectionTypes.Output(
1597 doc=
"Output per-tract concatenation of DataFrame Tables",
1598 name=
"{catalogType}ForcedSourceTable_tract",
1599 storageClass=
"DataFrame",
1600 dimensions=(
"tract",
"skymap"),
1604class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1605 pipelineConnections=ConsolidateTractConnections):
1609class ConsolidateTractTask(pipeBase.PipelineTask):
1610 """Concatenate any per-patch, dataframe list into a single
1611 per-tract DataFrame.
1613 _DefaultName =
'ConsolidateTract'
1614 ConfigClass = ConsolidateTractConfig
1616 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1617 inputs = butlerQC.get(inputRefs)
1620 self.log.info(
"Concatenating %s per-patch %s Tables",
1621 len(inputs[
'inputCatalogs']),
1622 inputRefs.inputCatalogs[0].datasetType.name)
1623 df = pd.concat(inputs[
'inputCatalogs'])
1624 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
compute(self, dropna=False, pool=None)
__init__(self, handles, functors, filt=None, flags=None, refFlags=None, forcedFlags=None)
run(self, handle, funcs=None, dataId=None, band=None)
runQuantum(self, butlerQC, inputRefs, outputRefs)
transform(self, band, handles, funcs, dataId)
getAnalysis(self, handles, funcs=None, band=None)
__init__(self, *args, **kwargs)
daf::base::PropertySet * set
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList, bool include_covariance=true)
Update sky coordinates in a collection of source objects.
flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)