24 from collections
import defaultdict
35 from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
37 from lsst.daf.butler
import DeferredDatasetHandle, DataCoordinate
39 from .parquetTable
import ParquetTable
40 from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
41 from .functors
import CompositeFunctor, Column
44 def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
45 """Flattens a dataframe with multilevel column index
47 newDf = pd.DataFrame()
49 dfBands = df.columns.unique(level=0).values
52 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
53 newColumns = {c: columnFormat.format(band, c)
54 for c
in subdf.columns
if c
not in noDupCols}
55 cols =
list(newColumns.keys())
56 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
59 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
61 noDupDf = df[presentBands[0]][noDupCols]
62 newDf = pd.concat([noDupDf, newDf], axis=1)
67 defaultTemplates={
"coaddName":
"deep"},
68 dimensions=(
"tract",
"patch",
"skymap")):
69 inputCatalogMeas = connectionTypes.Input(
70 doc=
"Catalog of source measurements on the deepCoadd.",
71 dimensions=(
"tract",
"patch",
"band",
"skymap"),
72 storageClass=
"SourceCatalog",
73 name=
"{coaddName}Coadd_meas",
76 inputCatalogForcedSrc = connectionTypes.Input(
77 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
78 dimensions=(
"tract",
"patch",
"band",
"skymap"),
79 storageClass=
"SourceCatalog",
80 name=
"{coaddName}Coadd_forced_src",
83 inputCatalogRef = connectionTypes.Input(
84 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
85 "for each detection in deepCoadd_mergeDet.",
86 dimensions=(
"tract",
"patch",
"skymap"),
87 storageClass=
"SourceCatalog",
88 name=
"{coaddName}Coadd_ref"
90 outputCatalog = connectionTypes.Output(
91 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
92 "stored as a DataFrame with a multi-level column index per-patch.",
93 dimensions=(
"tract",
"patch",
"skymap"),
94 storageClass=
"DataFrame",
95 name=
"{coaddName}Coadd_obj"
99 class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
100 pipelineConnections=WriteObjectTableConnections):
101 engine = pexConfig.Field(
104 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
106 coaddName = pexConfig.Field(
113 class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
114 """Write filter-merged source tables to parquet
116 _DefaultName =
"writeObjectTable"
117 ConfigClass = WriteObjectTableConfig
118 RunnerClass = MergeSourcesRunner
121 inputDatasets = (
'forced_src',
'meas',
'ref')
124 outputDataset =
'obj'
126 def __init__(self, butler=None, schema=None, **kwargs):
130 super().__init__(**kwargs)
132 def runDataRef(self, patchRefList):
134 @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
135 subclasses that inherit from MergeSourcesTask.
136 @param[in] patchRefList list of data references for each filter
138 catalogs = dict(self.readCatalog(patchRef)
for patchRef
in patchRefList)
139 dataId = patchRefList[0].dataId
140 mergedCatalog = self.run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
141 self.write(patchRefList[0],
ParquetTable(dataFrame=mergedCatalog))
143 def runQuantum(self, butlerQC, inputRefs, outputRefs):
144 inputs = butlerQC.get(inputRefs)
146 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
147 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
148 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
149 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
152 for band
in measDict.keys():
153 catalogs[band] = {
'meas': measDict[band][
'meas'],
154 'forced_src': forcedSourceDict[band][
'forced_src'],
155 'ref': inputs[
'inputCatalogRef']}
156 dataId = butlerQC.quantum.dataId
157 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
158 outputs = pipeBase.Struct(outputCatalog=df)
159 butlerQC.put(outputs, outputRefs)
162 def _makeArgumentParser(cls):
163 """Create a suitable ArgumentParser.
165 We will use the ArgumentParser to get a list of data
166 references for patches; the RunnerClass will sort them into lists
167 of data references for the same patch.
169 References first of self.inputDatasets, rather than
175 """Read input catalogs
177 Read all the input datasets given by the 'inputDatasets'
182 patchRef : `lsst.daf.persistence.ButlerDataRef`
183 Data reference for patch
187 Tuple consisting of band name and a dict of catalogs, keyed by
190 band = patchRef.get(self.config.coaddName +
"Coadd_filterLabel", immediate=
True).bandLabel
192 for dataset
in self.inputDatasets:
193 catalog = patchRef.get(self.config.coaddName +
"Coadd_" + dataset, immediate=
True)
194 self.log.
info(
"Read %d sources from %s for band %s: %s",
195 len(catalog), dataset, band, patchRef.dataId)
196 catalogDict[dataset] = catalog
197 return band, catalogDict
199 def run(self, catalogs, tract, patch):
200 """Merge multiple catalogs.
205 Mapping from filter names to dict of catalogs.
207 tractId to use for the tractId column
209 patchId to use for the patchId column
213 catalog : `pandas.DataFrame`
218 for filt, tableDict
in catalogs.items():
219 for dataset, table
in tableDict.items():
221 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
224 df = df.reindex(sorted(df.columns), axis=1)
225 df[
'tractId'] = tract
226 df[
'patchId'] = patch
229 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
230 names=(
'dataset',
'band',
'column'))
233 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
236 def write(self, patchRef, catalog):
241 catalog : `ParquetTable`
243 patchRef : `lsst.daf.persistence.ButlerDataRef`
244 Data reference for patch
246 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.outputDataset)
249 mergeDataId = patchRef.dataId.copy()
250 del mergeDataId[
"filter"]
251 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
254 """No metadata to write, and not sure how to write it for a list of dataRefs.
259 class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
260 defaultTemplates={
"catalogType":
""},
261 dimensions=(
"instrument",
"visit",
"detector")):
263 catalog = connectionTypes.Input(
264 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
265 name=
"{catalogType}src",
266 storageClass=
"SourceCatalog",
267 dimensions=(
"instrument",
"visit",
"detector")
269 outputCatalog = connectionTypes.Output(
270 doc=
"Catalog of sources, `src` in Parquet format. The 'id' column is "
271 "replaced with an index; all other columns are unchanged.",
272 name=
"{catalogType}source",
273 storageClass=
"DataFrame",
274 dimensions=(
"instrument",
"visit",
"detector")
278 class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
279 pipelineConnections=WriteSourceTableConnections):
280 doApplyExternalPhotoCalib = pexConfig.Field(
283 doc=(
"Add local photoCalib columns from the calexp.photoCalib? Should only set True if "
284 "generating Source Tables from older src tables which do not already have local calib columns")
286 doApplyExternalSkyWcs = pexConfig.Field(
289 doc=(
"Add local WCS columns from the calexp.wcs? Should only set True if "
290 "generating Source Tables from older src tables which do not already have local calib columns")
294 class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
295 """Write source table to parquet
297 _DefaultName =
"writeSourceTable"
298 ConfigClass = WriteSourceTableConfig
300 def runDataRef(self, dataRef):
301 src = dataRef.get(
'src')
302 if self.config.doApplyExternalPhotoCalib
or self.config.doApplyExternalSkyWcs:
303 src = self.addCalibColumns(src, dataRef)
305 ccdVisitId = dataRef.get(
'ccdExposureId')
306 result = self.run(src, ccdVisitId=ccdVisitId)
307 dataRef.put(result.table,
'source')
309 def runQuantum(self, butlerQC, inputRefs, outputRefs):
310 inputs = butlerQC.get(inputRefs)
311 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
312 result = self.run(**inputs).table
313 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
314 butlerQC.put(outputs, outputRefs)
316 def run(self, catalog, ccdVisitId=None):
317 """Convert `src` catalog to parquet
321 catalog: `afwTable.SourceCatalog`
322 catalog to be converted
324 ccdVisitId to be added as a column
328 result : `lsst.pipe.base.Struct`
330 `ParquetTable` version of the input catalog
332 self.log.
info(
"Generating parquet table from src catalog %s", ccdVisitId)
333 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
334 df[
'ccdVisitId'] = ccdVisitId
335 return pipeBase.Struct(table=ParquetTable(dataFrame=df))
337 def addCalibColumns(self, catalog, dataRef):
338 """Add columns with local calibration evaluated at each centroid
340 for backwards compatibility with old repos.
341 This exists for the purpose of converting old src catalogs
342 (which don't have the expected local calib columns) to Source Tables.
346 catalog: `afwTable.SourceCatalog`
347 catalog to which calib columns will be added
348 dataRef: `lsst.daf.persistence.ButlerDataRef
349 for fetching the calibs from disk.
353 newCat: `afwTable.SourceCatalog`
354 Source Catalog with requested local calib columns
357 measureConfig = SingleFrameMeasurementTask.ConfigClass()
358 measureConfig.doReplaceWithNoise =
False
361 exposure = dataRef.get(
'calexp_sub',
365 mapper.addMinimalSchema(catalog.schema,
True)
366 schema = mapper.getOutputSchema()
368 exposureIdInfo = dataRef.get(
"expIdInfo")
369 measureConfig.plugins.names = []
370 if self.config.doApplyExternalSkyWcs:
371 plugin =
'base_LocalWcs'
373 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalSkyWcs=False")
375 measureConfig.plugins.names.add(plugin)
377 if self.config.doApplyExternalPhotoCalib:
378 plugin =
'base_LocalPhotoCalib'
380 raise RuntimeError(f
"{plugin} already in src catalog. Set doApplyExternalPhotoCalib=False")
382 measureConfig.plugins.names.add(plugin)
384 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
386 newCat.extend(catalog, mapper=mapper)
387 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
391 """No metadata to write.
396 def _makeArgumentParser(cls):
397 parser = ArgumentParser(name=cls._DefaultName)
398 parser.add_id_argument(
"--id",
'src',
399 help=
"data ID, e.g. --id visit=12345 ccd=0")
403 class PostprocessAnalysis(
object):
404 """Calculate columns from ParquetTable
406 This object manages and organizes an arbitrary set of computations
407 on a catalog. The catalog is defined by a
408 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
409 `deepCoadd_obj` dataset, and the computations are defined by a collection
410 of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
411 a `CompositeFunctor`).
413 After the object is initialized, accessing the `.df` attribute (which
414 holds the `pandas.DataFrame` containing the results of the calculations) triggers
415 computation of said dataframe.
417 One of the conveniences of using this object is the ability to define a desired common
418 filter for all functors. This enables the same functor collection to be passed to
419 several different `PostprocessAnalysis` objects without having to change the original
420 functor collection, since the `filt` keyword argument of this object triggers an
421 overwrite of the `filt` property for all functors in the collection.
423 This object also allows a list of refFlags to be passed, and defines a set of default
424 refFlags that are always included even if not requested.
426 If a list of `ParquetTable` object is passed, rather than a single one, then the
427 calculations will be mapped over all the input catalogs. In principle, it should
428 be straightforward to parallelize this activity, but initial tests have failed
429 (see TODO in code comments).
433 parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
434 Source catalog(s) for computation
436 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
437 Computations to do (functors that act on `parq`).
438 If a dict, the output
439 DataFrame will have columns keyed accordingly.
440 If a list, the column keys will come from the
441 `.shortname` attribute of each functor.
443 filt : `str` (optional)
444 Filter in which to calculate. If provided,
445 this will overwrite any existing `.filt` attribute
446 of the provided functors.
448 flags : `list` (optional)
449 List of flags (per-band) to include in output table.
450 Taken from the `meas` dataset if applied to a multilevel Object Table.
452 refFlags : `list` (optional)
453 List of refFlags (only reference band) to include in output table.
455 forcedFlags : `list` (optional)
456 List of flags (per-band) to include in output table.
457 Taken from the ``forced_src`` dataset if applied to a
458 multilevel Object Table. Intended for flags from measurement plugins
459 only run during multi-band forced-photometry.
461 _defaultRefFlags = []
464 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
466 self.functors = functors
469 self.flags =
list(flags)
if flags
is not None else []
470 self.forcedFlags =
list(forcedFlags)
if forcedFlags
is not None else []
471 self.refFlags =
list(self._defaultRefFlags)
472 if refFlags
is not None:
473 self.refFlags +=
list(refFlags)
478 def defaultFuncs(self):
479 funcs = dict(self._defaultFuncs)
484 additionalFuncs = self.defaultFuncs
485 additionalFuncs.update({flag: Column(flag, dataset=
'forced_src')
for flag
in self.forcedFlags})
486 additionalFuncs.update({flag: Column(flag, dataset=
'ref')
for flag
in self.refFlags})
487 additionalFuncs.update({flag: Column(flag, dataset=
'meas')
for flag
in self.flags})
489 if isinstance(self.functors, CompositeFunctor):
492 func = CompositeFunctor(self.functors)
494 func.funcDict.update(additionalFuncs)
495 func.filt = self.filt
501 return [name
for name, func
in self.func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
509 def compute(self, dropna=False, pool=None):
511 if type(self.parq)
in (list, tuple):
513 dflist = [self.func(parq, dropna=dropna)
for parq
in self.parq]
516 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
517 self._df = pd.concat(dflist)
519 self._df = self.func(self.parq, dropna=dropna)
526 """Expected Connections for subclasses of TransformCatalogBaseTask.
530 inputCatalog = connectionTypes.Input(
532 storageClass=
"DataFrame",
534 outputCatalog = connectionTypes.Output(
536 storageClass=
"DataFrame",
541 pipelineConnections=TransformCatalogBaseConnections):
542 functorFile = pexConfig.Field(
544 doc=
"Path to YAML file specifying Science Data Model functors to use "
545 "when copying columns and computing calibrated values.",
549 primaryKey = pexConfig.Field(
551 doc=
"Name of column to be set as the DataFrame index. If None, the index"
552 "will be named `id`",
559 """Base class for transforming/standardizing a catalog
561 by applying functors that convert units and apply calibrations.
562 The purpose of this task is to perform a set of computations on
563 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
564 results to a new dataset (which needs to be declared in an `outputDataset`
567 The calculations to be performed are defined in a YAML file that specifies
568 a set of functors to be computed, provided as
569 a `--functorFile` config parameter. An example of such a YAML file
594 - base_InputCount_value
597 functor: DeconvolvedMoments
602 - merge_measurement_i
603 - merge_measurement_r
604 - merge_measurement_z
605 - merge_measurement_y
606 - merge_measurement_g
607 - base_PixelFlags_flag_inexact_psfCenter
610 The names for each entry under "func" will become the names of columns in the
611 output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
612 Positional arguments to be passed to each functor are in the `args` list,
613 and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
614 `'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
616 The "flags" entry is the default shortcut for `Column` functors.
617 All columns listed under "flags" will be copied to the output table
618 untransformed. They can be of any datatype.
619 In the special case of transforming a multi-level oject table with
620 band and dataset indices (deepCoadd_obj), these will be taked from the
621 `meas` dataset and exploded out per band.
623 There are two special shortcuts that only apply when transforming
624 multi-level Object (deepCoadd_obj) tables:
625 - The "refFlags" entry is shortcut for `Column` functor
626 taken from the `'ref'` dataset if transforming an ObjectTable.
627 - The "forcedFlags" entry is shortcut for `Column` functors.
628 taken from the ``forced_src`` dataset if transforming an ObjectTable.
629 These are expanded out per band.
632 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
633 to organize and excecute the calculations.
637 def _DefaultName(self):
638 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
642 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
646 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
650 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
654 if self.config.functorFile:
655 self.log.
info(
'Loading tranform functor definitions from %s',
656 self.config.functorFile)
657 self.
funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
658 self.
funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
660 self.
funcsfuncs =
None
663 inputs = butlerQC.get(inputRefs)
664 if self.
funcsfuncs
is None:
665 raise ValueError(
"config.functorFile is None. "
666 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
667 result = self.
runrun(parq=inputs[
'inputCatalog'], funcs=self.
funcsfuncs,
668 dataId=outputRefs.outputCatalog.dataId.full)
669 outputs = pipeBase.Struct(outputCatalog=result)
670 butlerQC.put(outputs, outputRefs)
674 if self.
funcsfuncs
is None:
675 raise ValueError(
"config.functorFile is None. "
676 "Must be a valid path to yaml in order to run as a CommandlineTask.")
677 df = self.
runrun(parq, funcs=self.
funcsfuncs, dataId=dataRef.dataId)
678 self.
writewrite(df, dataRef)
681 def run(self, parq, funcs=None, dataId=None, band=None):
682 """Do postprocessing calculations
684 Takes a `ParquetTable` object and dataId,
685 returns a dataframe with results of postprocessing calculations.
689 parq : `lsst.pipe.tasks.parquetTable.ParquetTable`
690 ParquetTable from which calculations are done.
691 funcs : `lsst.pipe.tasks.functors.Functors`
692 Functors to apply to the table's columns
693 dataId : dict, optional
694 Used to add a `patchId` column to the output dataframe.
695 band : `str`, optional
696 Filter band that is being processed.
703 self.log.
info(
"Transforming/standardizing the source table dataId: %s", dataId)
705 df = self.
transformtransform(band, parq, funcs, dataId).df
706 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
710 return self.
funcsfuncs
714 funcs = self.
funcsfuncs
715 analysis = PostprocessAnalysis(parq, funcs, filt=band)
719 analysis = self.
getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
721 if dataId
is not None:
722 for key, value
in dataId.items():
725 if self.config.primaryKey:
726 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
727 df.reset_index(inplace=
True, drop=
True)
728 df.set_index(self.config.primaryKey, inplace=
True)
730 return pipeBase.Struct(
739 """No metadata to write.
744 class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
745 defaultTemplates={
"coaddName":
"deep"},
746 dimensions=(
"tract",
"patch",
"skymap")):
747 inputCatalog = connectionTypes.Input(
748 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
749 "stored as a DataFrame with a multi-level column index per-patch.",
750 dimensions=(
"tract",
"patch",
"skymap"),
751 storageClass=
"DataFrame",
752 name=
"{coaddName}Coadd_obj",
755 outputCatalog = connectionTypes.Output(
756 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
758 dimensions=(
"tract",
"patch",
"skymap"),
759 storageClass=
"DataFrame",
764 class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
765 pipelineConnections=TransformObjectCatalogConnections):
766 coaddName = pexConfig.Field(
772 filterMap = pexConfig.DictField(
776 doc=(
"Dictionary mapping full filter name to short one for column name munging."
777 "These filters determine the output columns no matter what filters the "
778 "input data actually contain."),
779 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
780 "Will be removed after v22.")
782 outputBands = pexConfig.ListField(
786 doc=(
"These bands and only these bands will appear in the output,"
787 " NaN-filled if the input does not include them."
788 " If None, then use all bands found in the input.")
790 camelCase = pexConfig.Field(
793 doc=(
"Write per-band columns names with camelCase, else underscore "
794 "For example: gPsFlux instead of g_PsFlux.")
796 multilevelOutput = pexConfig.Field(
799 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
800 "and name-munged (False).")
802 goodFlags = pexConfig.ListField(
805 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
806 "All other flags are considered to be 'bad' flags and will be set to True.")
808 floatFillValue = pexConfig.Field(
811 doc=
"Fill value for float fields when populating empty tables."
813 integerFillValue = pexConfig.Field(
816 doc=
"Fill value for integer fields when populating empty tables."
821 self.primaryKey =
'objectId'
822 self.goodFlags = [
'calib_astrometry_used',
823 'calib_photometry_reserved',
824 'calib_photometry_used',
825 'calib_psf_candidate',
826 'calib_psf_reserved',
830 class TransformObjectCatalogTask(TransformCatalogBaseTask):
831 """Produce a flattened Object Table to match the format specified in
834 Do the same set of postprocessing calculations on all bands
836 This is identical to `TransformCatalogBaseTask`, except for that it does the
837 specified functor calculations for all filters present in the
838 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
839 by the YAML file will be superceded.
841 _DefaultName =
"transformObjectCatalog"
842 ConfigClass = TransformObjectCatalogConfig
845 inputDataset =
'deepCoadd_obj'
846 outputDataset =
'objectTable'
849 def _makeArgumentParser(cls):
850 parser = ArgumentParser(name=cls._DefaultName)
851 parser.add_id_argument(
"--id", cls.inputDataset,
852 ContainerClass=CoaddDataIdContainer,
853 help=
"data ID, e.g. --id tract=12345 patch=1,2")
856 def run(self, parq, funcs=None, dataId=None, band=None):
860 templateDf = pd.DataFrame()
862 if isinstance(parq, DeferredDatasetHandle):
863 columns = parq.get(component=
'columns')
864 inputBands = columns.unique(level=1).values
866 inputBands = parq.columnLevelNames[
'band']
868 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
871 for inputBand
in inputBands:
872 if inputBand
not in outputBands:
873 self.log.
info(
"Ignoring %s band data in the input", inputBand)
875 self.log.
info(
"Transforming the catalog of band %s", inputBand)
876 result = self.transform(inputBand, parq, funcs, dataId)
877 dfDict[inputBand] = result.df
878 analysisDict[inputBand] = result.analysis
880 templateDf = result.df
883 for filt
in outputBands:
884 if filt
not in dfDict:
885 self.log.
info(
"Adding empty columns for band %s", filt)
886 dfTemp = templateDf.copy()
887 for col
in dfTemp.columns:
888 testValue = dfTemp[col].values[0]
889 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
891 if col
in self.config.goodFlags:
895 elif isinstance(testValue, numbers.Integral):
899 if isinstance(testValue, np.unsignedinteger):
900 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
902 fillValue = self.config.integerFillValue
904 fillValue = self.config.floatFillValue
905 dfTemp[col].values[:] = fillValue
906 dfDict[filt] = dfTemp
909 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
911 if not self.config.multilevelOutput:
912 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
913 if self.config.primaryKey
in noDupCols:
914 noDupCols.remove(self.config.primaryKey)
915 if dataId
is not None:
916 noDupCols +=
list(dataId.keys())
917 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
918 inputBands=inputBands)
920 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
925 class TractObjectDataIdContainer(CoaddDataIdContainer):
927 def makeDataRefList(self, namespace):
928 """Make self.refList from self.idList
930 Generate a list of data references given tract and/or patch.
931 This was adapted from `TractQADataIdContainer`, which was
932 `TractDataIdContainer` modifie to not require "filter".
933 Only existing dataRefs are returned.
935 def getPatchRefList(tract):
936 return [namespace.butler.dataRef(datasetType=self.datasetType,
938 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
940 tractRefs = defaultdict(list)
941 for dataId
in self.idList:
942 skymap = self.getSkymap(namespace)
944 if "tract" in dataId:
945 tractId = dataId[
"tract"]
946 if "patch" in dataId:
947 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
949 patch=dataId[
'patch']))
951 tractRefs[tractId] += getPatchRefList(skymap[tractId])
953 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
956 for tractRefList
in tractRefs.values():
957 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
958 outputRefList.append(existingRefs)
960 self.refList = outputRefList
963 class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
964 dimensions=(
"tract",
"skymap")):
965 inputCatalogs = connectionTypes.Input(
966 doc=
"Per-Patch objectTables conforming to the standard data model.",
968 storageClass=
"DataFrame",
969 dimensions=(
"tract",
"patch",
"skymap"),
972 outputCatalog = connectionTypes.Output(
973 doc=
"Pre-tract horizontal concatenation of the input objectTables",
974 name=
"objectTable_tract",
975 storageClass=
"DataFrame",
976 dimensions=(
"tract",
"skymap"),
980 class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
981 pipelineConnections=ConsolidateObjectTableConnections):
982 coaddName = pexConfig.Field(
989 class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
990 """Write patch-merged source tables to a tract-level parquet file
992 Concatenates `objectTable` list into a per-visit `objectTable_tract`
994 _DefaultName =
"consolidateObjectTable"
995 ConfigClass = ConsolidateObjectTableConfig
997 inputDataset =
'objectTable'
998 outputDataset =
'objectTable_tract'
1000 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1001 inputs = butlerQC.get(inputRefs)
1002 self.log.
info(
"Concatenating %s per-patch Object Tables",
1003 len(inputs[
'inputCatalogs']))
1004 df = pd.concat(inputs[
'inputCatalogs'])
1005 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1008 def _makeArgumentParser(cls):
1009 parser = ArgumentParser(name=cls._DefaultName)
1011 parser.add_id_argument(
"--id", cls.inputDataset,
1012 help=
"data ID, e.g. --id tract=12345",
1013 ContainerClass=TractObjectDataIdContainer)
1016 def runDataRef(self, patchRefList):
1017 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
1018 patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1021 """No metadata to write.
1026 class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1027 defaultTemplates={
"catalogType":
""},
1028 dimensions=(
"instrument",
"visit",
"detector")):
1030 inputCatalog = connectionTypes.Input(
1031 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1032 name=
"{catalogType}source",
1033 storageClass=
"DataFrame",
1034 dimensions=(
"instrument",
"visit",
"detector"),
1037 outputCatalog = connectionTypes.Output(
1038 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1039 "specified set of functors",
1040 name=
"{catalogType}sourceTable",
1041 storageClass=
"DataFrame",
1042 dimensions=(
"instrument",
"visit",
"detector")
1046 class TransformSourceTableConfig(TransformCatalogBaseConfig,
1047 pipelineConnections=TransformSourceTableConnections):
1051 self.primaryKey =
'sourceId'
1054 class TransformSourceTableTask(TransformCatalogBaseTask):
1055 """Transform/standardize a source catalog
1057 _DefaultName =
"transformSourceTable"
1058 ConfigClass = TransformSourceTableConfig
1060 inputDataset =
'source'
1061 outputDataset =
'sourceTable'
1064 def _makeArgumentParser(cls):
1065 parser = ArgumentParser(name=cls._DefaultName)
1066 parser.add_id_argument(
"--id", datasetType=cls.inputDataset,
1068 help=
"data ID, e.g. --id visit=12345 ccd=0")
1071 def runDataRef(self, dataRef):
1072 """Override to specify band label to run()."""
1073 parq = dataRef.get()
1074 funcs = self.getFunctors()
1075 band = dataRef.get(
"calexp_filterLabel", immediate=
True).bandLabel
1076 df = self.run(parq, funcs=funcs, dataId=dataRef.dataId, band=band)
1077 self.write(df, dataRef)
1081 class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1082 dimensions=(
"instrument",
"visit",),
1083 defaultTemplates={
"calexpType":
""}):
1084 calexp = connectionTypes.Input(
1085 doc=
"Processed exposures used for metadata",
1086 name=
"{calexpType}calexp",
1087 storageClass=
"ExposureF",
1088 dimensions=(
"instrument",
"visit",
"detector"),
1092 visitSummary = connectionTypes.Output(
1093 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1094 "detector id for the id and are sorted for fast lookups of a "
1096 name=
"{calexpType}visitSummary",
1097 storageClass=
"ExposureCatalog",
1098 dimensions=(
"instrument",
"visit"),
1102 class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1103 pipelineConnections=ConsolidateVisitSummaryConnections):
1104 """Config for ConsolidateVisitSummaryTask"""
1108 class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1109 """Task to consolidate per-detector visit metadata.
1111 This task aggregates the following metadata from all the detectors in a
1112 single visit into an exposure catalog:
1116 - The physical_filter and band (if available).
1117 - The psf size, shape, and effective area at the center of the detector.
1118 - The corners of the bounding box in right ascension/declination.
1120 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1121 are not persisted here because of storage concerns, and because of their
1122 limited utility as summary statistics.
1124 Tests for this task are performed in ci_hsc_gen3.
1126 _DefaultName =
"consolidateVisitSummary"
1127 ConfigClass = ConsolidateVisitSummaryConfig
1130 def _makeArgumentParser(cls):
1131 parser = ArgumentParser(name=cls._DefaultName)
1133 parser.add_id_argument(
"--id",
"calexp",
1134 help=
"data ID, e.g. --id visit=12345",
1135 ContainerClass=VisitDataIdContainer)
1139 """No metadata to persist, so override to remove metadata persistance.
1143 def writeConfig(self, butler, clobber=False, doBackup=True):
1144 """No config to persist, so override to remove config persistance.
1148 def runDataRef(self, dataRefList):
1149 visit = dataRefList[0].dataId[
'visit']
1151 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1152 len(dataRefList), visit)
1154 expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=
False)
1156 dataRefList[0].put(expCatalog,
'visitSummary', visit=visit)
1158 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1159 dataRefs = butlerQC.get(inputRefs.calexp)
1160 visit = dataRefs[0].dataId.byName()[
'visit']
1162 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1163 len(dataRefs), visit)
1165 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1167 butlerQC.put(expCatalog, outputRefs.visitSummary)
1169 def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1170 """Make a combined exposure catalog from a list of dataRefs.
1171 These dataRefs must point to exposures with wcs, summaryStats,
1172 and other visit metadata.
1177 Visit identification number.
1179 List of dataRefs in visit. May be list of
1180 `lsst.daf.persistence.ButlerDataRef` (Gen2) or
1181 `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1182 isGen3 : `bool`, optional
1183 Specifies if this is a Gen3 list of datarefs.
1187 visitSummary : `lsst.afw.table.ExposureCatalog`
1188 Exposure catalog with per-detector summary information.
1190 schema = self._makeVisitSummarySchema()
1192 cat.resize(len(dataRefs))
1194 cat[
'visit'] = visit
1196 for i, dataRef
in enumerate(dataRefs):
1198 visitInfo = dataRef.get(component=
'visitInfo')
1199 filterLabel = dataRef.get(component=
'filterLabel')
1200 summaryStats = dataRef.get(component=
'summaryStats')
1201 detector = dataRef.get(component=
'detector')
1202 wcs = dataRef.get(component=
'wcs')
1203 photoCalib = dataRef.get(component=
'photoCalib')
1204 detector = dataRef.get(component=
'detector')
1205 bbox = dataRef.get(component=
'bbox')
1206 validPolygon = dataRef.get(component=
'validPolygon')
1211 exp = dataRef.get(datasetType=
'calexp_sub', bbox=gen2_read_bbox)
1212 visitInfo = exp.getInfo().getVisitInfo()
1213 filterLabel = dataRef.get(
"calexp_filterLabel")
1214 summaryStats = exp.getInfo().getSummaryStats()
1216 photoCalib = exp.getPhotoCalib()
1217 detector = exp.getDetector()
1218 bbox = dataRef.get(datasetType=
'calexp_bbox')
1219 validPolygon = exp.getInfo().getValidPolygon()
1223 rec.setVisitInfo(visitInfo)
1225 rec.setPhotoCalib(photoCalib)
1226 rec.setValidPolygon(validPolygon)
1228 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1229 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1230 rec.setId(detector.getId())
1231 rec[
'psfSigma'] = summaryStats.psfSigma
1232 rec[
'psfIxx'] = summaryStats.psfIxx
1233 rec[
'psfIyy'] = summaryStats.psfIyy
1234 rec[
'psfIxy'] = summaryStats.psfIxy
1235 rec[
'psfArea'] = summaryStats.psfArea
1236 rec[
'raCorners'][:] = summaryStats.raCorners
1237 rec[
'decCorners'][:] = summaryStats.decCorners
1238 rec[
'ra'] = summaryStats.ra
1239 rec[
'decl'] = summaryStats.decl
1240 rec[
'zenithDistance'] = summaryStats.zenithDistance
1241 rec[
'zeroPoint'] = summaryStats.zeroPoint
1242 rec[
'skyBg'] = summaryStats.skyBg
1243 rec[
'skyNoise'] = summaryStats.skyNoise
1244 rec[
'meanVar'] = summaryStats.meanVar
1245 rec[
'astromOffsetMean'] = summaryStats.astromOffsetMean
1246 rec[
'astromOffsetStd'] = summaryStats.astromOffsetStd
1247 rec[
'nPsfStar'] = summaryStats.nPsfStar
1248 rec[
'psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1249 rec[
'psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1250 rec[
'psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1251 rec[
'psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1252 rec[
'psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1253 rec[
'psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1254 rec[
'psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1257 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1259 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1260 cat.setMetadata(metadata)
1265 def _makeVisitSummarySchema(self):
1266 """Make the schema for the visitSummary catalog."""
1267 schema = afwTable.ExposureTable.makeMinimalSchema()
1268 schema.addField(
'visit', type=
'I', doc=
'Visit number')
1269 schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1270 schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1271 schema.addField(
'psfSigma', type=
'F',
1272 doc=
'PSF model second-moments determinant radius (center of chip) (pixel)')
1273 schema.addField(
'psfArea', type=
'F',
1274 doc=
'PSF model effective area (center of chip) (pixel**2)')
1275 schema.addField(
'psfIxx', type=
'F',
1276 doc=
'PSF model Ixx (center of chip) (pixel**2)')
1277 schema.addField(
'psfIyy', type=
'F',
1278 doc=
'PSF model Iyy (center of chip) (pixel**2)')
1279 schema.addField(
'psfIxy', type=
'F',
1280 doc=
'PSF model Ixy (center of chip) (pixel**2)')
1281 schema.addField(
'raCorners', type=
'ArrayD', size=4,
1282 doc=
'Right Ascension of bounding box corners (degrees)')
1283 schema.addField(
'decCorners', type=
'ArrayD', size=4,
1284 doc=
'Declination of bounding box corners (degrees)')
1285 schema.addField(
'ra', type=
'D',
1286 doc=
'Right Ascension of bounding box center (degrees)')
1287 schema.addField(
'decl', type=
'D',
1288 doc=
'Declination of bounding box center (degrees)')
1289 schema.addField(
'zenithDistance', type=
'F',
1290 doc=
'Zenith distance of bounding box center (degrees)')
1291 schema.addField(
'zeroPoint', type=
'F',
1292 doc=
'Mean zeropoint in detector (mag)')
1293 schema.addField(
'skyBg', type=
'F',
1294 doc=
'Average sky background (ADU)')
1295 schema.addField(
'skyNoise', type=
'F',
1296 doc=
'Average sky noise (ADU)')
1297 schema.addField(
'meanVar', type=
'F',
1298 doc=
'Mean variance of the weight plane (ADU**2)')
1299 schema.addField(
'astromOffsetMean', type=
'F',
1300 doc=
'Mean offset of astrometric calibration matches (arcsec)')
1301 schema.addField(
'astromOffsetStd', type=
'F',
1302 doc=
'Standard deviation of offsets of astrometric calibration matches (arcsec)')
1303 schema.addField(
'nPsfStar', type=
'I', doc=
'Number of stars used for PSF model')
1304 schema.addField(
'psfStarDeltaE1Median', type=
'F',
1305 doc=
'Median E1 residual (starE1 - psfE1) for psf stars')
1306 schema.addField(
'psfStarDeltaE2Median', type=
'F',
1307 doc=
'Median E2 residual (starE2 - psfE2) for psf stars')
1308 schema.addField(
'psfStarDeltaE1Scatter', type=
'F',
1309 doc=
'Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1310 schema.addField(
'psfStarDeltaE2Scatter', type=
'F',
1311 doc=
'Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1312 schema.addField(
'psfStarDeltaSizeMedian', type=
'F',
1313 doc=
'Median size residual (starSize - psfSize) for psf stars (pixel)')
1314 schema.addField(
'psfStarDeltaSizeScatter', type=
'F',
1315 doc=
'Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1316 schema.addField(
'psfStarScaledDeltaSizeScatter', type=
'F',
1317 doc=
'Scatter (via MAD) of size residual scaled by median size squared')
1322 class VisitDataIdContainer(DataIdContainer):
1323 """DataIdContainer that groups sensor-level id's by visit
1326 def makeDataRefList(self, namespace):
1327 """Make self.refList from self.idList
1329 Generate a list of data references grouped by visit.
1333 namespace : `argparse.Namespace`
1334 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
1337 visitRefs = defaultdict(list)
1338 for dataId
in self.idList:
1339 if "visit" in dataId:
1340 visitId = dataId[
"visit"]
1342 subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1343 visitRefs[visitId].extend([dataRef
for dataRef
in subset])
1346 for refList
in visitRefs.values():
1347 existingRefs = [ref
for ref
in refList
if ref.datasetExists()]
1349 outputRefList.append(existingRefs)
1351 self.refList = outputRefList
1354 class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1355 defaultTemplates={
"catalogType":
""},
1356 dimensions=(
"instrument",
"visit")):
1357 inputCatalogs = connectionTypes.Input(
1358 doc=
"Input per-detector Source Tables",
1359 name=
"{catalogType}sourceTable",
1360 storageClass=
"DataFrame",
1361 dimensions=(
"instrument",
"visit",
"detector"),
1364 outputCatalog = connectionTypes.Output(
1365 doc=
"Per-visit concatenation of Source Table",
1366 name=
"{catalogType}sourceTable_visit",
1367 storageClass=
"DataFrame",
1368 dimensions=(
"instrument",
"visit")
1372 class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1373 pipelineConnections=ConsolidateSourceTableConnections):
1377 class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1378 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1380 _DefaultName =
'consolidateSourceTable'
1381 ConfigClass = ConsolidateSourceTableConfig
1383 inputDataset =
'sourceTable'
1384 outputDataset =
'sourceTable_visit'
1386 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1387 inputs = butlerQC.get(inputRefs)
1388 self.log.
info(
"Concatenating %s per-detector Source Tables",
1389 len(inputs[
'inputCatalogs']))
1390 df = pd.concat(inputs[
'inputCatalogs'])
1391 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1393 def runDataRef(self, dataRefList):
1394 self.log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
1395 df = pd.concat([dataRef.get().toDataFrame()
for dataRef
in dataRefList])
1396 dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1399 def _makeArgumentParser(cls):
1400 parser = ArgumentParser(name=cls._DefaultName)
1402 parser.add_id_argument(
"--id", cls.inputDataset,
1403 help=
"data ID, e.g. --id visit=12345",
1404 ContainerClass=VisitDataIdContainer)
1408 """No metadata to write.
1412 def writeConfig(self, butler, clobber=False, doBackup=True):
1413 """No config to write.
1418 class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1419 dimensions=(
"instrument",),
1420 defaultTemplates={
"calexpType":
""}):
1421 visitSummaryRefs = connectionTypes.Input(
1422 doc=
"Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1423 name=
"{calexpType}visitSummary",
1424 storageClass=
"ExposureCatalog",
1425 dimensions=(
"instrument",
"visit"),
1429 outputCatalog = connectionTypes.Output(
1430 doc=
"CCD and Visit metadata table",
1431 name=
"ccdVisitTable",
1432 storageClass=
"DataFrame",
1433 dimensions=(
"instrument",)
1437 class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1438 pipelineConnections=MakeCcdVisitTableConnections):
1442 class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1443 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1445 _DefaultName =
'makeCcdVisitTable'
1446 ConfigClass = MakeCcdVisitTableConfig
1448 def run(self, visitSummaryRefs):
1449 """ Make a table of ccd information from the `visitSummary` catalogs.
1452 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1453 List of DeferredDatasetHandles pointing to exposure catalogs with
1454 per-detector summary information.
1457 result : `lsst.pipe.Base.Struct`
1458 Results struct with attribute:
1460 Catalog of ccd and visit information.
1463 for visitSummaryRef
in visitSummaryRefs:
1464 visitSummary = visitSummaryRef.get()
1465 visitInfo = visitSummary[0].getVisitInfo()
1468 summaryTable = visitSummary.asAstropy()
1469 selectColumns = [
'id',
'visit',
'physical_filter',
'band',
'ra',
'decl',
'zenithDistance',
1470 'zeroPoint',
'psfSigma',
'skyBg',
'skyNoise']
1471 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
'id')
1475 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1476 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id)
for id
in
1478 packer = visitSummaryRef.dataId.universe.makePacker(
'visit_detector', visitSummaryRef.dataId)
1479 ccdVisitIds = [packer.pack(dataId)
for dataId
in dataIds]
1480 ccdEntry[
'ccdVisitId'] = ccdVisitIds
1481 ccdEntry[
'detector'] = summaryTable[
'id']
1482 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds()
for vR
in visitSummary])
1483 ccdEntry[
"seeing"] = visitSummary[
'psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1485 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1486 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1487 expTime = visitInfo.getExposureTime()
1488 ccdEntry[
'expTime'] = expTime
1489 ccdEntry[
"obsStart"] = ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1490 ccdEntry[
'darkTime'] = visitInfo.getDarkTime()
1491 ccdEntry[
'xSize'] = summaryTable[
'bbox_max_x'] - summaryTable[
'bbox_min_x']
1492 ccdEntry[
'ySize'] = summaryTable[
'bbox_max_y'] - summaryTable[
'bbox_min_y']
1493 ccdEntry[
'llcra'] = summaryTable[
'raCorners'][:, 0]
1494 ccdEntry[
'llcdec'] = summaryTable[
'decCorners'][:, 0]
1495 ccdEntry[
'ulcra'] = summaryTable[
'raCorners'][:, 1]
1496 ccdEntry[
'ulcdec'] = summaryTable[
'decCorners'][:, 1]
1497 ccdEntry[
'urcra'] = summaryTable[
'raCorners'][:, 2]
1498 ccdEntry[
'urcdec'] = summaryTable[
'decCorners'][:, 2]
1499 ccdEntry[
'lrcra'] = summaryTable[
'raCorners'][:, 3]
1500 ccdEntry[
'lrcdec'] = summaryTable[
'decCorners'][:, 3]
1503 ccdEntries.append(ccdEntry)
1505 outputCatalog = pd.concat(ccdEntries)
1506 outputCatalog.set_index(
'ccdVisitId', inplace=
True, verify_integrity=
True)
1507 return pipeBase.Struct(outputCatalog=outputCatalog)
1510 class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1511 dimensions=(
"instrument",),
1512 defaultTemplates={
"calexpType":
""}):
1513 visitSummaries = connectionTypes.Input(
1514 doc=
"Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1515 name=
"{calexpType}visitSummary",
1516 storageClass=
"ExposureCatalog",
1517 dimensions=(
"instrument",
"visit",),
1521 outputCatalog = connectionTypes.Output(
1522 doc=
"Visit metadata table",
1524 storageClass=
"DataFrame",
1525 dimensions=(
"instrument",)
1529 class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1530 pipelineConnections=MakeVisitTableConnections):
1534 class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1535 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1537 _DefaultName =
'makeVisitTable'
1538 ConfigClass = MakeVisitTableConfig
1540 def run(self, visitSummaries):
1541 """ Make a table of visit information from the `visitSummary` catalogs
1545 visitSummaries : list of `lsst.afw.table.ExposureCatalog`
1546 List of exposure catalogs with per-detector summary information.
1549 result : `lsst.pipe.Base.Struct`
1550 Results struct with attribute:
1552 Catalog of visit information.
1555 for visitSummary
in visitSummaries:
1556 visitSummary = visitSummary.get()
1557 visitRow = visitSummary[0]
1558 visitInfo = visitRow.getVisitInfo()
1561 visitEntry[
"visitId"] = visitRow[
'visit']
1562 visitEntry[
"visit"] = visitRow[
'visit']
1563 visitEntry[
"physical_filter"] = visitRow[
'physical_filter']
1564 visitEntry[
"band"] = visitRow[
'band']
1565 raDec = visitInfo.getBoresightRaDec()
1566 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1567 visitEntry[
"decl"] = raDec.getDec().asDegrees()
1568 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1569 azAlt = visitInfo.getBoresightAzAlt()
1570 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1571 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1572 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1573 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1574 visitEntry[
"obsStart"] = visitInfo.getDate().toPython()
1575 visitEntry[
"expTime"] = visitInfo.getExposureTime()
1576 visitEntries.append(visitEntry)
1580 outputCatalog = pd.DataFrame(data=visitEntries)
1581 outputCatalog.set_index(
'visitId', inplace=
True, verify_integrity=
True)
1582 return pipeBase.Struct(outputCatalog=outputCatalog)
1585 class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1586 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1588 inputCatalog = connectionTypes.Input(
1589 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1590 "By default, it is the output of ForcedPhotCcdTask on calexps",
1592 storageClass=
"SourceCatalog",
1593 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1595 inputCatalogDiff = connectionTypes.Input(
1596 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1597 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1599 storageClass=
"SourceCatalog",
1600 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1602 outputCatalog = connectionTypes.Output(
1603 doc=
"InputCatalogs horizonatally joined on `objectId` in Parquet format",
1604 name=
"mergedForcedSource",
1605 storageClass=
"DataFrame",
1606 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1610 class WriteForcedSourceTableConfig(WriteSourceTableConfig,
1611 pipelineConnections=WriteForcedSourceTableConnections):
1613 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1619 class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1620 """Merge and convert per-detector forced source catalogs to parquet
1622 Because the predecessor ForcedPhotCcdTask operates per-detector,
1623 per-tract, (i.e., it has tract in its dimensions), detectors
1624 on the tract boundary may have multiple forced source catalogs.
1626 The successor task TransformForcedSourceTable runs per-patch
1627 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1628 available multiple epochs.
1630 _DefaultName =
"writeForcedSourceTable"
1631 ConfigClass = WriteForcedSourceTableConfig
1633 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1634 inputs = butlerQC.get(inputRefs)
1636 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
1637 inputs[
'band'] = butlerQC.quantum.dataId.full[
'band']
1638 outputs = self.run(**inputs)
1639 butlerQC.put(outputs, outputRefs)
1641 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1643 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
'calexp',
'diff')):
1644 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1645 df = df.reindex(sorted(df.columns), axis=1)
1646 df[
'ccdVisitId'] = ccdVisitId
if ccdVisitId
else pd.NA
1647 df[
'band'] = band
if band
else pd.NA
1648 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1649 names=(
'dataset',
'column'))
1653 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1654 return pipeBase.Struct(outputCatalog=outputCatalog)
1657 class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1658 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1660 inputCatalogs = connectionTypes.Input(
1661 doc=
"Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1662 name=
"mergedForcedSource",
1663 storageClass=
"DataFrame",
1664 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1668 referenceCatalog = connectionTypes.Input(
1669 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1670 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1673 storageClass=
"DataFrame",
1674 dimensions=(
"tract",
"patch",
"skymap"),
1677 outputCatalog = connectionTypes.Output(
1678 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1679 "specified set of functors",
1680 name=
"forcedSourceTable",
1681 storageClass=
"DataFrame",
1682 dimensions=(
"tract",
"patch",
"skymap")
1686 class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1687 pipelineConnections=TransformForcedSourceTableConnections):
1688 referenceColumns = pexConfig.ListField(
1690 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1692 doc=
"Columns to pull from reference catalog",
1695 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1700 doc=
"Rename the output DataFrame index to this name",
1702 default=
"forcedSourceId",
1706 class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1707 """Transform/standardize a ForcedSource catalog
1709 Transforms each wide, per-detector forcedSource parquet table per the
1710 specification file (per-camera defaults found in ForcedSource.yaml).
1711 All epochs that overlap the patch are aggregated into one per-patch
1712 narrow-parquet file.
1714 No de-duplication of rows is performed. Duplicate resolutions flags are
1715 pulled in from the referenceCatalog: `detect_isPrimary`,
1716 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1717 for analysis or compare duplicates for QA.
1719 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1720 per-visit rows can be retreived by joining with the CcdVisitTable on
1723 _DefaultName =
"transformForcedSourceTable"
1724 ConfigClass = TransformForcedSourceTableConfig
1726 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1727 inputs = butlerQC.get(inputRefs)
1728 if self.funcs
is None:
1729 raise ValueError(
"config.functorFile is None. "
1730 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1731 outputs = self.run(inputs[
'inputCatalogs'], inputs[
'referenceCatalog'], funcs=self.funcs,
1732 dataId=outputRefs.outputCatalog.dataId.full)
1734 butlerQC.put(outputs, outputRefs)
1736 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1738 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
1739 self.log.
info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1740 for handle
in inputCatalogs:
1741 result = self.transform(
None, handle, funcs, dataId)
1743 dfs.append(result.df.join(ref, how=
'inner'))
1745 outputCatalog = pd.concat(dfs)
1749 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1751 outputCatalog.reset_index(inplace=
True)
1753 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1755 outputCatalog.index.rename(self.config.key, inplace=
True)
1757 self.log.
info(
"Made a table of %d columns and %d rows",
1758 len(outputCatalog.columns), len(outputCatalog))
1759 return pipeBase.Struct(outputCatalog=outputCatalog)
1762 class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1763 defaultTemplates={
"catalogType":
""},
1764 dimensions=(
"instrument",
"tract")):
1765 inputCatalogs = connectionTypes.Input(
1766 doc=
"Input per-patch DataFrame Tables to be concatenated",
1767 name=
"{catalogType}ForcedSourceTable",
1768 storageClass=
"DataFrame",
1769 dimensions=(
"tract",
"patch",
"skymap"),
1773 outputCatalog = connectionTypes.Output(
1774 doc=
"Output per-tract concatenation of DataFrame Tables",
1775 name=
"{catalogType}ForcedSourceTable_tract",
1776 storageClass=
"DataFrame",
1777 dimensions=(
"tract",
"skymap"),
1781 class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1782 pipelineConnections=ConsolidateTractConnections):
1786 class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
1787 """Concatenate any per-patch, dataframe list into a single
1790 _DefaultName =
'ConsolidateTract'
1791 ConfigClass = ConsolidateTractConfig
1793 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1794 inputs = butlerQC.get(inputRefs)
1796 self.log.
info(
"Concatenating %s per-patch %s Tables",
1797 len(inputs[
'inputCatalogs']),
1798 inputRefs.inputCatalogs[0].datasetType.name)
1799 df = pd.concat(inputs[
'inputCatalogs'])
1800 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An integer coordinate rectangle.
def runDataRef(self, dataRef)
def getAnalysis(self, parq, funcs=None, band=None)
def write(self, df, parqRef)
def __init__(self, *args, **kwargs)
def transform(self, band, parq, funcs, dataId)
def run(self, parq, funcs=None, dataId=None, band=None)
def writeMetadata(self, dataRef)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
void write(OutputArchiveHandle &handle) const override
def run(self, coaddExposures, bbox, wcs)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)