25 from collections
import defaultdict
34 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
36 from lsst.daf.butler
import DeferredDatasetHandle
38 from .parquetTable
import ParquetTable
39 from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
40 from .functors
import CompositeFunctor, RAColumn, DecColumn, Column
43 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
44 """Flattens a dataframe with multilevel column index
46 newDf = pd.DataFrame()
48 dfBands = df.columns.unique(level=0).values
51 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
52 newColumns = {c: columnFormat.format(band, c)
53 for c
in subdf.columns
if c
not in noDupCols}
54 cols =
list(newColumns.keys())
55 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
58 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
60 noDupDf = df[presentBands[0]][noDupCols]
61 newDf = pd.concat([noDupDf, newDf], axis=1)
66 defaultTemplates={
"coaddName":
"deep"},
67 dimensions=(
"tract",
"patch",
"skymap")):
68 inputCatalogMeas = connectionTypes.Input(
69 doc=
"Catalog of source measurements on the deepCoadd.",
70 dimensions=(
"tract",
"patch",
"band",
"skymap"),
71 storageClass=
"SourceCatalog",
72 name=
"{coaddName}Coadd_meas",
75 inputCatalogForcedSrc = connectionTypes.Input(
76 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
77 dimensions=(
"tract",
"patch",
"band",
"skymap"),
78 storageClass=
"SourceCatalog",
79 name=
"{coaddName}Coadd_forced_src",
82 inputCatalogRef = connectionTypes.Input(
83 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
84 "for each detection in deepCoadd_mergeDet.",
85 dimensions=(
"tract",
"patch",
"skymap"),
86 storageClass=
"SourceCatalog",
87 name=
"{coaddName}Coadd_ref"
89 outputCatalog = connectionTypes.Output(
90 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
91 "stored as a DataFrame with a multi-level column index per-patch.",
92 dimensions=(
"tract",
"patch",
"skymap"),
93 storageClass=
"DataFrame",
94 name=
"{coaddName}Coadd_obj"
98 class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
99 pipelineConnections=WriteObjectTableConnections):
100 engine = pexConfig.Field(
103 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
105 coaddName = pexConfig.Field(
112 class WriteObjectTableTask(
CmdLineTask, pipeBase.PipelineTask):
113 """Write filter-merged source tables to parquet
115 _DefaultName =
"writeObjectTable"
116 ConfigClass = WriteObjectTableConfig
117 RunnerClass = MergeSourcesRunner
120 inputDatasets = (
'forced_src',
'meas',
'ref')
123 outputDataset =
'obj'
125 def __init__(self, butler=None, schema=None, **kwargs):
129 super().__init__(**kwargs)
131 def runDataRef(self, patchRefList):
133 @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
134 subclasses that inherit from MergeSourcesTask.
135 @param[in] patchRefList list of data references for each filter
137 catalogs = dict(self.readCatalog(patchRef)
for patchRef
in patchRefList)
138 dataId = patchRefList[0].dataId
139 mergedCatalog = self.run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
140 self.write(patchRefList[0],
ParquetTable(dataFrame=mergedCatalog))
142 def runQuantum(self, butlerQC, inputRefs, outputRefs):
143 inputs = butlerQC.get(inputRefs)
145 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
146 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
147 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
148 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
151 for band
in measDict.keys():
152 catalogs[band] = {
'meas': measDict[band][
'meas'],
153 'forced_src': forcedSourceDict[band][
'forced_src'],
154 'ref': inputs[
'inputCatalogRef']}
155 dataId = butlerQC.quantum.dataId
156 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
157 outputs = pipeBase.Struct(outputCatalog=df)
158 butlerQC.put(outputs, outputRefs)
161 def _makeArgumentParser(cls):
162 """Create a suitable ArgumentParser.
164 We will use the ArgumentParser to get a list of data
165 references for patches; the RunnerClass will sort them into lists
166 of data references for the same patch.
168 References first of self.inputDatasets, rather than
174 """Read input catalogs
176 Read all the input datasets given by the 'inputDatasets'
181 patchRef : `lsst.daf.persistence.ButlerDataRef`
182 Data reference for patch
186 Tuple consisting of band name and a dict of catalogs, keyed by
189 band = patchRef.get(self.config.coaddName +
"Coadd_filterLabel", immediate=
True).bandLabel
191 for dataset
in self.inputDatasets:
192 catalog = patchRef.get(self.config.coaddName +
"Coadd_" + dataset, immediate=
True)
193 self.log.
info(
"Read %d sources from %s for band %s: %s" %
194 (len(catalog), dataset, band, patchRef.dataId))
195 catalogDict[dataset] = catalog
196 return band, catalogDict
198 def run(self, catalogs, tract, patch):
199 """Merge multiple catalogs.
204 Mapping from filter names to dict of catalogs.
206 tractId to use for the tractId column
208 patchId to use for the patchId column
212 catalog : `pandas.DataFrame`
217 for filt, tableDict
in catalogs.items():
218 for dataset, table
in tableDict.items():
220 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
223 df = df.reindex(sorted(df.columns), axis=1)
224 df[
'tractId'] = tract
225 df[
'patchId'] = patch
228 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
229 names=(
'dataset',
'band',
'column'))
232 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
235 def write(self, patchRef, catalog):
240 catalog : `ParquetTable`
242 patchRef : `lsst.daf.persistence.ButlerDataRef`
243 Data reference for patch
245 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.outputDataset)
248 mergeDataId = patchRef.dataId.copy()
249 del mergeDataId[
"filter"]
250 self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
253 """No metadata to write, and not sure how to write it for a list of dataRefs.
258 class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
259 dimensions=(
"instrument",
"visit",
"detector")):
261 catalog = connectionTypes.Input(
262 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
264 storageClass=
"SourceCatalog",
265 dimensions=(
"instrument",
"visit",
"detector")
267 outputCatalog = connectionTypes.Output(
268 doc=
"Catalog of sources, `src` in Parquet format",
270 storageClass=
"DataFrame",
271 dimensions=(
"instrument",
"visit",
"detector")
275 class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
276 pipelineConnections=WriteSourceTableConnections):
277 doApplyExternalPhotoCalib = pexConfig.Field(
280 doc=(
"Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
281 "generating Source Tables from older src tables which do not already have local calib columns")
283 doApplyExternalSkyWcs = pexConfig.Field(
286 doc=(
"Add local WCS columns from the calexp.wcs? Should only set True if "
287 "generating Source Tables from older src tables which do not already have local calib columns")
291 class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
292 """Write source table to parquet
294 _DefaultName =
"writeSourceTable"
295 ConfigClass = WriteSourceTableConfig
297 def runDataRef(self, dataRef):
298 src = dataRef.get(
'src')
299 if self.config.doApplyExternalPhotoCalib
or self.config.doApplyExternalSkyWcs:
300 src = self.addCalibColumns(src, dataRef)
302 ccdVisitId = dataRef.get(
'ccdExposureId')
303 result = self.run(src, ccdVisitId=ccdVisitId)
304 dataRef.put(result.table,
'source')
306 def runQuantum(self, butlerQC, inputRefs, outputRefs):
307 inputs = butlerQC.get(inputRefs)
308 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
309 result = self.run(**inputs).table
310 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
311 butlerQC.put(outputs, outputRefs)
313 def run(self, catalog, ccdVisitId=None):
314 """Convert `src` catalog to parquet
318 catalog: `afwTable.SourceCatalog`
319 catalog to be converted
321 ccdVisitId to be added as a column
325 result : `lsst.pipe.base.Struct`
327 `ParquetTable` version of the input catalog
329 self.log.
info(
"Generating parquet table from src catalog %s", ccdVisitId)
330 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
331 df[
'ccdVisitId'] = ccdVisitId
332 return pipeBase.Struct(table=ParquetTable(dataFrame=df))
334 def addCalibColumns(self, catalog, dataRef):
335 """Add columns with local calibration evaluated at each centroid
337 for backwards compatibility with old repos.
338 This exists for the purpose of converting old src catalogs
339 (which don't have the expected local calib columns) to Source Tables.
343 catalog: `afwTable.SourceCatalog`
344 catalog to which calib columns will be added
345 dataRef: `lsst.daf.persistence.ButlerDataRef
346 for fetching the calibs from disk.
350 newCat: `afwTable.SourceCatalog`
351 Source Catalog with requested local calib columns
354 measureConfig = SingleFrameMeasurementTask.ConfigClass()
355 measureConfig.doReplaceWithNoise =
False
358 exposure = dataRef.get(
'calexp_sub',
362 mapper.addMinimalSchema(catalog.schema,
True)
363 schema = mapper.getOutputSchema()
365 exposureIdInfo = dataRef.get(
"expIdInfo")
366 measureConfig.plugins.names = []
367 if self.config.doApplyExternalSkyWcs:
368 plugin =
'base_LocalWcs'
370 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
372 measureConfig.plugins.names.add(plugin)
374 if self.config.doApplyExternalPhotoCalib:
375 plugin =
'base_LocalPhotoCalib'
377 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
379 measureConfig.plugins.names.add(plugin)
381 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
383 newCat.extend(catalog, mapper=mapper)
384 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
388 """No metadata to write.
393 def _makeArgumentParser(cls):
394 parser = ArgumentParser(name=cls._DefaultName)
395 parser.add_id_argument(
"--id",
'src',
396 help=
"data ID, e.g. --id visit=12345 ccd=0")
400 class PostprocessAnalysis(
object):
401 """Calculate columns from ParquetTable
403 This object manages and organizes an arbitrary set of computations
404 on a catalog. The catalog is defined by a
405 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
406 `deepCoadd_obj` dataset, and the computations are defined by a collection
407 of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
408 a `CompositeFunctor`).
410 After the object is initialized, accessing the `.df` attribute (which
411 holds the `pandas.DataFrame` containing the results of the calculations) triggers
412 computation of said dataframe.
414 One of the conveniences of using this object is the ability to define a desired common
415 filter for all functors. This enables the same functor collection to be passed to
416 several different `PostprocessAnalysis` objects without having to change the original
417 functor collection, since the `filt` keyword argument of this object triggers an
418 overwrite of the `filt` property for all functors in the collection.
420 This object also allows a list of refFlags to be passed, and defines a set of default
421 refFlags that are always included even if not requested.
423 If a list of `ParquetTable` object is passed, rather than a single one, then the
424 calculations will be mapped over all the input catalogs. In principle, it should
425 be straightforward to parallelize this activity, but initial tests have failed
426 (see TODO in code comments).
430 parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
431 Source catalog(s) for computation
433 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
434 Computations to do (functors that act on `parq`).
435 If a dict, the output
436 DataFrame will have columns keyed accordingly.
437 If a list, the column keys will come from the
438 `.shortname` attribute of each functor.
440 filt : `str` (optional)
441 Filter in which to calculate. If provided,
442 this will overwrite any existing `.filt` attribute
443 of the provided functors.
445 flags : `list` (optional)
446 List of flags (per-band) to include in output table.
448 refFlags : `list` (optional)
449 List of refFlags (only reference band) to include in output table.
453 _defaultRefFlags = []
454 _defaultFuncs = ((
'coord_ra', RAColumn()),
455 (
'coord_dec', DecColumn()))
457 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None):
459 self.functors = functors
462 self.flags =
list(flags)
if flags
is not None else []
463 self.refFlags =
list(self._defaultRefFlags)
464 if refFlags
is not None:
465 self.refFlags +=
list(refFlags)
470 def defaultFuncs(self):
471 funcs = dict(self._defaultFuncs)
476 additionalFuncs = self.defaultFuncs
477 additionalFuncs.update({flag: Column(flag, dataset=
'ref')
for flag
in self.refFlags})
478 additionalFuncs.update({flag: Column(flag, dataset=
'meas')
for flag
in self.flags})
480 if isinstance(self.functors, CompositeFunctor):
483 func = CompositeFunctor(self.functors)
485 func.funcDict.update(additionalFuncs)
486 func.filt = self.filt
492 return [name
for name, func
in self.func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
500 def compute(self, dropna=False, pool=None):
502 if type(self.parq)
in (list, tuple):
504 dflist = [self.func(parq, dropna=dropna)
for parq
in self.parq]
507 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
508 self._df = pd.concat(dflist)
510 self._df = self.func(self.parq, dropna=dropna)
517 """Expected Connections for subclasses of TransformCatalogBaseTask.
521 inputCatalog = connectionTypes.Input(
523 storageClass=
"DataFrame",
525 outputCatalog = connectionTypes.Output(
527 storageClass=
"DataFrame",
532 pipelineConnections=TransformCatalogBaseConnections):
533 functorFile = pexConfig.Field(
535 doc=
'Path to YAML file specifying functors to be computed',
542 """Base class for transforming/standardizing a catalog
544 by applying functors that convert units and apply calibrations.
545 The purpose of this task is to perform a set of computations on
546 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
547 results to a new dataset (which needs to be declared in an `outputDataset`
550 The calculations to be performed are defined in a YAML file that specifies
551 a set of functors to be computed, provided as
552 a `--functorFile` config parameter. An example of such a YAML file
577 - base_InputCount_value
580 functor: DeconvolvedMoments
585 - merge_measurement_i
586 - merge_measurement_r
587 - merge_measurement_z
588 - merge_measurement_y
589 - merge_measurement_g
590 - base_PixelFlags_flag_inexact_psfCenter
593 The names for each entry under "func" will become the names of columns in the
594 output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
595 Positional arguments to be passed to each functor are in the `args` list,
596 and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
597 `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
599 The "refFlags" entry is shortcut for a bunch of `Column` functors with the original column and
600 taken from the `'ref'` dataset.
602 The "flags" entry will be expanded out per band.
604 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
605 to organize and excecute the calculations.
609 def _DefaultName(self):
610 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
614 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
618 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
622 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
626 if self.
configconfig.functorFile:
627 self.
loglog.
info(
'Loading tranform functor definitions from %s',
628 self.
configconfig.functorFile)
629 self.
funcsfuncs = CompositeFunctor.from_file(self.
configconfig.functorFile)
630 self.
funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
632 self.
funcsfuncs =
None
635 inputs = butlerQC.get(inputRefs)
636 if self.
funcsfuncs
is None:
637 raise ValueError(
"config.functorFile is None. "
638 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
639 result = self.
runrun(parq=inputs[
'inputCatalog'], funcs=self.
funcsfuncs,
640 dataId=outputRefs.outputCatalog.dataId.full)
641 outputs = pipeBase.Struct(outputCatalog=result)
642 butlerQC.put(outputs, outputRefs)
646 if self.
funcsfuncs
is None:
647 raise ValueError(
"config.functorFile is None. "
648 "Must be a valid path to yaml in order to run as a CommandlineTask.")
649 df = self.
runrun(parq, funcs=self.
funcsfuncs, dataId=dataRef.dataId)
650 self.
writewrite(df, dataRef)
653 def run(self, parq, funcs=None, dataId=None, band=None):
654 """Do postprocessing calculations
656 Takes a `ParquetTable` object and dataId,
657 returns a dataframe with results of postprocessing calculations.
661 parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
662 ParquetTable from which calculations are done.
663 funcs : `lsst.pipe.tasks.functors.Functors`
664 Functors to apply to the table's columns
665 dataId : dict, optional
666 Used to add a `patchId` column to the output dataframe.
667 band : `str`, optional
668 Filter band that is being processed.
675 self.
loglog.
info(
"Transforming/standardizing the source table dataId: %s", dataId)
677 df = self.
transformtransform(band, parq, funcs, dataId).df
678 self.
loglog.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
682 return self.
funcsfuncs
686 funcs = self.
funcsfuncs
687 analysis = PostprocessAnalysis(parq, funcs, filt=band)
691 analysis = self.
getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
693 if dataId
is not None:
694 for key, value
in dataId.items():
697 return pipeBase.Struct(
706 """No metadata to write.
711 class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
712 defaultTemplates={
"coaddName":
"deep"},
713 dimensions=(
"tract",
"patch",
"skymap")):
714 inputCatalog = connectionTypes.Input(
715 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
716 "stored as a DataFrame with a multi-level column index per-patch.",
717 dimensions=(
"tract",
"patch",
"skymap"),
718 storageClass=
"DataFrame",
719 name=
"{coaddName}Coadd_obj",
722 outputCatalog = connectionTypes.Output(
723 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
725 dimensions=(
"tract",
"patch",
"skymap"),
726 storageClass=
"DataFrame",
731 class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
732 pipelineConnections=TransformObjectCatalogConnections):
733 coaddName = pexConfig.Field(
739 filterMap = pexConfig.DictField(
743 doc=(
"Dictionary mapping full filter name to short one for column name munging."
744 "These filters determine the output columns no matter what filters the "
745 "input data actually contain."),
746 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
747 "Will be removed after v22.")
749 outputBands = pexConfig.ListField(
753 doc=(
"These bands and only these bands will appear in the output,"
754 " NaN-filled if the input does not include them."
755 " If None, then use all bands found in the input.")
757 camelCase = pexConfig.Field(
760 doc=(
"Write per-band columns names with camelCase, else underscore "
761 "For example: gPsFlux instead of g_PsFlux.")
763 multilevelOutput = pexConfig.Field(
766 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
767 "and name-munged (False).")
771 class TransformObjectCatalogTask(TransformCatalogBaseTask):
772 """Produce a flattened Object Table to match the format specified in
775 Do the same set of postprocessing calculations on all bands
777 This is identical to `TransformCatalogBaseTask`, except for that it does the
778 specified functor calculations for all filters present in the
779 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
780 by the YAML file will be superceded.
782 _DefaultName =
"transformObjectCatalog"
783 ConfigClass = TransformObjectCatalogConfig
786 inputDataset =
'deepCoadd_obj'
787 outputDataset =
'objectTable'
790 def _makeArgumentParser(cls):
791 parser = ArgumentParser(name=cls._DefaultName)
792 parser.add_id_argument(
"--id", cls.inputDataset,
793 ContainerClass=CoaddDataIdContainer,
794 help=
"data ID, e.g. --id tract=12345 patch=1,2")
797 def run(self, parq, funcs=None, dataId=None, band=None):
801 templateDf = pd.DataFrame()
803 if isinstance(parq, DeferredDatasetHandle):
804 columns = parq.get(component=
'columns')
805 inputBands = columns.unique(level=1).values
807 inputBands = parq.columnLevelNames[
'band']
809 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
812 for inputBand
in inputBands:
813 if inputBand
not in outputBands:
814 self.log.
info(
"Ignoring %s band data in the input", inputBand)
816 self.log.
info(
"Transforming the catalog of band %s", inputBand)
817 result = self.transform(inputBand, parq, funcs, dataId)
818 dfDict[inputBand] = result.df
819 analysisDict[inputBand] = result.analysis
821 templateDf = result.df
824 for filt
in outputBands:
825 if filt
not in dfDict:
826 self.log.
info(
"Adding empty columns for band %s", filt)
827 dfDict[filt] = pd.DataFrame().reindex_like(templateDf)
830 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
832 if not self.config.multilevelOutput:
833 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
834 if dataId
is not None:
835 noDupCols +=
list(dataId.keys())
836 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
837 inputBands=inputBands)
839 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
843 class TractObjectDataIdContainer(CoaddDataIdContainer):
845 def makeDataRefList(self, namespace):
846 """Make self.refList from self.idList
848 Generate a list of data references given tract and/or patch.
849 This was adapted from `TractQADataIdContainer`, which was
850 `TractDataIdContainer` modifie to not require "filter".
851 Only existing dataRefs are returned.
853 def getPatchRefList(tract):
854 return [namespace.butler.dataRef(datasetType=self.datasetType,
856 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
858 tractRefs = defaultdict(list)
859 for dataId
in self.idList:
860 skymap = self.getSkymap(namespace)
862 if "tract" in dataId:
863 tractId = dataId[
"tract"]
864 if "patch" in dataId:
865 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
867 patch=dataId[
'patch']))
869 tractRefs[tractId] += getPatchRefList(skymap[tractId])
871 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
874 for tractRefList
in tractRefs.values():
875 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
876 outputRefList.append(existingRefs)
878 self.refList = outputRefList
881 class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
882 dimensions=(
"tract",
"skymap")):
883 inputCatalogs = connectionTypes.Input(
884 doc=
"Per-Patch objectTables conforming to the standard data model.",
886 storageClass=
"DataFrame",
887 dimensions=(
"tract",
"patch",
"skymap"),
890 outputCatalog = connectionTypes.Output(
891 doc=
"Pre-tract horizontal concatenation of the input objectTables",
892 name=
"objectTable_tract",
893 storageClass=
"DataFrame",
894 dimensions=(
"tract",
"skymap"),
898 class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
899 pipelineConnections=ConsolidateObjectTableConnections):
900 coaddName = pexConfig.Field(
907 class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
908 """Write patch-merged source tables to a tract-level parquet file
910 Concatenates `objectTable` list into a per-visit `objectTable_tract`
912 _DefaultName =
"consolidateObjectTable"
913 ConfigClass = ConsolidateObjectTableConfig
915 inputDataset =
'objectTable'
916 outputDataset =
'objectTable_tract'
918 def runQuantum(self, butlerQC, inputRefs, outputRefs):
919 inputs = butlerQC.get(inputRefs)
920 self.log.
info(
"Concatenating %s per-patch Object Tables",
921 len(inputs[
'inputCatalogs']))
922 df = pd.concat(inputs[
'inputCatalogs'])
923 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
926 def _makeArgumentParser(cls):
927 parser = ArgumentParser(name=cls._DefaultName)
929 parser.add_id_argument(
"--id", cls.inputDataset,
930 help=
"data ID, e.g. --id tract=12345",
931 ContainerClass=TractObjectDataIdContainer)
934 def runDataRef(self, patchRefList):
935 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
936 patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
939 """No metadata to write.
944 class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
945 dimensions=(
"instrument",
"visit",
"detector")):
947 inputCatalog = connectionTypes.Input(
948 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
950 storageClass=
"DataFrame",
951 dimensions=(
"instrument",
"visit",
"detector"),
954 outputCatalog = connectionTypes.Output(
955 doc=
"Narrower, per-detector Source Table transformed and converted per a "
956 "specified set of functors",
958 storageClass=
"DataFrame",
959 dimensions=(
"instrument",
"visit",
"detector")
963 class TransformSourceTableConfig(TransformCatalogBaseConfig,
964 pipelineConnections=TransformSourceTableConnections):
968 class TransformSourceTableTask(TransformCatalogBaseTask):
969 """Transform/standardize a source catalog
971 _DefaultName =
"transformSourceTable"
972 ConfigClass = TransformSourceTableConfig
974 inputDataset =
'source'
975 outputDataset =
'sourceTable'
978 def _makeArgumentParser(cls):
979 parser = ArgumentParser(name=cls._DefaultName)
980 parser.add_id_argument(
"--id", datasetType=cls.inputDataset,
982 help=
"data ID, e.g. --id visit=12345 ccd=0")
985 def runDataRef(self, dataRef):
986 """Override to specify band label to run()."""
988 funcs = self.getFunctors()
989 band = dataRef.get(
"calexp_filterLabel", immediate=
True).bandLabel
990 df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
991 self.write(df, dataRef)
995 class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
996 dimensions=(
"instrument",
"visit",),
997 defaultTemplates={}):
998 calexp = connectionTypes.Input(
999 doc=
"Processed exposures used for metadata",
1001 storageClass=
"ExposureF",
1002 dimensions=(
"instrument",
"visit",
"detector"),
1006 visitSummary = connectionTypes.Output(
1007 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1008 "detector id for the id and are sorted for fast lookups of a "
1010 name=
"visitSummary",
1011 storageClass=
"ExposureCatalog",
1012 dimensions=(
"instrument",
"visit"),
1016 class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1017 pipelineConnections=ConsolidateVisitSummaryConnections):
1018 """Config for ConsolidateVisitSummaryTask"""
1022 class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1023 """Task to consolidate per-detector visit metadata.
1025 This task aggregates the following metadata from all the detectors in a
1026 single visit into an exposure catalog:
1030 - The physical_filter and band (if available).
1031 - The psf size, shape, and effective area at the center of the detector.
1032 - The corners of the bounding box in right ascension/declination.
1034 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1035 are not persisted here because of storage concerns, and because of their
1036 limited utility as summary statistics.
1038 Tests for this task are performed in ci_hsc_gen3.
1040 _DefaultName =
"consolidateVisitSummary"
1041 ConfigClass = ConsolidateVisitSummaryConfig
1044 def _makeArgumentParser(cls):
1045 parser = ArgumentParser(name=cls._DefaultName)
1047 parser.add_id_argument(
"--id",
"calexp",
1048 help=
"data ID, e.g. --id visit=12345",
1049 ContainerClass=VisitDataIdContainer)
1053 """No metadata to persist, so override to remove metadata persistance.
1057 def writeConfig(self, butler, clobber=False, doBackup=True):
1058 """No config to persist, so override to remove config persistance.
1062 def runDataRef(self, dataRefList):
1063 visit = dataRefList[0].dataId[
'visit']
1065 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)" %
1066 (len(dataRefList), visit))
1068 expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=
False)
1070 dataRefList[0].put(expCatalog,
'visitSummary', visit=visit)
1072 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1073 dataRefs = butlerQC.get(inputRefs.calexp)
1074 visit = dataRefs[0].dataId.byName()[
'visit']
1076 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)" %
1077 (len(dataRefs), visit))
1079 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1081 butlerQC.put(expCatalog, outputRefs.visitSummary)
1083 def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1084 """Make a combined exposure catalog from a list of dataRefs.
1089 Visit identification number
1091 List of calexp dataRefs in visit. May be list of
1092 `lsst.daf.persistence.ButlerDataRef` (Gen2) or
1093 `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1094 isGen3 : `bool`, optional
1095 Specifies if this is a Gen3 list of datarefs.
1099 visitSummary : `lsst.afw.table.ExposureCatalog`
1100 Exposure catalog with per-detector summary information.
1102 schema = afwTable.ExposureTable.makeMinimalSchema()
1103 schema.addField(
'visit', type=
'I', doc=
'Visit number')
1104 schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1105 schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1106 schema.addField(
'psfSigma', type=
'F',
1107 doc=
'PSF model second-moments determinant radius (center of chip) (pixel)')
1108 schema.addField(
'psfArea', type=
'F',
1109 doc=
'PSF model effective area (center of chip) (pixel**2)')
1110 schema.addField(
'psfIxx', type=
'F',
1111 doc=
'PSF model Ixx (center of chip) (pixel**2)')
1112 schema.addField(
'psfIyy', type=
'F',
1113 doc=
'PSF model Iyy (center of chip) (pixel**2)')
1114 schema.addField(
'psfIxy', type=
'F',
1115 doc=
'PSF model Ixy (center of chip) (pixel**2)')
1116 schema.addField(
'raCorners', type=
'ArrayD', size=4,
1117 doc=
'Right Ascension of bounding box corners (degrees)')
1118 schema.addField(
'decCorners', type=
'ArrayD', size=4,
1119 doc=
'Declination of bounding box corners (degrees)')
1122 cat.resize(len(dataRefs))
1124 cat[
'visit'] = visit
1126 for i, dataRef
in enumerate(dataRefs):
1128 visitInfo = dataRef.get(component=
'visitInfo')
1129 filterLabel = dataRef.get(component=
'filterLabel')
1130 psf = dataRef.get(component=
'psf')
1131 wcs = dataRef.get(component=
'wcs')
1132 photoCalib = dataRef.get(component=
'photoCalib')
1133 detector = dataRef.get(component=
'detector')
1134 bbox = dataRef.get(component=
'bbox')
1135 validPolygon = dataRef.get(component=
'validPolygon')
1140 exp = dataRef.get(datasetType=
'calexp_sub', bbox=gen2_read_bbox)
1141 visitInfo = exp.getInfo().getVisitInfo()
1142 filterLabel = dataRef.get(
"calexp_filterLabel")
1145 photoCalib = exp.getPhotoCalib()
1146 detector = exp.getDetector()
1147 bbox = dataRef.get(datasetType=
'calexp_bbox')
1148 validPolygon = exp.getInfo().getValidPolygon()
1152 rec.setVisitInfo(visitInfo)
1154 rec.setPhotoCalib(photoCalib)
1155 rec.setValidPolygon(validPolygon)
1157 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1158 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1159 rec.setId(detector.getId())
1160 shape = psf.computeShape(bbox.getCenter())
1161 rec[
'psfSigma'] = shape.getDeterminantRadius()
1162 rec[
'psfIxx'] = shape.getIxx()
1163 rec[
'psfIyy'] = shape.getIyy()
1164 rec[
'psfIxy'] = shape.getIxy()
1165 im = psf.computeKernelImage(bbox.getCenter())
1170 rec[
'psfArea'] = np.sum(im.array)/np.sum(im.array**2.)
1173 rec[
'raCorners'][:] = [sph.getRa().asDegrees()
for sph
in sph_pts]
1174 rec[
'decCorners'][:] = [sph.getDec().asDegrees()
for sph
in sph_pts]
1177 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1179 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1180 cat.setMetadata(metadata)
1186 class VisitDataIdContainer(DataIdContainer):
1187 """DataIdContainer that groups sensor-level id's by visit
1190 def makeDataRefList(self, namespace):
1191 """Make self.refList from self.idList
1193 Generate a list of data references grouped by visit.
1197 namespace : `argparse.Namespace`
1198 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
1201 visitRefs = defaultdict(list)
1202 for dataId
in self.idList:
1203 if "visit" in dataId:
1204 visitId = dataId[
"visit"]
1206 subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1207 visitRefs[visitId].extend([dataRef
for dataRef
in subset])
1210 for refList
in visitRefs.values():
1211 existingRefs = [ref
for ref
in refList
if ref.datasetExists()]
1213 outputRefList.append(existingRefs)
1215 self.refList = outputRefList
1218 class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1219 dimensions=(
"instrument",
"visit")):
1220 inputCatalogs = connectionTypes.Input(
1221 doc=
"Input per-detector Source Tables",
1223 storageClass=
"DataFrame",
1224 dimensions=(
"instrument",
"visit",
"detector"),
1227 outputCatalog = connectionTypes.Output(
1228 doc=
"Per-visit concatenation of Source Table",
1229 name=
"sourceTable_visit",
1230 storageClass=
"DataFrame",
1231 dimensions=(
"instrument",
"visit")
1235 class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1236 pipelineConnections=ConsolidateSourceTableConnections):
1240 class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1241 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1243 _DefaultName =
'consolidateSourceTable'
1244 ConfigClass = ConsolidateSourceTableConfig
1246 inputDataset =
'sourceTable'
1247 outputDataset =
'sourceTable_visit'
1249 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1250 inputs = butlerQC.get(inputRefs)
1251 self.log.
info(
"Concatenating %s per-detector Source Tables",
1252 len(inputs[
'inputCatalogs']))
1253 df = pd.concat(inputs[
'inputCatalogs'])
1254 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1256 def runDataRef(self, dataRefList):
1257 self.log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
1258 df = pd.concat([dataRef.get().toDataFrame()
for dataRef
in dataRefList])
1259 dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1262 def _makeArgumentParser(cls):
1263 parser = ArgumentParser(name=cls._DefaultName)
1265 parser.add_id_argument(
"--id", cls.inputDataset,
1266 help=
"data ID, e.g. --id visit=12345",
1267 ContainerClass=VisitDataIdContainer)
1271 """No metadata to write.
1275 def writeConfig(self, butler, clobber=False, doBackup=True):
1276 """No config to write.
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
A floating-point coordinate rectangle geometry.
An integer coordinate rectangle.
def runDataRef(self, dataRef)
def getAnalysis(self, parq, funcs=None, band=None)
def write(self, df, parqRef)
def __init__(self, *args, **kwargs)
def transform(self, band, parq, funcs, dataId)
def run(self, parq, funcs=None, dataId=None, band=None)
def writeMetadata(self, dataRef)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)