22__all__ = [
"WriteObjectTableConfig",
"WriteObjectTableTask",
23 "WriteSourceTableConfig",
"WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig",
"WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig",
"TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig",
"TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig",
"ConsolidateObjectTableTask",
29 "TransformSourceTableConfig",
"TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig",
"ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig",
"ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig",
"MakeCcdVisitTableTask",
33 "MakeVisitTableConfig",
"MakeVisitTableTask",
34 "WriteForcedSourceTableConfig",
"WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig",
"TransformForcedSourceTableTask",
36 "ConsolidateTractConfig",
"ConsolidateTractTask"]
49from lsst.obs.base
import ExposureIdInfo
53from lsst.daf.butler
import DeferredDatasetHandle, DataCoordinate
56from .parquetTable
import ParquetTable
57from .functors
import CompositeFunctor, Column
59log = logging.getLogger(__name__)
62def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
63 """Flattens a dataframe with multilevel column index.
65 newDf = pd.DataFrame()
67 dfBands = df.columns.unique(level=0).values
70 columnFormat =
'{0}{1}' if camelCase
else '{0}_{1}'
71 newColumns = {c: columnFormat.format(band, c)
72 for c
in subdf.columns
if c
not in noDupCols}
73 cols =
list(newColumns.keys())
74 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
77 presentBands = dfBands
if inputBands
is None else list(
set(inputBands).intersection(dfBands))
79 noDupDf = df[presentBands[0]][noDupCols]
80 newDf = pd.concat([noDupDf, newDf], axis=1)
85 defaultTemplates={
"coaddName":
"deep"},
86 dimensions=(
"tract",
"patch",
"skymap")):
87 inputCatalogMeas = connectionTypes.Input(
88 doc=
"Catalog of source measurements on the deepCoadd.",
89 dimensions=(
"tract",
"patch",
"band",
"skymap"),
90 storageClass=
"SourceCatalog",
91 name=
"{coaddName}Coadd_meas",
94 inputCatalogForcedSrc = connectionTypes.Input(
95 doc=
"Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
96 dimensions=(
"tract",
"patch",
"band",
"skymap"),
97 storageClass=
"SourceCatalog",
98 name=
"{coaddName}Coadd_forced_src",
101 inputCatalogRef = connectionTypes.Input(
102 doc=
"Catalog marking the primary detection (which band provides a good shape and position)"
103 "for each detection in deepCoadd_mergeDet.",
104 dimensions=(
"tract",
"patch",
"skymap"),
105 storageClass=
"SourceCatalog",
106 name=
"{coaddName}Coadd_ref"
108 outputCatalog = connectionTypes.Output(
109 doc=
"A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
110 "stored as a DataFrame with a multi-level column index per-patch.",
111 dimensions=(
"tract",
"patch",
"skymap"),
112 storageClass=
"DataFrame",
113 name=
"{coaddName}Coadd_obj"
117class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
118 pipelineConnections=WriteObjectTableConnections):
119 engine = pexConfig.Field(
122 doc=
"Parquet engine for writing (pyarrow or fastparquet)"
124 coaddName = pexConfig.Field(
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged source tables to parquet
134 _DefaultName = "writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
138 inputDatasets = (
'forced_src',
'meas',
'ref')
141 outputDataset =
'obj'
143 def runQuantum(self, butlerQC, inputRefs, outputRefs):
144 inputs = butlerQC.get(inputRefs)
146 measDict = {ref.dataId[
'band']: {
'meas': cat}
for ref, cat
in
147 zip(inputRefs.inputCatalogMeas, inputs[
'inputCatalogMeas'])}
148 forcedSourceDict = {ref.dataId[
'band']: {
'forced_src': cat}
for ref, cat
in
149 zip(inputRefs.inputCatalogForcedSrc, inputs[
'inputCatalogForcedSrc'])}
152 for band
in measDict.keys():
153 catalogs[band] = {
'meas': measDict[band][
'meas'],
154 'forced_src': forcedSourceDict[band][
'forced_src'],
155 'ref': inputs[
'inputCatalogRef']}
156 dataId = butlerQC.quantum.dataId
157 df = self.run(catalogs=catalogs, tract=dataId[
'tract'], patch=dataId[
'patch'])
158 outputs = pipeBase.Struct(outputCatalog=df)
159 butlerQC.put(outputs, outputRefs)
161 def run(self, catalogs, tract, patch):
162 """Merge multiple catalogs.
167 Mapping from filter names to dict of catalogs.
169 tractId to use
for the tractId column.
171 patchId to use
for the patchId column.
175 catalog : `pandas.DataFrame`
180 for filt, tableDict
in catalogs.items():
181 for dataset, table
in tableDict.items():
183 df = table.asAstropy().to_pandas().set_index(
'id', drop=
True)
186 df = df.reindex(sorted(df.columns), axis=1)
187 df[
'tractId'] = tract
188 df[
'patchId'] = patch
191 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c)
for c
in df.columns],
192 names=(
'dataset',
'band',
'column'))
195 catalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
199class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
200 defaultTemplates={
"catalogType":
""},
201 dimensions=(
"instrument",
"visit",
"detector")):
203 catalog = connectionTypes.Input(
204 doc=
"Input full-depth catalog of sources produced by CalibrateTask",
205 name=
"{catalogType}src",
206 storageClass=
"SourceCatalog",
207 dimensions=(
"instrument",
"visit",
"detector")
209 outputCatalog = connectionTypes.Output(
210 doc=
"Catalog of sources, `src` in Parquet format. The 'id' column is "
211 "replaced with an index; all other columns are unchanged.",
212 name=
"{catalogType}source",
213 storageClass=
"DataFrame",
214 dimensions=(
"instrument",
"visit",
"detector")
218class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
219 pipelineConnections=WriteSourceTableConnections):
223class WriteSourceTableTask(pipeBase.PipelineTask):
224 """Write source table to parquet.
226 _DefaultName = "writeSourceTable"
227 ConfigClass = WriteSourceTableConfig
229 def runQuantum(self, butlerQC, inputRefs, outputRefs):
230 inputs = butlerQC.get(inputRefs)
231 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
232 result = self.run(**inputs).table
233 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
234 butlerQC.put(outputs, outputRefs)
236 def run(self, catalog, ccdVisitId=None, **kwargs):
237 """Convert `src` catalog to parquet
242 catalog to be converted
244 ccdVisitId to be added as a column
248 result : `lsst.pipe.base.Struct`
250 `ParquetTable` version of the input catalog
252 self.log.info("Generating parquet table from src catalog ccdVisitId=%s", ccdVisitId)
253 df = catalog.asAstropy().to_pandas().set_index(
'id', drop=
True)
254 df[
'ccdVisitId'] = ccdVisitId
255 return pipeBase.Struct(table=
ParquetTable(dataFrame=df))
258class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
259 defaultTemplates={
"catalogType":
"",
260 "skyWcsName":
"jointcal",
261 "photoCalibName":
"fgcm"},
262 dimensions=(
"instrument",
"visit",
"detector",
"skymap")):
263 skyMap = connectionTypes.Input(
264 doc=
"skyMap needed to choose which tract-level calibrations to use when multiple available",
265 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
266 storageClass=
"SkyMap",
267 dimensions=(
"skymap",),
269 exposure = connectionTypes.Input(
270 doc=
"Input exposure to perform photometry on.",
272 storageClass=
"ExposureF",
273 dimensions=[
"instrument",
"visit",
"detector"],
275 externalSkyWcsTractCatalog = connectionTypes.Input(
276 doc=(
"Per-tract, per-visit wcs calibrations. These catalogs use the detector "
277 "id for the catalog id, sorted on id for fast lookup."),
278 name=
"{skyWcsName}SkyWcsCatalog",
279 storageClass=
"ExposureCatalog",
280 dimensions=[
"instrument",
"visit",
"tract"],
283 externalSkyWcsGlobalCatalog = connectionTypes.Input(
284 doc=(
"Per-visit wcs calibrations computed globally (with no tract information). "
285 "These catalogs use the detector id for the catalog id, sorted on id for "
287 name=
"{skyWcsName}SkyWcsCatalog",
288 storageClass=
"ExposureCatalog",
289 dimensions=[
"instrument",
"visit"],
291 externalPhotoCalibTractCatalog = connectionTypes.Input(
292 doc=(
"Per-tract, per-visit photometric calibrations. These catalogs use the "
293 "detector id for the catalog id, sorted on id for fast lookup."),
294 name=
"{photoCalibName}PhotoCalibCatalog",
295 storageClass=
"ExposureCatalog",
296 dimensions=[
"instrument",
"visit",
"tract"],
299 externalPhotoCalibGlobalCatalog = connectionTypes.Input(
300 doc=(
"Per-visit photometric calibrations computed globally (with no tract "
301 "information). These catalogs use the detector id for the catalog id, "
302 "sorted on id for fast lookup."),
303 name=
"{photoCalibName}PhotoCalibCatalog",
304 storageClass=
"ExposureCatalog",
305 dimensions=[
"instrument",
"visit"],
308 def __init__(self, *, config=None):
309 super().__init__(config=config)
312 if config.doApplyExternalSkyWcs
and config.doReevaluateSkyWcs:
313 if config.useGlobalExternalSkyWcs:
314 self.inputs.remove(
"externalSkyWcsTractCatalog")
316 self.inputs.remove(
"externalSkyWcsGlobalCatalog")
318 self.inputs.remove(
"externalSkyWcsTractCatalog")
319 self.inputs.remove(
"externalSkyWcsGlobalCatalog")
320 if config.doApplyExternalPhotoCalib
and config.doReevaluatePhotoCalib:
321 if config.useGlobalExternalPhotoCalib:
322 self.inputs.remove(
"externalPhotoCalibTractCatalog")
324 self.inputs.remove(
"externalPhotoCalibGlobalCatalog")
326 self.inputs.remove(
"externalPhotoCalibTractCatalog")
327 self.inputs.remove(
"externalPhotoCalibGlobalCatalog")
330class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
331 pipelineConnections=WriteRecalibratedSourceTableConnections):
333 doReevaluatePhotoCalib = pexConfig.Field(
336 doc=(
"Add or replace local photoCalib columns")
338 doReevaluateSkyWcs = pexConfig.Field(
341 doc=(
"Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec")
343 doApplyExternalPhotoCalib = pexConfig.Field(
346 doc=(
"If and only if doReevaluatePhotoCalib, apply the photometric calibrations from an external ",
347 "algorithm such as FGCM or jointcal, else use the photoCalib already attached to the exposure."),
349 doApplyExternalSkyWcs = pexConfig.Field(
352 doc=(
"if and only if doReevaluateSkyWcs, apply the WCS from an external algorithm such as jointcal, ",
353 "else use the wcs already attached to the exposure."),
355 useGlobalExternalPhotoCalib = pexConfig.Field(
358 doc=(
"When using doApplyExternalPhotoCalib, use 'global' calibrations "
359 "that are not run per-tract. When False, use per-tract photometric "
360 "calibration files.")
362 useGlobalExternalSkyWcs = pexConfig.Field(
365 doc=(
"When using doApplyExternalSkyWcs, use 'global' calibrations "
366 "that are not run per-tract. When False, use per-tract wcs "
372 if self.doApplyExternalSkyWcs
and not self.doReevaluateSkyWcs:
373 log.warning(
"doApplyExternalSkyWcs=True but doReevaluateSkyWcs=False"
374 "External SkyWcs will not be read or evaluated.")
375 if self.doApplyExternalPhotoCalib
and not self.doReevaluatePhotoCalib:
376 log.warning(
"doApplyExternalPhotoCalib=True but doReevaluatePhotoCalib=False."
377 "External PhotoCalib will not be read or evaluated.")
380class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
381 """Write source table to parquet
383 _DefaultName = "writeRecalibratedSourceTable"
384 ConfigClass = WriteRecalibratedSourceTableConfig
386 def runQuantum(self, butlerQC, inputRefs, outputRefs):
387 inputs = butlerQC.get(inputRefs)
388 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
389 inputs[
'exposureIdInfo'] = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"visit_detector")
391 if self.config.doReevaluatePhotoCalib
or self.config.doReevaluateSkyWcs:
392 if self.config.doApplyExternalPhotoCalib
or self.config.doApplyExternalSkyWcs:
393 inputs[
'exposure'] = self.attachCalibs(inputRefs, **inputs)
395 inputs[
'catalog'] = self.addCalibColumns(**inputs)
397 result = self.run(**inputs).table
398 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
399 butlerQC.put(outputs, outputRefs)
401 def attachCalibs(self, inputRefs, skyMap, exposure, externalSkyWcsGlobalCatalog=None,
402 externalSkyWcsTractCatalog=None, externalPhotoCalibGlobalCatalog=None,
403 externalPhotoCalibTractCatalog=None, **kwargs):
404 """Apply external calibrations to exposure per configuration
406 When multiple tract-level calibrations overlap, select the one with the
407 center closest to detector.
411 inputRefs : `lsst.pipe.base.InputQuantizedConnection`,
for dataIds of
413 skyMap : `lsst.skymap.SkyMap`
415 Input exposure to adjust calibrations.
417 Exposure catalog
with external skyWcs to be applied per config
419 Exposure catalog
with external skyWcs to be applied per config
421 Exposure catalog
with external photoCalib to be applied per config
428 Exposure
with adjusted calibrations.
430 if not self.config.doApplyExternalSkyWcs:
432 externalSkyWcsCatalog =
None
433 elif self.config.useGlobalExternalSkyWcs:
435 externalSkyWcsCatalog = externalSkyWcsGlobalCatalog
436 self.log.info(
'Applying global SkyWcs')
439 inputRef = getattr(inputRefs,
'externalSkyWcsTractCatalog')
440 tracts = [ref.dataId[
'tract']
for ref
in inputRef]
443 self.log.info(
'Applying tract-level SkyWcs from tract %s', tracts[ind])
445 if exposure.getWcs()
is None:
446 raise ValueError(
"Trying to locate nearest tract, but exposure.wcs is None.")
447 ind = self.getClosestTract(tracts, skyMap,
448 exposure.getBBox(), exposure.getWcs())
449 self.log.info(
'Multiple overlapping externalSkyWcsTractCatalogs found (%s). '
450 'Applying closest to detector center: tract=%s',
str(tracts), tracts[ind])
452 externalSkyWcsCatalog = externalSkyWcsTractCatalog[ind]
454 if not self.config.doApplyExternalPhotoCalib:
456 externalPhotoCalibCatalog =
None
457 elif self.config.useGlobalExternalPhotoCalib:
459 externalPhotoCalibCatalog = externalPhotoCalibGlobalCatalog
460 self.log.info(
'Applying global PhotoCalib')
463 inputRef = getattr(inputRefs,
'externalPhotoCalibTractCatalog')
464 tracts = [ref.dataId[
'tract']
for ref
in inputRef]
467 self.log.info(
'Applying tract-level PhotoCalib from tract %s', tracts[ind])
469 ind = self.getClosestTract(tracts, skyMap,
470 exposure.getBBox(), exposure.getWcs())
471 self.log.info(
'Multiple overlapping externalPhotoCalibTractCatalogs found (%s). '
472 'Applying closest to detector center: tract=%s',
str(tracts), tracts[ind])
474 externalPhotoCalibCatalog = externalPhotoCalibTractCatalog[ind]
476 return self.prepareCalibratedExposure(exposure, externalSkyWcsCatalog, externalPhotoCalibCatalog)
478 def getClosestTract(self, tracts, skyMap, bbox, wcs):
479 """Find the index of the tract closest to detector from list of tractIds
483 tracts: `list` [`int`]
484 Iterable of integer tractIds
485 skyMap : `lsst.skymap.SkyMap`
486 skyMap to lookup tract geometry and wcs
488 Detector bbox, center of which will compared to tract centers
490 Detector Wcs object to map the detector center to SkyCoord
499 center = wcs.pixelToSky(bbox.getCenter())
501 for tractId
in tracts:
502 tract = skyMap[tractId]
503 tractCenter = tract.getWcs().pixelToSky(tract.getBBox().getCenter())
504 sep.append(center.separation(tractCenter))
506 return np.argmin(sep)
508 def prepareCalibratedExposure(self, exposure, externalSkyWcsCatalog=None, externalPhotoCalibCatalog=None):
509 """Prepare a calibrated exposure and apply external calibrations
515 Input exposure to adjust calibrations.
517 Exposure catalog
with external skyWcs to be applied
518 if config.doApplyExternalSkyWcs=
True. Catalog uses the detector id
519 for the catalog id, sorted on id
for fast lookup.
521 Exposure catalog
with external photoCalib to be applied
522 if config.doApplyExternalPhotoCalib=
True. Catalog uses the detector
523 id
for the catalog id, sorted on id
for fast lookup.
528 Exposure
with adjusted calibrations.
530 detectorId = exposure.getInfo().getDetector().getId()
532 if externalPhotoCalibCatalog
is not None:
533 row = externalPhotoCalibCatalog.find(detectorId)
535 self.log.warning(
"Detector id %s not found in externalPhotoCalibCatalog; "
536 "Using original photoCalib.", detectorId)
538 photoCalib = row.getPhotoCalib()
539 if photoCalib
is None:
540 self.log.warning(
"Detector id %s has None for photoCalib in externalPhotoCalibCatalog; "
541 "Using original photoCalib.", detectorId)
543 exposure.setPhotoCalib(photoCalib)
545 if externalSkyWcsCatalog
is not None:
546 row = externalSkyWcsCatalog.find(detectorId)
548 self.log.warning(
"Detector id %s not found in externalSkyWcsCatalog; "
549 "Using original skyWcs.", detectorId)
551 skyWcs = row.getWcs()
553 self.log.warning(
"Detector id %s has None for skyWcs in externalSkyWcsCatalog; "
554 "Using original skyWcs.", detectorId)
556 exposure.setWcs(skyWcs)
560 def addCalibColumns(self, catalog, exposure, exposureIdInfo, **kwargs):
561 """Add replace columns with calibs evaluated at each centroid
563 Add or replace
'base_LocalWcs' `base_LocalPhotoCalib
' columns in a
564 a source catalog, by rerunning the plugins.
569 catalog to which calib columns will be added
571 Exposure with attached PhotoCalibs
and SkyWcs attributes to be
572 reevaluated at local centroids. Pixels are
not required.
573 exposureIdInfo : `lsst.obs.base.ExposureIdInfo`
578 Source Catalog
with requested local calib columns
580 measureConfig = SingleFrameMeasurementTask.ConfigClass()
581 measureConfig.doReplaceWithNoise = False
584 for slot
in measureConfig.slots:
585 setattr(measureConfig.slots, slot,
None)
587 measureConfig.plugins.names = []
588 if self.config.doReevaluateSkyWcs:
589 measureConfig.plugins.names.add(
'base_LocalWcs')
590 self.log.info(
"Re-evaluating base_LocalWcs plugin")
591 if self.config.doReevaluatePhotoCalib:
592 measureConfig.plugins.names.add(
'base_LocalPhotoCalib')
593 self.log.info(
"Re-evaluating base_LocalPhotoCalib plugin")
594 pluginsNotToCopy = tuple(measureConfig.plugins.names)
598 aliasMap = catalog.schema.getAliasMap()
600 for item
in catalog.schema:
601 if not item.field.getName().startswith(pluginsNotToCopy):
602 mapper.addMapping(item.key)
604 schema = mapper.getOutputSchema()
606 schema.setAliasMap(aliasMap)
608 newCat.extend(catalog, mapper=mapper)
614 if self.config.doReevaluateSkyWcs
and exposure.wcs
is not None:
617 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
623 """Calculate columns from ParquetTable.
625 This object manages and organizes an arbitrary set of computations
626 on a catalog. The catalog
is defined by a
628 as a ``deepCoadd_obj`` dataset,
and the computations are defined by a
629 collection of `lsst.pipe.tasks.functor.Functor` objects (
or, equivalently,
630 a ``CompositeFunctor``).
632 After the object
is initialized, accessing the ``.df`` attribute (which
633 holds the `pandas.DataFrame` containing the results of the calculations)
634 triggers computation of said dataframe.
636 One of the conveniences of using this object
is the ability to define a
637 desired common filter
for all functors. This enables the same functor
638 collection to be passed to several different `PostprocessAnalysis` objects
639 without having to change the original functor collection, since the ``filt``
640 keyword argument of this object triggers an overwrite of the ``filt``
641 property
for all functors
in the collection.
643 This object also allows a list of refFlags to be passed,
and defines a set
644 of default refFlags that are always included even
if not requested.
646 If a list of `~lsst.pipe.tasks.ParquetTable` object
is passed, rather than a single one,
647 then the calculations will be mapped over all the input catalogs. In
648 principle, it should be straightforward to parallelize this activity, but
649 initial tests have failed (see TODO
in code comments).
653 parq : `~lsst.pipe.tasks.ParquetTable` (
or list of such)
654 Source
catalog(s)
for computation.
657 Computations to do (functors that act on ``parq``).
658 If a dict, the output
659 DataFrame will have columns keyed accordingly.
660 If a list, the column keys will come
from the
661 ``.shortname`` attribute of each functor.
663 filt : `str`, optional
664 Filter
in which to calculate. If provided,
665 this will overwrite any existing ``.filt`` attribute
666 of the provided functors.
668 flags : `list`, optional
669 List of flags (per-band) to include
in output table.
670 Taken
from the ``meas`` dataset
if applied to a multilevel Object Table.
672 refFlags : `list`, optional
673 List of refFlags (only reference band) to include
in output table.
675 forcedFlags : `list`, optional
676 List of flags (per-band) to include
in output table.
677 Taken
from the ``forced_src`` dataset
if applied to a
678 multilevel Object Table. Intended
for flags
from measurement plugins
679 only run during multi-band forced-photometry.
681 _defaultRefFlags = []
684 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
692 if refFlags
is not None:
705 additionalFuncs.update({flag:
Column(flag, dataset=
'forced_src')
for flag
in self.
forcedFlags})
706 additionalFuncs.update({flag:
Column(flag, dataset=
'ref')
for flag
in self.
refFlags})
707 additionalFuncs.update({flag:
Column(flag, dataset=
'meas')
for flag
in self.
flags})
709 if isinstance(self.
functors, CompositeFunctor):
714 func.funcDict.update(additionalFuncs)
715 func.filt = self.
filt
721 return [name
for name, func
in self.
func.funcDict.items()
if func.noDup
or func.dataset ==
'ref']
731 if type(self.
parq)
in (list, tuple):
733 dflist = [self.
func(parq, dropna=dropna)
for parq
in self.
parq]
737 dflist = pool.map(functools.partial(self.
func, dropna=dropna), self.
parq)
738 self.
_df = pd.concat(dflist)
747 """Expected Connections for subclasses of TransformCatalogBaseTask.
751 inputCatalog = connectionTypes.Input(
753 storageClass=
"DataFrame",
755 outputCatalog = connectionTypes.Output(
757 storageClass=
"DataFrame",
762 pipelineConnections=TransformCatalogBaseConnections):
763 functorFile = pexConfig.Field(
765 doc=
"Path to YAML file specifying Science Data Model functors to use "
766 "when copying columns and computing calibrated values.",
770 primaryKey = pexConfig.Field(
772 doc=
"Name of column to be set as the DataFrame index. If None, the index"
773 "will be named `id`",
777 columnsFromDataId = pexConfig.ListField(
781 doc=
"Columns to extract from the dataId",
786 """Base class for transforming/standardizing a catalog
788 by applying functors that convert units and apply calibrations.
789 The purpose of this task
is to perform a set of computations on
790 an input `ParquetTable` dataset (such
as ``deepCoadd_obj``)
and write the
791 results to a new dataset (which needs to be declared
in an ``outputDataset``
794 The calculations to be performed are defined
in a YAML file that specifies
795 a set of functors to be computed, provided
as
796 a ``--functorFile`` config parameter. An example of such a YAML file
821 - base_InputCount_value
824 functor: DeconvolvedMoments
829 - merge_measurement_i
830 - merge_measurement_r
831 - merge_measurement_z
832 - merge_measurement_y
833 - merge_measurement_g
834 - base_PixelFlags_flag_inexact_psfCenter
837 The names
for each entry under
"func" will become the names of columns
in
838 the output dataset. All the functors referenced are defined
in
840 functor are
in the `args` list,
and any additional entries
for each column
841 other than
"functor" or "args" (e.g., ``
'filt'``, ``
'dataset'``) are treated
as
842 keyword arguments to be passed to the functor initialization.
844 The
"flags" entry
is the default shortcut
for `Column` functors.
845 All columns listed under
"flags" will be copied to the output table
846 untransformed. They can be of any datatype.
847 In the special case of transforming a multi-level oject table
with
848 band
and dataset indices (deepCoadd_obj), these will be taked
from the
849 `meas` dataset
and exploded out per band.
851 There are two special shortcuts that only apply when transforming
852 multi-level Object (deepCoadd_obj) tables:
853 - The
"refFlags" entry
is shortcut
for `Column` functor
854 taken
from the `
'ref'` dataset
if transforming an ObjectTable.
855 - The
"forcedFlags" entry
is shortcut
for `Column` functors.
856 taken
from the ``forced_src`` dataset
if transforming an ObjectTable.
857 These are expanded out per band.
861 to organize
and excecute the calculations.
864 def _DefaultName(self):
865 raise NotImplementedError(
'Subclass must define "_DefaultName" attribute')
869 raise NotImplementedError(
'Subclass must define "outputDataset" attribute')
873 raise NotImplementedError(
'Subclass must define "inputDataset" attribute')
877 raise NotImplementedError(
'Subclass must define "ConfigClass" attribute')
879 def __init__(self, *args, **kwargs):
880 super().__init__(*args, **kwargs)
881 if self.config.functorFile:
882 self.log.info(
'Loading tranform functor definitions from %s',
883 self.config.functorFile)
884 self.
funcs = CompositeFunctor.from_file(self.config.functorFile)
885 self.
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
890 inputs = butlerQC.get(inputRefs)
891 if self.
funcs is None:
892 raise ValueError(
"config.functorFile is None. "
893 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
894 result = self.
run(parq=inputs[
'inputCatalog'], funcs=self.
funcs,
895 dataId=outputRefs.outputCatalog.dataId.full)
896 outputs = pipeBase.Struct(outputCatalog=result)
897 butlerQC.put(outputs, outputRefs)
899 def run(self, parq, funcs=None, dataId=None, band=None):
900 """Do postprocessing calculations
902 Takes a `ParquetTable` object and dataId,
903 returns a dataframe
with results of postprocessing calculations.
908 ParquetTable
from which calculations are done.
909 funcs : `lsst.pipe.tasks.functors.Functors`
910 Functors to apply to the table
's columns
911 dataId : dict, optional
912 Used to add a `patchId` column to the output dataframe.
913 band : `str`, optional
914 Filter band that is being processed.
918 df : `pandas.DataFrame`
920 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
922 df = self.
transform(band, parq, funcs, dataId).df
923 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
936 analysis = self.
getAnalysis(parq, funcs=funcs, band=band)
938 if dataId
and self.config.columnsFromDataId:
939 for key
in self.config.columnsFromDataId:
941 df[
str(key)] = dataId[key]
943 raise ValueError(f
"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
945 if self.config.primaryKey:
946 if df.index.name != self.config.primaryKey
and self.config.primaryKey
in df:
947 df.reset_index(inplace=
True, drop=
True)
948 df.set_index(self.config.primaryKey, inplace=
True)
950 return pipeBase.Struct(
957 defaultTemplates={
"coaddName":
"deep"},
958 dimensions=(
"tract",
"patch",
"skymap")):
959 inputCatalog = connectionTypes.Input(
960 doc=
"The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
961 "stored as a DataFrame with a multi-level column index per-patch.",
962 dimensions=(
"tract",
"patch",
"skymap"),
963 storageClass=
"DataFrame",
964 name=
"{coaddName}Coadd_obj",
967 outputCatalog = connectionTypes.Output(
968 doc=
"Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
970 dimensions=(
"tract",
"patch",
"skymap"),
971 storageClass=
"DataFrame",
977 pipelineConnections=TransformObjectCatalogConnections):
978 coaddName = pexConfig.Field(
984 filterMap = pexConfig.DictField(
988 doc=(
"Dictionary mapping full filter name to short one for column name munging."
989 "These filters determine the output columns no matter what filters the "
990 "input data actually contain."),
991 deprecated=(
"Coadds are now identified by the band, so this transform is unused."
992 "Will be removed after v22.")
994 outputBands = pexConfig.ListField(
998 doc=(
"These bands and only these bands will appear in the output,"
999 " NaN-filled if the input does not include them."
1000 " If None, then use all bands found in the input.")
1002 camelCase = pexConfig.Field(
1005 doc=(
"Write per-band columns names with camelCase, else underscore "
1006 "For example: gPsFlux instead of g_PsFlux.")
1008 multilevelOutput = pexConfig.Field(
1011 doc=(
"Whether results dataframe should have a multilevel column index (True) or be flat "
1012 "and name-munged (False).")
1014 goodFlags = pexConfig.ListField(
1017 doc=(
"List of 'good' flags that should be set False when populating empty tables. "
1018 "All other flags are considered to be 'bad' flags and will be set to True.")
1020 floatFillValue = pexConfig.Field(
1023 doc=
"Fill value for float fields when populating empty tables."
1025 integerFillValue = pexConfig.Field(
1028 doc=
"Fill value for integer fields when populating empty tables."
1031 def setDefaults(self):
1032 super().setDefaults()
1033 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Object.yaml')
1034 self.primaryKey =
'objectId'
1035 self.columnsFromDataId = [
'tract',
'patch']
1036 self.goodFlags = [
'calib_astrometry_used',
1037 'calib_photometry_reserved',
1038 'calib_photometry_used',
1039 'calib_psf_candidate',
1040 'calib_psf_reserved',
1045 """Produce a flattened Object Table to match the format specified in
1048 Do the same set of postprocessing calculations on all bands.
1050 This is identical to `TransformCatalogBaseTask`,
except for that it does
1051 the specified functor calculations
for all filters present
in the
1052 input `deepCoadd_obj` table. Any specific ``
"filt"`` keywords specified
1053 by the YAML file will be superceded.
1055 _DefaultName = "transformObjectCatalog"
1056 ConfigClass = TransformObjectCatalogConfig
1058 def run(self, parq, funcs=None, dataId=None, band=None):
1062 templateDf = pd.DataFrame()
1064 if isinstance(parq, DeferredDatasetHandle):
1065 columns = parq.get(component=
'columns')
1066 inputBands = columns.unique(level=1).values
1068 inputBands = parq.columnLevelNames[
'band']
1070 outputBands = self.config.outputBands
if self.config.outputBands
else inputBands
1073 for inputBand
in inputBands:
1074 if inputBand
not in outputBands:
1075 self.log.info(
"Ignoring %s band data in the input", inputBand)
1077 self.log.info(
"Transforming the catalog of band %s", inputBand)
1078 result = self.transform(inputBand, parq, funcs, dataId)
1079 dfDict[inputBand] = result.df
1080 analysisDict[inputBand] = result.analysis
1081 if templateDf.empty:
1082 templateDf = result.df
1085 for filt
in outputBands:
1086 if filt
not in dfDict:
1087 self.log.info(
"Adding empty columns for band %s", filt)
1088 dfTemp = templateDf.copy()
1089 for col
in dfTemp.columns:
1090 testValue = dfTemp[col].values[0]
1091 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1093 if col
in self.config.goodFlags:
1097 elif isinstance(testValue, numbers.Integral):
1101 if isinstance(testValue, np.unsignedinteger):
1102 raise ValueError(
"Parquet tables may not have unsigned integer columns.")
1104 fillValue = self.config.integerFillValue
1106 fillValue = self.config.floatFillValue
1107 dfTemp[col].values[:] = fillValue
1108 dfDict[filt] = dfTemp
1111 df = pd.concat(dfDict, axis=1, names=[
'band',
'column'])
1113 if not self.config.multilevelOutput:
1114 noDupCols =
list(set.union(*[
set(v.noDupCols)
for v
in analysisDict.values()]))
1115 if self.config.primaryKey
in noDupCols:
1116 noDupCols.remove(self.config.primaryKey)
1117 if dataId
and self.config.columnsFromDataId:
1118 noDupCols += self.config.columnsFromDataId
1119 df =
flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1120 inputBands=inputBands)
1122 self.log.info(
"Made a table of %d columns and %d rows", len(df.columns), len(df))
1127class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1128 dimensions=(
"tract",
"skymap")):
1129 inputCatalogs = connectionTypes.Input(
1130 doc=
"Per-Patch objectTables conforming to the standard data model.",
1132 storageClass=
"DataFrame",
1133 dimensions=(
"tract",
"patch",
"skymap"),
1136 outputCatalog = connectionTypes.Output(
1137 doc=
"Pre-tract horizontal concatenation of the input objectTables",
1138 name=
"objectTable_tract",
1139 storageClass=
"DataFrame",
1140 dimensions=(
"tract",
"skymap"),
1144class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1145 pipelineConnections=ConsolidateObjectTableConnections):
1146 coaddName = pexConfig.Field(
1153class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1154 """Write patch-merged source tables to a tract-level parquet file.
1156 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1158 _DefaultName = "consolidateObjectTable"
1159 ConfigClass = ConsolidateObjectTableConfig
1161 inputDataset =
'objectTable'
1162 outputDataset =
'objectTable_tract'
1164 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1165 inputs = butlerQC.get(inputRefs)
1166 self.log.info(
"Concatenating %s per-patch Object Tables",
1167 len(inputs[
'inputCatalogs']))
1168 df = pd.concat(inputs[
'inputCatalogs'])
1169 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1172class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1173 defaultTemplates={
"catalogType":
""},
1174 dimensions=(
"instrument",
"visit",
"detector")):
1176 inputCatalog = connectionTypes.Input(
1177 doc=
"Wide input catalog of sources produced by WriteSourceTableTask",
1178 name=
"{catalogType}source",
1179 storageClass=
"DataFrame",
1180 dimensions=(
"instrument",
"visit",
"detector"),
1183 outputCatalog = connectionTypes.Output(
1184 doc=
"Narrower, per-detector Source Table transformed and converted per a "
1185 "specified set of functors",
1186 name=
"{catalogType}sourceTable",
1187 storageClass=
"DataFrame",
1188 dimensions=(
"instrument",
"visit",
"detector")
1193 pipelineConnections=TransformSourceTableConnections):
1195 def setDefaults(self):
1196 super().setDefaults()
1197 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'Source.yaml')
1198 self.primaryKey =
'sourceId'
1199 self.columnsFromDataId = [
'visit',
'detector',
'band',
'physical_filter']
1203 """Transform/standardize a source catalog
1205 _DefaultName = "transformSourceTable"
1206 ConfigClass = TransformSourceTableConfig
1209class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1210 dimensions=(
"instrument",
"visit",),
1211 defaultTemplates={
"calexpType":
""}):
1212 calexp = connectionTypes.Input(
1213 doc=
"Processed exposures used for metadata",
1214 name=
"{calexpType}calexp",
1215 storageClass=
"ExposureF",
1216 dimensions=(
"instrument",
"visit",
"detector"),
1220 visitSummary = connectionTypes.Output(
1221 doc=(
"Per-visit consolidated exposure metadata. These catalogs use "
1222 "detector id for the id and are sorted for fast lookups of a "
1224 name=
"{calexpType}visitSummary",
1225 storageClass=
"ExposureCatalog",
1226 dimensions=(
"instrument",
"visit"),
1230class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1231 pipelineConnections=ConsolidateVisitSummaryConnections):
1232 """Config for ConsolidateVisitSummaryTask"""
1236class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1237 """Task to consolidate per-detector visit metadata.
1239 This task aggregates the following metadata from all the detectors
in a
1240 single visit into an exposure catalog:
1244 - The physical_filter
and band (
if available).
1245 - The psf size, shape,
and effective area at the center of the detector.
1246 - The corners of the bounding box
in right ascension/declination.
1248 Other quantities such
as Detector, Psf, ApCorrMap,
and TransmissionCurve
1249 are
not persisted here because of storage concerns,
and because of their
1250 limited utility
as summary statistics.
1252 Tests
for this task are performed
in ci_hsc_gen3.
1254 _DefaultName = "consolidateVisitSummary"
1255 ConfigClass = ConsolidateVisitSummaryConfig
1257 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1258 dataRefs = butlerQC.get(inputRefs.calexp)
1259 visit = dataRefs[0].dataId.byName()[
'visit']
1261 self.log.debug(
"Concatenating metadata from %d per-detector calexps (visit %d)",
1262 len(dataRefs), visit)
1264 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1266 butlerQC.put(expCatalog, outputRefs.visitSummary)
1268 def _combineExposureMetadata(self, visit, dataRefs):
1269 """Make a combined exposure catalog from a list of dataRefs.
1270 These dataRefs must point to exposures with wcs, summaryStats,
1271 and other visit metadata.
1276 Visit identification number.
1277 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1278 List of dataRefs
in visit.
1283 Exposure catalog
with per-detector summary information.
1285 schema = self._makeVisitSummarySchema()
1287 cat.resize(len(dataRefs))
1289 cat['visit'] = visit
1291 for i, dataRef
in enumerate(dataRefs):
1292 visitInfo = dataRef.get(component=
'visitInfo')
1293 filterLabel = dataRef.get(component=
'filter')
1294 summaryStats = dataRef.get(component=
'summaryStats')
1295 detector = dataRef.get(component=
'detector')
1296 wcs = dataRef.get(component=
'wcs')
1297 photoCalib = dataRef.get(component=
'photoCalib')
1298 detector = dataRef.get(component=
'detector')
1299 bbox = dataRef.get(component=
'bbox')
1300 validPolygon = dataRef.get(component=
'validPolygon')
1304 rec.setVisitInfo(visitInfo)
1306 rec.setPhotoCalib(photoCalib)
1307 rec.setValidPolygon(validPolygon)
1309 rec[
'physical_filter'] = filterLabel.physicalLabel
if filterLabel.hasPhysicalLabel()
else ""
1310 rec[
'band'] = filterLabel.bandLabel
if filterLabel.hasBandLabel()
else ""
1311 rec.setId(detector.getId())
1312 rec[
'psfSigma'] = summaryStats.psfSigma
1313 rec[
'psfIxx'] = summaryStats.psfIxx
1314 rec[
'psfIyy'] = summaryStats.psfIyy
1315 rec[
'psfIxy'] = summaryStats.psfIxy
1316 rec[
'psfArea'] = summaryStats.psfArea
1317 rec[
'raCorners'][:] = summaryStats.raCorners
1318 rec[
'decCorners'][:] = summaryStats.decCorners
1319 rec[
'ra'] = summaryStats.ra
1320 rec[
'decl'] = summaryStats.decl
1321 rec[
'zenithDistance'] = summaryStats.zenithDistance
1322 rec[
'zeroPoint'] = summaryStats.zeroPoint
1323 rec[
'skyBg'] = summaryStats.skyBg
1324 rec[
'skyNoise'] = summaryStats.skyNoise
1325 rec[
'meanVar'] = summaryStats.meanVar
1326 rec[
'astromOffsetMean'] = summaryStats.astromOffsetMean
1327 rec[
'astromOffsetStd'] = summaryStats.astromOffsetStd
1328 rec[
'nPsfStar'] = summaryStats.nPsfStar
1329 rec[
'psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1330 rec[
'psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1331 rec[
'psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1332 rec[
'psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1333 rec[
'psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1334 rec[
'psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1335 rec[
'psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1338 metadata.add(
"COMMENT",
"Catalog id is detector id, sorted.")
1340 metadata.add(
"COMMENT",
"Only detectors with data have entries.")
1341 cat.setMetadata(metadata)
1346 def _makeVisitSummarySchema(self):
1347 """Make the schema for the visitSummary catalog."""
1348 schema = afwTable.ExposureTable.makeMinimalSchema()
1349 schema.addField(
'visit', type=
'L', doc=
'Visit number')
1350 schema.addField(
'physical_filter', type=
'String', size=32, doc=
'Physical filter')
1351 schema.addField(
'band', type=
'String', size=32, doc=
'Name of band')
1352 schema.addField(
'psfSigma', type=
'F',
1353 doc=
'PSF model second-moments determinant radius (center of chip) (pixel)')
1354 schema.addField(
'psfArea', type=
'F',
1355 doc=
'PSF model effective area (center of chip) (pixel**2)')
1356 schema.addField(
'psfIxx', type=
'F',
1357 doc=
'PSF model Ixx (center of chip) (pixel**2)')
1358 schema.addField(
'psfIyy', type=
'F',
1359 doc=
'PSF model Iyy (center of chip) (pixel**2)')
1360 schema.addField(
'psfIxy', type=
'F',
1361 doc=
'PSF model Ixy (center of chip) (pixel**2)')
1362 schema.addField(
'raCorners', type=
'ArrayD', size=4,
1363 doc=
'Right Ascension of bounding box corners (degrees)')
1364 schema.addField(
'decCorners', type=
'ArrayD', size=4,
1365 doc=
'Declination of bounding box corners (degrees)')
1366 schema.addField(
'ra', type=
'D',
1367 doc=
'Right Ascension of bounding box center (degrees)')
1368 schema.addField(
'decl', type=
'D',
1369 doc=
'Declination of bounding box center (degrees)')
1370 schema.addField(
'zenithDistance', type=
'F',
1371 doc=
'Zenith distance of bounding box center (degrees)')
1372 schema.addField(
'zeroPoint', type=
'F',
1373 doc=
'Mean zeropoint in detector (mag)')
1374 schema.addField(
'skyBg', type=
'F',
1375 doc=
'Average sky background (ADU)')
1376 schema.addField(
'skyNoise', type=
'F',
1377 doc=
'Average sky noise (ADU)')
1378 schema.addField(
'meanVar', type=
'F',
1379 doc=
'Mean variance of the weight plane (ADU**2)')
1380 schema.addField(
'astromOffsetMean', type=
'F',
1381 doc=
'Mean offset of astrometric calibration matches (arcsec)')
1382 schema.addField(
'astromOffsetStd', type=
'F',
1383 doc=
'Standard deviation of offsets of astrometric calibration matches (arcsec)')
1384 schema.addField(
'nPsfStar', type=
'I', doc=
'Number of stars used for PSF model')
1385 schema.addField(
'psfStarDeltaE1Median', type=
'F',
1386 doc=
'Median E1 residual (starE1 - psfE1) for psf stars')
1387 schema.addField(
'psfStarDeltaE2Median', type=
'F',
1388 doc=
'Median E2 residual (starE2 - psfE2) for psf stars')
1389 schema.addField(
'psfStarDeltaE1Scatter', type=
'F',
1390 doc=
'Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1391 schema.addField(
'psfStarDeltaE2Scatter', type=
'F',
1392 doc=
'Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1393 schema.addField(
'psfStarDeltaSizeMedian', type=
'F',
1394 doc=
'Median size residual (starSize - psfSize) for psf stars (pixel)')
1395 schema.addField(
'psfStarDeltaSizeScatter', type=
'F',
1396 doc=
'Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1397 schema.addField(
'psfStarScaledDeltaSizeScatter', type=
'F',
1398 doc=
'Scatter (via MAD) of size residual scaled by median size squared')
1403class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1404 defaultTemplates={
"catalogType":
""},
1405 dimensions=(
"instrument",
"visit")):
1406 inputCatalogs = connectionTypes.Input(
1407 doc=
"Input per-detector Source Tables",
1408 name=
"{catalogType}sourceTable",
1409 storageClass=
"DataFrame",
1410 dimensions=(
"instrument",
"visit",
"detector"),
1413 outputCatalog = connectionTypes.Output(
1414 doc=
"Per-visit concatenation of Source Table",
1415 name=
"{catalogType}sourceTable_visit",
1416 storageClass=
"DataFrame",
1417 dimensions=(
"instrument",
"visit")
1421class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1422 pipelineConnections=ConsolidateSourceTableConnections):
1426class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1427 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1429 _DefaultName = 'consolidateSourceTable'
1430 ConfigClass = ConsolidateSourceTableConfig
1432 inputDataset =
'sourceTable'
1433 outputDataset =
'sourceTable_visit'
1435 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1436 from .makeWarp
import reorderRefs
1438 detectorOrder = [ref.dataId[
'detector']
for ref
in inputRefs.inputCatalogs]
1439 detectorOrder.sort()
1440 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey=
'detector')
1441 inputs = butlerQC.get(inputRefs)
1442 self.log.info(
"Concatenating %s per-detector Source Tables",
1443 len(inputs[
'inputCatalogs']))
1444 df = pd.concat(inputs[
'inputCatalogs'])
1445 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1448class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1449 dimensions=(
"instrument",),
1450 defaultTemplates={
"calexpType":
""}):
1451 visitSummaryRefs = connectionTypes.Input(
1452 doc=
"Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1453 name=
"{calexpType}visitSummary",
1454 storageClass=
"ExposureCatalog",
1455 dimensions=(
"instrument",
"visit"),
1459 outputCatalog = connectionTypes.Output(
1460 doc=
"CCD and Visit metadata table",
1461 name=
"{calexpType}ccdVisitTable",
1462 storageClass=
"DataFrame",
1463 dimensions=(
"instrument",)
1467class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1468 pipelineConnections=MakeCcdVisitTableConnections):
1472class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1473 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1475 _DefaultName = 'makeCcdVisitTable'
1476 ConfigClass = MakeCcdVisitTableConfig
1478 def run(self, visitSummaryRefs):
1479 """Make a table of ccd information from the `visitSummary` catalogs.
1483 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1484 List of DeferredDatasetHandles pointing to exposure catalogs with
1485 per-detector summary information.
1489 result : `lsst.pipe.Base.Struct`
1490 Results struct
with attribute:
1493 Catalog of ccd
and visit information.
1496 for visitSummaryRef
in visitSummaryRefs:
1497 visitSummary = visitSummaryRef.get()
1498 visitInfo = visitSummary[0].getVisitInfo()
1501 summaryTable = visitSummary.asAstropy()
1502 selectColumns = [
'id',
'visit',
'physical_filter',
'band',
'ra',
'decl',
'zenithDistance',
1503 'zeroPoint',
'psfSigma',
'skyBg',
'skyNoise',
1504 'astromOffsetMean',
'astromOffsetStd',
'nPsfStar',
1505 'psfStarDeltaE1Median',
'psfStarDeltaE2Median',
1506 'psfStarDeltaE1Scatter',
'psfStarDeltaE2Scatter',
1507 'psfStarDeltaSizeMedian',
'psfStarDeltaSizeScatter',
1508 'psfStarScaledDeltaSizeScatter']
1509 ccdEntry = summaryTable[selectColumns].to_pandas().set_index(
'id')
1514 ccdEntry = ccdEntry.rename(columns={
"visit":
"visitId"})
1515 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id)
for id
in
1517 packer = visitSummaryRef.dataId.universe.makePacker(
'visit_detector', visitSummaryRef.dataId)
1518 ccdVisitIds = [packer.pack(dataId)
for dataId
in dataIds]
1519 ccdEntry[
'ccdVisitId'] = ccdVisitIds
1520 ccdEntry[
'detector'] = summaryTable[
'id']
1521 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds()
if vR.getWcs()
1522 else np.nan
for vR
in visitSummary])
1523 ccdEntry[
"seeing"] = visitSummary[
'psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1525 ccdEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1526 ccdEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1527 ccdEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1528 expTime = visitInfo.getExposureTime()
1529 ccdEntry[
'expTime'] = expTime
1530 ccdEntry[
"obsStart"] = ccdEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1531 expTime_days = expTime / (60*60*24)
1532 ccdEntry[
"obsStartMJD"] = ccdEntry[
"expMidptMJD"] - 0.5 * expTime_days
1533 ccdEntry[
'darkTime'] = visitInfo.getDarkTime()
1534 ccdEntry[
'xSize'] = summaryTable[
'bbox_max_x'] - summaryTable[
'bbox_min_x']
1535 ccdEntry[
'ySize'] = summaryTable[
'bbox_max_y'] - summaryTable[
'bbox_min_y']
1536 ccdEntry[
'llcra'] = summaryTable[
'raCorners'][:, 0]
1537 ccdEntry[
'llcdec'] = summaryTable[
'decCorners'][:, 0]
1538 ccdEntry[
'ulcra'] = summaryTable[
'raCorners'][:, 1]
1539 ccdEntry[
'ulcdec'] = summaryTable[
'decCorners'][:, 1]
1540 ccdEntry[
'urcra'] = summaryTable[
'raCorners'][:, 2]
1541 ccdEntry[
'urcdec'] = summaryTable[
'decCorners'][:, 2]
1542 ccdEntry[
'lrcra'] = summaryTable[
'raCorners'][:, 3]
1543 ccdEntry[
'lrcdec'] = summaryTable[
'decCorners'][:, 3]
1547 ccdEntries.append(ccdEntry)
1549 outputCatalog = pd.concat(ccdEntries)
1550 outputCatalog.set_index(
'ccdVisitId', inplace=
True, verify_integrity=
True)
1551 return pipeBase.Struct(outputCatalog=outputCatalog)
1554class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1555 dimensions=(
"instrument",),
1556 defaultTemplates={
"calexpType":
""}):
1557 visitSummaries = connectionTypes.Input(
1558 doc=
"Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1559 name=
"{calexpType}visitSummary",
1560 storageClass=
"ExposureCatalog",
1561 dimensions=(
"instrument",
"visit",),
1565 outputCatalog = connectionTypes.Output(
1566 doc=
"Visit metadata table",
1567 name=
"{calexpType}visitTable",
1568 storageClass=
"DataFrame",
1569 dimensions=(
"instrument",)
1573class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1574 pipelineConnections=MakeVisitTableConnections):
1578class MakeVisitTableTask(pipeBase.PipelineTask):
1579 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1581 _DefaultName = 'makeVisitTable'
1582 ConfigClass = MakeVisitTableConfig
1584 def run(self, visitSummaries):
1585 """Make a table of visit information from the `visitSummary` catalogs.
1590 List of exposure catalogs with per-detector summary information.
1593 result : `lsst.pipe.Base.Struct`
1594 Results struct
with attribute:
1597 Catalog of visit information.
1600 for visitSummary
in visitSummaries:
1601 visitSummary = visitSummary.get()
1602 visitRow = visitSummary[0]
1603 visitInfo = visitRow.getVisitInfo()
1606 visitEntry[
"visitId"] = visitRow[
'visit']
1607 visitEntry[
"visit"] = visitRow[
'visit']
1608 visitEntry[
"physical_filter"] = visitRow[
'physical_filter']
1609 visitEntry[
"band"] = visitRow[
'band']
1610 raDec = visitInfo.getBoresightRaDec()
1611 visitEntry[
"ra"] = raDec.getRa().asDegrees()
1612 visitEntry[
"decl"] = raDec.getDec().asDegrees()
1613 visitEntry[
"skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1614 azAlt = visitInfo.getBoresightAzAlt()
1615 visitEntry[
"azimuth"] = azAlt.getLongitude().asDegrees()
1616 visitEntry[
"altitude"] = azAlt.getLatitude().asDegrees()
1617 visitEntry[
"zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1618 visitEntry[
"airmass"] = visitInfo.getBoresightAirmass()
1619 expTime = visitInfo.getExposureTime()
1620 visitEntry[
"expTime"] = expTime
1621 visitEntry[
"expMidpt"] = visitInfo.getDate().toPython()
1622 visitEntry[
"expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1623 visitEntry[
"obsStart"] = visitEntry[
"expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1624 expTime_days = expTime / (60*60*24)
1625 visitEntry[
"obsStartMJD"] = visitEntry[
"expMidptMJD"] - 0.5 * expTime_days
1626 visitEntries.append(visitEntry)
1632 outputCatalog = pd.DataFrame(data=visitEntries)
1633 outputCatalog.set_index(
'visitId', inplace=
True, verify_integrity=
True)
1634 return pipeBase.Struct(outputCatalog=outputCatalog)
1637class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1638 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")):
1640 inputCatalog = connectionTypes.Input(
1641 doc=
"Primary per-detector, single-epoch forced-photometry catalog. "
1642 "By default, it is the output of ForcedPhotCcdTask on calexps",
1644 storageClass=
"SourceCatalog",
1645 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1647 inputCatalogDiff = connectionTypes.Input(
1648 doc=
"Secondary multi-epoch, per-detector, forced photometry catalog. "
1649 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1651 storageClass=
"SourceCatalog",
1652 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1654 outputCatalog = connectionTypes.Output(
1655 doc=
"InputCatalogs horizonatally joined on `objectId` in Parquet format",
1656 name=
"mergedForcedSource",
1657 storageClass=
"DataFrame",
1658 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract")
1662class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1663 pipelineConnections=WriteForcedSourceTableConnections):
1665 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1671class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1672 """Merge and convert per-detector forced source catalogs to parquet.
1674 Because the predecessor ForcedPhotCcdTask operates per-detector,
1675 per-tract, (i.e., it has tract in its dimensions), detectors
1676 on the tract boundary may have multiple forced source catalogs.
1678 The successor task TransformForcedSourceTable runs per-patch
1679 and temporally-aggregates overlapping mergedForcedSource catalogs
from all
1680 available multiple epochs.
1682 _DefaultName = "writeForcedSourceTable"
1683 ConfigClass = WriteForcedSourceTableConfig
1685 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1686 inputs = butlerQC.get(inputRefs)
1688 inputs[
'ccdVisitId'] = butlerQC.quantum.dataId.pack(
"visit_detector")
1689 inputs[
'band'] = butlerQC.quantum.dataId.full[
'band']
1690 outputs = self.run(**inputs)
1691 butlerQC.put(outputs, outputRefs)
1693 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1695 for table, dataset,
in zip((inputCatalog, inputCatalogDiff), (
'calexp',
'diff')):
1696 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=
False)
1697 df = df.reindex(sorted(df.columns), axis=1)
1698 df[
'ccdVisitId'] = ccdVisitId
if ccdVisitId
else pd.NA
1699 df[
'band'] = band
if band
else pd.NA
1700 df.columns = pd.MultiIndex.from_tuples([(dataset, c)
for c
in df.columns],
1701 names=(
'dataset',
'column'))
1705 outputCatalog = functools.reduce(
lambda d1, d2: d1.join(d2), dfs)
1706 return pipeBase.Struct(outputCatalog=outputCatalog)
1709class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1710 dimensions=(
"instrument",
"skymap",
"patch",
"tract")):
1712 inputCatalogs = connectionTypes.Input(
1713 doc=
"Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1714 name=
"mergedForcedSource",
1715 storageClass=
"DataFrame",
1716 dimensions=(
"instrument",
"visit",
"detector",
"skymap",
"tract"),
1720 referenceCatalog = connectionTypes.Input(
1721 doc=
"Reference catalog which was used to seed the forcedPhot. Columns "
1722 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1725 storageClass=
"DataFrame",
1726 dimensions=(
"tract",
"patch",
"skymap"),
1729 outputCatalog = connectionTypes.Output(
1730 doc=
"Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1731 "specified set of functors",
1732 name=
"forcedSourceTable",
1733 storageClass=
"DataFrame",
1734 dimensions=(
"tract",
"patch",
"skymap")
1739 pipelineConnections=TransformForcedSourceTableConnections):
1740 referenceColumns = pexConfig.ListField(
1742 default=[
"detect_isPrimary",
"detect_isTractInner",
"detect_isPatchInner"],
1744 doc=
"Columns to pull from reference catalog",
1747 doc=
"Column on which to join the two input tables on and make the primary key of the output",
1752 doc=
"Rename the output DataFrame index to this name",
1754 default=
"forcedSourceId",
1757 def setDefaults(self):
1758 super().setDefaults()
1759 self.functorFile = os.path.join(
'$PIPE_TASKS_DIR',
'schemas',
'ForcedSource.yaml')
1760 self.columnsFromDataId = [
'tract',
'patch']
1764 """Transform/standardize a ForcedSource catalog
1766 Transforms each wide, per-detector forcedSource parquet table per the
1767 specification file (per-camera defaults found in ForcedSource.yaml).
1768 All epochs that overlap the patch are aggregated into one per-patch
1769 narrow-parquet file.
1771 No de-duplication of rows
is performed. Duplicate resolutions flags are
1772 pulled
in from the referenceCatalog: `detect_isPrimary`,
1773 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1774 for analysis
or compare duplicates
for QA.
1776 The resulting table includes multiple bands. Epochs (MJDs)
and other useful
1777 per-visit rows can be retreived by joining
with the CcdVisitTable on
1780 _DefaultName = "transformForcedSourceTable"
1781 ConfigClass = TransformForcedSourceTableConfig
1783 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1784 inputs = butlerQC.get(inputRefs)
1785 if self.funcs
is None:
1786 raise ValueError(
"config.functorFile is None. "
1787 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1788 outputs = self.run(inputs[
'inputCatalogs'], inputs[
'referenceCatalog'], funcs=self.funcs,
1789 dataId=outputRefs.outputCatalog.dataId.full)
1791 butlerQC.put(outputs, outputRefs)
1793 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1795 ref = referenceCatalog.get(parameters={
"columns": self.config.referenceColumns})
1796 self.log.info(
"Aggregating %s input catalogs" % (len(inputCatalogs)))
1797 for handle
in inputCatalogs:
1798 result = self.transform(
None, handle, funcs, dataId)
1800 dfs.append(result.df.join(ref, how=
'inner'))
1802 outputCatalog = pd.concat(dfs)
1806 outputCatalog.index.rename(self.config.keyRef, inplace=
True)
1808 outputCatalog.reset_index(inplace=
True)
1811 outputCatalog.set_index(
"forcedSourceId", inplace=
True, verify_integrity=
True)
1813 outputCatalog.index.rename(self.config.key, inplace=
True)
1815 self.log.info(
"Made a table of %d columns and %d rows",
1816 len(outputCatalog.columns), len(outputCatalog))
1817 return pipeBase.Struct(outputCatalog=outputCatalog)
1820class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1821 defaultTemplates={
"catalogType":
""},
1822 dimensions=(
"instrument",
"tract")):
1823 inputCatalogs = connectionTypes.Input(
1824 doc=
"Input per-patch DataFrame Tables to be concatenated",
1825 name=
"{catalogType}ForcedSourceTable",
1826 storageClass=
"DataFrame",
1827 dimensions=(
"tract",
"patch",
"skymap"),
1831 outputCatalog = connectionTypes.Output(
1832 doc=
"Output per-tract concatenation of DataFrame Tables",
1833 name=
"{catalogType}ForcedSourceTable_tract",
1834 storageClass=
"DataFrame",
1835 dimensions=(
"tract",
"skymap"),
1839class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1840 pipelineConnections=ConsolidateTractConnections):
1844class ConsolidateTractTask(pipeBase.PipelineTask):
1845 """Concatenate any per-patch, dataframe list into a single
1846 per-tract DataFrame.
1848 _DefaultName = 'ConsolidateTract'
1849 ConfigClass = ConsolidateTractConfig
1851 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1852 inputs = butlerQC.get(inputRefs)
1855 self.log.info(
"Concatenating %s per-patch %s Tables",
1856 len(inputs[
'inputCatalogs']),
1857 inputRefs.inputCatalogs[0].datasetType.name)
1858 df = pd.concat(inputs[
'inputCatalogs'])
1859 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Custom catalog class for ExposureRecord/Table.
A mapping between the keys of two Schemas, used to copy data between them.
Class for storing ordered metadata with comments.
An integer coordinate rectangle.
def compute(self, dropna=False, pool=None)
def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None)
def getAnalysis(self, parq, funcs=None, band=None)
def transform(self, band, parq, funcs, dataId)
def run(self, parq, funcs=None, dataId=None, band=None)
def runQuantum(self, butlerQC, inputRefs, outputRefs)
daf::base::PropertyList * list
daf::base::PropertySet * set
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList)
Update sky coordinates in a collection of source objects.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)