24from collections
import defaultdict
38from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
40from lsst.daf.butler
import DeferredDatasetHandle, DataCoordinate
43from .parquetTable
import ParquetTable
44from .multiBandUtils
import makeMergeArgumentParser, MergeSourcesRunner
45from .functors
import CompositeFunctor, Column
47log = logging.getLogger(__name__)
50def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
51 """Flattens a dataframe with multilevel column index.
53 newDf = pd.DataFrame()
55 dfBands = df.columns.unique(level=0).values
58 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
59 newColumns = {c: columnFormat.format(band, c)
60 for c
in subdf.columns
if c
not in noDupCols}
61 cols =
list(newColumns.keys())
62 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
65 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
67 noDupDf = df[presentBands[0]][noDupCols]
68 newDf = pd.concat([noDupDf, newDf], axis=1)
73 defaultTemplates={
"coaddName":
"deep"},
74 dimensions=(
"tract",
"patch",
"skymap")):
75 inputCatalogMeas = connectionTypes.Input(
76 doc=
"Catalog of source measurements on the deepCoadd.",
77 dimensions=(
"tract",
"patch",
"band",
"skymap"),
78 storageClass=
"SourceCatalog",
79 name=
"{coaddName}Coadd_meas",
82 inputCatalogForcedSrc = connectionTypes.Input(
83 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
84 dimensions=(
"tract",
"patch",
"band",
"skymap"),
85 storageClass=
"SourceCatalog",
86 name=
"{coaddName}Coadd_forced_src",
89 inputCatalogRef = connectionTypes.Input(
90 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
91 "for each detection in deepCoadd_mergeDet.",
92 dimensions=(
"tract",
"patch",
"skymap"),
93 storageClass=
"SourceCatalog",
94 name=
"{coaddName}Coadd_ref"
96 outputCatalog = connectionTypes.Output(
97 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
98 "stored as a DataFrame with a multi-level column index per-patch.",
99 dimensions=(
"tract",
"patch",
"skymap"),
100 storageClass=
"DataFrame",
101 name=
"{coaddName}Coadd_obj"
105class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
106 pipelineConnections=WriteObjectTableConnections):
107 engine = pexConfig.Field(
110 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
112 coaddName = pexConfig.Field(
119class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
120 """Write filter-merged source tables to parquet
122 _DefaultName = "writeObjectTable"
123 ConfigClass = WriteObjectTableConfig
124 RunnerClass = MergeSourcesRunner
127 inputDatasets = (
'forced_src',
'meas',
'ref')
130 outputDataset =
'obj'
132 def __init__(self, butler=None, schema=None, **kwargs):
137 super().__init__(**kwargs)
139 def runDataRef(self, patchRefList):
141 @brief Merge coadd sources
from multiple bands. Calls
@ref `run` which
142 must be defined
in subclasses that inherit
from MergeSourcesTask.
143 @param[
in] patchRefList list of data references
for each filter
145 catalogs = dict(self.readCatalog(patchRef) for patchRef
in patchRefList)
146 dataId = patchRefList[0].dataId
147 mergedCatalog = self.run(catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
148 self.write(patchRefList[0],
ParquetTable(dataFrame=mergedCatalog))
150 def runQuantum(self, butlerQC, inputRefs, outputRefs):
151 inputs = butlerQC.get(inputRefs)
153 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
154 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
155 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
156 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
159 for band
in measDict.keys():
160 catalogs[band] = {
'meas': measDict[band][
'meas'],
161 'forced_src': forcedSourceDict[band][
'forced_src'],
162 'ref': inputs[
'inputCatalogRef']}
163 dataId = butlerQC.quantum.dataId
164 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
165 outputs = pipeBase.Struct(outputCatalog=df)
166 butlerQC.put(outputs, outputRefs)
169 def _makeArgumentParser(cls):
170 """Create a suitable ArgumentParser.
172 We will use the ArgumentParser to get a list of data
173 references for patches; the RunnerClass will sort them into lists
174 of data references
for the same patch.
176 References first of self.inputDatasets, rather than
182 """Read input catalogs
184 Read all the input datasets given by the 'inputDatasets'
190 Data reference
for patch.
194 Tuple consisting of band name
and a dict of catalogs, keyed by
197 band = patchRef.get(self.config.coaddName + "Coadd_filter", immediate=
True).bandLabel
199 for dataset
in self.inputDatasets:
200 catalog = patchRef.get(self.config.coaddName +
"Coadd_" + dataset, immediate=
True)
201 self.log.
info(
"Read %d sources from %s for band %s: %s",
202 len(catalog), dataset, band, patchRef.dataId)
203 catalogDict[dataset] = catalog
204 return band, catalogDict
206 def run(self, catalogs, tract, patch):
207 """Merge multiple catalogs.
212 Mapping from filter names to dict of catalogs.
214 tractId to use
for the tractId column.
216 patchId to use
for the patchId column.
220 catalog : `pandas.DataFrame`
225 for filt, tableDict
in catalogs.items():
226 for dataset, table
in tableDict.items():
228 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
231 df = df.reindex(sorted(df.columns), axis=1)
232 df[
'tractId'] = tract
233 df[
'patchId'] = patch
236 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
237 names=(
'dataset',
'band',
'column'))
240 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
243 def write(self, patchRef, catalog):
248 catalog : `ParquetTable`
251 Data reference for patch.
253 patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
257 mergeDataId = patchRef.dataId.copy()
258 del mergeDataId[
"filter"]
259 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
262 """No metadata to write, and not sure how to write it for a list of
268class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
269 defaultTemplates={
"catalogType":
""},
270 dimensions=(
"instrument",
"visit",
"detector")):
272 catalog = connectionTypes.Input(
273 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
274 name=
"{catalogType}src",
275 storageClass=
"SourceCatalog",
276 dimensions=(
"instrument",
"visit",
"detector")
278 outputCatalog = connectionTypes.Output(
279 doc=
"Catalog of sources, `src` in Parquet format. The 'id' column is "
280 "replaced with an index; all other columns are unchanged.",
281 name=
"{catalogType}source",
282 storageClass=
"DataFrame",
283 dimensions=(
"instrument",
"visit",
"detector")
287class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
288 pipelineConnections=WriteSourceTableConnections):
292class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
293 """Write source table to parquet.
295 _DefaultName = "writeSourceTable"
296 ConfigClass = WriteSourceTableConfig
298 def runQuantum(self, butlerQC, inputRefs, outputRefs):
299 inputs = butlerQC.get(inputRefs)
300 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
301 result = self.run(**inputs).table
302 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
303 butlerQC.put(outputs, outputRefs)
305 def run(self, catalog, ccdVisitId=None, **kwargs):
306 """Convert `src` catalog to parquet
311 catalog to be converted
313 ccdVisitId to be added as a column
317 result : `lsst.pipe.base.Struct`
319 `ParquetTable` version of the input catalog
321 self.log.info("Generating parquet table from src catalog ccdVisitId=%s", ccdVisitId)
322 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
323 df[
'ccdVisitId'] = ccdVisitId
324 return pipeBase.Struct(table=
ParquetTable(dataFrame=df))
327class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
328 defaultTemplates={
"catalogType":
"",
329 "skyWcsName":
"jointcal",
330 "photoCalibName":
"fgcm"},
331 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
332 skyMap = connectionTypes.Input(
333 doc=
"skyMap needed to choose which tract-level calibrations to use when multiple available",
334 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
335 storageClass=
"SkyMap",
336 dimensions=(
"skymap",),
338 exposure = connectionTypes.Input(
339 doc=
"Input exposure to perform photometry on.",
341 storageClass=
"ExposureF",
342 dimensions=[
"instrument",
"visit",
"detector"],
344 externalSkyWcsTractCatalog = connectionTypes.Input(
345 doc=(
"Per-tract, per-visit wcs calibrations. These catalogs use the detector "
346 "id for the catalog id, sorted on id for fast lookup."),
347 name=
"{skyWcsName}SkyWcsCatalog",
348 storageClass=
"ExposureCatalog",
349 dimensions=[
"instrument",
"visit",
"tract"],
352 externalSkyWcsGlobalCatalog = connectionTypes.Input(
353 doc=(
"Per-visit wcs calibrations computed globally (with no tract information). "
354 "These catalogs use the detector id for the catalog id, sorted on id for "
356 name=
"{skyWcsName}SkyWcsCatalog",
357 storageClass=
"ExposureCatalog",
358 dimensions=[
"instrument",
"visit"],
360 externalPhotoCalibTractCatalog = connectionTypes.Input(
361 doc=(
"Per-tract, per-visit photometric calibrations. These catalogs use the "
362 "detector id for the catalog id, sorted on id for fast lookup."),
363 name=
"{photoCalibName}PhotoCalibCatalog",
364 storageClass=
"ExposureCatalog",
365 dimensions=[
"instrument",
"visit",
"tract"],
368 externalPhotoCalibGlobalCatalog = connectionTypes.Input(
369 doc=(
"Per-visit photometric calibrations computed globally (with no tract "
370 "information). These catalogs use the detector id for the catalog id, "
371 "sorted on id for fast lookup."),
372 name=
"{photoCalibName}PhotoCalibCatalog",
373 storageClass=
"ExposureCatalog",
374 dimensions=[
"instrument",
"visit"],
377 def __init__(self, *, config=None):
378 super().__init__(config=config)
381 if config.doApplyExternalSkyWcs
and config.doReevaluateSkyWcs:
382 if config.useGlobalExternalSkyWcs:
383 self.inputs.remove(
"externalSkyWcsTractCatalog")
385 self.inputs.remove(
"externalSkyWcsGlobalCatalog")
387 self.inputs.remove(
"externalSkyWcsTractCatalog")
388 self.inputs.remove(
"externalSkyWcsGlobalCatalog")
389 if config.doApplyExternalPhotoCalib
and config.doReevaluatePhotoCalib:
390 if config.useGlobalExternalPhotoCalib:
391 self.inputs.remove(
"externalPhotoCalibTractCatalog")
393 self.inputs.remove(
"externalPhotoCalibGlobalCatalog")
395 self.inputs.remove(
"externalPhotoCalibTractCatalog")
396 self.inputs.remove(
"externalPhotoCalibGlobalCatalog")
399class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
400 pipelineConnections=WriteRecalibratedSourceTableConnections):
402 doReevaluatePhotoCalib = pexConfig.Field(
405 doc=(
"Add or replace local photoCalib columns from either the calexp.photoCalib or jointcal/FGCM")
407 doReevaluateSkyWcs = pexConfig.Field(
410 doc=(
"Add or replace local WCS columns from either the calexp.wcs or or jointcal")
412 doApplyExternalPhotoCalib = pexConfig.Field(
415 doc=(
"Whether to apply external photometric calibration via an "
416 "`lsst.afw.image.PhotoCalib` object. Uses the "
417 "``externalPhotoCalibName`` field to determine which calibration "
420 doApplyExternalSkyWcs = pexConfig.Field(
423 doc=(
"Whether to apply external astrometric calibration via an "
424 "`lsst.afw.geom.SkyWcs` object. Uses ``externalSkyWcsName`` "
425 "field to determine which calibration to load."),
427 useGlobalExternalPhotoCalib = pexConfig.Field(
430 doc=(
"When using doApplyExternalPhotoCalib, use 'global' calibrations "
431 "that are not run per-tract. When False, use per-tract photometric "
432 "calibration files.")
434 useGlobalExternalSkyWcs = pexConfig.Field(
437 doc=(
"When using doApplyExternalSkyWcs, use 'global' calibrations "
438 "that are not run per-tract. When False, use per-tract wcs "
444 if self.doApplyExternalSkyWcs
and not self.doReevaluateSkyWcs:
445 log.warning(
"doApplyExternalSkyWcs=True but doReevaluateSkyWcs=False"
446 "External SkyWcs will not be read or evaluated.")
447 if self.doApplyExternalPhotoCalib
and not self.doReevaluatePhotoCalib:
448 log.warning(
"doApplyExternalPhotoCalib=True but doReevaluatePhotoCalib=False."
449 "External PhotoCalib will not be read or evaluated.")
452class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
453 """Write source table to parquet
455 _DefaultName = "writeRecalibratedSourceTable"
456 ConfigClass = WriteRecalibratedSourceTableConfig
458 def runQuantum(self, butlerQC, inputRefs, outputRefs):
459 inputs = butlerQC.get(inputRefs)
460 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
461 inputs[
'exposureIdInfo'] = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"visit_detector")
463 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
464 if self.config.doApplyExternalPhotoCalib
or self.config.doApplyExternalSkyWcs:
465 inputs[
'exposure'] = self.attachCalibs(inputRefs, **inputs)
467 inputs[
'catalog'] = self.addCalibColumns(**inputs)
469 result = self.run(**inputs).table
470 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
471 butlerQC.put(outputs, outputRefs)
473 def attachCalibs(self, inputRefs, skyMap, exposure, externalSkyWcsGlobalCatalog=None,
474 externalSkyWcsTractCatalog=None, externalPhotoCalibGlobalCatalog=None,
475 externalPhotoCalibTractCatalog=None, **kwargs):
476 """Apply external calibrations to exposure per configuration
478 When multiple tract-level calibrations overlap, select the one with the
479 center closest to detector.
483 inputRefs : `lsst.pipe.base.InputQuantizedConnection`,
for dataIds of
485 skyMap : `lsst.skymap.SkyMap`
487 Input exposure to adjust calibrations.
489 Exposure catalog
with external skyWcs to be applied per config
491 Exposure catalog
with external skyWcs to be applied per config
493 Exposure catalog
with external photoCalib to be applied per config
500 Exposure
with adjusted calibrations.
502 if not self.config.doApplyExternalSkyWcs:
504 externalSkyWcsCatalog =
None
505 elif self.config.useGlobalExternalSkyWcs:
507 externalSkyWcsCatalog = externalSkyWcsGlobalCatalog
508 self.log.
info(
'Applying global SkyWcs')
511 inputRef = getattr(inputRefs,
'externalSkyWcsTractCatalog')
512 tracts = [ref.dataId[
'tract']
for ref
in inputRef]
515 self.log.
info(
'Applying tract-level SkyWcs from tract %s', tracts[ind])
517 ind = self.getClosestTract(tracts, skyMap,
518 exposure.getBBox(), exposure.getWcs())
519 self.log.
info(
'Multiple overlapping externalSkyWcsTractCatalogs found (%s). '
520 'Applying closest to detector center: tract=%s',
str(tracts), tracts[ind])
522 externalSkyWcsCatalog = externalSkyWcsTractCatalog[ind]
524 if not self.config.doApplyExternalPhotoCalib:
526 externalPhotoCalibCatalog =
None
527 elif self.config.useGlobalExternalPhotoCalib:
529 externalPhotoCalibCatalog = externalPhotoCalibGlobalCatalog
530 self.log.
info(
'Applying global PhotoCalib')
533 inputRef = getattr(inputRefs,
'externalPhotoCalibTractCatalog')
534 tracts = [ref.dataId[
'tract']
for ref
in inputRef]
537 self.log.
info(
'Applying tract-level PhotoCalib from tract %s', tracts[ind])
539 ind = self.getClosestTract(tracts, skyMap,
540 exposure.getBBox(), exposure.getWcs())
541 self.log.
info(
'Multiple overlapping externalPhotoCalibTractCatalogs found (%s). '
542 'Applying closest to detector center: tract=%s',
str(tracts), tracts[ind])
544 externalPhotoCalibCatalog = externalPhotoCalibTractCatalog[ind]
546 return self.prepareCalibratedExposure(exposure, externalSkyWcsCatalog, externalPhotoCalibCatalog)
548 def getClosestTract(self, tracts, skyMap, bbox, wcs):
549 """Find the index of the tract closest to detector from list of tractIds
553 tracts: `list` [`int`]
554 Iterable of integer tractIds
555 skyMap : `lsst.skymap.SkyMap`
556 skyMap to lookup tract geometry and wcs
558 Detector bbox, center of which will compared to tract centers
560 Detector Wcs object to map the detector center to SkyCoord
569 center = wcs.pixelToSky(bbox.getCenter())
571 for tractId
in tracts:
572 tract = skyMap[tractId]
573 tractCenter = tract.getWcs().pixelToSky(tract.getBBox().getCenter())
574 sep.append(center.separation(tractCenter))
576 return np.argmin(sep)
578 def prepareCalibratedExposure(self, exposure, externalSkyWcsCatalog=None, externalPhotoCalibCatalog=None):
579 """Prepare a calibrated exposure and apply external calibrations
585 Input exposure to adjust calibrations.
587 Exposure catalog
with external skyWcs to be applied
588 if config.doApplyExternalSkyWcs=
True. Catalog uses the detector id
589 for the catalog id, sorted on id
for fast lookup.
591 Exposure catalog
with external photoCalib to be applied
592 if config.doApplyExternalPhotoCalib=
True. Catalog uses the detector
593 id
for the catalog id, sorted on id
for fast lookup.
598 Exposure
with adjusted calibrations.
600 detectorId = exposure.getInfo().getDetector().getId()
602 if externalPhotoCalibCatalog
is not None:
603 row = externalPhotoCalibCatalog.find(detectorId)
605 self.log.
warning(
"Detector id %s not found in externalPhotoCalibCatalog; "
606 "Using original photoCalib.", detectorId)
608 photoCalib = row.getPhotoCalib()
609 if photoCalib
is None:
610 self.log.
warning(
"Detector id %s has None for photoCalib in externalPhotoCalibCatalog; "
611 "Using original photoCalib.", detectorId)
613 exposure.setPhotoCalib(photoCalib)
615 if externalSkyWcsCatalog
is not None:
616 row = externalSkyWcsCatalog.find(detectorId)
618 self.log.
warning(
"Detector id %s not found in externalSkyWcsCatalog; "
619 "Using original skyWcs.", detectorId)
621 skyWcs = row.getWcs()
623 self.log.
warning(
"Detector id %s has None for skyWcs in externalSkyWcsCatalog; "
624 "Using original skyWcs.", detectorId)
626 exposure.setWcs(skyWcs)
630 def addCalibColumns(self, catalog, exposure, exposureIdInfo, **kwargs):
631 """Add replace columns with calibs evaluated at each centroid
633 Add or replace
'base_LocalWcs' `base_LocalPhotoCalib
' columns in a
634 a source catalog, by rerunning the plugins.
639 catalog to which calib columns will be added
641 Exposure with attached PhotoCalibs
and SkyWcs attributes to be
642 reevaluated at local centroids. Pixels are
not required.
643 exposureIdInfo : `lsst.obs.base.ExposureIdInfo`
648 Source Catalog
with requested local calib columns
650 measureConfig = SingleFrameMeasurementTask.ConfigClass()
651 measureConfig.doReplaceWithNoise = False
653 measureConfig.plugins.names = []
654 if self.config.doReevaluateSkyWcs:
655 measureConfig.plugins.names.add(
'base_LocalWcs')
656 self.log.
info(
"Re-evaluating base_LocalWcs plugin")
657 if self.config.doReevaluatePhotoCalib:
658 measureConfig.plugins.names.add(
'base_LocalPhotoCalib')
659 self.log.
info(
"Re-evaluating base_LocalPhotoCalib plugin")
660 pluginsNotToCopy = tuple(measureConfig.plugins.names)
664 aliasMap = catalog.schema.getAliasMap()
666 for item
in catalog.schema:
667 if not item.field.getName().startswith(pluginsNotToCopy):
668 mapper.addMapping(item.key)
670 schema = mapper.getOutputSchema()
672 schema.setAliasMap(aliasMap)
674 newCat.extend(catalog, mapper=mapper)
676 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
681class PostprocessAnalysis(
object):
682 """Calculate columns from ParquetTable.
684 This object manages and organizes an arbitrary set of computations
685 on a catalog. The catalog
is defined by a
687 as a `deepCoadd_obj` dataset,
and the computations are defined by a
688 collection of `lsst.pipe.tasks.functor.Functor` objects (
or, equivalently,
689 a `CompositeFunctor`).
691 After the object
is initialized, accessing the `.df` attribute (which
692 holds the `pandas.DataFrame` containing the results of the calculations)
693 triggers computation of said dataframe.
695 One of the conveniences of using this object
is the ability to define a
696 desired common filter
for all functors. This enables the same functor
697 collection to be passed to several different `PostprocessAnalysis` objects
698 without having to change the original functor collection, since the `filt`
699 keyword argument of this object triggers an overwrite of the `filt`
700 property
for all functors
in the collection.
702 This object also allows a list of refFlags to be passed,
and defines a set
703 of default refFlags that are always included even
if not requested.
705 If a list of `ParquetTable` object
is passed, rather than a single one,
706 then the calculations will be mapped over all the input catalogs. In
707 principle, it should be straightforward to parallelize this activity, but
708 initial tests have failed (see TODO
in code comments).
712 parq : `lsst.pipe.tasks.ParquetTable` (
or list of such)
713 Source catalog(s)
for computation.
716 Computations to do (functors that act on `parq`).
717 If a dict, the output
718 DataFrame will have columns keyed accordingly.
719 If a list, the column keys will come
from the
720 `.shortname` attribute of each functor.
722 filt : `str`, optional
723 Filter
in which to calculate. If provided,
724 this will overwrite any existing `.filt` attribute
725 of the provided functors.
727 flags : `list`, optional
728 List of flags (per-band) to include
in output table.
729 Taken
from the `meas` dataset
if applied to a multilevel Object Table.
731 refFlags : `list`, optional
732 List of refFlags (only reference band) to include
in output table.
734 forcedFlags : `list`, optional
735 List of flags (per-band) to include
in output table.
736 Taken
from the ``forced_src`` dataset
if applied to a
737 multilevel Object Table. Intended
for flags
from measurement plugins
738 only run during multi-band forced-photometry.
740 _defaultRefFlags = []
743 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
745 self.functors = functors
748 self.flags =
list(flags)
if flags
is not None else []
749 self.forcedFlags =
list(forcedFlags)
if forcedFlags
is not None else []
750 self.refFlags =
list(self._defaultRefFlags)
751 if refFlags
is not None:
752 self.refFlags +=
list(refFlags)
757 def defaultFuncs(self):
758 funcs = dict(self._defaultFuncs)
763 additionalFuncs = self.defaultFuncs
764 additionalFuncs.update({flag:
Column(flag, dataset=
'forced_src')
for flag
in self.forcedFlags})
765 additionalFuncs.update({flag:
Column(flag, dataset=
'ref')
for flag
in self.refFlags})
766 additionalFuncs.update({flag:
Column(flag, dataset=
'meas')
for flag
in self.flags})
768 if isinstance(self.functors, CompositeFunctor):
773 func.funcDict.update(additionalFuncs)
774 func.filt = self.filt
780 return [name
for name, func
in self.func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
788 def compute(self, dropna=False, pool=None):
790 if type(self.parq)
in (list, tuple):
792 dflist = [self.func(parq, dropna=dropna)
for parq
in self.parq]
796 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
797 self._df = pd.concat(dflist)
799 self._df = self.func(self.parq, dropna=dropna)
806 """Expected Connections for subclasses of TransformCatalogBaseTask.
810 inputCatalog = connectionTypes.Input(
812 storageClass=
"DataFrame",
814 outputCatalog = connectionTypes.Output(
816 storageClass=
"DataFrame",
821 pipelineConnections=TransformCatalogBaseConnections):
822 functorFile = pexConfig.Field(
824 doc=
"Path to YAML file specifying Science Data Model functors to use "
825 "when copying columns and computing calibrated values.",
829 primaryKey = pexConfig.Field(
831 doc=
"Name of column to be set as the DataFrame index. If None, the index"
832 "will be named `id`",
836 columnsFromDataId = pexConfig.ListField(
840 doc=
"Columns to extract from the dataId",
845 """Base class for transforming/standardizing a catalog
847 by applying functors that convert units and apply calibrations.
848 The purpose of this task
is to perform a set of computations on
849 an input `ParquetTable` dataset (such
as `deepCoadd_obj`)
and write the
850 results to a new dataset (which needs to be declared
in an `outputDataset`
853 The calculations to be performed are defined
in a YAML file that specifies
854 a set of functors to be computed, provided
as
855 a `--functorFile` config parameter. An example of such a YAML file
880 - base_InputCount_value
883 functor: DeconvolvedMoments
888 - merge_measurement_i
889 - merge_measurement_r
890 - merge_measurement_z
891 - merge_measurement_y
892 - merge_measurement_g
893 - base_PixelFlags_flag_inexact_psfCenter
896 The names
for each entry under
"func" will become the names of columns
in
897 the output dataset. All the functors referenced are defined
in
899 functor are
in the `args` list,
and any additional entries
for each column
900 other than
"functor" or "args" (e.g., `
'filt'`, `
'dataset'`) are treated
as
901 keyword arguments to be passed to the functor initialization.
903 The
"flags" entry
is the default shortcut
for `Column` functors.
904 All columns listed under
"flags" will be copied to the output table
905 untransformed. They can be of any datatype.
906 In the special case of transforming a multi-level oject table
with
907 band
and dataset indices (deepCoadd_obj), these will be taked
from the
908 `meas` dataset
and exploded out per band.
910 There are two special shortcuts that only apply when transforming
911 multi-level Object (deepCoadd_obj) tables:
912 - The
"refFlags" entry
is shortcut
for `Column` functor
913 taken
from the `
'ref'` dataset
if transforming an ObjectTable.
914 - The
"forcedFlags" entry
is shortcut
for `Column` functors.
915 taken
from the ``forced_src`` dataset
if transforming an ObjectTable.
916 These are expanded out per band.
919 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
920 to organize
and excecute the calculations.
923 def _DefaultName(self):
924 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
928 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
932 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
936 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
940 if self.config.functorFile:
941 self.log.
info(
'Loading tranform functor definitions from %s',
942 self.config.functorFile)
943 self.
funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
944 self.
funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
946 self.
funcsfuncs =
None
949 inputs = butlerQC.get(inputRefs)
950 if self.
funcsfuncs
is None:
951 raise ValueError(
"config.functorFile is None. "
952 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
953 result = self.
runrun(parq=inputs[
'inputCatalog'], funcs=self.
funcsfuncs,
954 dataId=outputRefs.outputCatalog.dataId.full)
955 outputs = pipeBase.Struct(outputCatalog=result)
956 butlerQC.put(outputs, outputRefs)
958 def run(self, parq, funcs=None, dataId=None, band=None):
959 """Do postprocessing calculations
961 Takes a `ParquetTable` object and dataId,
962 returns a dataframe
with results of postprocessing calculations.
967 ParquetTable
from which calculations are done.
968 funcs : `lsst.pipe.tasks.functors.Functors`
969 Functors to apply to the table
's columns
970 dataId : dict, optional
971 Used to add a `patchId` column to the output dataframe.
972 band : `str`, optional
973 Filter band that is being processed.
977 df : `pandas.DataFrame`
979 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
981 df = self.
transformtransform(band, parq, funcs, dataId).df
982 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
986 return self.
funcsfuncs
990 funcs = self.
funcsfuncs
991 analysis = PostprocessAnalysis(parq, funcs, filt=band)
995 analysis = self.
getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
997 if dataId
and self.config.columnsFromDataId:
998 for key
in self.config.columnsFromDataId:
1000 df[
str(key)] = dataId[key]
1002 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
1004 if self.config.primaryKey:
1005 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
1006 df.reset_index(inplace=
True, drop=
True)
1007 df.set_index(self.config.primaryKey, inplace=
True)
1009 return pipeBase.Struct(
1018 """No metadata to write.
1024 defaultTemplates={
"coaddName":
"deep"},
1025 dimensions=(
"tract",
"patch",
"skymap")):
1026 inputCatalog = connectionTypes.Input(
1027 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
1028 "stored as a DataFrame with a multi-level column index per-patch.",
1029 dimensions=(
"tract",
"patch",
"skymap"),
1030 storageClass=
"DataFrame",
1031 name=
"{coaddName}Coadd_obj",
1034 outputCatalog = connectionTypes.Output(
1035 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
1037 dimensions=(
"tract",
"patch",
"skymap"),
1038 storageClass=
"DataFrame",
1044 pipelineConnections=TransformObjectCatalogConnections):
1045 coaddName = pexConfig.Field(
1051 filterMap = pexConfig.DictField(
1055 doc=(
"Dictionary mapping full filter name to short one for column name munging."
1056 "These filters determine the output columns no matter what filters the "
1057 "input data actually contain."),
1058 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
1059 "Will be removed after v22.")
1061 outputBands = pexConfig.ListField(
1065 doc=(
"These bands and only these bands will appear in the output,"
1066 " NaN-filled if the input does not include them."
1067 " If None, then use all bands found in the input.")
1069 camelCase = pexConfig.Field(
1072 doc=(
"Write per-band columns names with camelCase, else underscore "
1073 "For example: gPsFlux instead of g_PsFlux.")
1075 multilevelOutput = pexConfig.Field(
1078 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
1079 "and name-munged (False).")
1081 goodFlags = pexConfig.ListField(
1084 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
1085 "All other flags are considered to be 'bad' flags and will be set to True.")
1087 floatFillValue = pexConfig.Field(
1090 doc=
"Fill value for float fields when populating empty tables."
1092 integerFillValue = pexConfig.Field(
1095 doc=
"Fill value for integer fields when populating empty tables."
1100 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Object.yaml')
1101 self.primaryKey =
'objectId'
1102 self.columnsFromDataId = [
'tract',
'patch']
1103 self.goodFlags = [
'calib_astrometry_used',
1104 'calib_photometry_reserved',
1105 'calib_photometry_used',
1106 'calib_psf_candidate',
1107 'calib_psf_reserved',
1112 """Produce a flattened Object Table to match the format specified in
1115 Do the same set of postprocessing calculations on all bands.
1117 This is identical to `TransformCatalogBaseTask`,
except for that it does
1118 the specified functor calculations
for all filters present
in the
1119 input `deepCoadd_obj` table. Any specific `
"filt"` keywords specified
1120 by the YAML file will be superceded.
1122 _DefaultName = "transformObjectCatalog"
1123 ConfigClass = TransformObjectCatalogConfig
1125 def run(self, parq, funcs=None, dataId=None, band=None):
1129 templateDf = pd.DataFrame()
1131 if isinstance(parq, DeferredDatasetHandle):
1132 columns = parq.get(component=
'columns')
1133 inputBands = columns.unique(level=1).values
1135 inputBands = parq.columnLevelNames[
'band']
1137 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1140 for inputBand
in inputBands:
1141 if inputBand
not in outputBands:
1142 self.log.
info(
"Ignoring %s band data in the input", inputBand)
1144 self.log.
info(
"Transforming the catalog of band %s", inputBand)
1145 result = self.transform(inputBand, parq, funcs, dataId)
1146 dfDict[inputBand] = result.df
1147 analysisDict[inputBand] = result.analysis
1148 if templateDf.empty:
1149 templateDf = result.df
1152 for filt
in outputBands:
1153 if filt
not in dfDict:
1154 self.log.
info(
"Adding empty columns for band %s", filt)
1155 dfTemp = templateDf.copy()
1156 for col
in dfTemp.columns:
1157 testValue = dfTemp[col].values[0]
1158 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1160 if col
in self.config.goodFlags:
1164 elif isinstance(testValue, numbers.Integral):
1168 if isinstance(testValue, np.unsignedinteger):
1169 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1171 fillValue = self.config.integerFillValue
1173 fillValue = self.config.floatFillValue
1174 dfTemp[col].values[:] = fillValue
1175 dfDict[filt] = dfTemp
1178 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
1180 if not self.config.multilevelOutput:
1181 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
1182 if self.config.primaryKey
in noDupCols:
1183 noDupCols.remove(self.config.primaryKey)
1184 if dataId
and self.config.columnsFromDataId:
1185 noDupCols += self.config.columnsFromDataId
1186 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1187 inputBands=inputBands)
1189 self.log.
info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
1196 def makeDataRefList(self, namespace):
1197 """Make self.refList from self.idList
1199 Generate a list of data references given tract and/
or patch.
1200 This was adapted
from `TractQADataIdContainer`, which was
1201 `TractDataIdContainer` modifie to
not require
"filter".
1202 Only existing dataRefs are returned.
1204 def getPatchRefList(tract):
1205 return [namespace.butler.dataRef(datasetType=self.datasetType,
1206 tract=tract.getId(),
1207 patch=
"%d,%d" % patch.getIndex())
for patch
in tract]
1209 tractRefs = defaultdict(list)
1210 for dataId
in self.idList:
1211 skymap = self.getSkymap(namespace)
1213 if "tract" in dataId:
1214 tractId = dataId[
"tract"]
1215 if "patch" in dataId:
1216 tractRefs[tractId].
append(namespace.butler.dataRef(datasetType=self.datasetType,
1218 patch=dataId[
'patch']))
1220 tractRefs[tractId] += getPatchRefList(skymap[tractId])
1222 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
1223 for tract
in skymap)
1225 for tractRefList
in tractRefs.values():
1226 existingRefs = [ref
for ref
in tractRefList
if ref.datasetExists()]
1227 outputRefList.append(existingRefs)
1229 self.refList = outputRefList
1232class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1233 dimensions=(
"tract",
"skymap")):
1234 inputCatalogs = connectionTypes.Input(
1235 doc=
"Per-Patch objectTables conforming to the standard data model.",
1237 storageClass=
"DataFrame",
1238 dimensions=(
"tract",
"patch",
"skymap"),
1241 outputCatalog = connectionTypes.Output(
1242 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1243 name=
"objectTable_tract",
1244 storageClass=
"DataFrame",
1245 dimensions=(
"tract",
"skymap"),
1249class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1250 pipelineConnections=ConsolidateObjectTableConnections):
1251 coaddName = pexConfig.Field(
1258class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
1259 """Write patch-merged source tables to a tract-level parquet file.
1261 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1263 _DefaultName = "consolidateObjectTable"
1264 ConfigClass = ConsolidateObjectTableConfig
1266 inputDataset =
'objectTable'
1267 outputDataset =
'objectTable_tract'
1269 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1270 inputs = butlerQC.get(inputRefs)
1271 self.log.
info(
"Concatenating %s per-patch Object Tables",
1272 len(inputs[
'inputCatalogs']))
1273 df = pd.concat(inputs[
'inputCatalogs'])
1274 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1277 def _makeArgumentParser(cls):
1278 parser = ArgumentParser(name=cls._DefaultName)
1280 parser.add_id_argument(
"--id", cls.inputDataset,
1281 help=
"data ID, e.g. --id tract=12345",
1282 ContainerClass=TractObjectDataIdContainer)
1285 def runDataRef(self, patchRefList):
1286 df = pd.concat([patchRef.get().toDataFrame()
for patchRef
in patchRefList])
1287 patchRefList[0].put(
ParquetTable(dataFrame=df), self.outputDataset)
1290 """No metadata to write.
1295class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1296 defaultTemplates={
"catalogType":
""},
1297 dimensions=(
"instrument",
"visit",
"detector")):
1299 inputCatalog = connectionTypes.Input(
1300 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1301 name=
"{catalogType}source",
1302 storageClass=
"DataFrame",
1303 dimensions=(
"instrument",
"visit",
"detector"),
1306 outputCatalog = connectionTypes.Output(
1307 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1308 "specified set of functors",
1309 name=
"{catalogType}sourceTable",
1310 storageClass=
"DataFrame",
1311 dimensions=(
"instrument",
"visit",
"detector")
1316 pipelineConnections=TransformSourceTableConnections):
1320 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Source.yaml')
1321 self.primaryKey =
'sourceId'
1322 self.columnsFromDataId = [
'visit',
'detector',
'band',
'physical_filter']
1326 """Transform/standardize a source catalog
1328 _DefaultName = "transformSourceTable"
1329 ConfigClass = TransformSourceTableConfig
1332class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1333 dimensions=(
"instrument",
"visit",),
1334 defaultTemplates={
"calexpType":
""}):
1335 calexp = connectionTypes.Input(
1336 doc=
"Processed exposures used for metadata",
1337 name=
"{calexpType}calexp",
1338 storageClass=
"ExposureF",
1339 dimensions=(
"instrument",
"visit",
"detector"),
1343 visitSummary = connectionTypes.Output(
1344 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1345 "detector id for the id and are sorted for fast lookups of a "
1347 name=
"{calexpType}visitSummary",
1348 storageClass=
"ExposureCatalog",
1349 dimensions=(
"instrument",
"visit"),
1353class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1354 pipelineConnections=ConsolidateVisitSummaryConnections):
1355 """Config for ConsolidateVisitSummaryTask"""
1359class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1360 """Task to consolidate per-detector visit metadata.
1362 This task aggregates the following metadata from all the detectors
in a
1363 single visit into an exposure catalog:
1367 - The physical_filter
and band (
if available).
1368 - The psf size, shape,
and effective area at the center of the detector.
1369 - The corners of the bounding box
in right ascension/declination.
1371 Other quantities such
as Detector, Psf, ApCorrMap,
and TransmissionCurve
1372 are
not persisted here because of storage concerns,
and because of their
1373 limited utility
as summary statistics.
1375 Tests
for this task are performed
in ci_hsc_gen3.
1377 _DefaultName = "consolidateVisitSummary"
1378 ConfigClass = ConsolidateVisitSummaryConfig
1381 def _makeArgumentParser(cls):
1382 parser = ArgumentParser(name=cls._DefaultName)
1384 parser.add_id_argument(
"--id",
"calexp",
1385 help=
"data ID, e.g. --id visit=12345",
1386 ContainerClass=VisitDataIdContainer)
1390 """No metadata to persist, so override to remove metadata persistance.
1394 def writeConfig(self, butler, clobber=False, doBackup=True):
1395 """No config to persist, so override to remove config persistance.
1399 def runDataRef(self, dataRefList):
1400 visit = dataRefList[0].dataId[
'visit']
1402 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1403 len(dataRefList), visit)
1405 expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=
False)
1407 dataRefList[0].put(expCatalog,
'visitSummary', visit=visit)
1409 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1410 dataRefs = butlerQC.get(inputRefs.calexp)
1411 visit = dataRefs[0].dataId.byName()[
'visit']
1413 self.log.
debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1414 len(dataRefs), visit)
1416 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1418 butlerQC.put(expCatalog, outputRefs.visitSummary)
1420 def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1421 """Make a combined exposure catalog from a list of dataRefs.
1422 These dataRefs must point to exposures with wcs, summaryStats,
1423 and other visit metadata.
1428 Visit identification number.
1430 List of dataRefs
in visit. May be list of
1432 `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1433 isGen3 : `bool`, optional
1434 Specifies
if this
is a Gen3 list of datarefs.
1439 Exposure catalog
with per-detector summary information.
1441 schema = self._makeVisitSummarySchema()
1443 cat.resize(len(dataRefs))
1445 cat['visit'] = visit
1447 for i, dataRef
in enumerate(dataRefs):
1449 visitInfo = dataRef.get(component=
'visitInfo')
1450 filterLabel = dataRef.get(component=
'filter')
1451 summaryStats = dataRef.get(component=
'summaryStats')
1452 detector = dataRef.get(component=
'detector')
1453 wcs = dataRef.get(component=
'wcs')
1454 photoCalib = dataRef.get(component=
'photoCalib')
1455 detector = dataRef.get(component=
'detector')
1456 bbox = dataRef.get(component=
'bbox')
1457 validPolygon = dataRef.get(component=
'validPolygon')
1462 exp = dataRef.get(datasetType=
'calexp_sub', bbox=gen2_read_bbox)
1463 visitInfo = exp.getInfo().getVisitInfo()
1464 filterLabel = dataRef.get(
"calexp_filter")
1465 summaryStats = exp.getInfo().getSummaryStats()
1467 photoCalib = exp.getPhotoCalib()
1468 detector = exp.getDetector()
1469 bbox = dataRef.get(datasetType=
'calexp_bbox')
1470 validPolygon = exp.getInfo().getValidPolygon()
1474 rec.setVisitInfo(visitInfo)
1476 rec.setPhotoCalib(photoCalib)
1477 rec.setValidPolygon(validPolygon)
1479 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1480 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1481 rec.setId(detector.getId())
1482 rec[
'psfSigma'] = summaryStats.psfSigma
1483 rec[
'psfIxx'] = summaryStats.psfIxx
1484 rec[
'psfIyy'] = summaryStats.psfIyy
1485 rec[
'psfIxy'] = summaryStats.psfIxy
1486 rec[
'psfArea'] = summaryStats.psfArea
1487 rec[
'raCorners'][:] = summaryStats.raCorners
1488 rec[
'decCorners'][:] = summaryStats.decCorners
1489 rec[
'ra'] = summaryStats.ra
1490 rec[
'decl'] = summaryStats.decl
1491 rec[
'zenithDistance'] = summaryStats.zenithDistance
1492 rec[
'zeroPoint'] = summaryStats.zeroPoint
1493 rec[
'skyBg'] = summaryStats.skyBg
1494 rec[
'skyNoise'] = summaryStats.skyNoise
1495 rec[
'meanVar'] = summaryStats.meanVar
1496 rec[
'astromOffsetMean'] = summaryStats.astromOffsetMean
1497 rec[
'astromOffsetStd'] = summaryStats.astromOffsetStd
1498 rec[
'nPsfStar'] = summaryStats.nPsfStar
1499 rec[
'psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1500 rec[
'psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1501 rec[
'psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1502 rec[
'psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1503 rec[
'psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1504 rec[
'psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1505 rec[
'psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1508 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1510 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1511 cat.setMetadata(metadata)
1516 def _makeVisitSummarySchema(self):
1517 """Make the schema for the visitSummary catalog."""
1518 schema = afwTable.ExposureTable.makeMinimalSchema()
1519 schema.addField(
'visit', type=
'L', doc=
'Visit number')
1520 schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1521 schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1522 schema.addField(
'psfSigma', type=
'F',
1523 doc=
'PSF model second-moments determinant radius (center of chip) (pixel)')
1524 schema.addField(
'psfArea', type=
'F',
1525 doc=
'PSF model effective area (center of chip) (pixel**2)')
1526 schema.addField(
'psfIxx', type=
'F',
1527 doc=
'PSF model Ixx (center of chip) (pixel**2)')
1528 schema.addField(
'psfIyy', type=
'F',
1529 doc=
'PSF model Iyy (center of chip) (pixel**2)')
1530 schema.addField(
'psfIxy', type=
'F',
1531 doc=
'PSF model Ixy (center of chip) (pixel**2)')
1532 schema.addField(
'raCorners', type=
'ArrayD', size=4,
1533 doc=
'Right Ascension of bounding box corners (degrees)')
1534 schema.addField(
'decCorners', type=
'ArrayD', size=4,
1535 doc=
'Declination of bounding box corners (degrees)')
1536 schema.addField(
'ra', type=
'D',
1537 doc=
'Right Ascension of bounding box center (degrees)')
1538 schema.addField(
'decl', type=
'D',
1539 doc=
'Declination of bounding box center (degrees)')
1540 schema.addField(
'zenithDistance', type=
'F',
1541 doc=
'Zenith distance of bounding box center (degrees)')
1542 schema.addField(
'zeroPoint', type=
'F',
1543 doc=
'Mean zeropoint in detector (mag)')
1544 schema.addField(
'skyBg', type=
'F',
1545 doc=
'Average sky background (ADU)')
1546 schema.addField(
'skyNoise', type=
'F',
1547 doc=
'Average sky noise (ADU)')
1548 schema.addField(
'meanVar', type=
'F',
1549 doc=
'Mean variance of the weight plane (ADU**2)')
1550 schema.addField(
'astromOffsetMean', type=
'F',
1551 doc=
'Mean offset of astrometric calibration matches (arcsec)')
1552 schema.addField(
'astromOffsetStd', type=
'F',
1553 doc=
'Standard deviation of offsets of astrometric calibration matches (arcsec)')
1554 schema.addField(
'nPsfStar', type=
'I', doc=
'Number of stars used for PSF model')
1555 schema.addField(
'psfStarDeltaE1Median', type=
'F',
1556 doc=
'Median E1 residual (starE1 - psfE1) for psf stars')
1557 schema.addField(
'psfStarDeltaE2Median', type=
'F',
1558 doc=
'Median E2 residual (starE2 - psfE2) for psf stars')
1559 schema.addField(
'psfStarDeltaE1Scatter', type=
'F',
1560 doc=
'Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1561 schema.addField(
'psfStarDeltaE2Scatter', type=
'F',
1562 doc=
'Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1563 schema.addField(
'psfStarDeltaSizeMedian', type=
'F',
1564 doc=
'Median size residual (starSize - psfSize) for psf stars (pixel)')
1565 schema.addField(
'psfStarDeltaSizeScatter', type=
'F',
1566 doc=
'Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1567 schema.addField(
'psfStarScaledDeltaSizeScatter', type=
'F',
1568 doc=
'Scatter (via MAD) of size residual scaled by median size squared')
1573class VisitDataIdContainer(DataIdContainer):
1574 """DataIdContainer that groups sensor-level ids by visit.
1577 def makeDataRefList(self, namespace):
1578 """Make self.refList from self.idList
1580 Generate a list of data references grouped by visit.
1584 namespace : `argparse.Namespace`
1585 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command
1589 visitRefs = defaultdict(list)
1590 for dataId
in self.idList:
1591 if "visit" in dataId:
1592 visitId = dataId[
"visit"]
1594 subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1595 visitRefs[visitId].extend([dataRef
for dataRef
in subset])
1598 for refList
in visitRefs.values():
1599 existingRefs = [ref
for ref
in refList
if ref.datasetExists()]
1601 outputRefList.append(existingRefs)
1603 self.refList = outputRefList
1606class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1607 defaultTemplates={
"catalogType":
""},
1608 dimensions=(
"instrument",
"visit")):
1609 inputCatalogs = connectionTypes.Input(
1610 doc=
"Input per-detector Source Tables",
1611 name=
"{catalogType}sourceTable",
1612 storageClass=
"DataFrame",
1613 dimensions=(
"instrument",
"visit",
"detector"),
1616 outputCatalog = connectionTypes.Output(
1617 doc=
"Per-visit concatenation of Source Table",
1618 name=
"{catalogType}sourceTable_visit",
1619 storageClass=
"DataFrame",
1620 dimensions=(
"instrument",
"visit")
1624class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1625 pipelineConnections=ConsolidateSourceTableConnections):
1629class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1630 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1632 _DefaultName = 'consolidateSourceTable'
1633 ConfigClass = ConsolidateSourceTableConfig
1635 inputDataset =
'sourceTable'
1636 outputDataset =
'sourceTable_visit'
1638 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1639 from .makeCoaddTempExp
import reorderRefs
1641 detectorOrder = [ref.dataId[
'detector']
for ref
in inputRefs.inputCatalogs]
1642 detectorOrder.sort()
1643 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
1644 inputs = butlerQC.get(inputRefs)
1645 self.log.
info(
"Concatenating %s per-detector Source Tables",
1646 len(inputs[
'inputCatalogs']))
1647 df = pd.concat(inputs[
'inputCatalogs'])
1648 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1650 def runDataRef(self, dataRefList):
1651 self.log.
info(
"Concatenating %s per-detector Source Tables", len(dataRefList))
1652 df = pd.concat([dataRef.get().toDataFrame()
for dataRef
in dataRefList])
1653 dataRefList[0].put(
ParquetTable(dataFrame=df), self.outputDataset)
1656 def _makeArgumentParser(cls):
1657 parser = ArgumentParser(name=cls._DefaultName)
1659 parser.add_id_argument(
"--id", cls.inputDataset,
1660 help=
"data ID, e.g. --id visit=12345",
1661 ContainerClass=VisitDataIdContainer)
1665 """No metadata to write.
1669 def writeConfig(self, butler, clobber=False, doBackup=True):
1670 """No config to write.
1675class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1676 dimensions=(
"instrument",),
1677 defaultTemplates={
"calexpType":
""}):
1678 visitSummaryRefs = connectionTypes.Input(
1679 doc=
"Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1680 name=
"{calexpType}visitSummary",
1681 storageClass=
"ExposureCatalog",
1682 dimensions=(
"instrument",
"visit"),
1686 outputCatalog = connectionTypes.Output(
1687 doc=
"CCD and Visit metadata table",
1688 name=
"ccdVisitTable",
1689 storageClass=
"DataFrame",
1690 dimensions=(
"instrument",)
1694class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1695 pipelineConnections=MakeCcdVisitTableConnections):
1699class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1700 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1702 _DefaultName = 'makeCcdVisitTable'
1703 ConfigClass = MakeCcdVisitTableConfig
1705 def run(self, visitSummaryRefs):
1706 """Make a table of ccd information from the `visitSummary` catalogs.
1710 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1711 List of DeferredDatasetHandles pointing to exposure catalogs with
1712 per-detector summary information.
1716 result : `lsst.pipe.Base.Struct`
1717 Results struct
with attribute:
1720 Catalog of ccd
and visit information.
1723 for visitSummaryRef
in visitSummaryRefs:
1724 visitSummary = visitSummaryRef.get()
1725 visitInfo = visitSummary[0].getVisitInfo()
1728 summaryTable = visitSummary.asAstropy()
1729 selectColumns = [
'id',
'visit',
'physical_filter',
'band',
'ra',
'decl',
'zenithDistance',
1730 'zeroPoint',
'psfSigma',
'skyBg',
'skyNoise',
1731 'astromOffsetMean',
'astromOffsetStd',
'nPsfStar',
1732 'psfStarDeltaE1Median',
'psfStarDeltaE2Median',
1733 'psfStarDeltaE1Scatter',
'psfStarDeltaE2Scatter',
1734 'psfStarDeltaSizeMedian',
'psfStarDeltaSizeScatter',
1735 'psfStarScaledDeltaSizeScatter']
1736 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
'id')
1741 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1742 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id)
for id
in
1744 packer = visitSummaryRef.dataId.universe.makePacker(
'visit_detector', visitSummaryRef.dataId)
1745 ccdVisitIds = [packer.pack(dataId)
for dataId
in dataIds]
1746 ccdEntry[
'ccdVisitId'] = ccdVisitIds
1747 ccdEntry[
'detector'] = summaryTable[
'id']
1748 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds()
for vR
in visitSummary])
1749 ccdEntry[
"seeing"] = visitSummary[
'psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1751 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1752 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1753 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1754 expTime = visitInfo.getExposureTime()
1755 ccdEntry[
'expTime'] = expTime
1756 ccdEntry[
"obsStart"] = ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1757 expTime_days = expTime / (60*60*24)
1758 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1759 ccdEntry[
'darkTime'] = visitInfo.getDarkTime()
1760 ccdEntry[
'xSize'] = summaryTable[
'bbox_max_x'] - summaryTable[
'bbox_min_x']
1761 ccdEntry[
'ySize'] = summaryTable[
'bbox_max_y'] - summaryTable[
'bbox_min_y']
1762 ccdEntry[
'llcra'] = summaryTable[
'raCorners'][:, 0]
1763 ccdEntry[
'llcdec'] = summaryTable[
'decCorners'][:, 0]
1764 ccdEntry[
'ulcra'] = summaryTable[
'raCorners'][:, 1]
1765 ccdEntry[
'ulcdec'] = summaryTable[
'decCorners'][:, 1]
1766 ccdEntry[
'urcra'] = summaryTable[
'raCorners'][:, 2]
1767 ccdEntry[
'urcdec'] = summaryTable[
'decCorners'][:, 2]
1768 ccdEntry[
'lrcra'] = summaryTable[
'raCorners'][:, 3]
1769 ccdEntry[
'lrcdec'] = summaryTable[
'decCorners'][:, 3]
1773 ccdEntries.append(ccdEntry)
1775 outputCatalog = pd.concat(ccdEntries)
1776 outputCatalog.set_index(
'ccdVisitId', inplace=
True, verify_integrity=
True)
1777 return pipeBase.Struct(outputCatalog=outputCatalog)
1780class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1781 dimensions=(
"instrument",),
1782 defaultTemplates={
"calexpType":
""}):
1783 visitSummaries = connectionTypes.Input(
1784 doc=
"Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1785 name=
"{calexpType}visitSummary",
1786 storageClass=
"ExposureCatalog",
1787 dimensions=(
"instrument",
"visit",),
1791 outputCatalog = connectionTypes.Output(
1792 doc=
"Visit metadata table",
1794 storageClass=
"DataFrame",
1795 dimensions=(
"instrument",)
1799class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1800 pipelineConnections=MakeVisitTableConnections):
1804class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1805 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1807 _DefaultName = 'makeVisitTable'
1808 ConfigClass = MakeVisitTableConfig
1810 def run(self, visitSummaries):
1811 """Make a table of visit information from the `visitSummary` catalogs.
1816 List of exposure catalogs with per-detector summary information.
1819 result : `lsst.pipe.Base.Struct`
1820 Results struct
with attribute:
1823 Catalog of visit information.
1826 for visitSummary
in visitSummaries:
1827 visitSummary = visitSummary.get()
1828 visitRow = visitSummary[0]
1829 visitInfo = visitRow.getVisitInfo()
1832 visitEntry[
"visitId"] = visitRow[
'visit']
1833 visitEntry[
"visit"] = visitRow[
'visit']
1834 visitEntry[
"physical_filter"] = visitRow[
'physical_filter']
1835 visitEntry[
"band"] = visitRow[
'band']
1836 raDec = visitInfo.getBoresightRaDec()
1837 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1838 visitEntry[
"decl"] = raDec.getDec().asDegrees()
1839 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1840 azAlt = visitInfo.getBoresightAzAlt()
1841 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1842 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1843 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1844 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1845 expTime = visitInfo.getExposureTime()
1846 visitEntry[
"expTime"] = expTime
1847 visitEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1848 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1849 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1850 expTime_days = expTime / (60*60*24)
1851 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1852 visitEntries.append(visitEntry)
1858 outputCatalog = pd.DataFrame(data=visitEntries)
1859 outputCatalog.set_index(
'visitId', inplace=
True, verify_integrity=
True)
1860 return pipeBase.Struct(outputCatalog=outputCatalog)
1863class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1864 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1866 inputCatalog = connectionTypes.Input(
1867 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1868 "By default, it is the output of ForcedPhotCcdTask on calexps",
1870 storageClass=
"SourceCatalog",
1871 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1873 inputCatalogDiff = connectionTypes.Input(
1874 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1875 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1877 storageClass=
"SourceCatalog",
1878 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1880 outputCatalog = connectionTypes.Output(
1881 doc=
"InputCatalogs horizonatally joined on `objectId` in Parquet format",
1882 name=
"mergedForcedSource",
1883 storageClass=
"DataFrame",
1884 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1888class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1889 pipelineConnections=WriteForcedSourceTableConnections):
1891 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1897class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1898 """Merge and convert per-detector forced source catalogs to parquet.
1900 Because the predecessor ForcedPhotCcdTask operates per-detector,
1901 per-tract, (i.e., it has tract in its dimensions), detectors
1902 on the tract boundary may have multiple forced source catalogs.
1904 The successor task TransformForcedSourceTable runs per-patch
1905 and temporally-aggregates overlapping mergedForcedSource catalogs
from all
1906 available multiple epochs.
1908 _DefaultName = "writeForcedSourceTable"
1909 ConfigClass = WriteForcedSourceTableConfig
1911 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1912 inputs = butlerQC.get(inputRefs)
1914 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
1915 inputs[
'band'] = butlerQC.quantum.dataId.full[
'band']
1916 outputs = self.run(**inputs)
1917 butlerQC.put(outputs, outputRefs)
1919 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1921 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
'calexp',
'diff')):
1922 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1923 df = df.reindex(sorted(df.columns), axis=1)
1924 df[
'ccdVisitId'] = ccdVisitId
if ccdVisitId
else pd.NA
1925 df[
'band'] = band
if band
else pd.NA
1926 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1927 names=(
'dataset',
'column'))
1931 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1932 return pipeBase.Struct(outputCatalog=outputCatalog)
1935class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1936 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1938 inputCatalogs = connectionTypes.Input(
1939 doc=
"Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1940 name=
"mergedForcedSource",
1941 storageClass=
"DataFrame",
1942 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1946 referenceCatalog = connectionTypes.Input(
1947 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1948 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1951 storageClass=
"DataFrame",
1952 dimensions=(
"tract",
"patch",
"skymap"),
1955 outputCatalog = connectionTypes.Output(
1956 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1957 "specified set of functors",
1958 name=
"forcedSourceTable",
1959 storageClass=
"DataFrame",
1960 dimensions=(
"tract",
"patch",
"skymap")
1965 pipelineConnections=TransformForcedSourceTableConnections):
1966 referenceColumns = pexConfig.ListField(
1968 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1970 doc=
"Columns to pull from reference catalog",
1973 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1978 doc=
"Rename the output DataFrame index to this name",
1980 default=
"forcedSourceId",
1985 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'ForcedSource.yaml')
1986 self.columnsFromDataId = [
'tract',
'patch']
1990 """Transform/standardize a ForcedSource catalog
1992 Transforms each wide, per-detector forcedSource parquet table per the
1993 specification file (per-camera defaults found in ForcedSource.yaml).
1994 All epochs that overlap the patch are aggregated into one per-patch
1995 narrow-parquet file.
1997 No de-duplication of rows
is performed. Duplicate resolutions flags are
1998 pulled
in from the referenceCatalog: `detect_isPrimary`,
1999 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
2000 for analysis
or compare duplicates
for QA.
2002 The resulting table includes multiple bands. Epochs (MJDs)
and other useful
2003 per-visit rows can be retreived by joining
with the CcdVisitTable on
2006 _DefaultName = "transformForcedSourceTable"
2007 ConfigClass = TransformForcedSourceTableConfig
2009 def runQuantum(self, butlerQC, inputRefs, outputRefs):
2010 inputs = butlerQC.get(inputRefs)
2011 if self.funcs
is None:
2012 raise ValueError(
"config.functorFile is None. "
2013 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
2014 outputs = self.run(inputs[
'inputCatalogs'], inputs[
'referenceCatalog'], funcs=self.funcs,
2015 dataId=outputRefs.outputCatalog.dataId.full)
2017 butlerQC.put(outputs, outputRefs)
2019 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
2021 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
2022 self.log.
info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
2023 for handle
in inputCatalogs:
2024 result = self.transform(
None, handle, funcs, dataId)
2026 dfs.append(result.df.join(ref, how=
'inner'))
2028 outputCatalog = pd.concat(dfs)
2032 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
2034 outputCatalog.reset_index(inplace=
True)
2037 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
2039 outputCatalog.index.rename(self.config.key, inplace=
True)
2041 self.log.
info(
"Made a table of %d columns and %d rows",
2042 len(outputCatalog.columns), len(outputCatalog))
2043 return pipeBase.Struct(outputCatalog=outputCatalog)
2046class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
2047 defaultTemplates={
"catalogType":
""},
2048 dimensions=(
"instrument",
"tract")):
2049 inputCatalogs = connectionTypes.Input(
2050 doc=
"Input per-patch DataFrame Tables to be concatenated",
2051 name=
"{catalogType}ForcedSourceTable",
2052 storageClass=
"DataFrame",
2053 dimensions=(
"tract",
"patch",
"skymap"),
2057 outputCatalog = connectionTypes.Output(
2058 doc=
"Output per-tract concatenation of DataFrame Tables",
2059 name=
"{catalogType}ForcedSourceTable_tract",
2060 storageClass=
"DataFrame",
2061 dimensions=(
"tract",
"skymap"),
2065class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
2066 pipelineConnections=ConsolidateTractConnections):
2070class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
2071 """Concatenate any per-patch, dataframe list into a single
2072 per-tract DataFrame.
2074 _DefaultName = 'ConsolidateTract'
2075 ConfigClass = ConsolidateTractConfig
2077 def runQuantum(self, butlerQC, inputRefs, outputRefs):
2078 inputs = butlerQC.get(inputRefs)
2081 self.log.
info(
"Concatenating %s per-patch %s Tables",
2082 len(inputs[
'inputCatalogs']),
2083 inputRefs.inputCatalogs[0].datasetType.name)
2084 df = pd.concat(inputs[
'inputCatalogs'])
2085 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An integer coordinate rectangle.
def getAnalysis(self, parq, funcs=None, band=None)
def write(self, df, parqRef)
def __init__(self, *args, **kwargs)
def transform(self, band, parq, funcs, dataId)
def run(self, parq, funcs=None, dataId=None, band=None)
def writeMetadata(self, dataRef)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)