LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
postprocess.py
Go to the documentation of this file.
1# This file is part of pipe_tasks
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22import functools
23import pandas as pd
24from collections import defaultdict
25import logging
26import numpy as np
27import numbers
28import os
29
30import lsst.geom
31import lsst.pex.config as pexConfig
32import lsst.pipe.base as pipeBase
33import lsst.daf.base as dafBase
34from lsst.obs.base import ExposureIdInfo
35from lsst.pipe.base import connectionTypes
36import lsst.afw.table as afwTable
37from lsst.meas.base import SingleFrameMeasurementTask
38from lsst.pipe.base import CmdLineTask, ArgumentParser, DataIdContainer
39from lsst.coadd.utils.coaddDataIdContainer import CoaddDataIdContainer
40from lsst.daf.butler import DeferredDatasetHandle, DataCoordinate
41from lsst.skymap import BaseSkyMap
42
43from .parquetTable import ParquetTable
44from .multiBandUtils import makeMergeArgumentParser, MergeSourcesRunner
45from .functors import CompositeFunctor, Column
46
47log = logging.getLogger(__name__)
48
49
50def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
51 """Flattens a dataframe with multilevel column index.
52 """
53 newDf = pd.DataFrame()
54 # band is the level 0 index
55 dfBands = df.columns.unique(level=0).values
56 for band in dfBands:
57 subdf = df[band]
58 columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
59 newColumns = {c: columnFormat.format(band, c)
60 for c in subdf.columns if c not in noDupCols}
61 cols = list(newColumns.keys())
62 newDf = pd.concat([newDf, subdf[cols].rename(columns=newColumns)], axis=1)
63
64 # Band must be present in the input and output or else column is all NaN:
65 presentBands = dfBands if inputBands is None else list(set(inputBands).intersection(dfBands))
66 # Get the unexploded columns from any present band's partition
67 noDupDf = df[presentBands[0]][noDupCols]
68 newDf = pd.concat([noDupDf, newDf], axis=1)
69 return newDf
70
71
72class WriteObjectTableConnections(pipeBase.PipelineTaskConnections,
73 defaultTemplates={"coaddName": "deep"},
74 dimensions=("tract", "patch", "skymap")):
75 inputCatalogMeas = connectionTypes.Input(
76 doc="Catalog of source measurements on the deepCoadd.",
77 dimensions=("tract", "patch", "band", "skymap"),
78 storageClass="SourceCatalog",
79 name="{coaddName}Coadd_meas",
80 multiple=True
81 )
82 inputCatalogForcedSrc = connectionTypes.Input(
83 doc="Catalog of forced measurements (shape and position parameters held fixed) on the deepCoadd.",
84 dimensions=("tract", "patch", "band", "skymap"),
85 storageClass="SourceCatalog",
86 name="{coaddName}Coadd_forced_src",
87 multiple=True
88 )
89 inputCatalogRef = connectionTypes.Input(
90 doc="Catalog marking the primary detection (which band provides a good shape and position)"
91 "for each detection in deepCoadd_mergeDet.",
92 dimensions=("tract", "patch", "skymap"),
93 storageClass="SourceCatalog",
94 name="{coaddName}Coadd_ref"
95 )
96 outputCatalog = connectionTypes.Output(
97 doc="A vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
98 "stored as a DataFrame with a multi-level column index per-patch.",
99 dimensions=("tract", "patch", "skymap"),
100 storageClass="DataFrame",
101 name="{coaddName}Coadd_obj"
102 )
103
104
105class WriteObjectTableConfig(pipeBase.PipelineTaskConfig,
106 pipelineConnections=WriteObjectTableConnections):
107 engine = pexConfig.Field(
108 dtype=str,
109 default="pyarrow",
110 doc="Parquet engine for writing (pyarrow or fastparquet)"
111 )
112 coaddName = pexConfig.Field(
113 dtype=str,
114 default="deep",
115 doc="Name of coadd"
116 )
117
118
119class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
120 """Write filter-merged source tables to parquet
121 """
122 _DefaultName = "writeObjectTable"
123 ConfigClass = WriteObjectTableConfig
124 RunnerClass = MergeSourcesRunner
125
126 # Names of table datasets to be merged
127 inputDatasets = ('forced_src', 'meas', 'ref')
128
129 # Tag of output dataset written by `MergeSourcesTask.write`
130 outputDataset = 'obj'
131
132 def __init__(self, butler=None, schema=None, **kwargs):
133 # It is a shame that this class can't use the default init for
134 # CmdLineTask, but to do so would require its own special task
135 # runner, which is many more lines of specialization, so this is
136 # how it is for now.
137 super().__init__(**kwargs)
138
139 def runDataRef(self, patchRefList):
140 """!
141 @brief Merge coadd sources from multiple bands. Calls @ref `run` which
142 must be defined in subclasses that inherit from MergeSourcesTask.
143 @param[in] patchRefList list of data references for each filter
144 """
145 catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
146 dataId = patchRefList[0].dataId
147 mergedCatalog = self.run(catalogs, tract=dataId['tract'], patch=dataId['patch'])
148 self.write(patchRefList[0], ParquetTable(dataFrame=mergedCatalog))
149
150 def runQuantum(self, butlerQC, inputRefs, outputRefs):
151 inputs = butlerQC.get(inputRefs)
152
153 measDict = {ref.dataId['band']: {'meas': cat} for ref, cat in
154 zip(inputRefs.inputCatalogMeas, inputs['inputCatalogMeas'])}
155 forcedSourceDict = {ref.dataId['band']: {'forced_src': cat} for ref, cat in
156 zip(inputRefs.inputCatalogForcedSrc, inputs['inputCatalogForcedSrc'])}
157
158 catalogs = {}
159 for band in measDict.keys():
160 catalogs[band] = {'meas': measDict[band]['meas'],
161 'forced_src': forcedSourceDict[band]['forced_src'],
162 'ref': inputs['inputCatalogRef']}
163 dataId = butlerQC.quantum.dataId
164 df = self.run(catalogs=catalogs, tract=dataId['tract'], patch=dataId['patch'])
165 outputs = pipeBase.Struct(outputCatalog=df)
166 butlerQC.put(outputs, outputRefs)
167
168 @classmethod
169 def _makeArgumentParser(cls):
170 """Create a suitable ArgumentParser.
171
172 We will use the ArgumentParser to get a list of data
173 references for patches; the RunnerClass will sort them into lists
174 of data references for the same patch.
175
176 References first of self.inputDatasets, rather than
177 self.inputDataset
178 """
179 return makeMergeArgumentParser(cls._DefaultName, cls.inputDatasets[0])
180
181 def readCatalog(self, patchRef):
182 """Read input catalogs
183
184 Read all the input datasets given by the 'inputDatasets'
185 attribute.
186
187 Parameters
188 ----------
190 Data reference for patch.
191
192 Returns
193 -------
194 Tuple consisting of band name and a dict of catalogs, keyed by
195 dataset name.
196 """
197 band = patchRef.get(self.config.coaddName + "Coadd_filter", immediate=True).bandLabel
198 catalogDict = {}
199 for dataset in self.inputDatasets:
200 catalog = patchRef.get(self.config.coaddName + "Coadd_" + dataset, immediate=True)
201 self.log.info("Read %d sources from %s for band %s: %s",
202 len(catalog), dataset, band, patchRef.dataId)
203 catalogDict[dataset] = catalog
204 return band, catalogDict
205
206 def run(self, catalogs, tract, patch):
207 """Merge multiple catalogs.
208
209 Parameters
210 ----------
211 catalogs : `dict`
212 Mapping from filter names to dict of catalogs.
213 tract : int
214 tractId to use for the tractId column.
215 patch : str
216 patchId to use for the patchId column.
217
218 Returns
219 -------
220 catalog : `pandas.DataFrame`
221 Merged dataframe.
222 """
223
224 dfs = []
225 for filt, tableDict in catalogs.items():
226 for dataset, table in tableDict.items():
227 # Convert afwTable to pandas DataFrame
228 df = table.asAstropy().to_pandas().set_index('id', drop=True)
229
230 # Sort columns by name, to ensure matching schema among patches
231 df = df.reindex(sorted(df.columns), axis=1)
232 df['tractId'] = tract
233 df['patchId'] = patch
234
235 # Make columns a 3-level MultiIndex
236 df.columns = pd.MultiIndex.from_tuples([(dataset, filt, c) for c in df.columns],
237 names=('dataset', 'band', 'column'))
238 dfs.append(df)
239
240 catalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
241 return catalog
242
243 def write(self, patchRef, catalog):
244 """Write the output.
245
246 Parameters
247 ----------
248 catalog : `ParquetTable`
249 Catalog to write.
251 Data reference for patch.
252 """
253 patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
254 # since the filter isn't actually part of the data ID for the dataset
255 # we're saving, it's confusing to see it in the log message, even if
256 # the butler simply ignores it.
257 mergeDataId = patchRef.dataId.copy()
258 del mergeDataId["filter"]
259 self.log.info("Wrote merged catalog: %s", mergeDataId)
260
261 def writeMetadata(self, dataRefList):
262 """No metadata to write, and not sure how to write it for a list of
263 dataRefs.
264 """
265 pass
266
267
268class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
269 defaultTemplates={"catalogType": ""},
270 dimensions=("instrument", "visit", "detector")):
271
272 catalog = connectionTypes.Input(
273 doc="Input full-depth catalog of sources produced by CalibrateTask",
274 name="{catalogType}src",
275 storageClass="SourceCatalog",
276 dimensions=("instrument", "visit", "detector")
277 )
278 outputCatalog = connectionTypes.Output(
279 doc="Catalog of sources, `src` in Parquet format. The 'id' column is "
280 "replaced with an index; all other columns are unchanged.",
281 name="{catalogType}source",
282 storageClass="DataFrame",
283 dimensions=("instrument", "visit", "detector")
284 )
285
286
287class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
288 pipelineConnections=WriteSourceTableConnections):
289 pass
290
291
292class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
293 """Write source table to parquet.
294 """
295 _DefaultName = "writeSourceTable"
296 ConfigClass = WriteSourceTableConfig
297
298 def runQuantum(self, butlerQC, inputRefs, outputRefs):
299 inputs = butlerQC.get(inputRefs)
300 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
301 result = self.run(**inputs).table
302 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
303 butlerQC.put(outputs, outputRefs)
304
305 def run(self, catalog, ccdVisitId=None, **kwargs):
306 """Convert `src` catalog to parquet
307
308 Parameters
309 ----------
310 catalog: `afwTable.SourceCatalog`
311 catalog to be converted
312 ccdVisitId: `int`
313 ccdVisitId to be added as a column
314
315 Returns
316 -------
317 result : `lsst.pipe.base.Struct`
318 ``table``
319 `ParquetTable` version of the input catalog
320 """
321 self.log.info("Generating parquet table from src catalog ccdVisitId=%s", ccdVisitId)
322 df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
323 df['ccdVisitId'] = ccdVisitId
324 return pipeBase.Struct(table=ParquetTable(dataFrame=df))
325
326
327class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections,
328 defaultTemplates={"catalogType": "",
329 "skyWcsName": "jointcal",
330 "photoCalibName": "fgcm"},
331 dimensions=("instrument", "visit", "detector", "skymap")):
332 skyMap = connectionTypes.Input(
333 doc="skyMap needed to choose which tract-level calibrations to use when multiple available",
334 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
335 storageClass="SkyMap",
336 dimensions=("skymap",),
337 )
338 exposure = connectionTypes.Input(
339 doc="Input exposure to perform photometry on.",
340 name="calexp",
341 storageClass="ExposureF",
342 dimensions=["instrument", "visit", "detector"],
343 )
344 externalSkyWcsTractCatalog = connectionTypes.Input(
345 doc=("Per-tract, per-visit wcs calibrations. These catalogs use the detector "
346 "id for the catalog id, sorted on id for fast lookup."),
347 name="{skyWcsName}SkyWcsCatalog",
348 storageClass="ExposureCatalog",
349 dimensions=["instrument", "visit", "tract"],
350 multiple=True
351 )
352 externalSkyWcsGlobalCatalog = connectionTypes.Input(
353 doc=("Per-visit wcs calibrations computed globally (with no tract information). "
354 "These catalogs use the detector id for the catalog id, sorted on id for "
355 "fast lookup."),
356 name="{skyWcsName}SkyWcsCatalog",
357 storageClass="ExposureCatalog",
358 dimensions=["instrument", "visit"],
359 )
360 externalPhotoCalibTractCatalog = connectionTypes.Input(
361 doc=("Per-tract, per-visit photometric calibrations. These catalogs use the "
362 "detector id for the catalog id, sorted on id for fast lookup."),
363 name="{photoCalibName}PhotoCalibCatalog",
364 storageClass="ExposureCatalog",
365 dimensions=["instrument", "visit", "tract"],
366 multiple=True
367 )
368 externalPhotoCalibGlobalCatalog = connectionTypes.Input(
369 doc=("Per-visit photometric calibrations computed globally (with no tract "
370 "information). These catalogs use the detector id for the catalog id, "
371 "sorted on id for fast lookup."),
372 name="{photoCalibName}PhotoCalibCatalog",
373 storageClass="ExposureCatalog",
374 dimensions=["instrument", "visit"],
375 )
376
377 def __init__(self, *, config=None):
378 super().__init__(config=config)
379 # Same connection boilerplate as all other applications of
380 # Global/Tract calibrations
381 if config.doApplyExternalSkyWcs and config.doReevaluateSkyWcs:
382 if config.useGlobalExternalSkyWcs:
383 self.inputs.remove("externalSkyWcsTractCatalog")
384 else:
385 self.inputs.remove("externalSkyWcsGlobalCatalog")
386 else:
387 self.inputs.remove("externalSkyWcsTractCatalog")
388 self.inputs.remove("externalSkyWcsGlobalCatalog")
389 if config.doApplyExternalPhotoCalib and config.doReevaluatePhotoCalib:
390 if config.useGlobalExternalPhotoCalib:
391 self.inputs.remove("externalPhotoCalibTractCatalog")
392 else:
393 self.inputs.remove("externalPhotoCalibGlobalCatalog")
394 else:
395 self.inputs.remove("externalPhotoCalibTractCatalog")
396 self.inputs.remove("externalPhotoCalibGlobalCatalog")
397
398
399class WriteRecalibratedSourceTableConfig(WriteSourceTableConfig,
400 pipelineConnections=WriteRecalibratedSourceTableConnections):
401
402 doReevaluatePhotoCalib = pexConfig.Field(
403 dtype=bool,
404 default=True,
405 doc=("Add or replace local photoCalib columns from either the calexp.photoCalib or jointcal/FGCM")
406 )
407 doReevaluateSkyWcs = pexConfig.Field(
408 dtype=bool,
409 default=True,
410 doc=("Add or replace local WCS columns from either the calexp.wcs or or jointcal")
411 )
412 doApplyExternalPhotoCalib = pexConfig.Field(
413 dtype=bool,
414 default=True,
415 doc=("Whether to apply external photometric calibration via an "
416 "`lsst.afw.image.PhotoCalib` object. Uses the "
417 "``externalPhotoCalibName`` field to determine which calibration "
418 "to load."),
419 )
420 doApplyExternalSkyWcs = pexConfig.Field(
421 dtype=bool,
422 default=True,
423 doc=("Whether to apply external astrometric calibration via an "
424 "`lsst.afw.geom.SkyWcs` object. Uses ``externalSkyWcsName`` "
425 "field to determine which calibration to load."),
426 )
427 useGlobalExternalPhotoCalib = pexConfig.Field(
428 dtype=bool,
429 default=True,
430 doc=("When using doApplyExternalPhotoCalib, use 'global' calibrations "
431 "that are not run per-tract. When False, use per-tract photometric "
432 "calibration files.")
433 )
434 useGlobalExternalSkyWcs = pexConfig.Field(
435 dtype=bool,
436 default=False,
437 doc=("When using doApplyExternalSkyWcs, use 'global' calibrations "
438 "that are not run per-tract. When False, use per-tract wcs "
439 "files.")
440 )
441
442 def validate(self):
443 super().validate()
444 if self.doApplyExternalSkyWcs and not self.doReevaluateSkyWcs:
445 log.warning("doApplyExternalSkyWcs=True but doReevaluateSkyWcs=False"
446 "External SkyWcs will not be read or evaluated.")
447 if self.doApplyExternalPhotoCalib and not self.doReevaluatePhotoCalib:
448 log.warning("doApplyExternalPhotoCalib=True but doReevaluatePhotoCalib=False."
449 "External PhotoCalib will not be read or evaluated.")
450
451
452class WriteRecalibratedSourceTableTask(WriteSourceTableTask):
453 """Write source table to parquet
454 """
455 _DefaultName = "writeRecalibratedSourceTable"
456 ConfigClass = WriteRecalibratedSourceTableConfig
457
458 def runQuantum(self, butlerQC, inputRefs, outputRefs):
459 inputs = butlerQC.get(inputRefs)
460 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
461 inputs['exposureIdInfo'] = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, "visit_detector")
462
463 if self.config.doReevaluatePhotoCalib or self.config.doReevaluateSkyWcs:
464 if self.config.doApplyExternalPhotoCalib or self.config.doApplyExternalSkyWcs:
465 inputs['exposure'] = self.attachCalibs(inputRefs, **inputs)
466
467 inputs['catalog'] = self.addCalibColumns(**inputs)
468
469 result = self.run(**inputs).table
470 outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
471 butlerQC.put(outputs, outputRefs)
472
473 def attachCalibs(self, inputRefs, skyMap, exposure, externalSkyWcsGlobalCatalog=None,
474 externalSkyWcsTractCatalog=None, externalPhotoCalibGlobalCatalog=None,
475 externalPhotoCalibTractCatalog=None, **kwargs):
476 """Apply external calibrations to exposure per configuration
477
478 When multiple tract-level calibrations overlap, select the one with the
479 center closest to detector.
480
481 Parameters
482 ----------
483 inputRefs : `lsst.pipe.base.InputQuantizedConnection`, for dataIds of
484 tract-level calibs.
485 skyMap : `lsst.skymap.SkyMap`
487 Input exposure to adjust calibrations.
488 externalSkyWcsGlobalCatalog : `lsst.afw.table.ExposureCatalog`, optional
489 Exposure catalog with external skyWcs to be applied per config
490 externalSkyWcsTractCatalog : `lsst.afw.table.ExposureCatalog`, optional
491 Exposure catalog with external skyWcs to be applied per config
492 externalPhotoCalibGlobalCatalog : `lsst.afw.table.ExposureCatalog`, optional
493 Exposure catalog with external photoCalib to be applied per config
494 externalPhotoCalibTractCatalog : `lsst.afw.table.ExposureCatalog`, optional
495
496
497 Returns
498 -------
500 Exposure with adjusted calibrations.
501 """
502 if not self.config.doApplyExternalSkyWcs:
503 # Do not modify the exposure's SkyWcs
504 externalSkyWcsCatalog = None
505 elif self.config.useGlobalExternalSkyWcs:
506 # Use the global external SkyWcs
507 externalSkyWcsCatalog = externalSkyWcsGlobalCatalog
508 self.log.info('Applying global SkyWcs')
509 else:
510 # use tract-level external SkyWcs from the closest overlapping tract
511 inputRef = getattr(inputRefs, 'externalSkyWcsTractCatalog')
512 tracts = [ref.dataId['tract'] for ref in inputRef]
513 if len(tracts) == 1:
514 ind = 0
515 self.log.info('Applying tract-level SkyWcs from tract %s', tracts[ind])
516 else:
517 ind = self.getClosestTract(tracts, skyMap,
518 exposure.getBBox(), exposure.getWcs())
519 self.log.info('Multiple overlapping externalSkyWcsTractCatalogs found (%s). '
520 'Applying closest to detector center: tract=%s', str(tracts), tracts[ind])
521
522 externalSkyWcsCatalog = externalSkyWcsTractCatalog[ind]
523
524 if not self.config.doApplyExternalPhotoCalib:
525 # Do not modify the exposure's PhotoCalib
526 externalPhotoCalibCatalog = None
527 elif self.config.useGlobalExternalPhotoCalib:
528 # Use the global external PhotoCalib
529 externalPhotoCalibCatalog = externalPhotoCalibGlobalCatalog
530 self.log.info('Applying global PhotoCalib')
531 else:
532 # use tract-level external PhotoCalib from the closest overlapping tract
533 inputRef = getattr(inputRefs, 'externalPhotoCalibTractCatalog')
534 tracts = [ref.dataId['tract'] for ref in inputRef]
535 if len(tracts) == 1:
536 ind = 0
537 self.log.info('Applying tract-level PhotoCalib from tract %s', tracts[ind])
538 else:
539 ind = self.getClosestTract(tracts, skyMap,
540 exposure.getBBox(), exposure.getWcs())
541 self.log.info('Multiple overlapping externalPhotoCalibTractCatalogs found (%s). '
542 'Applying closest to detector center: tract=%s', str(tracts), tracts[ind])
543
544 externalPhotoCalibCatalog = externalPhotoCalibTractCatalog[ind]
545
546 return self.prepareCalibratedExposure(exposure, externalSkyWcsCatalog, externalPhotoCalibCatalog)
547
548 def getClosestTract(self, tracts, skyMap, bbox, wcs):
549 """Find the index of the tract closest to detector from list of tractIds
550
551 Parameters
552 ----------
553 tracts: `list` [`int`]
554 Iterable of integer tractIds
555 skyMap : `lsst.skymap.SkyMap`
556 skyMap to lookup tract geometry and wcs
557 bbox : `lsst.geom.Box2I`
558 Detector bbox, center of which will compared to tract centers
560 Detector Wcs object to map the detector center to SkyCoord
561
562 Returns
563 -------
564 index : `int`
565 """
566 if len(tracts) == 1:
567 return 0
568
569 center = wcs.pixelToSky(bbox.getCenter())
570 sep = []
571 for tractId in tracts:
572 tract = skyMap[tractId]
573 tractCenter = tract.getWcs().pixelToSky(tract.getBBox().getCenter())
574 sep.append(center.separation(tractCenter))
575
576 return np.argmin(sep)
577
578 def prepareCalibratedExposure(self, exposure, externalSkyWcsCatalog=None, externalPhotoCalibCatalog=None):
579 """Prepare a calibrated exposure and apply external calibrations
580 if so configured.
581
582 Parameters
583 ----------
585 Input exposure to adjust calibrations.
586 externalSkyWcsCatalog : `lsst.afw.table.ExposureCatalog`, optional
587 Exposure catalog with external skyWcs to be applied
588 if config.doApplyExternalSkyWcs=True. Catalog uses the detector id
589 for the catalog id, sorted on id for fast lookup.
590 externalPhotoCalibCatalog : `lsst.afw.table.ExposureCatalog`, optional
591 Exposure catalog with external photoCalib to be applied
592 if config.doApplyExternalPhotoCalib=True. Catalog uses the detector
593 id for the catalog id, sorted on id for fast lookup.
594
595 Returns
596 -------
598 Exposure with adjusted calibrations.
599 """
600 detectorId = exposure.getInfo().getDetector().getId()
601
602 if externalPhotoCalibCatalog is not None:
603 row = externalPhotoCalibCatalog.find(detectorId)
604 if row is None:
605 self.log.warning("Detector id %s not found in externalPhotoCalibCatalog; "
606 "Using original photoCalib.", detectorId)
607 else:
608 photoCalib = row.getPhotoCalib()
609 if photoCalib is None:
610 self.log.warning("Detector id %s has None for photoCalib in externalPhotoCalibCatalog; "
611 "Using original photoCalib.", detectorId)
612 else:
613 exposure.setPhotoCalib(photoCalib)
614
615 if externalSkyWcsCatalog is not None:
616 row = externalSkyWcsCatalog.find(detectorId)
617 if row is None:
618 self.log.warning("Detector id %s not found in externalSkyWcsCatalog; "
619 "Using original skyWcs.", detectorId)
620 else:
621 skyWcs = row.getWcs()
622 if skyWcs is None:
623 self.log.warning("Detector id %s has None for skyWcs in externalSkyWcsCatalog; "
624 "Using original skyWcs.", detectorId)
625 else:
626 exposure.setWcs(skyWcs)
627
628 return exposure
629
630 def addCalibColumns(self, catalog, exposure, exposureIdInfo, **kwargs):
631 """Add replace columns with calibs evaluated at each centroid
632
633 Add or replace 'base_LocalWcs' `base_LocalPhotoCalib' columns in a
634 a source catalog, by rerunning the plugins.
635
636 Parameters
637 ----------
639 catalog to which calib columns will be added
641 Exposure with attached PhotoCalibs and SkyWcs attributes to be
642 reevaluated at local centroids. Pixels are not required.
643 exposureIdInfo : `lsst.obs.base.ExposureIdInfo`
644
645 Returns
646 -------
648 Source Catalog with requested local calib columns
649 """
650 measureConfig = SingleFrameMeasurementTask.ConfigClass()
651 measureConfig.doReplaceWithNoise = False
652
653 measureConfig.plugins.names = []
654 if self.config.doReevaluateSkyWcs:
655 measureConfig.plugins.names.add('base_LocalWcs')
656 self.log.info("Re-evaluating base_LocalWcs plugin")
657 if self.config.doReevaluatePhotoCalib:
658 measureConfig.plugins.names.add('base_LocalPhotoCalib')
659 self.log.info("Re-evaluating base_LocalPhotoCalib plugin")
660 pluginsNotToCopy = tuple(measureConfig.plugins.names)
661
662 # Create a new schema and catalog
663 # Copy all columns from original except for the ones to reevaluate
664 aliasMap = catalog.schema.getAliasMap()
665 mapper = afwTable.SchemaMapper(catalog.schema)
666 for item in catalog.schema:
667 if not item.field.getName().startswith(pluginsNotToCopy):
668 mapper.addMapping(item.key)
669
670 schema = mapper.getOutputSchema()
671 measurement = SingleFrameMeasurementTask(config=measureConfig, schema=schema)
672 schema.setAliasMap(aliasMap)
673 newCat = afwTable.SourceCatalog(schema)
674 newCat.extend(catalog, mapper=mapper)
675
676 measurement.run(measCat=newCat, exposure=exposure, exposureId=exposureIdInfo.expId)
677
678 return newCat
679
680
681class PostprocessAnalysis(object):
682 """Calculate columns from ParquetTable.
683
684 This object manages and organizes an arbitrary set of computations
685 on a catalog. The catalog is defined by a
686 `lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such
687 as a `deepCoadd_obj` dataset, and the computations are defined by a
688 collection of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
689 a `CompositeFunctor`).
690
691 After the object is initialized, accessing the `.df` attribute (which
692 holds the `pandas.DataFrame` containing the results of the calculations)
693 triggers computation of said dataframe.
694
695 One of the conveniences of using this object is the ability to define a
696 desired common filter for all functors. This enables the same functor
697 collection to be passed to several different `PostprocessAnalysis` objects
698 without having to change the original functor collection, since the `filt`
699 keyword argument of this object triggers an overwrite of the `filt`
700 property for all functors in the collection.
701
702 This object also allows a list of refFlags to be passed, and defines a set
703 of default refFlags that are always included even if not requested.
704
705 If a list of `ParquetTable` object is passed, rather than a single one,
706 then the calculations will be mapped over all the input catalogs. In
707 principle, it should be straightforward to parallelize this activity, but
708 initial tests have failed (see TODO in code comments).
709
710 Parameters
711 ----------
712 parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
713 Source catalog(s) for computation.
714
715 functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
716 Computations to do (functors that act on `parq`).
717 If a dict, the output
718 DataFrame will have columns keyed accordingly.
719 If a list, the column keys will come from the
720 `.shortname` attribute of each functor.
721
722 filt : `str`, optional
723 Filter in which to calculate. If provided,
724 this will overwrite any existing `.filt` attribute
725 of the provided functors.
726
727 flags : `list`, optional
728 List of flags (per-band) to include in output table.
729 Taken from the `meas` dataset if applied to a multilevel Object Table.
730
731 refFlags : `list`, optional
732 List of refFlags (only reference band) to include in output table.
733
734 forcedFlags : `list`, optional
735 List of flags (per-band) to include in output table.
736 Taken from the ``forced_src`` dataset if applied to a
737 multilevel Object Table. Intended for flags from measurement plugins
738 only run during multi-band forced-photometry.
739 """
740 _defaultRefFlags = []
741 _defaultFuncs = ()
742
743 def __init__(self, parq, functors, filt=None, flags=None, refFlags=None, forcedFlags=None):
744 self.parq = parq
745 self.functors = functors
746
747 self.filt = filt
748 self.flags = list(flags) if flags is not None else []
749 self.forcedFlags = list(forcedFlags) if forcedFlags is not None else []
750 self.refFlags = list(self._defaultRefFlags)
751 if refFlags is not None:
752 self.refFlags += list(refFlags)
753
754 self._df = None
755
756 @property
757 def defaultFuncs(self):
758 funcs = dict(self._defaultFuncs)
759 return funcs
760
761 @property
762 def func(self):
763 additionalFuncs = self.defaultFuncs
764 additionalFuncs.update({flag: Column(flag, dataset='forced_src') for flag in self.forcedFlags})
765 additionalFuncs.update({flag: Column(flag, dataset='ref') for flag in self.refFlags})
766 additionalFuncs.update({flag: Column(flag, dataset='meas') for flag in self.flags})
767
768 if isinstance(self.functors, CompositeFunctor):
769 func = self.functors
770 else:
771 func = CompositeFunctor(self.functors)
772
773 func.funcDict.update(additionalFuncs)
774 func.filt = self.filt
775
776 return func
777
778 @property
779 def noDupCols(self):
780 return [name for name, func in self.func.funcDict.items() if func.noDup or func.dataset == 'ref']
781
782 @property
783 def df(self):
784 if self._df is None:
785 self.compute()
786 return self._df
787
788 def compute(self, dropna=False, pool=None):
789 # map over multiple parquet tables
790 if type(self.parq) in (list, tuple):
791 if pool is None:
792 dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
793 else:
794 # TODO: Figure out why this doesn't work (pyarrow pickling
795 # issues?)
796 dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
797 self._df = pd.concat(dflist)
798 else:
799 self._df = self.func(self.parq, dropna=dropna)
800
801 return self._df
802
803
804class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
805 dimensions=()):
806 """Expected Connections for subclasses of TransformCatalogBaseTask.
807
808 Must be subclassed.
809 """
810 inputCatalog = connectionTypes.Input(
811 name="",
812 storageClass="DataFrame",
813 )
814 outputCatalog = connectionTypes.Output(
815 name="",
816 storageClass="DataFrame",
817 )
818
819
820class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
821 pipelineConnections=TransformCatalogBaseConnections):
822 functorFile = pexConfig.Field(
823 dtype=str,
824 doc="Path to YAML file specifying Science Data Model functors to use "
825 "when copying columns and computing calibrated values.",
826 default=None,
827 optional=True
828 )
829 primaryKey = pexConfig.Field(
830 dtype=str,
831 doc="Name of column to be set as the DataFrame index. If None, the index"
832 "will be named `id`",
833 default=None,
834 optional=True
835 )
836 columnsFromDataId = pexConfig.ListField(
837 dtype=str,
838 default=None,
839 optional=True,
840 doc="Columns to extract from the dataId",
841 )
842
843
844class TransformCatalogBaseTask(pipeBase.PipelineTask):
845 """Base class for transforming/standardizing a catalog
846
847 by applying functors that convert units and apply calibrations.
848 The purpose of this task is to perform a set of computations on
849 an input `ParquetTable` dataset (such as `deepCoadd_obj`) and write the
850 results to a new dataset (which needs to be declared in an `outputDataset`
851 attribute).
852
853 The calculations to be performed are defined in a YAML file that specifies
854 a set of functors to be computed, provided as
855 a `--functorFile` config parameter. An example of such a YAML file
856 is the following:
857
858 funcs:
859 psfMag:
860 functor: Mag
861 args:
862 - base_PsfFlux
863 filt: HSC-G
864 dataset: meas
865 cmodel_magDiff:
866 functor: MagDiff
867 args:
868 - modelfit_CModel
869 - base_PsfFlux
870 filt: HSC-G
871 gauss_magDiff:
872 functor: MagDiff
873 args:
874 - base_GaussianFlux
875 - base_PsfFlux
876 filt: HSC-G
877 count:
878 functor: Column
879 args:
880 - base_InputCount_value
881 filt: HSC-G
882 deconvolved_moments:
883 functor: DeconvolvedMoments
884 filt: HSC-G
885 dataset: forced_src
886 refFlags:
887 - calib_psfUsed
888 - merge_measurement_i
889 - merge_measurement_r
890 - merge_measurement_z
891 - merge_measurement_y
892 - merge_measurement_g
893 - base_PixelFlags_flag_inexact_psfCenter
894 - detect_isPrimary
895
896 The names for each entry under "func" will become the names of columns in
897 the output dataset. All the functors referenced are defined in
898 `lsst.pipe.tasks.functors`. Positional arguments to be passed to each
899 functor are in the `args` list, and any additional entries for each column
900 other than "functor" or "args" (e.g., `'filt'`, `'dataset'`) are treated as
901 keyword arguments to be passed to the functor initialization.
902
903 The "flags" entry is the default shortcut for `Column` functors.
904 All columns listed under "flags" will be copied to the output table
905 untransformed. They can be of any datatype.
906 In the special case of transforming a multi-level oject table with
907 band and dataset indices (deepCoadd_obj), these will be taked from the
908 `meas` dataset and exploded out per band.
909
910 There are two special shortcuts that only apply when transforming
911 multi-level Object (deepCoadd_obj) tables:
912 - The "refFlags" entry is shortcut for `Column` functor
913 taken from the `'ref'` dataset if transforming an ObjectTable.
914 - The "forcedFlags" entry is shortcut for `Column` functors.
915 taken from the ``forced_src`` dataset if transforming an ObjectTable.
916 These are expanded out per band.
917
918
919 This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
920 to organize and excecute the calculations.
921 """
922 @property
923 def _DefaultName(self):
924 raise NotImplementedError('Subclass must define "_DefaultName" attribute')
925
926 @property
927 def outputDataset(self):
928 raise NotImplementedError('Subclass must define "outputDataset" attribute')
929
930 @property
931 def inputDataset(self):
932 raise NotImplementedError('Subclass must define "inputDataset" attribute')
933
934 @property
935 def ConfigClass(self):
936 raise NotImplementedError('Subclass must define "ConfigClass" attribute')
937
938 def __init__(self, *args, **kwargs):
939 super().__init__(*args, **kwargs)
940 if self.config.functorFile:
941 self.log.info('Loading tranform functor definitions from %s',
942 self.config.functorFile)
943 self.funcsfuncs = CompositeFunctor.from_file(self.config.functorFile)
944 self.funcsfuncs.update(dict(PostprocessAnalysis._defaultFuncs))
945 else:
946 self.funcsfuncs = None
947
948 def runQuantum(self, butlerQC, inputRefs, outputRefs):
949 inputs = butlerQC.get(inputRefs)
950 if self.funcsfuncs is None:
951 raise ValueError("config.functorFile is None. "
952 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
953 result = self.runrun(parq=inputs['inputCatalog'], funcs=self.funcsfuncs,
954 dataId=outputRefs.outputCatalog.dataId.full)
955 outputs = pipeBase.Struct(outputCatalog=result)
956 butlerQC.put(outputs, outputRefs)
957
958 def run(self, parq, funcs=None, dataId=None, band=None):
959 """Do postprocessing calculations
960
961 Takes a `ParquetTable` object and dataId,
962 returns a dataframe with results of postprocessing calculations.
963
964 Parameters
965 ----------
967 ParquetTable from which calculations are done.
968 funcs : `lsst.pipe.tasks.functors.Functors`
969 Functors to apply to the table's columns
970 dataId : dict, optional
971 Used to add a `patchId` column to the output dataframe.
972 band : `str`, optional
973 Filter band that is being processed.
974
975 Returns
976 ------
977 df : `pandas.DataFrame`
978 """
979 self.log.info("Transforming/standardizing the source table dataId: %s", dataId)
980
981 df = self.transformtransform(band, parq, funcs, dataId).df
982 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
983 return df
984
985 def getFunctors(self):
986 return self.funcsfuncs
987
988 def getAnalysis(self, parq, funcs=None, band=None):
989 if funcs is None:
990 funcs = self.funcsfuncs
991 analysis = PostprocessAnalysis(parq, funcs, filt=band)
992 return analysis
993
994 def transform(self, band, parq, funcs, dataId):
995 analysis = self.getAnalysisgetAnalysis(parq, funcs=funcs, band=band)
996 df = analysis.df
997 if dataId and self.config.columnsFromDataId:
998 for key in self.config.columnsFromDataId:
999 if key in dataId:
1000 df[str(key)] = dataId[key]
1001 else:
1002 raise ValueError(f"'{key}' in config.columnsFromDataId not found in dataId: {dataId}")
1003
1004 if self.config.primaryKey:
1005 if df.index.name != self.config.primaryKey and self.config.primaryKey in df:
1006 df.reset_index(inplace=True, drop=True)
1007 df.set_index(self.config.primaryKey, inplace=True)
1008
1009 return pipeBase.Struct(
1010 df=df,
1011 analysis=analysis
1012 )
1013
1014 def write(self, df, parqRef):
1015 parqRef.put(ParquetTable(dataFrame=df), self.outputDatasetoutputDataset)
1016
1017 def writeMetadata(self, dataRef):
1018 """No metadata to write.
1019 """
1020 pass
1021
1022
1023class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections,
1024 defaultTemplates={"coaddName": "deep"},
1025 dimensions=("tract", "patch", "skymap")):
1026 inputCatalog = connectionTypes.Input(
1027 doc="The vertical concatenation of the deepCoadd_{ref|meas|forced_src} catalogs, "
1028 "stored as a DataFrame with a multi-level column index per-patch.",
1029 dimensions=("tract", "patch", "skymap"),
1030 storageClass="DataFrame",
1031 name="{coaddName}Coadd_obj",
1032 deferLoad=True,
1033 )
1034 outputCatalog = connectionTypes.Output(
1035 doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard "
1036 "data model.",
1037 dimensions=("tract", "patch", "skymap"),
1038 storageClass="DataFrame",
1039 name="objectTable"
1040 )
1041
1042
1043class TransformObjectCatalogConfig(TransformCatalogBaseConfig,
1044 pipelineConnections=TransformObjectCatalogConnections):
1045 coaddName = pexConfig.Field(
1046 dtype=str,
1047 default="deep",
1048 doc="Name of coadd"
1049 )
1050 # TODO: remove in DM-27177
1051 filterMap = pexConfig.DictField(
1052 keytype=str,
1053 itemtype=str,
1054 default={},
1055 doc=("Dictionary mapping full filter name to short one for column name munging."
1056 "These filters determine the output columns no matter what filters the "
1057 "input data actually contain."),
1058 deprecated=("Coadds are now identified by the band, so this transform is unused."
1059 "Will be removed after v22.")
1060 )
1061 outputBands = pexConfig.ListField(
1062 dtype=str,
1063 default=None,
1064 optional=True,
1065 doc=("These bands and only these bands will appear in the output,"
1066 " NaN-filled if the input does not include them."
1067 " If None, then use all bands found in the input.")
1068 )
1069 camelCase = pexConfig.Field(
1070 dtype=bool,
1071 default=False,
1072 doc=("Write per-band columns names with camelCase, else underscore "
1073 "For example: gPsFlux instead of g_PsFlux.")
1074 )
1075 multilevelOutput = pexConfig.Field(
1076 dtype=bool,
1077 default=False,
1078 doc=("Whether results dataframe should have a multilevel column index (True) or be flat "
1079 "and name-munged (False).")
1080 )
1081 goodFlags = pexConfig.ListField(
1082 dtype=str,
1083 default=[],
1084 doc=("List of 'good' flags that should be set False when populating empty tables. "
1085 "All other flags are considered to be 'bad' flags and will be set to True.")
1086 )
1087 floatFillValue = pexConfig.Field(
1088 dtype=float,
1089 default=np.nan,
1090 doc="Fill value for float fields when populating empty tables."
1091 )
1092 integerFillValue = pexConfig.Field(
1093 dtype=int,
1094 default=-1,
1095 doc="Fill value for integer fields when populating empty tables."
1096 )
1097
1098 def setDefaults(self):
1099 super().setDefaults()
1100 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Object.yaml')
1101 self.primaryKey = 'objectId'
1102 self.columnsFromDataId = ['tract', 'patch']
1103 self.goodFlags = ['calib_astrometry_used',
1104 'calib_photometry_reserved',
1105 'calib_photometry_used',
1106 'calib_psf_candidate',
1107 'calib_psf_reserved',
1108 'calib_psf_used']
1109
1110
1111class TransformObjectCatalogTask(TransformCatalogBaseTask):
1112 """Produce a flattened Object Table to match the format specified in
1113 sdm_schemas.
1114
1115 Do the same set of postprocessing calculations on all bands.
1116
1117 This is identical to `TransformCatalogBaseTask`, except for that it does
1118 the specified functor calculations for all filters present in the
1119 input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
1120 by the YAML file will be superceded.
1121 """
1122 _DefaultName = "transformObjectCatalog"
1123 ConfigClass = TransformObjectCatalogConfig
1124
1125 def run(self, parq, funcs=None, dataId=None, band=None):
1126 # NOTE: band kwarg is ignored here.
1127 dfDict = {}
1128 analysisDict = {}
1129 templateDf = pd.DataFrame()
1130
1131 if isinstance(parq, DeferredDatasetHandle):
1132 columns = parq.get(component='columns')
1133 inputBands = columns.unique(level=1).values
1134 else:
1135 inputBands = parq.columnLevelNames['band']
1136
1137 outputBands = self.config.outputBands if self.config.outputBands else inputBands
1138
1139 # Perform transform for data of filters that exist in parq.
1140 for inputBand in inputBands:
1141 if inputBand not in outputBands:
1142 self.log.info("Ignoring %s band data in the input", inputBand)
1143 continue
1144 self.log.info("Transforming the catalog of band %s", inputBand)
1145 result = self.transform(inputBand, parq, funcs, dataId)
1146 dfDict[inputBand] = result.df
1147 analysisDict[inputBand] = result.analysis
1148 if templateDf.empty:
1149 templateDf = result.df
1150
1151 # Put filler values in columns of other wanted bands
1152 for filt in outputBands:
1153 if filt not in dfDict:
1154 self.log.info("Adding empty columns for band %s", filt)
1155 dfTemp = templateDf.copy()
1156 for col in dfTemp.columns:
1157 testValue = dfTemp[col].values[0]
1158 if isinstance(testValue, (np.bool_, pd.BooleanDtype)):
1159 # Boolean flag type, check if it is a "good" flag
1160 if col in self.config.goodFlags:
1161 fillValue = False
1162 else:
1163 fillValue = True
1164 elif isinstance(testValue, numbers.Integral):
1165 # Checking numbers.Integral catches all flavors
1166 # of python, numpy, pandas, etc. integers.
1167 # We must ensure this is not an unsigned integer.
1168 if isinstance(testValue, np.unsignedinteger):
1169 raise ValueError("Parquet tables may not have unsigned integer columns.")
1170 else:
1171 fillValue = self.config.integerFillValue
1172 else:
1173 fillValue = self.config.floatFillValue
1174 dfTemp[col].values[:] = fillValue
1175 dfDict[filt] = dfTemp
1176
1177 # This makes a multilevel column index, with band as first level
1178 df = pd.concat(dfDict, axis=1, names=['band', 'column'])
1179
1180 if not self.config.multilevelOutput:
1181 noDupCols = list(set.union(*[set(v.noDupCols) for v in analysisDict.values()]))
1182 if self.config.primaryKey in noDupCols:
1183 noDupCols.remove(self.config.primaryKey)
1184 if dataId and self.config.columnsFromDataId:
1185 noDupCols += self.config.columnsFromDataId
1186 df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase,
1187 inputBands=inputBands)
1188
1189 self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
1190
1191 return df
1192
1193
1194class TractObjectDataIdContainer(CoaddDataIdContainer):
1195
1196 def makeDataRefList(self, namespace):
1197 """Make self.refList from self.idList
1198
1199 Generate a list of data references given tract and/or patch.
1200 This was adapted from `TractQADataIdContainer`, which was
1201 `TractDataIdContainer` modifie to not require "filter".
1202 Only existing dataRefs are returned.
1203 """
1204 def getPatchRefList(tract):
1205 return [namespace.butler.dataRef(datasetType=self.datasetType,
1206 tract=tract.getId(),
1207 patch="%d,%d" % patch.getIndex()) for patch in tract]
1208
1209 tractRefs = defaultdict(list) # Data references for each tract
1210 for dataId in self.idList:
1211 skymap = self.getSkymap(namespace)
1212
1213 if "tract" in dataId:
1214 tractId = dataId["tract"]
1215 if "patch" in dataId:
1216 tractRefs[tractId].append(namespace.butler.dataRef(datasetType=self.datasetType,
1217 tract=tractId,
1218 patch=dataId['patch']))
1219 else:
1220 tractRefs[tractId] += getPatchRefList(skymap[tractId])
1221 else:
1222 tractRefs = dict((tract.getId(), tractRefs.get(tract.getId(), []) + getPatchRefList(tract))
1223 for tract in skymap)
1224 outputRefList = []
1225 for tractRefList in tractRefs.values():
1226 existingRefs = [ref for ref in tractRefList if ref.datasetExists()]
1227 outputRefList.append(existingRefs)
1228
1229 self.refList = outputRefList
1230
1231
1232class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections,
1233 dimensions=("tract", "skymap")):
1234 inputCatalogs = connectionTypes.Input(
1235 doc="Per-Patch objectTables conforming to the standard data model.",
1236 name="objectTable",
1237 storageClass="DataFrame",
1238 dimensions=("tract", "patch", "skymap"),
1239 multiple=True,
1240 )
1241 outputCatalog = connectionTypes.Output(
1242 doc="Pre-tract horizontal concatenation of the input objectTables",
1243 name="objectTable_tract",
1244 storageClass="DataFrame",
1245 dimensions=("tract", "skymap"),
1246 )
1247
1248
1249class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,
1250 pipelineConnections=ConsolidateObjectTableConnections):
1251 coaddName = pexConfig.Field(
1252 dtype=str,
1253 default="deep",
1254 doc="Name of coadd"
1255 )
1256
1257
1258class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
1259 """Write patch-merged source tables to a tract-level parquet file.
1260
1261 Concatenates `objectTable` list into a per-visit `objectTable_tract`.
1262 """
1263 _DefaultName = "consolidateObjectTable"
1264 ConfigClass = ConsolidateObjectTableConfig
1265
1266 inputDataset = 'objectTable'
1267 outputDataset = 'objectTable_tract'
1268
1269 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1270 inputs = butlerQC.get(inputRefs)
1271 self.log.info("Concatenating %s per-patch Object Tables",
1272 len(inputs['inputCatalogs']))
1273 df = pd.concat(inputs['inputCatalogs'])
1274 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1275
1276 @classmethod
1277 def _makeArgumentParser(cls):
1278 parser = ArgumentParser(name=cls._DefaultName)
1279
1280 parser.add_id_argument("--id", cls.inputDataset,
1281 help="data ID, e.g. --id tract=12345",
1282 ContainerClass=TractObjectDataIdContainer)
1283 return parser
1284
1285 def runDataRef(self, patchRefList):
1286 df = pd.concat([patchRef.get().toDataFrame() for patchRef in patchRefList])
1287 patchRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1288
1289 def writeMetadata(self, dataRef):
1290 """No metadata to write.
1291 """
1292 pass
1293
1294
1295class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
1296 defaultTemplates={"catalogType": ""},
1297 dimensions=("instrument", "visit", "detector")):
1298
1299 inputCatalog = connectionTypes.Input(
1300 doc="Wide input catalog of sources produced by WriteSourceTableTask",
1301 name="{catalogType}source",
1302 storageClass="DataFrame",
1303 dimensions=("instrument", "visit", "detector"),
1304 deferLoad=True
1305 )
1306 outputCatalog = connectionTypes.Output(
1307 doc="Narrower, per-detector Source Table transformed and converted per a "
1308 "specified set of functors",
1309 name="{catalogType}sourceTable",
1310 storageClass="DataFrame",
1311 dimensions=("instrument", "visit", "detector")
1312 )
1313
1314
1315class TransformSourceTableConfig(TransformCatalogBaseConfig,
1316 pipelineConnections=TransformSourceTableConnections):
1317
1318 def setDefaults(self):
1319 super().setDefaults()
1320 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'Source.yaml')
1321 self.primaryKey = 'sourceId'
1322 self.columnsFromDataId = ['visit', 'detector', 'band', 'physical_filter']
1323
1324
1325class TransformSourceTableTask(TransformCatalogBaseTask):
1326 """Transform/standardize a source catalog
1327 """
1328 _DefaultName = "transformSourceTable"
1329 ConfigClass = TransformSourceTableConfig
1330
1331
1332class ConsolidateVisitSummaryConnections(pipeBase.PipelineTaskConnections,
1333 dimensions=("instrument", "visit",),
1334 defaultTemplates={"calexpType": ""}):
1335 calexp = connectionTypes.Input(
1336 doc="Processed exposures used for metadata",
1337 name="{calexpType}calexp",
1338 storageClass="ExposureF",
1339 dimensions=("instrument", "visit", "detector"),
1340 deferLoad=True,
1341 multiple=True,
1342 )
1343 visitSummary = connectionTypes.Output(
1344 doc=("Per-visit consolidated exposure metadata. These catalogs use "
1345 "detector id for the id and are sorted for fast lookups of a "
1346 "detector."),
1347 name="{calexpType}visitSummary",
1348 storageClass="ExposureCatalog",
1349 dimensions=("instrument", "visit"),
1350 )
1351
1352
1353class ConsolidateVisitSummaryConfig(pipeBase.PipelineTaskConfig,
1354 pipelineConnections=ConsolidateVisitSummaryConnections):
1355 """Config for ConsolidateVisitSummaryTask"""
1356 pass
1357
1358
1359class ConsolidateVisitSummaryTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
1360 """Task to consolidate per-detector visit metadata.
1361
1362 This task aggregates the following metadata from all the detectors in a
1363 single visit into an exposure catalog:
1364 - The visitInfo.
1365 - The wcs.
1366 - The photoCalib.
1367 - The physical_filter and band (if available).
1368 - The psf size, shape, and effective area at the center of the detector.
1369 - The corners of the bounding box in right ascension/declination.
1370
1371 Other quantities such as Detector, Psf, ApCorrMap, and TransmissionCurve
1372 are not persisted here because of storage concerns, and because of their
1373 limited utility as summary statistics.
1374
1375 Tests for this task are performed in ci_hsc_gen3.
1376 """
1377 _DefaultName = "consolidateVisitSummary"
1378 ConfigClass = ConsolidateVisitSummaryConfig
1379
1380 @classmethod
1381 def _makeArgumentParser(cls):
1382 parser = ArgumentParser(name=cls._DefaultName)
1383
1384 parser.add_id_argument("--id", "calexp",
1385 help="data ID, e.g. --id visit=12345",
1386 ContainerClass=VisitDataIdContainer)
1387 return parser
1388
1389 def writeMetadata(self, dataRef):
1390 """No metadata to persist, so override to remove metadata persistance.
1391 """
1392 pass
1393
1394 def writeConfig(self, butler, clobber=False, doBackup=True):
1395 """No config to persist, so override to remove config persistance.
1396 """
1397 pass
1398
1399 def runDataRef(self, dataRefList):
1400 visit = dataRefList[0].dataId['visit']
1401
1402 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1403 len(dataRefList), visit)
1404
1405 expCatalog = self._combineExposureMetadata(visit, dataRefList, isGen3=False)
1406
1407 dataRefList[0].put(expCatalog, 'visitSummary', visit=visit)
1408
1409 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1410 dataRefs = butlerQC.get(inputRefs.calexp)
1411 visit = dataRefs[0].dataId.byName()['visit']
1412
1413 self.log.debug("Concatenating metadata from %d per-detector calexps (visit %d)",
1414 len(dataRefs), visit)
1415
1416 expCatalog = self._combineExposureMetadata(visit, dataRefs)
1417
1418 butlerQC.put(expCatalog, outputRefs.visitSummary)
1419
1420 def _combineExposureMetadata(self, visit, dataRefs, isGen3=True):
1421 """Make a combined exposure catalog from a list of dataRefs.
1422 These dataRefs must point to exposures with wcs, summaryStats,
1423 and other visit metadata.
1424
1425 Parameters
1426 ----------
1427 visit : `int`
1428 Visit identification number.
1429 dataRefs : `list`
1430 List of dataRefs in visit. May be list of
1432 `lsst.daf.butler.DeferredDatasetHandle` (Gen3).
1433 isGen3 : `bool`, optional
1434 Specifies if this is a Gen3 list of datarefs.
1435
1436 Returns
1437 -------
1438 visitSummary : `lsst.afw.table.ExposureCatalog`
1439 Exposure catalog with per-detector summary information.
1440 """
1441 schema = self._makeVisitSummarySchema()
1442 cat = afwTable.ExposureCatalog(schema)
1443 cat.resize(len(dataRefs))
1444
1445 cat['visit'] = visit
1446
1447 for i, dataRef in enumerate(dataRefs):
1448 if isGen3:
1449 visitInfo = dataRef.get(component='visitInfo')
1450 filterLabel = dataRef.get(component='filter')
1451 summaryStats = dataRef.get(component='summaryStats')
1452 detector = dataRef.get(component='detector')
1453 wcs = dataRef.get(component='wcs')
1454 photoCalib = dataRef.get(component='photoCalib')
1455 detector = dataRef.get(component='detector')
1456 bbox = dataRef.get(component='bbox')
1457 validPolygon = dataRef.get(component='validPolygon')
1458 else:
1459 # Note that we need to read the calexp because there is
1460 # no magic access to the psf except through the exposure.
1461 gen2_read_bbox = lsst.geom.BoxI(lsst.geom.PointI(0, 0), lsst.geom.PointI(1, 1))
1462 exp = dataRef.get(datasetType='calexp_sub', bbox=gen2_read_bbox)
1463 visitInfo = exp.getInfo().getVisitInfo()
1464 filterLabel = dataRef.get("calexp_filter")
1465 summaryStats = exp.getInfo().getSummaryStats()
1466 wcs = exp.getWcs()
1467 photoCalib = exp.getPhotoCalib()
1468 detector = exp.getDetector()
1469 bbox = dataRef.get(datasetType='calexp_bbox')
1470 validPolygon = exp.getInfo().getValidPolygon()
1471
1472 rec = cat[i]
1473 rec.setBBox(bbox)
1474 rec.setVisitInfo(visitInfo)
1475 rec.setWcs(wcs)
1476 rec.setPhotoCalib(photoCalib)
1477 rec.setValidPolygon(validPolygon)
1478
1479 rec['physical_filter'] = filterLabel.physicalLabel if filterLabel.hasPhysicalLabel() else ""
1480 rec['band'] = filterLabel.bandLabel if filterLabel.hasBandLabel() else ""
1481 rec.setId(detector.getId())
1482 rec['psfSigma'] = summaryStats.psfSigma
1483 rec['psfIxx'] = summaryStats.psfIxx
1484 rec['psfIyy'] = summaryStats.psfIyy
1485 rec['psfIxy'] = summaryStats.psfIxy
1486 rec['psfArea'] = summaryStats.psfArea
1487 rec['raCorners'][:] = summaryStats.raCorners
1488 rec['decCorners'][:] = summaryStats.decCorners
1489 rec['ra'] = summaryStats.ra
1490 rec['decl'] = summaryStats.decl
1491 rec['zenithDistance'] = summaryStats.zenithDistance
1492 rec['zeroPoint'] = summaryStats.zeroPoint
1493 rec['skyBg'] = summaryStats.skyBg
1494 rec['skyNoise'] = summaryStats.skyNoise
1495 rec['meanVar'] = summaryStats.meanVar
1496 rec['astromOffsetMean'] = summaryStats.astromOffsetMean
1497 rec['astromOffsetStd'] = summaryStats.astromOffsetStd
1498 rec['nPsfStar'] = summaryStats.nPsfStar
1499 rec['psfStarDeltaE1Median'] = summaryStats.psfStarDeltaE1Median
1500 rec['psfStarDeltaE2Median'] = summaryStats.psfStarDeltaE2Median
1501 rec['psfStarDeltaE1Scatter'] = summaryStats.psfStarDeltaE1Scatter
1502 rec['psfStarDeltaE2Scatter'] = summaryStats.psfStarDeltaE2Scatter
1503 rec['psfStarDeltaSizeMedian'] = summaryStats.psfStarDeltaSizeMedian
1504 rec['psfStarDeltaSizeScatter'] = summaryStats.psfStarDeltaSizeScatter
1505 rec['psfStarScaledDeltaSizeScatter'] = summaryStats.psfStarScaledDeltaSizeScatter
1506
1507 metadata = dafBase.PropertyList()
1508 metadata.add("COMMENT", "Catalog id is detector id, sorted.")
1509 # We are looping over existing datarefs, so the following is true
1510 metadata.add("COMMENT", "Only detectors with data have entries.")
1511 cat.setMetadata(metadata)
1512
1513 cat.sort()
1514 return cat
1515
1516 def _makeVisitSummarySchema(self):
1517 """Make the schema for the visitSummary catalog."""
1518 schema = afwTable.ExposureTable.makeMinimalSchema()
1519 schema.addField('visit', type='L', doc='Visit number')
1520 schema.addField('physical_filter', type='String', size=32, doc='Physical filter')
1521 schema.addField('band', type='String', size=32, doc='Name of band')
1522 schema.addField('psfSigma', type='F',
1523 doc='PSF model second-moments determinant radius (center of chip) (pixel)')
1524 schema.addField('psfArea', type='F',
1525 doc='PSF model effective area (center of chip) (pixel**2)')
1526 schema.addField('psfIxx', type='F',
1527 doc='PSF model Ixx (center of chip) (pixel**2)')
1528 schema.addField('psfIyy', type='F',
1529 doc='PSF model Iyy (center of chip) (pixel**2)')
1530 schema.addField('psfIxy', type='F',
1531 doc='PSF model Ixy (center of chip) (pixel**2)')
1532 schema.addField('raCorners', type='ArrayD', size=4,
1533 doc='Right Ascension of bounding box corners (degrees)')
1534 schema.addField('decCorners', type='ArrayD', size=4,
1535 doc='Declination of bounding box corners (degrees)')
1536 schema.addField('ra', type='D',
1537 doc='Right Ascension of bounding box center (degrees)')
1538 schema.addField('decl', type='D',
1539 doc='Declination of bounding box center (degrees)')
1540 schema.addField('zenithDistance', type='F',
1541 doc='Zenith distance of bounding box center (degrees)')
1542 schema.addField('zeroPoint', type='F',
1543 doc='Mean zeropoint in detector (mag)')
1544 schema.addField('skyBg', type='F',
1545 doc='Average sky background (ADU)')
1546 schema.addField('skyNoise', type='F',
1547 doc='Average sky noise (ADU)')
1548 schema.addField('meanVar', type='F',
1549 doc='Mean variance of the weight plane (ADU**2)')
1550 schema.addField('astromOffsetMean', type='F',
1551 doc='Mean offset of astrometric calibration matches (arcsec)')
1552 schema.addField('astromOffsetStd', type='F',
1553 doc='Standard deviation of offsets of astrometric calibration matches (arcsec)')
1554 schema.addField('nPsfStar', type='I', doc='Number of stars used for PSF model')
1555 schema.addField('psfStarDeltaE1Median', type='F',
1556 doc='Median E1 residual (starE1 - psfE1) for psf stars')
1557 schema.addField('psfStarDeltaE2Median', type='F',
1558 doc='Median E2 residual (starE2 - psfE2) for psf stars')
1559 schema.addField('psfStarDeltaE1Scatter', type='F',
1560 doc='Scatter (via MAD) of E1 residual (starE1 - psfE1) for psf stars')
1561 schema.addField('psfStarDeltaE2Scatter', type='F',
1562 doc='Scatter (via MAD) of E2 residual (starE2 - psfE2) for psf stars')
1563 schema.addField('psfStarDeltaSizeMedian', type='F',
1564 doc='Median size residual (starSize - psfSize) for psf stars (pixel)')
1565 schema.addField('psfStarDeltaSizeScatter', type='F',
1566 doc='Scatter (via MAD) of size residual (starSize - psfSize) for psf stars (pixel)')
1567 schema.addField('psfStarScaledDeltaSizeScatter', type='F',
1568 doc='Scatter (via MAD) of size residual scaled by median size squared')
1569
1570 return schema
1571
1572
1573class VisitDataIdContainer(DataIdContainer):
1574 """DataIdContainer that groups sensor-level ids by visit.
1575 """
1576
1577 def makeDataRefList(self, namespace):
1578 """Make self.refList from self.idList
1579
1580 Generate a list of data references grouped by visit.
1581
1582 Parameters
1583 ----------
1584 namespace : `argparse.Namespace`
1585 Namespace used by `lsst.pipe.base.CmdLineTask` to parse command
1586 line arguments.
1587 """
1588 # Group by visits
1589 visitRefs = defaultdict(list)
1590 for dataId in self.idList:
1591 if "visit" in dataId:
1592 visitId = dataId["visit"]
1593 # append all subsets to
1594 subset = namespace.butler.subset(self.datasetType, dataId=dataId)
1595 visitRefs[visitId].extend([dataRef for dataRef in subset])
1596
1597 outputRefList = []
1598 for refList in visitRefs.values():
1599 existingRefs = [ref for ref in refList if ref.datasetExists()]
1600 if existingRefs:
1601 outputRefList.append(existingRefs)
1602
1603 self.refList = outputRefList
1604
1605
1606class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
1607 defaultTemplates={"catalogType": ""},
1608 dimensions=("instrument", "visit")):
1609 inputCatalogs = connectionTypes.Input(
1610 doc="Input per-detector Source Tables",
1611 name="{catalogType}sourceTable",
1612 storageClass="DataFrame",
1613 dimensions=("instrument", "visit", "detector"),
1614 multiple=True
1615 )
1616 outputCatalog = connectionTypes.Output(
1617 doc="Per-visit concatenation of Source Table",
1618 name="{catalogType}sourceTable_visit",
1619 storageClass="DataFrame",
1620 dimensions=("instrument", "visit")
1621 )
1622
1623
1624class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
1625 pipelineConnections=ConsolidateSourceTableConnections):
1626 pass
1627
1628
1629class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
1630 """Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
1631 """
1632 _DefaultName = 'consolidateSourceTable'
1633 ConfigClass = ConsolidateSourceTableConfig
1634
1635 inputDataset = 'sourceTable'
1636 outputDataset = 'sourceTable_visit'
1637
1638 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1639 from .makeCoaddTempExp import reorderRefs
1640
1641 detectorOrder = [ref.dataId['detector'] for ref in inputRefs.inputCatalogs]
1642 detectorOrder.sort()
1643 inputRefs = reorderRefs(inputRefs, detectorOrder, dataIdKey='detector')
1644 inputs = butlerQC.get(inputRefs)
1645 self.log.info("Concatenating %s per-detector Source Tables",
1646 len(inputs['inputCatalogs']))
1647 df = pd.concat(inputs['inputCatalogs'])
1648 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
1649
1650 def runDataRef(self, dataRefList):
1651 self.log.info("Concatenating %s per-detector Source Tables", len(dataRefList))
1652 df = pd.concat([dataRef.get().toDataFrame() for dataRef in dataRefList])
1653 dataRefList[0].put(ParquetTable(dataFrame=df), self.outputDataset)
1654
1655 @classmethod
1656 def _makeArgumentParser(cls):
1657 parser = ArgumentParser(name=cls._DefaultName)
1658
1659 parser.add_id_argument("--id", cls.inputDataset,
1660 help="data ID, e.g. --id visit=12345",
1661 ContainerClass=VisitDataIdContainer)
1662 return parser
1663
1664 def writeMetadata(self, dataRef):
1665 """No metadata to write.
1666 """
1667 pass
1668
1669 def writeConfig(self, butler, clobber=False, doBackup=True):
1670 """No config to write.
1671 """
1672 pass
1673
1674
1675class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections,
1676 dimensions=("instrument",),
1677 defaultTemplates={"calexpType": ""}):
1678 visitSummaryRefs = connectionTypes.Input(
1679 doc="Data references for per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1680 name="{calexpType}visitSummary",
1681 storageClass="ExposureCatalog",
1682 dimensions=("instrument", "visit"),
1683 multiple=True,
1684 deferLoad=True,
1685 )
1686 outputCatalog = connectionTypes.Output(
1687 doc="CCD and Visit metadata table",
1688 name="ccdVisitTable",
1689 storageClass="DataFrame",
1690 dimensions=("instrument",)
1691 )
1692
1693
1694class MakeCcdVisitTableConfig(pipeBase.PipelineTaskConfig,
1695 pipelineConnections=MakeCcdVisitTableConnections):
1696 pass
1697
1698
1699class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1700 """Produce a `ccdVisitTable` from the `visitSummary` exposure catalogs.
1701 """
1702 _DefaultName = 'makeCcdVisitTable'
1703 ConfigClass = MakeCcdVisitTableConfig
1704
1705 def run(self, visitSummaryRefs):
1706 """Make a table of ccd information from the `visitSummary` catalogs.
1707
1708 Parameters
1709 ----------
1710 visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
1711 List of DeferredDatasetHandles pointing to exposure catalogs with
1712 per-detector summary information.
1713
1714 Returns
1715 -------
1716 result : `lsst.pipe.Base.Struct`
1717 Results struct with attribute:
1718
1719 ``outputCatalog``
1720 Catalog of ccd and visit information.
1721 """
1722 ccdEntries = []
1723 for visitSummaryRef in visitSummaryRefs:
1724 visitSummary = visitSummaryRef.get()
1725 visitInfo = visitSummary[0].getVisitInfo()
1726
1727 ccdEntry = {}
1728 summaryTable = visitSummary.asAstropy()
1729 selectColumns = ['id', 'visit', 'physical_filter', 'band', 'ra', 'decl', 'zenithDistance',
1730 'zeroPoint', 'psfSigma', 'skyBg', 'skyNoise',
1731 'astromOffsetMean', 'astromOffsetStd', 'nPsfStar',
1732 'psfStarDeltaE1Median', 'psfStarDeltaE2Median',
1733 'psfStarDeltaE1Scatter', 'psfStarDeltaE2Scatter',
1734 'psfStarDeltaSizeMedian', 'psfStarDeltaSizeScatter',
1735 'psfStarScaledDeltaSizeScatter']
1736 ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
1737 # 'visit' is the human readable visit number.
1738 # 'visitId' is the key to the visitId table. They are the same.
1739 # Technically you should join to get the visit from the visit
1740 # table.
1741 ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
1742 dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id) for id in
1743 summaryTable['id']]
1744 packer = visitSummaryRef.dataId.universe.makePacker('visit_detector', visitSummaryRef.dataId)
1745 ccdVisitIds = [packer.pack(dataId) for dataId in dataIds]
1746 ccdEntry['ccdVisitId'] = ccdVisitIds
1747 ccdEntry['detector'] = summaryTable['id']
1748 pixToArcseconds = np.array([vR.getWcs().getPixelScale().asArcseconds() for vR in visitSummary])
1749 ccdEntry["seeing"] = visitSummary['psfSigma'] * np.sqrt(8 * np.log(2)) * pixToArcseconds
1750
1751 ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1752 ccdEntry["expMidpt"] = visitInfo.getDate().toPython()
1753 ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1754 expTime = visitInfo.getExposureTime()
1755 ccdEntry['expTime'] = expTime
1756 ccdEntry["obsStart"] = ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1757 expTime_days = expTime / (60*60*24)
1758 ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days
1759 ccdEntry['darkTime'] = visitInfo.getDarkTime()
1760 ccdEntry['xSize'] = summaryTable['bbox_max_x'] - summaryTable['bbox_min_x']
1761 ccdEntry['ySize'] = summaryTable['bbox_max_y'] - summaryTable['bbox_min_y']
1762 ccdEntry['llcra'] = summaryTable['raCorners'][:, 0]
1763 ccdEntry['llcdec'] = summaryTable['decCorners'][:, 0]
1764 ccdEntry['ulcra'] = summaryTable['raCorners'][:, 1]
1765 ccdEntry['ulcdec'] = summaryTable['decCorners'][:, 1]
1766 ccdEntry['urcra'] = summaryTable['raCorners'][:, 2]
1767 ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
1768 ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
1769 ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
1770 # TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
1771 # and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
1772 # values are actually wanted.
1773 ccdEntries.append(ccdEntry)
1774
1775 outputCatalog = pd.concat(ccdEntries)
1776 outputCatalog.set_index('ccdVisitId', inplace=True, verify_integrity=True)
1777 return pipeBase.Struct(outputCatalog=outputCatalog)
1778
1779
1780class MakeVisitTableConnections(pipeBase.PipelineTaskConnections,
1781 dimensions=("instrument",),
1782 defaultTemplates={"calexpType": ""}):
1783 visitSummaries = connectionTypes.Input(
1784 doc="Per-visit consolidated exposure metadata from ConsolidateVisitSummaryTask",
1785 name="{calexpType}visitSummary",
1786 storageClass="ExposureCatalog",
1787 dimensions=("instrument", "visit",),
1788 multiple=True,
1789 deferLoad=True,
1790 )
1791 outputCatalog = connectionTypes.Output(
1792 doc="Visit metadata table",
1793 name="visitTable",
1794 storageClass="DataFrame",
1795 dimensions=("instrument",)
1796 )
1797
1798
1799class MakeVisitTableConfig(pipeBase.PipelineTaskConfig,
1800 pipelineConnections=MakeVisitTableConnections):
1801 pass
1802
1803
1804class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
1805 """Produce a `visitTable` from the `visitSummary` exposure catalogs.
1806 """
1807 _DefaultName = 'makeVisitTable'
1808 ConfigClass = MakeVisitTableConfig
1809
1810 def run(self, visitSummaries):
1811 """Make a table of visit information from the `visitSummary` catalogs.
1812
1813 Parameters
1814 ----------
1815 visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
1816 List of exposure catalogs with per-detector summary information.
1817 Returns
1818 -------
1819 result : `lsst.pipe.Base.Struct`
1820 Results struct with attribute:
1821
1822 ``outputCatalog``
1823 Catalog of visit information.
1824 """
1825 visitEntries = []
1826 for visitSummary in visitSummaries:
1827 visitSummary = visitSummary.get()
1828 visitRow = visitSummary[0]
1829 visitInfo = visitRow.getVisitInfo()
1830
1831 visitEntry = {}
1832 visitEntry["visitId"] = visitRow['visit']
1833 visitEntry["visit"] = visitRow['visit']
1834 visitEntry["physical_filter"] = visitRow['physical_filter']
1835 visitEntry["band"] = visitRow['band']
1836 raDec = visitInfo.getBoresightRaDec()
1837 visitEntry["ra"] = raDec.getRa().asDegrees()
1838 visitEntry["decl"] = raDec.getDec().asDegrees()
1839 visitEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees()
1840 azAlt = visitInfo.getBoresightAzAlt()
1841 visitEntry["azimuth"] = azAlt.getLongitude().asDegrees()
1842 visitEntry["altitude"] = azAlt.getLatitude().asDegrees()
1843 visitEntry["zenithDistance"] = 90 - azAlt.getLatitude().asDegrees()
1844 visitEntry["airmass"] = visitInfo.getBoresightAirmass()
1845 expTime = visitInfo.getExposureTime()
1846 visitEntry["expTime"] = expTime
1847 visitEntry["expMidpt"] = visitInfo.getDate().toPython()
1848 visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD)
1849 visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime)
1850 expTime_days = expTime / (60*60*24)
1851 visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
1852 visitEntries.append(visitEntry)
1853
1854 # TODO: DM-30623, Add programId, exposureType, cameraTemp,
1855 # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
1856 # dimmSeeing, pwvGPS, pwvMW, flags, nExposures.
1857
1858 outputCatalog = pd.DataFrame(data=visitEntries)
1859 outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
1860 return pipeBase.Struct(outputCatalog=outputCatalog)
1861
1862
1863class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1864 dimensions=("instrument", "visit", "detector", "skymap", "tract")):
1865
1866 inputCatalog = connectionTypes.Input(
1867 doc="Primary per-detector, single-epoch forced-photometry catalog. "
1868 "By default, it is the output of ForcedPhotCcdTask on calexps",
1869 name="forced_src",
1870 storageClass="SourceCatalog",
1871 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1872 )
1873 inputCatalogDiff = connectionTypes.Input(
1874 doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
1875 "By default, it is the output of ForcedPhotCcdTask run on image differences.",
1876 name="forced_diff",
1877 storageClass="SourceCatalog",
1878 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1879 )
1880 outputCatalog = connectionTypes.Output(
1881 doc="InputCatalogs horizonatally joined on `objectId` in Parquet format",
1882 name="mergedForcedSource",
1883 storageClass="DataFrame",
1884 dimensions=("instrument", "visit", "detector", "skymap", "tract")
1885 )
1886
1887
1888class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
1889 pipelineConnections=WriteForcedSourceTableConnections):
1891 doc="Column on which to join the two input tables on and make the primary key of the output",
1892 dtype=str,
1893 default="objectId",
1894 )
1895
1896
1897class WriteForcedSourceTableTask(pipeBase.PipelineTask):
1898 """Merge and convert per-detector forced source catalogs to parquet.
1899
1900 Because the predecessor ForcedPhotCcdTask operates per-detector,
1901 per-tract, (i.e., it has tract in its dimensions), detectors
1902 on the tract boundary may have multiple forced source catalogs.
1903
1904 The successor task TransformForcedSourceTable runs per-patch
1905 and temporally-aggregates overlapping mergedForcedSource catalogs from all
1906 available multiple epochs.
1907 """
1908 _DefaultName = "writeForcedSourceTable"
1909 ConfigClass = WriteForcedSourceTableConfig
1910
1911 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1912 inputs = butlerQC.get(inputRefs)
1913 # Add ccdVisitId to allow joining with CcdVisitTable
1914 inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
1915 inputs['band'] = butlerQC.quantum.dataId.full['band']
1916 outputs = self.run(**inputs)
1917 butlerQC.put(outputs, outputRefs)
1918
1919 def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
1920 dfs = []
1921 for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
1922 df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
1923 df = df.reindex(sorted(df.columns), axis=1)
1924 df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
1925 df['band'] = band if band else pd.NA
1926 df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
1927 names=('dataset', 'column'))
1928
1929 dfs.append(df)
1930
1931 outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
1932 return pipeBase.Struct(outputCatalog=outputCatalog)
1933
1934
1935class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
1936 dimensions=("instrument", "skymap", "patch", "tract")):
1937
1938 inputCatalogs = connectionTypes.Input(
1939 doc="Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
1940 name="mergedForcedSource",
1941 storageClass="DataFrame",
1942 dimensions=("instrument", "visit", "detector", "skymap", "tract"),
1943 multiple=True,
1944 deferLoad=True
1945 )
1946 referenceCatalog = connectionTypes.Input(
1947 doc="Reference catalog which was used to seed the forcedPhot. Columns "
1948 "objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
1949 "are expected.",
1950 name="objectTable",
1951 storageClass="DataFrame",
1952 dimensions=("tract", "patch", "skymap"),
1953 deferLoad=True
1954 )
1955 outputCatalog = connectionTypes.Output(
1956 doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
1957 "specified set of functors",
1958 name="forcedSourceTable",
1959 storageClass="DataFrame",
1960 dimensions=("tract", "patch", "skymap")
1961 )
1962
1963
1964class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
1965 pipelineConnections=TransformForcedSourceTableConnections):
1966 referenceColumns = pexConfig.ListField(
1967 dtype=str,
1968 default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
1969 optional=True,
1970 doc="Columns to pull from reference catalog",
1971 )
1972 keyRef = lsst.pex.config.Field(
1973 doc="Column on which to join the two input tables on and make the primary key of the output",
1974 dtype=str,
1975 default="objectId",
1976 )
1978 doc="Rename the output DataFrame index to this name",
1979 dtype=str,
1980 default="forcedSourceId",
1981 )
1982
1983 def setDefaults(self):
1984 super().setDefaults()
1985 self.functorFile = os.path.join('$PIPE_TASKS_DIR', 'schemas', 'ForcedSource.yaml')
1986 self.columnsFromDataId = ['tract', 'patch']
1987
1988
1989class TransformForcedSourceTableTask(TransformCatalogBaseTask):
1990 """Transform/standardize a ForcedSource catalog
1991
1992 Transforms each wide, per-detector forcedSource parquet table per the
1993 specification file (per-camera defaults found in ForcedSource.yaml).
1994 All epochs that overlap the patch are aggregated into one per-patch
1995 narrow-parquet file.
1996
1997 No de-duplication of rows is performed. Duplicate resolutions flags are
1998 pulled in from the referenceCatalog: `detect_isPrimary`,
1999 `detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
2000 for analysis or compare duplicates for QA.
2001
2002 The resulting table includes multiple bands. Epochs (MJDs) and other useful
2003 per-visit rows can be retreived by joining with the CcdVisitTable on
2004 ccdVisitId.
2005 """
2006 _DefaultName = "transformForcedSourceTable"
2007 ConfigClass = TransformForcedSourceTableConfig
2008
2009 def runQuantum(self, butlerQC, inputRefs, outputRefs):
2010 inputs = butlerQC.get(inputRefs)
2011 if self.funcs is None:
2012 raise ValueError("config.functorFile is None. "
2013 "Must be a valid path to yaml in order to run Task as a PipelineTask.")
2014 outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
2015 dataId=outputRefs.outputCatalog.dataId.full)
2016
2017 butlerQC.put(outputs, outputRefs)
2018
2019 def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
2020 dfs = []
2021 ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
2022 self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
2023 for handle in inputCatalogs:
2024 result = self.transform(None, handle, funcs, dataId)
2025 # Filter for only rows that were detected on (overlap) the patch
2026 dfs.append(result.df.join(ref, how='inner'))
2027
2028 outputCatalog = pd.concat(dfs)
2029
2030 # Now that we are done joining on config.keyRef
2031 # Change index to config.key by
2032 outputCatalog.index.rename(self.config.keyRef, inplace=True)
2033 # Add config.keyRef to the column list
2034 outputCatalog.reset_index(inplace=True)
2035 # Set the forcedSourceId to the index. This is specified in the
2036 # ForcedSource.yaml
2037 outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
2038 # Rename it to the config.key
2039 outputCatalog.index.rename(self.config.key, inplace=True)
2040
2041 self.log.info("Made a table of %d columns and %d rows",
2042 len(outputCatalog.columns), len(outputCatalog))
2043 return pipeBase.Struct(outputCatalog=outputCatalog)
2044
2045
2046class ConsolidateTractConnections(pipeBase.PipelineTaskConnections,
2047 defaultTemplates={"catalogType": ""},
2048 dimensions=("instrument", "tract")):
2049 inputCatalogs = connectionTypes.Input(
2050 doc="Input per-patch DataFrame Tables to be concatenated",
2051 name="{catalogType}ForcedSourceTable",
2052 storageClass="DataFrame",
2053 dimensions=("tract", "patch", "skymap"),
2054 multiple=True,
2055 )
2056
2057 outputCatalog = connectionTypes.Output(
2058 doc="Output per-tract concatenation of DataFrame Tables",
2059 name="{catalogType}ForcedSourceTable_tract",
2060 storageClass="DataFrame",
2061 dimensions=("tract", "skymap"),
2062 )
2063
2064
2065class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,
2066 pipelineConnections=ConsolidateTractConnections):
2067 pass
2068
2069
2070class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
2071 """Concatenate any per-patch, dataframe list into a single
2072 per-tract DataFrame.
2073 """
2074 _DefaultName = 'ConsolidateTract'
2075 ConfigClass = ConsolidateTractConfig
2076
2077 def runQuantum(self, butlerQC, inputRefs, outputRefs):
2078 inputs = butlerQC.get(inputRefs)
2079 # Not checking at least one inputCatalog exists because that'd be an
2080 # empty QG.
2081 self.log.info("Concatenating %s per-patch %s Tables",
2082 len(inputs['inputCatalogs']),
2083 inputRefs.inputCatalogs[0].datasetType.name)
2084 df = pd.concat(inputs['inputCatalogs'])
2085 butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)
table::Key< int > type
Definition: Detector.cc:163
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
Custom catalog class for ExposureRecord/Table.
Definition: Exposure.h:311
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
An integer coordinate rectangle.
Definition: Box.h:55
def getAnalysis(self, parq, funcs=None, band=None)
Definition: postprocess.py:988
def transform(self, band, parq, funcs, dataId)
Definition: postprocess.py:994
def run(self, parq, funcs=None, dataId=None, band=None)
Definition: postprocess.py:958
def runQuantum(self, butlerQC, inputRefs, outputRefs)
Definition: postprocess.py:948
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Definition: getTemplate.py:596
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None)
Definition: postprocess.py:50