LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["WriteObjectTableConfig", "WriteObjectTableTask",
23 "WriteSourceTableConfig", "WriteSourceTableTask",
24 "WriteRecalibratedSourceTableConfig", "WriteRecalibratedSourceTableTask",
25 "PostprocessAnalysis",
26 "TransformCatalogBaseConfig", "TransformCatalogBaseTask",
27 "TransformObjectCatalogConfig", "TransformObjectCatalogTask",
28 "ConsolidateObjectTableConfig", "ConsolidateObjectTableTask",
29 "TransformSourceTableConfig", "TransformSourceTableTask",
30 "ConsolidateVisitSummaryConfig", "ConsolidateVisitSummaryTask",
31 "ConsolidateSourceTableConfig", "ConsolidateSourceTableTask",
32 "MakeCcdVisitTableConfig", "MakeCcdVisitTableTask",
33 "MakeVisitTableConfig", "MakeVisitTableTask",
34 "WriteForcedSourceTableConfig", "WriteForcedSourceTableTask",
35 "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask",
36 "ConsolidateTractConfig", "ConsolidateTractTask"]
37
38import functools
39import pandas as pd
40import logging
41import numpy as np
42import numbers
43import os
44
45import lsst.geom
46import lsst.pex.config as pexConfig
47import lsst.pipe.base as pipeBase
48import lsst.daf.base as dafBase
49from lsst.obs.base import ExposureIdInfo
50from lsst.pipe.base import connectionTypes
51import lsst.afw.table as afwTable
52from lsst.meas.base import SingleFrameMeasurementTask
53from lsst.daf.butler import DeferredDatasetHandle, DataCoordinate
54from lsst.skymap import BaseSkyMap
55
56from .parquetTable import ParquetTable
57from .functors import CompositeFunctor, Column
58
59log = logging.getLogger(__name__)
60
61
62def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
63 """Flattens a dataframe with multilevel column index.
64 """
65 newDf = pd.DataFrame()
66 # band is the level 0 index
67 dfBands = df.columns.unique(level=0).values
68 for band in dfBands:
69 subdf = df[band]
70 columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
71 newColumns = {c: columnFormat.format(band, c)
72 for c in subdf.columns if c not in noDupCols}
73 cols = list(newColumns.keys())
74 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
75
76 # Band must be present in the input and output or else column is all NaN:
77 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
78 # Get the unexploded columns from any present band's partition
79 noDupDf = df[presentBands[0]][noDupCols]
80 newDf = pd.concat([noDupDf, newDf], axis=1)
81 return newDf
82
83
84class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
85 defaultTemplates={"coaddName": "deep"},
86 dimensions=("tract", "patch", "skymap")):
87 inputCatalogMeas = connectionTypes.Input(
88 doc="Catalog of source measurements on the deepCoadd.",
89 dimensions=("tract", "patch", "band", "skymap"),
90 storageClass="SourceCatalog",
91 name="{coaddName}Coadd_meas",
92 multiple=True
93 )
94 inputCatalogForcedSrc = connectionTypes.Input(
95 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
96 dimensions=("tract", "patch", "band", "skymap"),
97 storageClass="SourceCatalog",
98 name="{coaddName}Coadd_forced_src",
99 multiple=True
100 )
101 inputCatalogRef = connectionTypes.Input(
102 doc="Catalog marking the primary detection (which band provides a good shape and position)"
103 "for each detection in deepCoadd_mergeDet.",
104 dimensions=("tract", "patch", "skymap"),
105 storageClass="SourceCatalog",
106 name="{coaddName}Coadd_ref"
107 )
108 outputCatalog = connectionTypes.Output(
109 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
110 "stored as a DataFrame with a multi-level column index per-patch.",
111 dimensions=("tract", "patch", "skymap"),
112 storageClass="DataFrame",
113 name="{coaddName}Coadd_obj"
114 )
115
116
117class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
118 pipelineConnections=WriteObjectTableConnections):
119 engine = pexConfig.Field(
120 dtype=str,
121 default="pyarrow",
122 doc="Parquet engine for writing (pyarrow or fastparquet)"
123 )
124 coaddName = pexConfig.Field(
125 dtype=str,
126 default="deep",
127 doc="Name of coadd"
128 )
129
130
131class WriteObjectTableTask(pipeBase.PipelineTask):
132 """Write filter-merged source tables to parquet
133 """
134 _DefaultName = "writeObjectTable"
135 ConfigClass = WriteObjectTableConfig
136
137 # Names of table datasets to be merged
138 inputDatasets = ('forced_src', 'meas', 'ref')
139
140 # Tag of output dataset written by `MergeSourcesTask.write`
141 outputDataset = 'obj'
142
143 def runQuantum(self, butlerQC, inputRefs, outputRefs):
144 inputs = butlerQC.get(inputRefs)
145
146 measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
147 zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
148 forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
149 zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
150
151 catalogs = {}
152 for band in measDict.keys():
153 catalogs[band] = {'meas': measDict[band]['meas'],
154 'forced_src': forcedSourceDict[band]['forced_src'],
155 'ref': inputs['inputCatalogRef']}
156 dataId = butlerQC.quantum.dataId
157 df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
158 outputs = pipeBase.Struct(outputCatalog=df)
159 butlerQC.put(outputs, outputRefs)
160
161 def run(self, catalogs, tract, patch):
162 """Merge multiple catalogs.
163
164 Parameters
165 ----------
166 catalogs : `dict`
167 Mapping from filter names to dict of catalogs.
168 tract : int
169 tractId to use for the tractId column.
170 patch : str
171 patchId to use for the patchId column.
172
173 Returns
174 -------
175 catalog : `pandas.DataFrame`
176 Merged dataframe.
177 """
178
179 dfs = []
180 for filt, tableDict in catalogs.items():
181 for dataset, table in tableDict.items():
182 # Convert afwTable to pandas DataFrame
183 df = table.asAstropy().to_pandas().set_index('id', drop=True)
184
185 # Sort columns by name, to ensure matching schema among patches
186 df = df.reindex(sorted(df.columns), axis=1)
187 df['tractId'] = tract
188 df['patchId'] = patch
189
190 # Make columns a 3-level MultiIndex
191 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
192 names=('dataset', 'band', 'column'))
193 dfs.append(df)
194
195 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
196 return catalog
197
198
199class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
200 defaultTemplates={"catalogType": ""},
201 dimensions=("instrument", "visit", "detector")):
202
203 catalog = connectionTypes.Input(
204 doc="Input full-depth catalog of sources produced by CalibrateTask",
205 name="{catalogType}src",
206 storageClass="SourceCatalog",
207 dimensions=("instrument", "visit", "detector")
208 )
209 outputCatalog = connectionTypes.Output(
210 doc="Catalog of sources, `src` in Parquet format. The 'id' column is "
211 "replaced with an index; all other columns are unchanged.",
212 name="{catalogType}source",
213 storageClass="DataFrame",
214 dimensions=("instrument", "visit", "detector")
215 )
216
217
218class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
219 pipelineConnections=WriteSourceTableConnections):
220 pass
221
222
223class WriteSourceTableTask(pipeBase.PipelineTask):
224 """Write source table to parquet.
225 """
226 _DefaultName = "writeSourceTable"
227 ConfigClass = WriteSourceTableConfig
228
229 def runQuantum(self, butlerQC, inputRefs, outputRefs):
230 inputs = butlerQC.get(inputRefs)
231 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
232 result = self.run(**inputs).table
233 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
234 butlerQC.put(outputs, outputRefs)
235
236 def run(self, catalog, ccdVisitId=None, **kwargs):
237 """Convert `src` catalog to parquet
238
239 Parameters
240 ----------
241 catalog: `afwTable.SourceCatalog`
242 catalog to be converted
243 ccdVisitId: `int`
244 ccdVisitId to be added as a column
245
246 Returns
247 -------
248 result : `lsst.pipe.base.Struct`
249 ``table``
250 `ParquetTable` version of the input catalog
251 """
252 self.log.info("Generating parquet table from src catalog ccdVisitId=%s", ccdVisitId)
253 df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
254 df['ccdVisitId'] = ccdVisitId
255 return pipeBase.Struct(table=ParquetTable(dataFrame=df))
256
257
258class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
259 defaultTemplates={"catalogType": "",
260 "skyWcsName": "jointcal",
261 "photoCalibName": "fgcm"},
262 dimensions=("instrument", "visit", "detector", "skymap")):
263 skyMap = connectionTypes.Input(
264 doc="skyMap needed to choose which tract-level calibrations to use when multiple available",
265 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
266 storageClass="SkyMap",
267 dimensions=("skymap",),
268 )
269 exposure = connectionTypes.Input(
270 doc="Input exposure to perform photometry on.",
271 name="calexp",
272 storageClass="ExposureF",
273 dimensions=["instrument", "visit", "detector"],
274 )
275 externalSkyWcsTractCatalog = connectionTypes.Input(
276 doc=("Per-tract, per-visit wcs calibrations. These catalogs use the detector "
277 "id for the catalog id, sorted on id for fast lookup."),
278 name="{skyWcsName}SkyWcsCatalog",
279 storageClass="ExposureCatalog",
280 dimensions=["instrument", "visit", "tract"],
281 multiple=True
282 )
283 externalSkyWcsGlobalCatalog = connectionTypes.Input(
284 doc=("Per-visit wcs calibrations computed globally (with no tract information). "
285 "These catalogs use the detector id for the catalog id, sorted on id for "
286 "fast lookup."),
287 name="{skyWcsName}SkyWcsCatalog",
288 storageClass="ExposureCatalog",
289 dimensions=["instrument", "visit"],
290 )
291 externalPhotoCalibTractCatalog = connectionTypes.Input(
292 doc=("Per-tract, per-visit photometric calibrations. These catalogs use the "
293 "detector id for the catalog id, sorted on id for fast lookup."),
294 name="{photoCalibName}PhotoCalibCatalog",
295 storageClass="ExposureCatalog",
296 dimensions=["instrument", "visit", "tract"],
297 multiple=True
298 )
299 externalPhotoCalibGlobalCatalog = connectionTypes.Input(
300 doc=("Per-visit photometric calibrations computed globally (with no tract "
301 "information). These catalogs use the detector id for the catalog id, "
302 "sorted on id for fast lookup."),
303 name="{photoCalibName}PhotoCalibCatalog",
304 storageClass="ExposureCatalog",
305 dimensions=["instrument", "visit"],
306 )
307
308 def __init__(self, *, config=None):
309 super().__init__(config=config)
310 # Same connection boilerplate as all other applications of
311 # Global/Tract calibrations
312 if config.doApplyExternalSkyWcs and config.doReevaluateSkyWcs:
313 if config.useGlobalExternalSkyWcs:
314 self.inputs.remove("externalSkyWcsTractCatalog")
315 else:
316 self.inputs.remove("externalSkyWcsGlobalCatalog")
317 else:
318 self.inputs.remove("externalSkyWcsTractCatalog")
319 self.inputs.remove("externalSkyWcsGlobalCatalog")
320 if config.doApplyExternalPhotoCalib and config.doReevaluatePhotoCalib:
321 if config.useGlobalExternalPhotoCalib:
322 self.inputs.remove("externalPhotoCalibTractCatalog")
323 else:
324 self.inputs.remove("externalPhotoCalibGlobalCatalog")
325 else:
326 self.inputs.remove("externalPhotoCalibTractCatalog")
327 self.inputs.remove("externalPhotoCalibGlobalCatalog")
328
329
330class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
331 pipelineConnections=WriteRecalibratedSourceTableConnections):
332
333 doReevaluatePhotoCalib = pexConfig.Field(
334 dtype=bool,
335 default=True,
336 doc=("Add or replace local photoCalib columns")
337 )
338 doReevaluateSkyWcs = pexConfig.Field(
339 dtype=bool,
340 default=True,
341 doc=("Add or replace local WCS columns and update the coord columns, coord_ra and coord_dec")
342 )
343 doApplyExternalPhotoCalib = pexConfig.Field(
344 dtype=bool,
345 default=True,
346 doc=("If and only if doReevaluatePhotoCalib, apply the photometric calibrations from an external ",
347 "algorithm such as FGCM or jointcal, else use the photoCalib already attached to the exposure."),
348 )
349 doApplyExternalSkyWcs = pexConfig.Field(
350 dtype=bool,
351 default=True,
352 doc=("if and only if doReevaluateSkyWcs, apply the WCS from an external algorithm such as jointcal, ",
353 "else use the wcs already attached to the exposure."),
354 )
355 useGlobalExternalPhotoCalib = pexConfig.Field(
356 dtype=bool,
357 default=True,
358 doc=("When using doApplyExternalPhotoCalib, use 'global' calibrations "
359 "that are not run per-tract. When False, use per-tract photometric "
360 "calibration files.")
361 )
362 useGlobalExternalSkyWcs = pexConfig.Field(
363 dtype=bool,
364 default=False,
365 doc=("When using doApplyExternalSkyWcs, use 'global' calibrations "
366 "that are not run per-tract. When False, use per-tract wcs "
367 "files.")
368 )
369
370 def validate(self):
371 super().validate()
372 if self.doApplyExternalSkyWcs and not self.doReevaluateSkyWcs:
373 log.warning("doApplyExternalSkyWcs=True but doReevaluateSkyWcs=False"
374 "External SkyWcs will not be read or evaluated.")
375 if self.doApplyExternalPhotoCalib and not self.doReevaluatePhotoCalib:
376 log.warning("doApplyExternalPhotoCalib=True but doReevaluatePhotoCalib=False."
377 "External PhotoCalib will not be read or evaluated.")
378
379
380class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
381 """Write source table to parquet
382 """
383 _DefaultName = "writeRecalibratedSourceTable"
384 ConfigClass = WriteRecalibratedSourceTableConfig
385
386 def runQuantum(self, butlerQC, inputRefs, outputRefs):
387 inputs = butlerQC.get(inputRefs)
388 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
389 inputs['exposureIdInfo'] = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, "visit_detector")
390
391 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
392 if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
393 inputs['exposure'] = self.attachCalibs(inputRefs, **inputs)
394
395 inputs['catalog'] = self.addCalibColumns(**inputs)
396
397 result = self.run(**inputs).table
398 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
399 butlerQC.put(outputs, outputRefs)
400
401 def attachCalibs(self, inputRefs, skyMap, exposure, externalSkyWcsGlobalCatalog=None,
402 externalSkyWcsTractCatalog=None, externalPhotoCalibGlobalCatalog=None,
403 externalPhotoCalibTractCatalog=None, **kwargs):
404 """Apply external calibrations to exposure per configuration
405
406 When multiple tract-level calibrations overlap, select the one with the
407 center closest to detector.
408
409 Parameters
410 ----------
411 inputRefs : `lsst.pipe.base.InputQuantizedConnection`, for dataIds of
412 tract-level calibs.
413 skyMap : `lsst.skymap.SkyMap`
415 Input exposure to adjust calibrations.
416 externalSkyWcsGlobalCatalog : `lsst.afw.table.ExposureCatalog`, optional
417 Exposure catalog with external skyWcs to be applied per config
418 externalSkyWcsTractCatalog : `lsst.afw.table.ExposureCatalog`, optional
419 Exposure catalog with external skyWcs to be applied per config
420 externalPhotoCalibGlobalCatalog : `lsst.afw.table.ExposureCatalog`, optional
421 Exposure catalog with external photoCalib to be applied per config
422 externalPhotoCalibTractCatalog : `lsst.afw.table.ExposureCatalog`, optional
423
424
425 Returns
426 -------
428 Exposure with adjusted calibrations.
429 """
430 if not self.config.doApplyExternalSkyWcs:
431 # Do not modify the exposure's SkyWcs
432 externalSkyWcsCatalog = None
433 elif self.config.useGlobalExternalSkyWcs:
434 # Use the global external SkyWcs
435 externalSkyWcsCatalog = externalSkyWcsGlobalCatalog
436 self.log.info('Applying global SkyWcs')
437 else:
438 # use tract-level external SkyWcs from the closest overlapping tract
439 inputRef = getattr(inputRefs, 'externalSkyWcsTractCatalog')
440 tracts = [ref.dataId['tract'] for ref in inputRef]
441 if len(tracts) == 1:
442 ind = 0
443 self.log.info('Applying tract-level SkyWcs from tract %s', tracts[ind])
444 else:
445 if exposure.getWcs() is None: # TODO: could this look-up use the externalPhotoCalib?
446 raise ValueError("Trying to locate nearest tract, but exposure.wcs is None.")
447 ind = self.getClosestTract(tracts, skyMap,
448 exposure.getBBox(), exposure.getWcs())
449 self.log.info('Multiple overlapping externalSkyWcsTractCatalogs found (%s). '
450 'Applying closest to detector center: tract=%s', str(tracts), tracts[ind])
451
452 externalSkyWcsCatalog = externalSkyWcsTractCatalog[ind]
453
454 if not self.config.doApplyExternalPhotoCalib:
455 # Do not modify the exposure's PhotoCalib
456 externalPhotoCalibCatalog = None
457 elif self.config.useGlobalExternalPhotoCalib:
458 # Use the global external PhotoCalib
459 externalPhotoCalibCatalog = externalPhotoCalibGlobalCatalog
460 self.log.info('Applying global PhotoCalib')
461 else:
462 # use tract-level external PhotoCalib from the closest overlapping tract
463 inputRef = getattr(inputRefs, 'externalPhotoCalibTractCatalog')
464 tracts = [ref.dataId['tract'] for ref in inputRef]
465 if len(tracts) == 1:
466 ind = 0
467 self.log.info('Applying tract-level PhotoCalib from tract %s', tracts[ind])
468 else:
469 ind = self.getClosestTract(tracts, skyMap,
470 exposure.getBBox(), exposure.getWcs())
471 self.log.info('Multiple overlapping externalPhotoCalibTractCatalogs found (%s). '
472 'Applying closest to detector center: tract=%s', str(tracts), tracts[ind])
473
474 externalPhotoCalibCatalog = externalPhotoCalibTractCatalog[ind]
475
476 return self.prepareCalibratedExposure(exposure, externalSkyWcsCatalog, externalPhotoCalibCatalog)
477
478 def getClosestTract(self, tracts, skyMap, bbox, wcs):
479 """Find the index of the tract closest to detector from list of tractIds
480
481 Parameters
482 ----------
483 tracts: `list` [`int`]
484 Iterable of integer tractIds
485 skyMap : `lsst.skymap.SkyMap`
486 skyMap to lookup tract geometry and wcs
487 bbox : `lsst.geom.Box2I`
488 Detector bbox, center of which will compared to tract centers
490 Detector Wcs object to map the detector center to SkyCoord
491
492 Returns
493 -------
494 index : `int`
495 """
496 if len(tracts) == 1:
497 return 0
498
499 center = wcs.pixelToSky(bbox.getCenter())
500 sep = []
501 for tractId in tracts:
502 tract = skyMap[tractId]
503 tractCenter = tract.getWcs().pixelToSky(tract.getBBox().getCenter())
504 sep.append(center.separation(tractCenter))
505
506 return np.argmin(sep)
507
508 def prepareCalibratedExposure(self, exposure, externalSkyWcsCatalog=None, externalPhotoCalibCatalog=None):
509 """Prepare a calibrated exposure and apply external calibrations
510 if so configured.
511
512 Parameters
513 ----------
515 Input exposure to adjust calibrations.
516 externalSkyWcsCatalog : `lsst.afw.table.ExposureCatalog`, optional
517 Exposure catalog with external skyWcs to be applied
518 if config.doApplyExternalSkyWcs=True. Catalog uses the detector id
519 for the catalog id, sorted on id for fast lookup.
520 externalPhotoCalibCatalog : `lsst.afw.table.ExposureCatalog`, optional
521 Exposure catalog with external photoCalib to be applied
522 if config.doApplyExternalPhotoCalib=True. Catalog uses the detector
523 id for the catalog id, sorted on id for fast lookup.
524
525 Returns
526 -------
528 Exposure with adjusted calibrations.
529 """
530 detectorId = exposure.getInfo().getDetector().getId()
531
532 if externalPhotoCalibCatalog is not None:
533 row = externalPhotoCalibCatalog.find(detectorId)
534 if row is None:
535 self.log.warning("Detector id %s not found in externalPhotoCalibCatalog; "
536 "Using original photoCalib.", detectorId)
537 else:
538 photoCalib = row.getPhotoCalib()
539 if photoCalib is None:
540 self.log.warning("Detector id %s has None for photoCalib in externalPhotoCalibCatalog; "
541 "Using original photoCalib.", detectorId)
542 else:
543 exposure.setPhotoCalib(photoCalib)
544
545 if externalSkyWcsCatalog is not None:
546 row = externalSkyWcsCatalog.find(detectorId)
547 if row is None:
548 self.log.warning("Detector id %s not found in externalSkyWcsCatalog; "
549 "Using original skyWcs.", detectorId)
550 else:
551 skyWcs = row.getWcs()
552 if skyWcs is None:
553 self.log.warning("Detector id %s has None for skyWcs in externalSkyWcsCatalog; "
554 "Using original skyWcs.", detectorId)
555 else:
556 exposure.setWcs(skyWcs)
557
558 return exposure
559
560 def addCalibColumns(self, catalog, exposure, exposureIdInfo, **kwargs):
561 """Add replace columns with calibs evaluated at each centroid
562
563 Add or replace 'base_LocalWcs' `base_LocalPhotoCalib' columns in a
564 a source catalog, by rerunning the plugins.
565
566 Parameters
567 ----------
569 catalog to which calib columns will be added
571 Exposure with attached PhotoCalibs and SkyWcs attributes to be
572 reevaluated at local centroids. Pixels are not required.
573 exposureIdInfo : `lsst.obs.base.ExposureIdInfo`
574
575 Returns
576 -------
578 Source Catalog with requested local calib columns
579 """
580 measureConfig = SingleFrameMeasurementTask.ConfigClass()
581 measureConfig.doReplaceWithNoise = False
582
583 # Clear all slots, because we aren't running the relevant plugins.
584 for slot in measureConfig.slots:
585 setattr(measureConfig.slots, slot, None)
586
587 measureConfig.plugins.names = []
588 if self.config.doReevaluateSkyWcs:
589 measureConfig.plugins.names.add('base_LocalWcs')
590 self.log.info("Re-evaluating base_LocalWcs plugin")
591 if self.config.doReevaluatePhotoCalib:
592 measureConfig.plugins.names.add('base_LocalPhotoCalib')
593 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
594 pluginsNotToCopy = tuple(measureConfig.plugins.names)
595
596 # Create a new schema and catalog
597 # Copy all columns from original except for the ones to reevaluate
598 aliasMap = catalog.schema.getAliasMap()
599 mapper = afwTable.SchemaMapper(catalog.schema)
600 for item in catalog.schema:
601 if not item.field.getName().startswith(pluginsNotToCopy):
602 mapper.addMapping(item.key)
603
604 schema = mapper.getOutputSchema()
605 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
606 schema.setAliasMap(aliasMap)
607 newCat = afwTable.SourceCatalog(schema)
608 newCat.extend(catalog, mapper=mapper)
609
610 # Fluxes in sourceCatalogs are in counts, so there are no fluxes to
611 # update here. LocalPhotoCalibs are applied during transform tasks.
612 # Update coord_ra/coord_dec, which are expected to be positions on the
613 # sky and are used as such in sdm tables without transform
614 if self.config.doReevaluateSkyWcs and exposure.wcs is not None:
615 afwTable.updateSourceCoords(exposure.wcs, newCat)
616
617 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
618
619 return newCat
620
621
623 """Calculate columns from ParquetTable.
624
625 This object manages and organizes an arbitrary set of computations
626 on a catalog. The catalog is defined by a
627 `~lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such
628 as a ``deepCoadd_obj`` dataset, and the computations are defined by a
629 collection of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
630 a ``CompositeFunctor``).
631
632 After the object is initialized, accessing the ``.df`` attribute (which
633 holds the `pandas.DataFrame` containing the results of the calculations)
634 triggers computation of said dataframe.
635
636 One of the conveniences of using this object is the ability to define a
637 desired common filter for all functors. This enables the same functor
638 collection to be passed to several different `PostprocessAnalysis` objects
639 without having to change the original functor collection, since the ``filt``
640 keyword argument of this object triggers an overwrite of the ``filt``
641 property for all functors in the collection.
642
643 This object also allows a list of refFlags to be passed, and defines a set
644 of default refFlags that are always included even if not requested.
645
646 If a list of `~lsst.pipe.tasks.ParquetTable` object is passed, rather than a single one,
647 then the calculations will be mapped over all the input catalogs. In
648 principle, it should be straightforward to parallelize this activity, but
649 initial tests have failed (see TODO in code comments).
650
651 Parameters
652 ----------
653 parq : `~lsst.pipe.tasks.ParquetTable` (or list of such)
654 Source catalog(s) for computation.
655
656 functors : `list`, `dict`, or `~lsst.pipe.tasks.functors.CompositeFunctor`
657 Computations to do (functors that act on ``parq``).
658 If a dict, the output
659 DataFrame will have columns keyed accordingly.
660 If a list, the column keys will come from the
661 ``.shortname`` attribute of each functor.
662
663 filt : `str`, optional
664 Filter in which to calculate. If provided,
665 this will overwrite any existing ``.filt`` attribute
666 of the provided functors.
667
668 flags : `list`, optional
669 List of flags (per-band) to include in output table.
670 Taken from the ``meas`` dataset if applied to a multilevel Object Table.
671
672 refFlags : `list`, optional
673 List of refFlags (only reference band) to include in output table.
674
675 forcedFlags : `list`, optional
676 List of flags (per-band) to include in output table.
677 Taken from the ``forced_src`` dataset if applied to a
678 multilevel Object Table. Intended for flags from measurement plugins
679 only run during multi-band forced-photometry.
680 """
681 _defaultRefFlags = []
682 _defaultFuncs = ()
683
684 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
685 self.parq = parq
686 self.functors = functors
687
688 self.filt = filt
689 self.flags = list(flags) if flags is not None else []
690 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
692 if refFlags is not None:
693 self.refFlags += list(refFlags)
694
695 self._df = None
696
697 @property
698 def defaultFuncs(self):
699 funcs = dict(self._defaultFuncs)
700 return funcs
701
702 @property
703 def func(self):
704 additionalFuncs = self.defaultFuncs
705 additionalFuncs.update({flag: Column(flag, dataset='forced_src') for flag in self.forcedFlags})
706 additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
707 additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
708
709 if isinstance(self.functors, CompositeFunctor):
710 func = self.functors
711 else:
712 func = CompositeFunctor(self.functors)
713
714 func.funcDict.update(additionalFuncs)
715 func.filt = self.filt
716
717 return func
718
719 @property
720 def noDupCols(self):
721 return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
722
723 @property
724 def df(self):
725 if self._df is None:
726 self.compute()
727 return self._df
728
729 def compute(self, dropna=False, pool=None):
730 # map over multiple parquet tables
731 if type(self.parq) in (list, tuple):
732 if pool is None:
733 dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
734 else:
735 # TODO: Figure out why this doesn't work (pyarrow pickling
736 # issues?)
737 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
738 self._df = pd.concat(dflist)
739 else:
740 self._df = self.func(self.parq, dropna=dropna)
741
742 return self._df
743
744
745class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
746 dimensions=()):
747 """Expected Connections for subclasses of TransformCatalogBaseTask.
748
749 Must be subclassed.
750 """
751 inputCatalog = connectionTypes.Input(
752 name="",
753 storageClass="DataFrame",
754 )
755 outputCatalog = connectionTypes.Output(
756 name="",
757 storageClass="DataFrame",
758 )
759
760
761class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
762 pipelineConnections=TransformCatalogBaseConnections):
763 functorFile = pexConfig.Field(
764 dtype=str,
765 doc="Path to YAML file specifying Science Data Model functors to use "
766 "when copying columns and computing calibrated values.",
767 default=None,
768 optional=True
769 )
770 primaryKey = pexConfig.Field(
771 dtype=str,
772 doc="Name of column to be set as the DataFrame index. If None, the index"
773 "will be named `id`",
774 default=None,
775 optional=True
776 )
777 columnsFromDataId = pexConfig.ListField(
778 dtype=str,
779 default=None,
780 optional=True,
781 doc="Columns to extract from the dataId",
782 )
783
784
785class TransformCatalogBaseTask(pipeBase.PipelineTask):
786 """Base class for transforming/standardizing a catalog
787
788 by applying functors that convert units and apply calibrations.
789 The purpose of this task is to perform a set of computations on
790 an input `ParquetTable` dataset (such as ``deepCoadd_obj``) and write the
791 results to a new dataset (which needs to be declared in an ``outputDataset``
792 attribute).
793
794 The calculations to be performed are defined in a YAML file that specifies
795 a set of functors to be computed, provided as
796 a ``--functorFile`` config parameter. An example of such a YAML file
797 is the following:
798
799 funcs:
800 psfMag:
801 functor: Mag
802 args:
803 - base_PsfFlux
804 filt: HSC-G
805 dataset: meas
806 cmodel_magDiff:
807 functor: MagDiff
808 args:
809 - modelfit_CModel
810 - base_PsfFlux
811 filt: HSC-G
812 gauss_magDiff:
813 functor: MagDiff
814 args:
815 - base_GaussianFlux
816 - base_PsfFlux
817 filt: HSC-G
818 count:
819 functor: Column
820 args:
821 - base_InputCount_value
822 filt: HSC-G
823 deconvolved_moments:
824 functor: DeconvolvedMoments
825 filt: HSC-G
826 dataset: forced_src
827 refFlags:
828 - calib_psfUsed
829 - merge_measurement_i
830 - merge_measurement_r
831 - merge_measurement_z
832 - merge_measurement_y
833 - merge_measurement_g
834 - base_PixelFlags_flag_inexact_psfCenter
835 - detect_isPrimary
836
837 The names for each entry under "func" will become the names of columns in
838 the output dataset. All the functors referenced are defined in
839 `lsst.pipe.tasks.functors`. Positional arguments to be passed to each
840 functor are in the `args` list, and any additional entries for each column
841 other than "functor" or "args" (e.g., ``'filt'``, ``'dataset'``) are treated as
842 keyword arguments to be passed to the functor initialization.
843
844 The "flags" entry is the default shortcut for `Column` functors.
845 All columns listed under "flags" will be copied to the output table
846 untransformed. They can be of any datatype.
847 In the special case of transforming a multi-level oject table with
848 band and dataset indices (deepCoadd_obj), these will be taked from the
849 `meas` dataset and exploded out per band.
850
851 There are two special shortcuts that only apply when transforming
852 multi-level Object (deepCoadd_obj) tables:
853 - The "refFlags" entry is shortcut for `Column` functor
854 taken from the `'ref'` dataset if transforming an ObjectTable.
855 - The "forcedFlags" entry is shortcut for `Column` functors.
856 taken from the ``forced_src`` dataset if transforming an ObjectTable.
857 These are expanded out per band.
858
859
860 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
861 to organize and excecute the calculations.
862 """
863 @property
864 def _DefaultName(self):
865 raise NotImplementedError('Subclass must define "_DefaultName" attribute')
866
867 @property
868 def outputDataset(self):
869 raise NotImplementedError('Subclass must define "outputDataset" attribute')
870
871 @property
872 def inputDataset(self):
873 raise NotImplementedError('Subclass must define "inputDataset" attribute')
874
875 @property
876 def ConfigClass(self):
877 raise NotImplementedError('Subclass must define "ConfigClass" attribute')
878
879 def __init__(self, *args, **kwargs):
880 super().__init__(*args, **kwargs)
881 if self.config.functorFile:
882 self.log.info('Loading tranform functor definitions from %s',
883 self.config.functorFile)
884 self.funcs = CompositeFunctor.from_file(self.config.functorFile)
885 self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
886 else:
887 self.funcs = None
888
889 def runQuantum(self, butlerQC, inputRefs, outputRefs):
890 inputs = butlerQC.get(inputRefs)
891 if self.funcs is None:
892 raise ValueError("config.functorFile is None. "
893 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
894 result = self.run(parq=inputs['inputCatalog'], funcs=self.funcs,
895 dataId=outputRefs.outputCatalog.dataId.full)
896 outputs = pipeBase.Struct(outputCatalog=result)
897 butlerQC.put(outputs, outputRefs)
898
899 def run(self, parq, funcs=None, dataId=None, band=None):
900 """Do postprocessing calculations
901
902 Takes a `ParquetTable` object and dataId,
903 returns a dataframe with results of postprocessing calculations.
904
905 Parameters
906 ----------
908 ParquetTable from which calculations are done.
909 funcs : `lsst.pipe.tasks.functors.Functors`
910 Functors to apply to the table's columns
911 dataId : dict, optional
912 Used to add a `patchId` column to the output dataframe.
913 band : `str`, optional
914 Filter band that is being processed.
915
916 Returns
917 ------
918 df : `pandas.DataFrame`
919 """
920 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
921
922 df = self.transform(band, parq, funcs, dataId).df
923 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
924 return df
925
926 def getFunctors(self):
927 return self.funcs
928
929 def getAnalysis(self, parq, funcs=None, band=None):
930 if funcs is None:
931 funcs = self.funcs
932 analysis = PostprocessAnalysis(parq, funcs, filt=band)
933 return analysis
934
935 def transform(self, band, parq, funcs, dataId):
936 analysis = self.getAnalysis(parq, funcs=funcs, band=band)
937 df = analysis.df
938 if dataId and self.config.columnsFromDataId:
939 for key in self.config.columnsFromDataId:
940 if key in dataId:
941 df[str(key)] = dataId[key]
942 else:
943 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
944
945 if self.config.primaryKey:
946 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
947 df.reset_index(inplace=True, drop=True)
948 df.set_index(self.config.primaryKey, inplace=True)
949
950 return pipeBase.Struct(
951 df=df,
952 analysis=analysis
953 )
954
955
956class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
957 defaultTemplates={"coaddName": "deep"},
958 dimensions=("tract", "patch", "skymap")):
959 inputCatalog = connectionTypes.Input(
960 doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
961 "stored as a DataFrame with a multi-level column index per-patch.",
962 dimensions=("tract", "patch", "skymap"),
963 storageClass="DataFrame",
964 name="{coaddName}Coadd_obj",
965 deferLoad=True,
966 )
967 outputCatalog = connectionTypes.Output(
968 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
969 "data model.",
970 dimensions=("tract", "patch", "skymap"),
971 storageClass="DataFrame",
972 name="objectTable"
973 )
974
975
976class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
977 pipelineConnections=TransformObjectCatalogConnections):
978 coaddName = pexConfig.Field(
979 dtype=str,
980 default="deep",
981 doc="Name of coadd"
982 )
983 # TODO: remove in DM-27177
984 filterMap = pexConfig.DictField(
985 keytype=str,
986 itemtype=str,
987 default={},
988 doc=("Dictionary mapping full filter name to short one for column name munging."
989 "These filters determine the output columns no matter what filters the "
990 "input data actually contain."),
991 deprecated=("Coadds are now identified by the band, so this transform is unused."
992 "Will be removed after v22.")
993 )
994 outputBands = pexConfig.ListField(
995 dtype=str,
996 default=None,
997 optional=True,
998 doc=("These bands and only these bands will appear in the output,"
999 " NaN-filled if the input does not include them."
1000 " If None, then use all bands found in the input.")
1001 )
1002 camelCase = pexConfig.Field(
1003 dtype=bool,
1004 default=False,
1005 doc=("Write per-band columns names with camelCase, else underscore "
1006 "For example: gPsFlux instead of g_PsFlux.")
1007 )
1008 multilevelOutput = pexConfig.Field(
1009 dtype=bool,
1010 default=False,
1011 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
1012 "and name-munged (False).")
1013 )
1014 goodFlags = pexConfig.ListField(
1015 dtype=str,
1016 default=[],
1017 doc=("List of 'good' flags that should be set False when populating empty tables. "
1018 "All other flags are considered to be 'bad' flags and will be set to True.")
1019 )
1020 floatFillValue = pexConfig.Field(
1021 dtype=float,
1022 default=np.nan,
1023 doc="Fill value for float fields when populating empty tables."
1024 )
1025 integerFillValue = pexConfig.Field(
1026 dtype=int,
1027 default=-1,
1028 doc="Fill value for integer fields when populating empty tables."
1029 )
1030
1031 def setDefaults(self):
1032 super().setDefaults()
1033 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Object.yaml')
1034 self.primaryKey = 'objectId'
1035 self.columnsFromDataId = ['tract', 'patch']
1036 self.goodFlags = ['calib_astrometry_used',
1037 'calib_photometry_reserved',
1038 'calib_photometry_used',
1039 'calib_psf_candidate',
1040 'calib_psf_reserved',
1041 'calib_psf_used']
1042
1043
1044class TransformObjectCatalogTask(TransformCatalogBaseTask):
1045 """Produce a flattened Object Table to match the format specified in
1046 sdm_schemas.
1047
1048 Do the same set of postprocessing calculations on all bands.
1049
1050 This is identical to `TransformCatalogBaseTask`, except for that it does
1051 the specified functor calculations for all filters present in the
1052 input `deepCoadd_obj` table. Any specific ``"filt"`` keywords specified
1053 by the YAML file will be superceded.
1054 """
1055 _DefaultName = "transformObjectCatalog"
1056 ConfigClass = TransformObjectCatalogConfig
1057
1058 def run(self, parq, funcs=None, dataId=None, band=None):
1059 # NOTE: band kwarg is ignored here.
1060 dfDict = {}
1061 analysisDict = {}
1062 templateDf = pd.DataFrame()
1063
1064 if isinstance(parq, DeferredDatasetHandle):
1065 columns = parq.get(component='columns')
1066 inputBands = columns.unique(level=1).values
1067 else:
1068 inputBands = parq.columnLevelNames['band']
1069
1070 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1071
1072 # Perform transform for data of filters that exist in parq.
1073 for inputBand in inputBands:
1074 if inputBand not in outputBands:
1075 self.log.info("Ignoring %s band data in the input", inputBand)
1076 continue
1077 self.log.info("Transforming the catalog of band %s", inputBand)
1078 result = self.transform(inputBand, parq, funcs, dataId)
1079 dfDict[inputBand] = result.df
1080 analysisDict[inputBand] = result.analysis
1081 if templateDf.empty:
1082 templateDf = result.df
1083
1084 # Put filler values in columns of other wanted bands
1085 for filt in outputBands:
1086 if filt not in dfDict:
1087 self.log.info("Adding empty columns for band %s", filt)
1088 dfTemp = templateDf.copy()
1089 for col in dfTemp.columns:
1090 testValue = dfTemp[col].values[0]
1091 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1092 # Boolean flag type, check if it is a "good" flag
1093 if col in self.config.goodFlags:
1094 fillValue = False
1095 else:
1096 fillValue = True
1097 elif isinstance(testValue, numbers.Integral):
1098 # Checking numbers.Integral catches all flavors
1099 # of python, numpy, pandas, etc. integers.
1100 # We must ensure this is not an unsigned integer.
1101 if isinstance(testValue, np.unsignedinteger):
1102 raise ValueError("Parquet tables may not have unsigned integer columns.")
1103 else:
1104 fillValue = self.config.integerFillValue
1105 else:
1106 fillValue = self.config.floatFillValue
1107 dfTemp[col].values[:] = fillValue
1108 dfDict[filt] = dfTemp
1109
1110 # This makes a multilevel column index, with band as first level
1111 df = pd.concat(dfDict, axis=1, names=['band', 'column'])
1112
1113 if not self.config.multilevelOutput:
1114 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1115 if self.config.primaryKey in noDupCols:
1116 noDupCols.remove(self.config.primaryKey)
1117 if dataId and self.config.columnsFromDataId:
1118 noDupCols += self.config.columnsFromDataId
1119 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1120 inputBands=inputBands)
1121
1122 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
1123
1124 return df
1125
1126
1127class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1128 dimensions=("tract", "skymap")):
1129 inputCatalogs = connectionTypes.Input(
1130 doc="Per-Patch objectTables conforming to the standard data model.",
1131 name="objectTable",
1132 storageClass="DataFrame",
1133 dimensions=("tract", "patch", "skymap"),
1134 multiple=True,
1135 )
1136 outputCatalog = connectionTypes.Output(
1137 doc="Pre-tract horizontal concatenation of the input objectTables",
1138 name="objectTable_tract",
1139 storageClass="DataFrame",
1140 dimensions=("tract", "skymap"),
1141 )
1142
1143
1144class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1145 pipelineConnections=ConsolidateObjectTableConnections):
1146 coaddName = pexConfig.Field(
1147 dtype=str,
1148 default="deep",
1149 doc="Name of coadd"
1150 )
1151
1152
1153class ConsolidateObjectTableTask(pipeBase.PipelineTask):
1154 """Write patch-merged source tables to a tract-level parquet file.
1155
1156 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1157 """
1158 _DefaultName = "consolidateObjectTable"
1159 ConfigClass = ConsolidateObjectTableConfig
1160
1161 inputDataset = 'objectTable'
1162 outputDataset = 'objectTable_tract'
1163
1164 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1165 inputs = butlerQC.get(inputRefs)
1166 self.log.info("Concatenating %s per-patch Object Tables",
1167 len(inputs['inputCatalogs']))
1168 df = pd.concat(inputs['inputCatalogs'])
1169 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1170
1171
1172class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1173 defaultTemplates={"catalogType": ""},
1174 dimensions=("instrument", "visit", "detector")):
1175
1176 inputCatalog = connectionTypes.Input(
1177 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1178 name="{catalogType}source",
1179 storageClass="DataFrame",
1180 dimensions=("instrument", "visit", "detector"),
1181 deferLoad=True
1182 )
1183 outputCatalog = connectionTypes.Output(
1184 doc="Narrower, per-detector Source Table transformed and converted per a "
1185 "specified set of functors",
1186 name="{catalogType}sourceTable",
1187 storageClass="DataFrame",
1188 dimensions=("instrument", "visit", "detector")
1189 )
1190
1191
1192class TransformSourceTableConfig(TransformCatalogBaseConfig,
1193 pipelineConnections=TransformSourceTableConnections):
1194
1195 def setDefaults(self):
1196 super().setDefaults()
1197 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Source.yaml')
1198 self.primaryKey = 'sourceId'
1199 self.columnsFromDataId = ['visit', 'detector', 'band', 'physical_filter']
1200
1201
1202class TransformSourceTableTask(TransformCatalogBaseTask):
1203 """Transform/standardize a source catalog
1204 """
1205 _DefaultName = "transformSourceTable"
1206 ConfigClass = TransformSourceTableConfig
1207
1208
1209class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1210 dimensions=("instrument", "visit",),
1211 defaultTemplates={"calexpType": ""}):
1212 calexp = connectionTypes.Input(
1213 doc="Processed exposures used for metadata",
1214 name="{calexpType}calexp",
1215 storageClass="ExposureF",
1216 dimensions=("instrument", "visit", "detector"),
1217 deferLoad=True,
1218 multiple=True,
1219 )
1220 visitSummary = connectionTypes.Output(
1221 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1222 "detector id for the id and are sorted for fast lookups of a "
1223 "detector."),
1224 name="{calexpType}visitSummary",
1225 storageClass="ExposureCatalog",
1226 dimensions=("instrument", "visit"),
1227 )
1228
1229
1230class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1231 pipelineConnections=ConsolidateVisitSummaryConnections):
1232 """Config for ConsolidateVisitSummaryTask"""
1233 pass
1234
1235
1236class ConsolidateVisitSummaryTask(pipeBase.PipelineTask):
1237 """Task to consolidate per-detector visit metadata.
1238
1239 This task aggregates the following metadata from all the detectors in a
1240 single visit into an exposure catalog:
1241 - The visitInfo.
1242 - The wcs.
1243 - The photoCalib.
1244 - The physical_filter and band (if available).
1245 - The psf size, shape, and effective area at the center of the detector.
1246 - The corners of the bounding box in right ascension/declination.
1247
1248 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1249 are not persisted here because of storage concerns, and because of their
1250 limited utility as summary statistics.
1251
1252 Tests for this task are performed in ci_hsc_gen3.
1253 """
1254 _DefaultName = "consolidateVisitSummary"
1255 ConfigClass = ConsolidateVisitSummaryConfig
1256
1257 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1258 dataRefs = butlerQC.get(inputRefs.calexp)
1259 visit = dataRefs[0].dataId.byName()['visit']
1260
1261 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1262 len(dataRefs), visit)
1263
1264 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1265
1266 butlerQC.put(expCatalog, outputRefs.visitSummary)
1267
1268 def _combineExposureMetadata(self, visit, dataRefs):
1269 """Make a combined exposure catalog from a list of dataRefs.
1270 These dataRefs must point to exposures with wcs, summaryStats,
1271 and other visit metadata.
1272
1273 Parameters
1274 ----------
1275 visit : `int`
1276 Visit identification number.
1277 dataRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1278 List of dataRefs in visit.
1279
1280 Returns
1281 -------
1282 visitSummary : `lsst.afw.table.ExposureCatalog`
1283 Exposure catalog with per-detector summary information.
1284 """
1285 schema = self._makeVisitSummarySchema()
1286 cat = afwTable.ExposureCatalog(schema)
1287 cat.resize(len(dataRefs))
1288
1289 cat['visit'] = visit
1290
1291 for i, dataRef in enumerate(dataRefs):
1292 visitInfo = dataRef.get(component='visitInfo')
1293 filterLabel = dataRef.get(component='filter')
1294 summaryStats = dataRef.get(component='summaryStats')
1295 detector = dataRef.get(component='detector')
1296 wcs = dataRef.get(component='wcs')
1297 photoCalib = dataRef.get(component='photoCalib')
1298 detector = dataRef.get(component='detector')
1299 bbox = dataRef.get(component='bbox')
1300 validPolygon = dataRef.get(component='validPolygon')
1301
1302 rec = cat[i]
1303 rec.setBBox(bbox)
1304 rec.setVisitInfo(visitInfo)
1305 rec.setWcs(wcs)
1306 rec.setPhotoCalib(photoCalib)
1307 rec.setValidPolygon(validPolygon)
1308
1309 rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1310 rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1311 rec.setId(detector.getId())
1312 rec['psfSigma'] = summaryStats.psfSigma
1313 rec['psfIxx'] = summaryStats.psfIxx
1314 rec['psfIyy'] = summaryStats.psfIyy
1315 rec['psfIxy'] = summaryStats.psfIxy
1316 rec['psfArea'] = summaryStats.psfArea
1317 rec['raCorners'][:] = summaryStats.raCorners
1318 rec['decCorners'][:] = summaryStats.decCorners
1319 rec['ra'] = summaryStats.ra
1320 rec['decl'] = summaryStats.decl
1321 rec['zenithDistance'] = summaryStats.zenithDistance
1322 rec['zeroPoint'] = summaryStats.zeroPoint
1323 rec['skyBg'] = summaryStats.skyBg
1324 rec['skyNoise'] = summaryStats.skyNoise
1325 rec['meanVar'] = summaryStats.meanVar
1326 rec['astromOffsetMean'] = summaryStats.astromOffsetMean
1327 rec['astromOffsetStd'] = summaryStats.astromOffsetStd
1328 rec['nPsfStar'] = summaryStats.nPsfStar
1329 rec['psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1330 rec['psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1331 rec['psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1332 rec['psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1333 rec['psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1334 rec['psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1335 rec['psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1336
1337 metadata = dafBase.PropertyList()
1338 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1339 # We are looping over existing datarefs, so the following is true
1340 metadata.add("COMMENT", "Only detectors with data have entries.")
1341 cat.setMetadata(metadata)
1342
1343 cat.sort()
1344 return cat
1345
1346 def _makeVisitSummarySchema(self):
1347 """Make the schema for the visitSummary catalog."""
1348 schema = afwTable.ExposureTable.makeMinimalSchema()
1349 schema.addField('visit', type='L', doc='Visit number')
1350 schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
1351 schema.addField('band', type='String', size=32, doc='Name of band')
1352 schema.addField('psfSigma', type='F',
1353 doc='PSF model second-moments determinant radius (center of chip) (pixel)')
1354 schema.addField('psfArea', type='F',
1355 doc='PSF model effective area (center of chip) (pixel**2)')
1356 schema.addField('psfIxx', type='F',
1357 doc='PSF model Ixx (center of chip) (pixel**2)')
1358 schema.addField('psfIyy', type='F',
1359 doc='PSF model Iyy (center of chip) (pixel**2)')
1360 schema.addField('psfIxy', type='F',
1361 doc='PSF model Ixy (center of chip) (pixel**2)')
1362 schema.addField('raCorners', type='ArrayD', size=4,
1363 doc='Right Ascension of bounding box corners (degrees)')
1364 schema.addField('decCorners', type='ArrayD', size=4,
1365 doc='Declination of bounding box corners (degrees)')
1366 schema.addField('ra', type='D',
1367 doc='Right Ascension of bounding box center (degrees)')
1368 schema.addField('decl', type='D',
1369 doc='Declination of bounding box center (degrees)')
1370 schema.addField('zenithDistance', type='F',
1371 doc='Zenith distance of bounding box center (degrees)')
1372 schema.addField('zeroPoint', type='F',
1373 doc='Mean zeropoint in detector (mag)')
1374 schema.addField('skyBg', type='F',
1375 doc='Average sky background (ADU)')
1376 schema.addField('skyNoise', type='F',
1377 doc='Average sky noise (ADU)')
1378 schema.addField('meanVar', type='F',
1379 doc='Mean variance of the weight plane (ADU**2)')
1380 schema.addField('astromOffsetMean', type='F',
1381 doc='Mean offset of astrometric calibration matches (arcsec)')
1382 schema.addField('astromOffsetStd', type='F',
1383 doc='Standard deviation of offsets of astrometric calibration matches (arcsec)')
1384 schema.addField('nPsfStar', type='I', doc='Number of stars used for PSF model')
1385 schema.addField('psfStarDeltaE1Median', type='F',
1386 doc='Median E1 residual (starE1 - psfE1) for psf stars')
1387 schema.addField('psfStarDeltaE2Median', type='F',
1388 doc='Median E2 residual (starE2 - psfE2) for psf stars')
1389 schema.addField('psfStarDeltaE1Scatter', type='F',
1390 doc='Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1391 schema.addField('psfStarDeltaE2Scatter', type='F',
1392 doc='Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1393 schema.addField('psfStarDeltaSizeMedian', type='F',
1394 doc='Median size residual (starSize - psfSize) for psf stars (pixel)')
1395 schema.addField('psfStarDeltaSizeScatter', type='F',
1396 doc='Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1397 schema.addField('psfStarScaledDeltaSizeScatter', type='F',
1398 doc='Scatter (via MAD) of size residual scaled by median size squared')
1399
1400 return schema
1401
1402
1403class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1404 defaultTemplates={"catalogType": ""},
1405 dimensions=("instrument", "visit")):
1406 inputCatalogs = connectionTypes.Input(
1407 doc="Input per-detector Source Tables",
1408 name="{catalogType}sourceTable",
1409 storageClass="DataFrame",
1410 dimensions=("instrument", "visit", "detector"),
1411 multiple=True
1412 )
1413 outputCatalog = connectionTypes.Output(
1414 doc="Per-visit concatenation of Source Table",
1415 name="{catalogType}sourceTable_visit",
1416 storageClass="DataFrame",
1417 dimensions=("instrument", "visit")
1418 )
1419
1420
1421class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1422 pipelineConnections=ConsolidateSourceTableConnections):
1423 pass
1424
1425
1426class ConsolidateSourceTableTask(pipeBase.PipelineTask):
1427 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1428 """
1429 _DefaultName = 'consolidateSourceTable'
1430 ConfigClass = ConsolidateSourceTableConfig
1431
1432 inputDataset = 'sourceTable'
1433 outputDataset = 'sourceTable_visit'
1434
1435 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1436 from .makeWarp import reorderRefs
1437
1438 detectorOrder = [ref.dataId['detector'] for ref in inputRefs.inputCatalogs]
1439 detectorOrder.sort()
1440 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey='detector')
1441 inputs = butlerQC.get(inputRefs)
1442 self.log.info("Concatenating %s per-detector Source Tables",
1443 len(inputs['inputCatalogs']))
1444 df = pd.concat(inputs['inputCatalogs'])
1445 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1446
1447
1448class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1449 dimensions=("instrument",),
1450 defaultTemplates={"calexpType": ""}):
1451 visitSummaryRefs = connectionTypes.Input(
1452 doc="Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1453 name="{calexpType}visitSummary",
1454 storageClass="ExposureCatalog",
1455 dimensions=("instrument", "visit"),
1456 multiple=True,
1457 deferLoad=True,
1458 )
1459 outputCatalog = connectionTypes.Output(
1460 doc="CCD and Visit metadata table",
1461 name="{calexpType}ccdVisitTable",
1462 storageClass="DataFrame",
1463 dimensions=("instrument",)
1464 )
1465
1466
1467class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1468 pipelineConnections=MakeCcdVisitTableConnections):
1469 pass
1470
1471
1472class MakeCcdVisitTableTask(pipeBase.PipelineTask):
1473 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1474 """
1475 _DefaultName = 'makeCcdVisitTable'
1476 ConfigClass = MakeCcdVisitTableConfig
1477
1478 def run(self, visitSummaryRefs):
1479 """Make a table of ccd information from the `visitSummary` catalogs.
1480
1481 Parameters
1482 ----------
1483 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1484 List of DeferredDatasetHandles pointing to exposure catalogs with
1485 per-detector summary information.
1486
1487 Returns
1488 -------
1489 result : `lsst.pipe.Base.Struct`
1490 Results struct with attribute:
1491
1492 ``outputCatalog``
1493 Catalog of ccd and visit information.
1494 """
1495 ccdEntries = []
1496 for visitSummaryRef in visitSummaryRefs:
1497 visitSummary = visitSummaryRef.get()
1498 visitInfo = visitSummary[0].getVisitInfo()
1499
1500 ccdEntry = {}
1501 summaryTable = visitSummary.asAstropy()
1502 selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'decl', 'zenithDistance',
1503 'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise',
1504 'astromOffsetMean', 'astromOffsetStd', 'nPsfStar',
1505 'psfStarDeltaE1Median', 'psfStarDeltaE2Median',
1506 'psfStarDeltaE1Scatter', 'psfStarDeltaE2Scatter',
1507 'psfStarDeltaSizeMedian', 'psfStarDeltaSizeScatter',
1508 'psfStarScaledDeltaSizeScatter']
1509 ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
1510 # 'visit' is the human readable visit number.
1511 # 'visitId' is the key to the visitId table. They are the same.
1512 # Technically you should join to get the visit from the visit
1513 # table.
1514 ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1515 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id) for id in
1516 summaryTable['id']]
1517 packer = visitSummaryRef.dataId.universe.makePacker('visit_detector', visitSummaryRef.dataId)
1518 ccdVisitIds = [packer.pack(dataId) for dataId in dataIds]
1519 ccdEntry['ccdVisitId'] = ccdVisitIds
1520 ccdEntry['detector'] = summaryTable['id']
1521 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() if vR.getWcs()
1522 else np.nan for vR in visitSummary])
1523 ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1524
1525 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1526 ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1527 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1528 expTime = visitInfo.getExposureTime()
1529 ccdEntry['expTime'] = expTime
1530 ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1531 expTime_days = expTime / (60*60*24)
1532 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1533 ccdEntry['darkTime'] = visitInfo.getDarkTime()
1534 ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x']
1535 ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y']
1536 ccdEntry['llcra'] = summaryTable['raCorners'][:, 0]
1537 ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0]
1538 ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1]
1539 ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1]
1540 ccdEntry['urcra'] = summaryTable['raCorners'][:, 2]
1541 ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
1542 ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
1543 ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
1544 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1545 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1546 # values are actually wanted.
1547 ccdEntries.append(ccdEntry)
1548
1549 outputCatalog = pd.concat(ccdEntries)
1550 outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True)
1551 return pipeBase.Struct(outputCatalog=outputCatalog)
1552
1553
1554class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1555 dimensions=("instrument",),
1556 defaultTemplates={"calexpType": ""}):
1557 visitSummaries = connectionTypes.Input(
1558 doc="Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1559 name="{calexpType}visitSummary",
1560 storageClass="ExposureCatalog",
1561 dimensions=("instrument", "visit",),
1562 multiple=True,
1563 deferLoad=True,
1564 )
1565 outputCatalog = connectionTypes.Output(
1566 doc="Visit metadata table",
1567 name="{calexpType}visitTable",
1568 storageClass="DataFrame",
1569 dimensions=("instrument",)
1570 )
1571
1572
1573class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1574 pipelineConnections=MakeVisitTableConnections):
1575 pass
1576
1577
1578class MakeVisitTableTask(pipeBase.PipelineTask):
1579 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1580 """
1581 _DefaultName = 'makeVisitTable'
1582 ConfigClass = MakeVisitTableConfig
1583
1584 def run(self, visitSummaries):
1585 """Make a table of visit information from the `visitSummary` catalogs.
1586
1587 Parameters
1588 ----------
1589 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1590 List of exposure catalogs with per-detector summary information.
1591 Returns
1592 -------
1593 result : `lsst.pipe.Base.Struct`
1594 Results struct with attribute:
1595
1596 ``outputCatalog``
1597 Catalog of visit information.
1598 """
1599 visitEntries = []
1600 for visitSummary in visitSummaries:
1601 visitSummary = visitSummary.get()
1602 visitRow = visitSummary[0]
1603 visitInfo = visitRow.getVisitInfo()
1604
1605 visitEntry = {}
1606 visitEntry["visitId"] = visitRow['visit']
1607 visitEntry["visit"] = visitRow['visit']
1608 visitEntry["physical_filter"] = visitRow['physical_filter']
1609 visitEntry["band"] = visitRow['band']
1610 raDec = visitInfo.getBoresightRaDec()
1611 visitEntry["ra"] = raDec.getRa().asDegrees()
1612 visitEntry["decl"] = raDec.getDec().asDegrees()
1613 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1614 azAlt = visitInfo.getBoresightAzAlt()
1615 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1616 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1617 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1618 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1619 expTime = visitInfo.getExposureTime()
1620 visitEntry["expTime"] = expTime
1621 visitEntry["expMidpt"] = visitInfo.getDate().toPython()
1622 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1623 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1624 expTime_days = expTime / (60*60*24)
1625 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1626 visitEntries.append(visitEntry)
1627
1628 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1629 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1630 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1631
1632 outputCatalog = pd.DataFrame(data=visitEntries)
1633 outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
1634 return pipeBase.Struct(outputCatalog=outputCatalog)
1635
1636
1637class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1638 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1639
1640 inputCatalog = connectionTypes.Input(
1641 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1642 "By default, it is the output of ForcedPhotCcdTask on calexps",
1643 name="forced_src",
1644 storageClass="SourceCatalog",
1645 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1646 )
1647 inputCatalogDiff = connectionTypes.Input(
1648 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1649 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1650 name="forced_diff",
1651 storageClass="SourceCatalog",
1652 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1653 )
1654 outputCatalog = connectionTypes.Output(
1655 doc="InputCatalogs horizonatally joined on `objectId` in Parquet format",
1656 name="mergedForcedSource",
1657 storageClass="DataFrame",
1658 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1659 )
1660
1661
1662class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1663 pipelineConnections=WriteForcedSourceTableConnections):
1665 doc="Column on which to join the two input tables on and make the primary key of the output",
1666 dtype=str,
1667 default="objectId",
1668 )
1669
1670
1671class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1672 """Merge and convert per-detector forced source catalogs to parquet.
1673
1674 Because the predecessor ForcedPhotCcdTask operates per-detector,
1675 per-tract, (i.e., it has tract in its dimensions), detectors
1676 on the tract boundary may have multiple forced source catalogs.
1677
1678 The successor task TransformForcedSourceTable runs per-patch
1679 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1680 available multiple epochs.
1681 """
1682 _DefaultName = "writeForcedSourceTable"
1683 ConfigClass = WriteForcedSourceTableConfig
1684
1685 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1686 inputs = butlerQC.get(inputRefs)
1687 # Add ccdVisitId to allow joining with CcdVisitTable
1688 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
1689 inputs['band'] = butlerQC.quantum.dataId.full['band']
1690 outputs = self.run(**inputs)
1691 butlerQC.put(outputs, outputRefs)
1692
1693 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1694 dfs = []
1695 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
1696 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1697 df = df.reindex(sorted(df.columns), axis=1)
1698 df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
1699 df['band'] = band if band else pd.NA
1700 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1701 names=('dataset', 'column'))
1702
1703 dfs.append(df)
1704
1705 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1706 return pipeBase.Struct(outputCatalog=outputCatalog)
1707
1708
1709class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1710 dimensions=("instrument", "skymap", "patch", "tract")):
1711
1712 inputCatalogs = connectionTypes.Input(
1713 doc="Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1714 name="mergedForcedSource",
1715 storageClass="DataFrame",
1716 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1717 multiple=True,
1718 deferLoad=True
1719 )
1720 referenceCatalog = connectionTypes.Input(
1721 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1722 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1723 "are expected.",
1724 name="objectTable",
1725 storageClass="DataFrame",
1726 dimensions=("tract", "patch", "skymap"),
1727 deferLoad=True
1728 )
1729 outputCatalog = connectionTypes.Output(
1730 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1731 "specified set of functors",
1732 name="forcedSourceTable",
1733 storageClass="DataFrame",
1734 dimensions=("tract", "patch", "skymap")
1735 )
1736
1737
1738class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1739 pipelineConnections=TransformForcedSourceTableConnections):
1740 referenceColumns = pexConfig.ListField(
1741 dtype=str,
1742 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1743 optional=True,
1744 doc="Columns to pull from reference catalog",
1745 )
1746 keyRef = lsst.pex.config.Field(
1747 doc="Column on which to join the two input tables on and make the primary key of the output",
1748 dtype=str,
1749 default="objectId",
1750 )
1752 doc="Rename the output DataFrame index to this name",
1753 dtype=str,
1754 default="forcedSourceId",
1755 )
1756
1757 def setDefaults(self):
1758 super().setDefaults()
1759 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'ForcedSource.yaml')
1760 self.columnsFromDataId = ['tract', 'patch']
1761
1762
1763class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1764 """Transform/standardize a ForcedSource catalog
1765
1766 Transforms each wide, per-detector forcedSource parquet table per the
1767 specification file (per-camera defaults found in ForcedSource.yaml).
1768 All epochs that overlap the patch are aggregated into one per-patch
1769 narrow-parquet file.
1770
1771 No de-duplication of rows is performed. Duplicate resolutions flags are
1772 pulled in from the referenceCatalog: `detect_isPrimary`,
1773 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
1774 for analysis or compare duplicates for QA.
1775
1776 The resulting table includes multiple bands. Epochs (MJDs) and other useful
1777 per-visit rows can be retreived by joining with the CcdVisitTable on
1778 ccdVisitId.
1779 """
1780 _DefaultName = "transformForcedSourceTable"
1781 ConfigClass = TransformForcedSourceTableConfig
1782
1783 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1784 inputs = butlerQC.get(inputRefs)
1785 if self.funcs is None:
1786 raise ValueError("config.functorFile is None. "
1787 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
1788 outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
1789 dataId=outputRefs.outputCatalog.dataId.full)
1790
1791 butlerQC.put(outputs, outputRefs)
1792
1793 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
1794 dfs = []
1795 ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
1796 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
1797 for handle in inputCatalogs:
1798 result = self.transform(None, handle, funcs, dataId)
1799 # Filter for only rows that were detected on (overlap) the patch
1800 dfs.append(result.df.join(ref, how='inner'))
1801
1802 outputCatalog = pd.concat(dfs)
1803
1804 # Now that we are done joining on config.keyRef
1805 # Change index to config.key by
1806 outputCatalog.index.rename(self.config.keyRef, inplace=True)
1807 # Add config.keyRef to the column list
1808 outputCatalog.reset_index(inplace=True)
1809 # Set the forcedSourceId to the index. This is specified in the
1810 # ForcedSource.yaml
1811 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
1812 # Rename it to the config.key
1813 outputCatalog.index.rename(self.config.key, inplace=True)
1814
1815 self.log.info("Made a table of %d columns and %d rows",
1816 len(outputCatalog.columns), len(outputCatalog))
1817 return pipeBase.Struct(outputCatalog=outputCatalog)
1818
1819
1820class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
1821 defaultTemplates={"catalogType": ""},
1822 dimensions=("instrument", "tract")):
1823 inputCatalogs = connectionTypes.Input(
1824 doc="Input per-patch DataFrame Tables to be concatenated",
1825 name="{catalogType}ForcedSourceTable",
1826 storageClass="DataFrame",
1827 dimensions=("tract", "patch", "skymap"),
1828 multiple=True,
1829 )
1830
1831 outputCatalog = connectionTypes.Output(
1832 doc="Output per-tract concatenation of DataFrame Tables",
1833 name="{catalogType}ForcedSourceTable_tract",
1834 storageClass="DataFrame",
1835 dimensions=("tract", "skymap"),
1836 )
1837
1838
1839class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
1840 pipelineConnections=ConsolidateTractConnections):
1841 pass
1842
1843
1844class ConsolidateTractTask(pipeBase.PipelineTask):
1845 """Concatenate any per-patch, dataframe list into a single
1846 per-tract DataFrame.
1847 """
1848 _DefaultName = 'ConsolidateTract'
1849 ConfigClass = ConsolidateTractConfig
1850
1851 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1852 inputs = butlerQC.get(inputRefs)
1853 # Not checking at least one inputCatalog exists because that'd be an
1854 # empty QG.
1855 self.log.info("Concatenating %s per-patch %s Tables",
1856 len(inputs['inputCatalogs']),
1857 inputRefs.inputCatalogs[0].datasetType.name)
1858 df = pd.concat(inputs['inputCatalogs'])
1859 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > type
Definition: Detector.cc:163
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
An integer coordinate rectangle.
Definition: Box.h:55
def compute(self, dropna=False, pool=None)
Definition: postprocess.py:729
def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None)
Definition: postprocess.py:684
def getAnalysis(self, parq, funcs=None, band=None)
Definition: postprocess.py:929
def transform(self, band, parq, funcs, dataId)
Definition: postprocess.py:935
def run(self, parq, funcs=None, dataId=None, band=None)
Definition: postprocess.py:899
def runQuantum(self, butlerQC, inputRefs, outputRefs)
Definition: postprocess.py:889
daf::base::PropertyList * list
Definition: fits.cc:928
daf::base::PropertySet * set
Definition: fits.cc:927
void updateSourceCoords(geom::SkyWcs const &wcs, SourceCollection &sourceList)
Update sky coordinates in a collection of source objects.
Definition: wcsUtils.cc:95
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
Definition: postprocess.py:62