24 from collections
import defaultdict
34 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
36 from lsst.daf.butler
import DeferredDatasetHandle, DataCoordinate
38 from .parquetTable
import ParquetTable
39 from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
40 from .functors
import CompositeFunctor, Column
43 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
44 """Flattens a dataframe with multilevel column index
46 newDf = pd.DataFrame()
48 dfBands = df.columns.unique(level=0).values
51 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
52 newColumns = {c: columnFormat.format(band, c)
53 for c
in subdf.columns
if c
not in noDupCols}
54 cols =
list(newColumns.keys())
55 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
58 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
60 noDupDf = df[presentBands[0]][noDupCols]
61 newDf = pd.concat([noDupDf, newDf], axis=1)
66 defaultTemplates={
"coaddName":
"deep"},
67 dimensions=(
"tract",
"patch",
"skymap")):
68 inputCatalogMeas = connectionTypes.Input(
69 doc=
"Catalog of source measurements on the deepCoadd.",
70 dimensions=(
"tract",
"patch",
"band",
"skymap"),
71 storageClass=
"SourceCatalog",
72 name=
"{coaddName}Coadd_meas",
75 inputCatalogForcedSrc = connectionTypes.Input(
76 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
77 dimensions=(
"tract",
"patch",
"band",
"skymap"),
78 storageClass=
"SourceCatalog",
79 name=
"{coaddName}Coadd_forced_src",
82 inputCatalogRef = connectionTypes.Input(
83 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
84 "for each detection in deepCoadd_mergeDet.",
85 dimensions=(
"tract",
"patch",
"skymap"),
86 storageClass=
"SourceCatalog",
87 name=
"{coaddName}Coadd_ref"
89 outputCatalog = connectionTypes.Output(
90 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
91 "stored as a DataFrame with a multi-level column index per-patch.",
92 dimensions=(
"tract",
"patch",
"skymap"),
93 storageClass=
"DataFrame",
94 name=
"{coaddName}Coadd_obj"
98 class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
99 pipelineConnections=WriteObjectTableConnections):
100 engine = pexConfig.Field(
103 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
105 coaddName = pexConfig.Field(
112 class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
113 """Write filter-merged source tables to parquet
115 _DefaultName =
"writeObjectTable"
116 ConfigClass = WriteObjectTableConfig
117 RunnerClass = MergeSourcesRunner
120 inputDatasets = (
'forced_src',
'meas',
'ref')
123 outputDataset =
'obj'
125 def __init__(self, butler=None, schema=None, **kwargs):
129 super().__init__(**kwargs)
131 def runDataRef(self, patchRefList):
133 @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
134 subclasses that inherit from MergeSourcesTask.
135 @param[in] patchRefList list of data references for each filter
137 catalogs = dict(self.readCatalog(patchRef)
for patchRef
in patchRefList)
138 dataId = patchRefList[0].dataId
139 mergedCatalog = self.run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
140 self.write(patchRefList[0],
ParquetTable(dataFrame=mergedCatalog))
142 def runQuantum(self, butlerQC, inputRefs, outputRefs):
143 inputs = butlerQC.get(inputRefs)
145 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
146 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
147 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
148 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
151 for band
in measDict.keys():
152 catalogs[band] = {
'meas': measDict[band][
'meas'],
153 'forced_src': forcedSourceDict[band][
'forced_src'],
154 'ref': inputs[
'inputCatalogRef']}
155 dataId = butlerQC.quantum.dataId
156 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
157 outputs = pipeBase.Struct(outputCatalog=df)
158 butlerQC.put(outputs, outputRefs)
161 def _makeArgumentParser(cls):
162 """Create a suitable ArgumentParser.
164 We will use the ArgumentParser to get a list of data
165 references for patches; the RunnerClass will sort them into lists
166 of data references for the same patch.
168 References first of self.inputDatasets, rather than
174 """Read input catalogs
176 Read all the input datasets given by the 'inputDatasets'
181 patchRef : `lsst.daf.persistence.ButlerDataRef`
182 Data reference for patch
186 Tuple consisting of band name and a dict of catalogs, keyed by
189 band = patchRef.get(self.config.coaddName +
"Coadd_filterLabel", immediate=
True).bandLabel
191 for dataset
in self.inputDatasets:
192 catalog = patchRef.get(self.config.coaddName +
"Coadd_" + dataset, immediate=
True)
193 self.log.
info(
"Read %d sources from %s for band %s: %s",
194 len(catalog), dataset, band, patchRef.dataId)
195 catalogDict[dataset] = catalog
196 return band, catalogDict
198 def run(self, catalogs, tract, patch):
199 """Merge multiple catalogs.
204 Mapping from filter names to dict of catalogs.
206 tractId to use for the tractId column
208 patchId to use for the patchId column
212 catalog : `pandas.DataFrame`
217 for filt, tableDict
in catalogs.items():
218 for dataset, table
in tableDict.items():
220 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
223 df = df.reindex(sorted(df.columns), axis=1)
224 df[
'tractId'] = tract
225 df[
'patchId'] = patch
228 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
229 names=(
'dataset',
'band',
'column'))
232 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
235 def write(self, patchRef, catalog):
240 catalog : `ParquetTable`
242 patchRef : `lsst.daf.persistence.ButlerDataRef`
243 Data reference for patch
245 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.outputDataset)
248 mergeDataId = patchRef.dataId.copy()
249 del mergeDataId[
"filter"]
250 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
253 """No metadata to write, and not sure how to write it for a list of dataRefs.
258 class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
259 defaultTemplates={
"catalogType":
""},
260 dimensions=(
"instrument",
"visit",
"detector")):
262 catalog = connectionTypes.Input(
263 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
264 name=
"{catalogType}src",
265 storageClass=
"SourceCatalog",
266 dimensions=(
"instrument",
"visit",
"detector")
268 outputCatalog = connectionTypes.Output(
269 doc=
"Catalog of sources, `src` in Parquet format. The 'id' column is "
270 "replaced with an index; all other columns are unchanged.",
271 name=
"{catalogType}source",
272 storageClass=
"DataFrame",
273 dimensions=(
"instrument",
"visit",
"detector")
277 class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
278 pipelineConnections=WriteSourceTableConnections):
279 doApplyExternalPhotoCalib = pexConfig.Field(
282 doc=(
"Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
283 "generating Source Tables from older src tables which do not already have local calib columns")
285 doApplyExternalSkyWcs = pexConfig.Field(
288 doc=(
"Add local WCS columns from the calexp.wcs? Should only set True if "
289 "generating Source Tables from older src tables which do not already have local calib columns")
293 class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
294 """Write source table to parquet
296 _DefaultName =
"writeSourceTable"
297 ConfigClass = WriteSourceTableConfig
299 def runDataRef(self, dataRef):
300 src = dataRef.get(
'src')
301 if self.config.doApplyExternalPhotoCalib
or self.config.doApplyExternalSkyWcs:
302 src = self.addCalibColumns(src, dataRef)
304 ccdVisitId = dataRef.get(
'ccdExposureId')
305 result = self.run(src, ccdVisitId=ccdVisitId)
306 dataRef.put(result.table,
'source')
308 def runQuantum(self, butlerQC, inputRefs, outputRefs):
309 inputs = butlerQC.get(inputRefs)
310 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
311 result = self.run(**inputs).table
312 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
313 butlerQC.put(outputs, outputRefs)
315 def run(self, catalog, ccdVisitId=None):
316 """Convert `src` catalog to parquet
320 catalog: `afwTable.SourceCatalog`
321 catalog to be converted
323 ccdVisitId to be added as a column
327 result : `lsst.pipe.base.Struct`
329 `ParquetTable` version of the input catalog
331 self.log.
info(
"Generating parquet table from src catalog %s", ccdVisitId)
332 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
333 df[
'ccdVisitId'] = ccdVisitId
334 return pipeBase.Struct(table=ParquetTable(dataFrame=df))
336 def addCalibColumns(self, catalog, dataRef):
337 """Add columns with local calibration evaluated at each centroid
339 for backwards compatibility with old repos.
340 This exists for the purpose of converting old src catalogs
341 (which don't have the expected local calib columns) to Source Tables.
345 catalog: `afwTable.SourceCatalog`
346 catalog to which calib columns will be added
347 dataRef: `lsst.daf.persistence.ButlerDataRef
348 for fetching the calibs from disk.
352 newCat: `afwTable.SourceCatalog`
353 Source Catalog with requested local calib columns
356 measureConfig = SingleFrameMeasurementTask.ConfigClass()
357 measureConfig.doReplaceWithNoise =
False
360 exposure = dataRef.get(
'calexp_sub',
364 mapper.addMinimalSchema(catalog.schema,
True)
365 schema = mapper.getOutputSchema()
367 exposureIdInfo = dataRef.get(
"expIdInfo")
368 measureConfig.plugins.names = []
369 if self.config.doApplyExternalSkyWcs:
370 plugin =
'base_LocalWcs'
372 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
374 measureConfig.plugins.names.add(plugin)
376 if self.config.doApplyExternalPhotoCalib:
377 plugin =
'base_LocalPhotoCalib'
379 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
381 measureConfig.plugins.names.add(plugin)
383 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
385 newCat.extend(catalog, mapper=mapper)
386 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
390 """No metadata to write.
395 def _makeArgumentParser(cls):
396 parser = ArgumentParser(name=cls._DefaultName)
397 parser.add_id_argument(
"--id",
'src',
398 help=
"data ID, e.g. --id visit=12345 ccd=0")
402 class PostprocessAnalysis(
object):
403 """Calculate columns from ParquetTable
405 This object manages and organizes an arbitrary set of computations
406 on a catalog. The catalog is defined by a
407 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
408 `deepCoadd_obj` dataset, and the computations are defined by a collection
409 of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
410 a `CompositeFunctor`).
412 After the object is initialized, accessing the `.df` attribute (which
413 holds the `pandas.DataFrame` containing the results of the calculations) triggers
414 computation of said dataframe.
416 One of the conveniences of using this object is the ability to define a desired common
417 filter for all functors. This enables the same functor collection to be passed to
418 several different `PostprocessAnalysis` objects without having to change the original
419 functor collection, since the `filt` keyword argument of this object triggers an
420 overwrite of the `filt` property for all functors in the collection.
422 This object also allows a list of refFlags to be passed, and defines a set of default
423 refFlags that are always included even if not requested.
425 If a list of `ParquetTable` object is passed, rather than a single one, then the
426 calculations will be mapped over all the input catalogs. In principle, it should
427 be straightforward to parallelize this activity, but initial tests have failed
428 (see TODO in code comments).
432 parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
433 Source catalog(s) for computation
435 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
436 Computations to do (functors that act on `parq`).
437 If a dict, the output
438 DataFrame will have columns keyed accordingly.
439 If a list, the column keys will come from the
440 `.shortname` attribute of each functor.
442 filt : `str` (optional)
443 Filter in which to calculate. If provided,
444 this will overwrite any existing `.filt` attribute
445 of the provided functors.
447 flags : `list` (optional)
448 List of flags (per-band) to include in output table.
449 Taken from the `meas` dataset if applied to a multilevel Object Table.
451 refFlags : `list` (optional)
452 List of refFlags (only reference band) to include in output table.
454 forcedFlags : `list` (optional)
455 List of flags (per-band) to include in output table.
456 Taken from the ``forced_src`` dataset if applied to a
457 multilevel Object Table. Intended for flags from measurement plugins
458 only run during multi-band forced-photometry.
460 _defaultRefFlags = []
463 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
465 self.functors = functors
468 self.flags =
list(flags)
if flags
is not None else []
469 self.forcedFlags =
list(forcedFlags)
if forcedFlags
is not None else []
470 self.refFlags =
list(self._defaultRefFlags)
471 if refFlags
is not None:
472 self.refFlags +=
list(refFlags)
477 def defaultFuncs(self):
478 funcs = dict(self._defaultFuncs)
483 additionalFuncs = self.defaultFuncs
484 additionalFuncs.update({flag: Column(flag, dataset=
'forced_src')
for flag
in self.forcedFlags})
485 additionalFuncs.update({flag: Column(flag, dataset=
'ref')
for flag
in self.refFlags})
486 additionalFuncs.update({flag: Column(flag, dataset=
'meas')
for flag
in self.flags})
488 if isinstance(self.functors, CompositeFunctor):
491 func = CompositeFunctor(self.functors)
493 func.funcDict.update(additionalFuncs)
494 func.filt = self.filt
500 return [name
for name, func
in self.func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
508 def compute(self, dropna=False, pool=None):
510 if type(self.parq)
in (list, tuple):
512 dflist = [self.func(parq, dropna=dropna)
for parq
in self.parq]
515 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
516 self._df = pd.concat(dflist)
518 self._df = self.func(self.parq, dropna=dropna)
525 """Expected Connections for subclasses of TransformCatalogBaseTask.
529 inputCatalog = connectionTypes.Input(
531 storageClass=
"DataFrame",
533 outputCatalog = connectionTypes.Output(
535 storageClass=
"DataFrame",
540 pipelineConnections=TransformCatalogBaseConnections):
541 functorFile = pexConfig.Field(
543 doc=
"Path to YAML file specifying Science Data Model functors to use "
544 "when copying columns and computing calibrated values.",
548 primaryKey = pexConfig.Field(
550 doc=
"Name of column to be set as the DataFrame index. If None, the index"
551 "will be named `id`",
558 """Base class for transforming/standardizing a catalog
560 by applying functors that convert units and apply calibrations.
561 The purpose of this task is to perform a set of computations on
562 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
563 results to a new dataset (which needs to be declared in an `outputDataset`
566 The calculations to be performed are defined in a YAML file that specifies
567 a set of functors to be computed, provided as
568 a `--functorFile` config parameter. An example of such a YAML file
593 - base_InputCount_value
596 functor: DeconvolvedMoments
601 - merge_measurement_i
602 - merge_measurement_r
603 - merge_measurement_z
604 - merge_measurement_y
605 - merge_measurement_g
606 - base_PixelFlags_flag_inexact_psfCenter
609 The names for each entry under "func" will become the names of columns in the
610 output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
611 Positional arguments to be passed to each functor are in the `args` list,
612 and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
613 `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
615 The "flags" entry is the default shortcut for `Column` functors.
616 All columns listed under "flags" will be copied to the output table
617 untransformed. They can be of any datatype.
618 In the special case of transforming a multi-level oject table with
619 band and dataset indices (deepCoadd_obj), these will be taked from the
620 `meas` dataset and exploded out per band.
622 There are two special shortcuts that only apply when transforming
623 multi-level Object (deepCoadd_obj) tables:
624 - The "refFlags" entry is shortcut for `Column` functor
625 taken from the `'ref'` dataset if transforming an ObjectTable.
626 - The "forcedFlags" entry is shortcut for `Column` functors.
627 taken from the ``forced_src`` dataset if transforming an ObjectTable.
628 These are expanded out per band.
631 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
632 to organize and excecute the calculations.
636 def _DefaultName(self):
637 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
641 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
645 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
649 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
653 if self.config.functorFile:
654 self.log.
info(
'Loading tranform functor definitions from %s',
655 self.config.functorFile)
656 self.
funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
657 self.
funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
659 self.
funcsfuncs =
None
662 inputs = butlerQC.get(inputRefs)
663 if self.
funcsfuncs
is None:
664 raise ValueError(
"config.functorFile is None. "
665 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
666 result = self.
runrun(parq=inputs[
'inputCatalog'], funcs=self.
funcsfuncs,
667 dataId=outputRefs.outputCatalog.dataId.full)
668 outputs = pipeBase.Struct(outputCatalog=result)
669 butlerQC.put(outputs, outputRefs)
673 if self.
funcsfuncs
is None:
674 raise ValueError(
"config.functorFile is None. "
675 "Must be a valid path to yaml in order to run as a CommandlineTask.")
676 df = self.
runrun(parq, funcs=self.
funcsfuncs, dataId=dataRef.dataId)
677 self.
writewrite(df, dataRef)
680 def run(self, parq, funcs=None, dataId=None, band=None):
681 """Do postprocessing calculations
683 Takes a `ParquetTable` object and dataId,
684 returns a dataframe with results of postprocessing calculations.
688 parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
689 ParquetTable from which calculations are done.
690 funcs : `lsst.pipe.tasks.functors.Functors`
691 Functors to apply to the table's columns
692 dataId : dict, optional
693 Used to add a `patchId` column to the output dataframe.
694 band : `str`, optional
695 Filter band that is being processed.
702 self.log.
info(
"Transforming/standardizing the source table dataId: %s", dataId)
704 df = self.
transformtransform(band, parq, funcs, dataId).df
705 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
709 return self.
funcsfuncs
713 funcs = self.
funcsfuncs
714 analysis = PostprocessAnalysis(parq, funcs, filt=band)
718 analysis = self.
getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
720 if dataId
is not None:
721 for key, value
in dataId.items():
724 if self.config.primaryKey:
725 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
726 df.reset_index(inplace=
True, drop=
True)
727 df.set_index(self.config.primaryKey, inplace=
True)
729 return pipeBase.Struct(
738 """No metadata to write.
743 class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
744 defaultTemplates={
"coaddName":
"deep"},
745 dimensions=(
"tract",
"patch",
"skymap")):
746 inputCatalog = connectionTypes.Input(
747 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
748 "stored as a DataFrame with a multi-level column index per-patch.",
749 dimensions=(
"tract",
"patch",
"skymap"),
750 storageClass=
"DataFrame",
751 name=
"{coaddName}Coadd_obj",
754 outputCatalog = connectionTypes.Output(
755 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
757 dimensions=(
"tract",
"patch",
"skymap"),
758 storageClass=
"DataFrame",
763 class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
764 pipelineConnections=TransformObjectCatalogConnections):
765 coaddName = pexConfig.Field(
771 filterMap = pexConfig.DictField(
775 doc=(
"Dictionary mapping full filter name to short one for column name munging."
776 "These filters determine the output columns no matter what filters the "
777 "input data actually contain."),
778 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
779 "Will be removed after v22.")
781 outputBands = pexConfig.ListField(
785 doc=(
"These bands and only these bands will appear in the output,"
786 " NaN-filled if the input does not include them."
787 " If None, then use all bands found in the input.")
789 camelCase = pexConfig.Field(
792 doc=(
"Write per-band columns names with camelCase, else underscore "
793 "For example: gPsFlux instead of g_PsFlux.")
795 multilevelOutput = pexConfig.Field(
798 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
799 "and name-munged (False).")
804 self.primaryKey =
'objectId'
807 class TransformObjectCatalogTask(TransformCatalogBaseTask):
808 """Produce a flattened Object Table to match the format specified in
811 Do the same set of postprocessing calculations on all bands
813 This is identical to `TransformCatalogBaseTask`, except for that it does the
814 specified functor calculations for all filters present in the
815 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
816 by the YAML file will be superceded.
818 _DefaultName =
"transformObjectCatalog"
819 ConfigClass = TransformObjectCatalogConfig
822 inputDataset =
'deepCoadd_obj'
823 outputDataset =
'objectTable'
826 def _makeArgumentParser(cls):
827 parser = ArgumentParser(name=cls._DefaultName)
828 parser.add_id_argument(
"--id", cls.inputDataset,
829 ContainerClass=CoaddDataIdContainer,
830 help=
"data ID, e.g. --id tract=12345 patch=1,2")
833 def run(self, parq, funcs=None, dataId=None, band=None):
837 templateDf = pd.DataFrame()
839 if isinstance(parq, DeferredDatasetHandle):
840 columns = parq.get(component=
'columns')
841 inputBands = columns.unique(level=1).values
843 inputBands = parq.columnLevelNames[
'band']
845 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
848 for inputBand
in inputBands:
849 if inputBand
not in outputBands:
850 self.log.
info(
"Ignoring %s band data in the input", inputBand)
852 self.log.
info(
"Transforming the catalog of band %s", inputBand)
853 result = self.transform(inputBand, parq, funcs, dataId)
854 dfDict[inputBand] = result.df
855 analysisDict[inputBand] = result.analysis
857 templateDf = result.df
860 for filt
in outputBands:
861 if filt
not in dfDict:
862 self.log.
info(
"Adding empty columns for band %s", filt)
863 dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
866 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
868 if not self.config.multilevelOutput:
869 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
870 if self.config.primaryKey
in noDupCols:
871 noDupCols.remove(self.config.primaryKey)
872 if dataId
is not None:
873 noDupCols +=
list(dataId.keys())
874 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
875 inputBands=inputBands)
877 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
882 class TractObjectDataIdContainer(CoaddDataIdContainer):
884 def makeDataRefList(self, namespace):
885 """Make self.refList from self.idList
887 Generate a list of data references given tract and/or patch.
888 This was adapted from `TractQADataIdContainer`, which was
889 `TractDataIdContainer` modifie to not require "filter".
890 Only existing dataRefs are returned.
892 def getPatchRefList(tract):
893 return [namespace.butler.dataRef(datasetType=self.datasetType,
895 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
897 tractRefs = defaultdict(list)
898 for dataId
in self.idList:
899 skymap = self.getSkymap(namespace)
901 if "tract" in dataId:
902 tractId = dataId[
"tract"]
903 if "patch" in dataId:
904 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
906 patch=dataId[
'patch']))
908 tractRefs[tractId] += getPatchRefList(skymap[tractId])
910 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
913 for tractRefList
in tractRefs.values():
914 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
915 outputRefList.append(existingRefs)
917 self.refList = outputRefList
920 class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
921 dimensions=(
"tract",
"skymap")):
922 inputCatalogs = connectionTypes.Input(
923 doc=
"Per-Patch objectTables conforming to the standard data model.",
925 storageClass=
"DataFrame",
926 dimensions=(
"tract",
"patch",
"skymap"),
929 outputCatalog = connectionTypes.Output(
930 doc=
"Pre-tract horizontal concatenation of the input objectTables",
931 name=
"objectTable_tract",
932 storageClass=
"DataFrame",
933 dimensions=(
"tract",
"skymap"),
937 class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
938 pipelineConnections=ConsolidateObjectTableConnections):
939 coaddName = pexConfig.Field(
946 class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
947 """Write patch-merged source tables to a tract-level parquet file
949 Concatenates `objectTable` list into a per-visit `objectTable_tract`
951 _DefaultName =
"consolidateObjectTable"
952 ConfigClass = ConsolidateObjectTableConfig
954 inputDataset =
'objectTable'
955 outputDataset =
'objectTable_tract'
957 def runQuantum(self, butlerQC, inputRefs, outputRefs):
958 inputs = butlerQC.get(inputRefs)
959 self.log.
info(
"Concatenating %s per-patch Object Tables",
960 len(inputs[
'inputCatalogs']))
961 df = pd.concat(inputs[
'inputCatalogs'])
962 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
965 def _makeArgumentParser(cls):
966 parser = ArgumentParser(name=cls._DefaultName)
968 parser.add_id_argument(
"--id", cls.inputDataset,
969 help=
"data ID, e.g. --id tract=12345",
970 ContainerClass=TractObjectDataIdContainer)
973 def runDataRef(self, patchRefList):
974 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
975 patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
978 """No metadata to write.
983 class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
984 defaultTemplates={
"catalogType":
""},
985 dimensions=(
"instrument",
"visit",
"detector")):
987 inputCatalog = connectionTypes.Input(
988 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
989 name=
"{catalogType}source",
990 storageClass=
"DataFrame",
991 dimensions=(
"instrument",
"visit",
"detector"),
994 outputCatalog = connectionTypes.Output(
995 doc=
"Narrower, per-detector Source Table transformed and converted per a "
996 "specified set of functors",
997 name=
"{catalogType}sourceTable",
998 storageClass=
"DataFrame",
999 dimensions=(
"instrument",
"visit",
"detector")
1003 class TransformSourceTableConfig(TransformCatalogBaseConfig,
1004 pipelineConnections=TransformSourceTableConnections):
1008 self.primaryKey =
'sourceId'
1011 class TransformSourceTableTask(TransformCatalogBaseTask):
1012 """Transform/standardize a source catalog
1014 _DefaultName =
"transformSourceTable"
1015 ConfigClass = TransformSourceTableConfig
1017 inputDataset =
'source'
1018 outputDataset =
'sourceTable'
1021 def _makeArgumentParser(cls):
1022 parser = ArgumentParser(name=cls._DefaultName)
1023 parser.add_id_argument(
"--id", datasetType=cls.inputDataset,
1025 help=
"data ID, e.g. --id visit=12345 ccd=0")
1028 def runDataRef(self, dataRef):
1029 """Override to specify band label to run()."""
1030 parq = dataRef.get()
1031 funcs = self.getFunctors()
1032 band = dataRef.get(
"calexp_filterLabel", immediate=
True).bandLabel
1033 df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
1034 self.write(df, dataRef)
1038 class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1039 dimensions=(
"instrument",
"visit",),
1040 defaultTemplates={
"calexpType":
""}):
1041 calexp = connectionTypes.Input(
1042 doc=
"Processed exposures used for metadata",
1043 name=
"{calexpType}calexp",
1044 storageClass=
"ExposureF",
1045 dimensions=(
"instrument",
"visit",
"detector"),
1049 visitSummary = connectionTypes.Output(
1050 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1051 "detector id for the id and are sorted for fast lookups of a "
1053 name=
"{calexpType}visitSummary",
1054 storageClass=
"ExposureCatalog",
1055 dimensions=(
"instrument",
"visit"),
1059 class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1060 pipelineConnections=ConsolidateVisitSummaryConnections):
1061 """Config for ConsolidateVisitSummaryTask"""
1065 class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1066 """Task to consolidate per-detector visit metadata.
1068 This task aggregates the following metadata from all the detectors in a
1069 single visit into an exposure catalog:
1073 - The physical_filter and band (if available).
1074 - The psf size, shape, and effective area at the center of the detector.
1075 - The corners of the bounding box in right ascension/declination.
1077 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1078 are not persisted here because of storage concerns, and because of their
1079 limited utility as summary statistics.
1081 Tests for this task are performed in ci_hsc_gen3.
1083 _DefaultName =
"consolidateVisitSummary"
1084 ConfigClass = ConsolidateVisitSummaryConfig
1087 def _makeArgumentParser(cls):
1088 parser = ArgumentParser(name=cls._DefaultName)
1090 parser.add_id_argument(
"--id",
"calexp",
1091 help=
"data ID, e.g. --id visit=12345",
1092 ContainerClass=VisitDataIdContainer)
1096 """No metadata to persist, so override to remove metadata persistance.
1100 def writeConfig(self, butler, clobber=False, doBackup=True):
1101 """No config to persist, so override to remove config persistance.
1105 def runDataRef(self, dataRefList):
1106 visit = dataRefList[0].dataId[
'visit']
1108 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1109 len(dataRefList), visit)
1111 expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=
False)
1113 dataRefList[0].put(expCatalog,
'visitSummary', visit=visit)
1115 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1116 dataRefs = butlerQC.get(inputRefs.calexp)
1117 visit = dataRefs[0].dataId.byName()[
'visit']
1119 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1120 len(dataRefs), visit)
1122 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1124 butlerQC.put(expCatalog, outputRefs.visitSummary)
1126 def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1127 """Make a combined exposure catalog from a list of dataRefs.
1128 These dataRefs must point to exposures with wcs, summaryStats,
1129 and other visit metadata.
1134 Visit identification number.
1136 List of dataRefs in visit. May be list of
1137 `lsst.daf.persistence.ButlerDataRef` (Gen2) or
1138 `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1139 isGen3 : `bool`, optional
1140 Specifies if this is a Gen3 list of datarefs.
1144 visitSummary : `lsst.afw.table.ExposureCatalog`
1145 Exposure catalog with per-detector summary information.
1147 schema = self._makeVisitSummarySchema()
1149 cat.resize(len(dataRefs))
1151 cat[
'visit'] = visit
1153 for i, dataRef
in enumerate(dataRefs):
1155 visitInfo = dataRef.get(component=
'visitInfo')
1156 filterLabel = dataRef.get(component=
'filterLabel')
1157 summaryStats = dataRef.get(component=
'summaryStats')
1158 detector = dataRef.get(component=
'detector')
1159 wcs = dataRef.get(component=
'wcs')
1160 photoCalib = dataRef.get(component=
'photoCalib')
1161 detector = dataRef.get(component=
'detector')
1162 bbox = dataRef.get(component=
'bbox')
1163 validPolygon = dataRef.get(component=
'validPolygon')
1168 exp = dataRef.get(datasetType=
'calexp_sub', bbox=gen2_read_bbox)
1169 visitInfo = exp.getInfo().getVisitInfo()
1170 filterLabel = dataRef.get(
"calexp_filterLabel")
1171 summaryStats = exp.getInfo().getSummaryStats()
1173 photoCalib = exp.getPhotoCalib()
1174 detector = exp.getDetector()
1175 bbox = dataRef.get(datasetType=
'calexp_bbox')
1176 validPolygon = exp.getInfo().getValidPolygon()
1180 rec.setVisitInfo(visitInfo)
1182 rec.setPhotoCalib(photoCalib)
1183 rec.setValidPolygon(validPolygon)
1185 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1186 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1187 rec.setId(detector.getId())
1188 rec[
'psfSigma'] = summaryStats.psfSigma
1189 rec[
'psfIxx'] = summaryStats.psfIxx
1190 rec[
'psfIyy'] = summaryStats.psfIyy
1191 rec[
'psfIxy'] = summaryStats.psfIxy
1192 rec[
'psfArea'] = summaryStats.psfArea
1193 rec[
'raCorners'][:] = summaryStats.raCorners
1194 rec[
'decCorners'][:] = summaryStats.decCorners
1195 rec[
'ra'] = summaryStats.ra
1196 rec[
'decl'] = summaryStats.decl
1197 rec[
'zenithDistance'] = summaryStats.zenithDistance
1198 rec[
'zeroPoint'] = summaryStats.zeroPoint
1199 rec[
'skyBg'] = summaryStats.skyBg
1200 rec[
'skyNoise'] = summaryStats.skyNoise
1201 rec[
'meanVar'] = summaryStats.meanVar
1202 rec[
'astromOffsetMean'] = summaryStats.astromOffsetMean
1203 rec[
'astromOffsetStd'] = summaryStats.astromOffsetStd
1206 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1208 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1209 cat.setMetadata(metadata)
1214 def _makeVisitSummarySchema(self):
1215 """Make the schema for the visitSummary catalog."""
1216 schema = afwTable.ExposureTable.makeMinimalSchema()
1217 schema.addField(
'visit', type=
'I', doc=
'Visit number')
1218 schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1219 schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1220 schema.addField(
'psfSigma', type=
'F',
1221 doc=
'PSF model second-moments determinant radius (center of chip) (pixel)')
1222 schema.addField(
'psfArea', type=
'F',
1223 doc=
'PSF model effective area (center of chip) (pixel**2)')
1224 schema.addField(
'psfIxx', type=
'F',
1225 doc=
'PSF model Ixx (center of chip) (pixel**2)')
1226 schema.addField(
'psfIyy', type=
'F',
1227 doc=
'PSF model Iyy (center of chip) (pixel**2)')
1228 schema.addField(
'psfIxy', type=
'F',
1229 doc=
'PSF model Ixy (center of chip) (pixel**2)')
1230 schema.addField(
'raCorners', type=
'ArrayD', size=4,
1231 doc=
'Right Ascension of bounding box corners (degrees)')
1232 schema.addField(
'decCorners', type=
'ArrayD', size=4,
1233 doc=
'Declination of bounding box corners (degrees)')
1234 schema.addField(
'ra', type=
'D',
1235 doc=
'Right Ascension of bounding box center (degrees)')
1236 schema.addField(
'decl', type=
'D',
1237 doc=
'Declination of bounding box center (degrees)')
1238 schema.addField(
'zenithDistance', type=
'F',
1239 doc=
'Zenith distance of bounding box center (degrees)')
1240 schema.addField(
'zeroPoint', type=
'F',
1241 doc=
'Mean zeropoint in detector (mag)')
1242 schema.addField(
'skyBg', type=
'F',
1243 doc=
'Average sky background (ADU)')
1244 schema.addField(
'skyNoise', type=
'F',
1245 doc=
'Average sky noise (ADU)')
1246 schema.addField(
'meanVar', type=
'F',
1247 doc=
'Mean variance of the weight plane (ADU**2)')
1248 schema.addField(
'astromOffsetMean', type=
'F',
1249 doc=
'Mean offset of astrometric calibration matches (arcsec)')
1250 schema.addField(
'astromOffsetStd', type=
'F',
1251 doc=
'Standard deviation of offsets of astrometric calibration matches (arcsec)')
1256 class VisitDataIdContainer(DataIdContainer):
1257 """DataIdContainer that groups sensor-level id's by visit
1260 def makeDataRefList(self, namespace):
1261 """Make self.refList from self.idList
1263 Generate a list of data references grouped by visit.
1267 namespace : `argparse.Namespace`
1268 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
1271 visitRefs = defaultdict(list)
1272 for dataId
in self.idList:
1273 if "visit" in dataId:
1274 visitId = dataId[
"visit"]
1276 subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1277 visitRefs[visitId].extend([dataRef
for dataRef
in subset])
1280 for refList
in visitRefs.values():
1281 existingRefs = [ref
for ref
in refList
if ref.datasetExists()]
1283 outputRefList.append(existingRefs)
1285 self.refList = outputRefList
1288 class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1289 defaultTemplates={
"catalogType":
""},
1290 dimensions=(
"instrument",
"visit")):
1291 inputCatalogs = connectionTypes.Input(
1292 doc=
"Input per-detector Source Tables",
1293 name=
"{catalogType}sourceTable",
1294 storageClass=
"DataFrame",
1295 dimensions=(
"instrument",
"visit",
"detector"),
1298 outputCatalog = connectionTypes.Output(
1299 doc=
"Per-visit concatenation of Source Table",
1300 name=
"{catalogType}sourceTable_visit",
1301 storageClass=
"DataFrame",
1302 dimensions=(
"instrument",
"visit")
1306 class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1307 pipelineConnections=ConsolidateSourceTableConnections):
1311 class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1312 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1314 _DefaultName =
'consolidateSourceTable'
1315 ConfigClass = ConsolidateSourceTableConfig
1317 inputDataset =
'sourceTable'
1318 outputDataset =
'sourceTable_visit'
1320 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1321 inputs = butlerQC.get(inputRefs)
1322 self.log.
info(
"Concatenating %s per-detector Source Tables",
1323 len(inputs[
'inputCatalogs']))
1324 df = pd.concat(inputs[
'inputCatalogs'])
1325 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1327 def runDataRef(self, dataRefList):
1328 self.log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
1329 df = pd.concat([dataRef.get().toDataFrame()
for dataRef
in dataRefList])
1330 dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1333 def _makeArgumentParser(cls):
1334 parser = ArgumentParser(name=cls._DefaultName)
1336 parser.add_id_argument(
"--id", cls.inputDataset,
1337 help=
"data ID, e.g. --id visit=12345",
1338 ContainerClass=VisitDataIdContainer)
1342 """No metadata to write.
1346 def writeConfig(self, butler, clobber=False, doBackup=True):
1347 """No config to write.
1352 class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1353 dimensions=(
"instrument",),
1354 defaultTemplates={}):
1355 visitSummaryRefs = connectionTypes.Input(
1356 doc=
"Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1357 name=
"visitSummary",
1358 storageClass=
"ExposureCatalog",
1359 dimensions=(
"instrument",
"visit"),
1363 outputCatalog = connectionTypes.Output(
1364 doc=
"CCD and Visit metadata table",
1365 name=
"ccdVisitTable",
1366 storageClass=
"DataFrame",
1367 dimensions=(
"instrument",)
1371 class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1372 pipelineConnections=MakeCcdVisitTableConnections):
1376 class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1377 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1379 _DefaultName =
'makeCcdVisitTable'
1380 ConfigClass = MakeCcdVisitTableConfig
1382 def run(self, visitSummaryRefs):
1383 """ Make a table of ccd information from the `visitSummary` catalogs.
1386 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1387 List of DeferredDatasetHandles pointing to exposure catalogs with
1388 per-detector summary information.
1391 result : `lsst.pipe.Base.Struct`
1392 Results struct with attribute:
1394 Catalog of ccd and visit information.
1397 for visitSummaryRef
in visitSummaryRefs:
1398 visitSummary = visitSummaryRef.get()
1399 visitInfo = visitSummary[0].getVisitInfo()
1402 summaryTable = visitSummary.asAstropy()
1403 selectColumns = [
'id',
'visit',
'physical_filter',
'band',
'ra',
'decl',
'zenithDistance',
1404 'zeroPoint',
'psfSigma',
'skyBg',
'skyNoise']
1405 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
'id')
1409 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1410 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id)
for id
in
1412 packer = visitSummaryRef.dataId.universe.makePacker(
'visit_detector', visitSummaryRef.dataId)
1413 ccdVisitIds = [packer.pack(dataId)
for dataId
in dataIds]
1414 ccdEntry[
'ccdVisitId'] = ccdVisitIds
1415 ccdEntry[
'detector'] = summaryTable[
'id']
1416 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds()
for vR
in visitSummary])
1417 ccdEntry[
"seeing"] = visitSummary[
'psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1419 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1420 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1421 expTime = visitInfo.getExposureTime()
1422 ccdEntry[
'expTime'] = expTime
1423 ccdEntry[
"obsStart"] = ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1424 ccdEntry[
'darkTime'] = visitInfo.getDarkTime()
1425 ccdEntry[
'xSize'] = summaryTable[
'bbox_max_x'] - summaryTable[
'bbox_min_x']
1426 ccdEntry[
'ySize'] = summaryTable[
'bbox_max_y'] - summaryTable[
'bbox_min_y']
1427 ccdEntry[
'llcra'] = summaryTable[
'raCorners'][:, 0]
1428 ccdEntry[
'llcdec'] = summaryTable[
'decCorners'][:, 0]
1429 ccdEntry[
'ulcra'] = summaryTable[
'raCorners'][:, 1]
1430 ccdEntry[
'ulcdec'] = summaryTable[
'decCorners'][:, 1]
1431 ccdEntry[
'urcra'] = summaryTable[
'raCorners'][:, 2]
1432 ccdEntry[
'urcdec'] = summaryTable[
'decCorners'][:, 2]
1433 ccdEntry[
'lrcra'] = summaryTable[
'raCorners'][:, 3]
1434 ccdEntry[
'lrcdec'] = summaryTable[
'decCorners'][:, 3]
1437 ccdEntries.append(ccdEntry)
1439 outputCatalog = pd.concat(ccdEntries)
1440 outputCatalog.set_index(
'ccdVisitId', inplace=
True, verify_integrity=
True)
1441 return pipeBase.Struct(outputCatalog=outputCatalog)
1444 class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1445 dimensions=(
"instrument",),
1446 defaultTemplates={}):
1447 visitSummaries = connectionTypes.Input(
1448 doc=
"Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1449 name=
"visitSummary",
1450 storageClass=
"ExposureCatalog",
1451 dimensions=(
"instrument",
"visit",),
1455 outputCatalog = connectionTypes.Output(
1456 doc=
"Visit metadata table",
1458 storageClass=
"DataFrame",
1459 dimensions=(
"instrument",)
1463 class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1464 pipelineConnections=MakeVisitTableConnections):
1468 class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1469 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1471 _DefaultName =
'makeVisitTable'
1472 ConfigClass = MakeVisitTableConfig
1474 def run(self, visitSummaries):
1475 """ Make a table of visit information from the `visitSummary` catalogs
1479 visitSummaries : list of `lsst.afw.table.ExposureCatalog`
1480 List of exposure catalogs with per-detector summary information.
1483 result : `lsst.pipe.Base.Struct`
1484 Results struct with attribute:
1486 Catalog of visit information.
1489 for visitSummary
in visitSummaries:
1490 visitSummary = visitSummary.get()
1491 visitRow = visitSummary[0]
1492 visitInfo = visitRow.getVisitInfo()
1495 visitEntry[
"visitId"] = visitRow[
'visit']
1496 visitEntry[
"visit"] = visitRow[
'visit']
1497 visitEntry[
"physical_filter"] = visitRow[
'physical_filter']
1498 visitEntry[
"band"] = visitRow[
'band']
1499 raDec = visitInfo.getBoresightRaDec()
1500 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1501 visitEntry[
"decl"] = raDec.getDec().asDegrees()
1502 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1503 azAlt = visitInfo.getBoresightAzAlt()
1504 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1505 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1506 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1507 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1508 visitEntry[
"obsStart"] = visitInfo.getDate().toPython()
1509 visitEntry[
"expTime"] = visitInfo.getExposureTime()
1510 visitEntries.append(visitEntry)
1514 outputCatalog = pd.DataFrame(data=visitEntries)
1515 outputCatalog.set_index(
'visitId', inplace=
True, verify_integrity=
True)
1516 return pipeBase.Struct(outputCatalog=outputCatalog)
1519 class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1520 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1522 inputCatalog = connectionTypes.Input(
1523 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1524 "By default, it is the output of ForcedPhotCcdTask on calexps",
1526 storageClass=
"SourceCatalog",
1527 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1529 inputCatalogDiff = connectionTypes.Input(
1530 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1531 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1533 storageClass=
"SourceCatalog",
1534 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1536 outputCatalog = connectionTypes.Output(
1537 doc=
"InputCatalogs horizonatally joined on `objectId` in Parquet format",
1538 name=
"forcedSource",
1539 storageClass=
"DataFrame",
1540 dimensions=(
"instrument",
"visit",
"detector")
1544 class WriteForcedSourceTableConfig(WriteSourceTableConfig,
1545 pipelineConnections=WriteForcedSourceTableConnections):
1547 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1553 class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1554 """Merge and convert per-detector forced source catalogs to parquet
1556 _DefaultName =
"writeForcedSourceTable"
1557 ConfigClass = WriteForcedSourceTableConfig
1559 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1560 inputs = butlerQC.get(inputRefs)
1562 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
1563 inputs[
'band'] = butlerQC.quantum.dataId.full[
'band']
1564 outputs = self.run(**inputs)
1565 butlerQC.put(outputs, outputRefs)
1567 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1569 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
'calexp',
'diff')):
1570 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1571 df = df.reindex(sorted(df.columns), axis=1)
1572 df[
'ccdVisitId'] = ccdVisitId
if ccdVisitId
else pd.NA
1573 df[
'band'] = band
if band
else pd.NA
1574 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1575 names=(
'dataset',
'column'))
1579 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1580 return pipeBase.Struct(outputCatalog=outputCatalog)
1583 class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1584 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1586 inputCatalogs = connectionTypes.Input(
1587 doc=
"Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1588 name=
"forcedSource",
1589 storageClass=
"DataFrame",
1590 dimensions=(
"instrument",
"visit",
"detector"),
1594 referenceCatalog = connectionTypes.Input(
1595 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1596 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1599 storageClass=
"DataFrame",
1600 dimensions=(
"tract",
"patch",
"skymap"),
1603 outputCatalog = connectionTypes.Output(
1604 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1605 "specified set of functors",
1606 name=
"forcedSourceTable",
1607 storageClass=
"DataFrame",
1608 dimensions=(
"tract",
"patch",
"skymap")
1612 class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1613 pipelineConnections=TransformForcedSourceTableConnections):
1614 referenceColumns = pexConfig.ListField(
1616 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1618 doc=
"Columns to pull from reference catalog",
1621 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1626 doc=
"Rename the output DataFrame index to this name",
1628 default=
"forcedSourceId",
1632 class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1633 """Transform/standardize a ForcedSource catalog
1635 Transforms each wide, per-detector forcedSource parquet table per the
1636 specification file (per-camera defaults found in ForcedSource.yaml).
1637 All epochs that overlap the patch are aggregated into one per-patch
1638 narrow-parquet file.
1640 No de-duplication of rows is performed. Duplicate resolutions flags are
1641 pulled in from the referenceCatalog: `detect_isPrimary`,
1642 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1643 for analysis or compare duplicates for QA.
1645 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1646 per-visit rows can be retreived by joining with the CcdVisitTable on
1649 _DefaultName =
"transformForcedSourceTable"
1650 ConfigClass = TransformForcedSourceTableConfig
1652 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1653 inputs = butlerQC.get(inputRefs)
1654 if self.funcs
is None:
1655 raise ValueError(
"config.functorFile is None. "
1656 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1657 outputs = self.run(inputs[
'inputCatalogs'], inputs[
'referenceCatalog'], funcs=self.funcs,
1658 dataId=outputRefs.outputCatalog.dataId.full)
1660 butlerQC.put(outputs, outputRefs)
1662 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1664 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
1665 self.log.
info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1666 for handle
in inputCatalogs:
1667 result = self.transform(
None, handle, funcs, dataId)
1669 dfs.append(result.df.join(ref, how=
'inner'))
1671 outputCatalog = pd.concat(dfs)
1675 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1677 outputCatalog.reset_index(inplace=
True)
1679 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1681 outputCatalog.index.rename(self.config.key, inplace=
True)
1683 self.log.
info(
"Made a table of %d columns and %d rows",
1684 len(outputCatalog.columns), len(outputCatalog))
1685 return pipeBase.Struct(outputCatalog=outputCatalog)
1688 class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1689 defaultTemplates={
"catalogType":
""},
1690 dimensions=(
"instrument",
"tract")):
1691 inputCatalogs = connectionTypes.Input(
1692 doc=
"Input per-patch DataFrame Tables to be concatenated",
1693 name=
"{catalogType}ForcedSourceTable",
1694 storageClass=
"DataFrame",
1695 dimensions=(
"tract",
"patch",
"skymap"),
1699 outputCatalog = connectionTypes.Output(
1700 doc=
"Output per-tract concatenation of DataFrame Tables",
1701 name=
"{catalogType}ForcedSourceTable_tract",
1702 storageClass=
"DataFrame",
1703 dimensions=(
"tract",
"skymap"),
1707 class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1708 pipelineConnections=ConsolidateTractConnections):
1712 class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
1713 """Concatenate any per-patch, dataframe list into a single
1716 _DefaultName =
'ConsolidateTract'
1717 ConfigClass = ConsolidateTractConfig
1719 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1720 inputs = butlerQC.get(inputRefs)
1722 self.log.
info(
"Concatenating %s per-patch %s Tables",
1723 len(inputs[
'inputCatalogs']),
1724 inputRefs.inputCatalogs[0].datasetType.name)
1725 df = pd.concat(inputs[
'inputCatalogs'])
1726 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An integer coordinate rectangle.
def runDataRef(self, dataRef)
def getAnalysis(self, parq, funcs=None, band=None)
def write(self, df, parqRef)
def __init__(self, *args, **kwargs)
def transform(self, band, parq, funcs, dataId)
def run(self, parq, funcs=None, dataId=None, band=None)
def writeMetadata(self, dataRef)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
def run(self, coaddExposures, bbox, wcs)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)