LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
mergeMeasurements.py
Go to the documentation of this file.
1#!/usr/bin/env python
2#
3# LSST Data Management System
4# Copyright 2008-2015 AURA/LSST.
5#
6# This product includes software developed by the
7# LSST Project (http://www.lsst.org/).
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the LSST License Statement and
20# the GNU General Public License along with this program. If not,
21# see <https://www.lsstcorp.org/LegalNotices/>.
22#
23import numpy
24
25from .multiBandUtils import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26 getInputSchema, readCatalog)
27
28
29import lsst.afw.table as afwTable
30import lsst.pex.config as pexConfig
31import lsst.pipe.base as pipeBase
32
33from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
34import lsst.pipe.base.connectionTypes as cT
35
36
37class MergeMeasurementsConnections(PipelineTaskConnections,
38 dimensions=("skymap", "tract", "patch"),
39 defaultTemplates={"inputCoaddName": "deep",
40 "outputCoaddName": "deep"}):
41 inputSchema = cT.InitInput(
42 doc="Schema for the output merged measurement catalog.",
43 name="{inputCoaddName}Coadd_meas_schema",
44 storageClass="SourceCatalog",
45 )
46 outputSchema = cT.InitOutput(
47 doc="Schema for the output merged measurement catalog.",
48 name="{outputCoaddName}Coadd_ref_schema",
49 storageClass="SourceCatalog",
50 )
51 catalogs = cT.Input(
52 doc="Input catalogs to merge.",
53 name="{inputCoaddName}Coadd_meas",
54 multiple=True,
55 storageClass="SourceCatalog",
56 dimensions=["band", "skymap", "tract", "patch"],
57 )
58 mergedCatalog = cT.Output(
59 doc="Output merged catalog.",
60 name="{outputCoaddName}Coadd_ref",
61 storageClass="SourceCatalog",
62 dimensions=["skymap", "tract", "patch"],
63 )
64
65
66class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
67 """!
68 @anchor MergeMeasurementsConfig_
69
70 @brief Configuration parameters for the MergeMeasurementsTask
71 """
72 pseudoFilterList = pexConfig.ListField(
73 dtype=str,
74 default=["sky"],
75 doc="Names of filters which may have no associated detection\n"
76 "(N.b. should include MergeDetectionsConfig.skyFilterName)"
77 )
78 snName = pexConfig.Field(
79 dtype=str,
80 default="base_PsfFlux",
81 doc="Name of flux measurement for calculating the S/N when choosing the reference band."
82 )
83 minSN = pexConfig.Field(
84 dtype=float,
85 default=10.,
86 doc="If the S/N from the priority band is below this value (and the S/N "
87 "is larger than minSNDiff compared to the priority band), use the band with "
88 "the largest S/N as the reference band."
89 )
90 minSNDiff = pexConfig.Field(
91 dtype=float,
92 default=3.,
93 doc="If the difference in S/N between another band and the priority band is larger "
94 "than this value (and the S/N in the priority band is less than minSN) "
95 "use the band with the largest S/N as the reference band"
96 )
97 flags = pexConfig.ListField(
98 dtype=str,
99 doc="Require that these flags, if available, are not set",
100 default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
101 "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
102 )
103 priorityList = pexConfig.ListField(
104 dtype=str,
105 default=[],
106 doc="Priority-ordered list of filter bands for the merge."
107 )
108 coaddName = pexConfig.Field(
109 dtype=str,
110 default="deep",
111 doc="Name of coadd"
112 )
113
114 def validate(self):
115 super().validate()
116 if len(self.priorityList) == 0:
117 raise RuntimeError("No priority list provided")
118
119
120
126
127
128class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
129 r"""!
130 @anchor MergeMeasurementsTask_
131
132 @brief Merge measurements from multiple bands
133
134 @section pipe_tasks_multiBand_Contents Contents
135
136 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
137 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
138 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
139 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
140 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
141 - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
142
143 @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
144
145 Command-line task that merges measurements from multiple bands.
146
147 Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
148 bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
149 required to have centroid, shape and flux measurements in each band.
150
151 @par Inputs:
152 deepCoadd_meas{tract,patch,filter}: SourceCatalog
153 @par Outputs:
154 deepCoadd_ref{tract,patch}: SourceCatalog
155 @par Data Unit:
156 tract, patch
157
158 MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask".
159
160 @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
161
162 @copydoc \_\_init\_\_
163
164 @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
165
166 @copydoc run
167
168 @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
169
170 See @ref MergeMeasurementsConfig_
171
172 @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
173
174 The command line task interface supports a
175 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
176 files.
177
178 MergeMeasurementsTask has no debug variables.
179
180 @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
181 of using MergeMeasurementsTask
182
183 MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
184 The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
185 coadds and individual exposures.
186 Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
187 of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
188 command line argument:
189 @code
190 mergeCoaddMeasurements.py --help
191 @endcode
192
193 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
194 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
195 step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
196 as follows:
197 @code
198 mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
199 @endcode
200 This will merge the HSC-I & HSC-R band catalogs. The results are written in
201 `$CI_HSC_DIR/DATA/deepCoadd-results/`.
202 """
203 _DefaultName = "mergeCoaddMeasurements"
204 ConfigClass = MergeMeasurementsConfig
205 RunnerClass = MergeSourcesRunner
206 inputDataset = "meas"
207 outputDataset = "ref"
208 getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
209
210 @classmethod
211 def _makeArgumentParser(cls):
212 return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
213
214 def getInputSchema(self, butler=None, schema=None):
215 return getInputSchema(self, butler, schema)
216
217 def runQuantum(self, butlerQC, inputRefs, outputRefs):
218 inputs = butlerQC.get(inputRefs)
219 dataIds = (ref.dataId for ref in inputRefs.catalogs)
220 catalogDict = {dataId['band']: cat for dataId, cat in zip(dataIds, inputs['catalogs'])}
221 inputs['catalogs'] = catalogDict
222 outputs = self.run(**inputs)
223 butlerQC.put(outputs, outputRefs)
224
225 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
226 """!
227 Initialize the task.
228
229 @param[in] schema: the schema of the detection catalogs used as input to this one
230 @param[in] butler: a butler used to read the input schema from disk, if schema is None
231
232 The task will set its own self.schema attribute to the schema of the output merged catalog.
233 """
234 super().__init__(**kwargs)
235
236 if initInputs is not None:
237 inputSchema = initInputs['inputSchema'].schema
238 else:
239 inputSchema = self.getInputSchema(butler=butler, schema=schema)
240 self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
241 self.schemaMapper.addMinimalSchema(inputSchema, True)
242 self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
243 self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
244 self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
245
246 self.flagKeys = {}
247 for band in self.config.priorityList:
248 outputKey = self.schemaMapper.editOutputSchema().addField(
249 "merge_measurement_%s" % band,
250 type="Flag",
251 doc="Flag field set if the measurements here are from the %s filter" % band
252 )
253 peakKey = inputSchema.find("merge_peak_%s" % band).key
254 footprintKey = inputSchema.find("merge_footprint_%s" % band).key
255 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
256 self.schema = self.schemaMapper.getOutputSchema()
257
258 self.pseudoFilterKeys = []
259 for filt in self.config.pseudoFilterList:
260 try:
261 self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
262 except Exception as e:
263 self.log.warning("merge_peak is not set for pseudo-filter %s: %s", filt, e)
264
265 self.badFlags = {}
266 for flag in self.config.flags:
267 try:
268 self.badFlags[flag] = self.schema.find(flag).getKey()
269 except KeyError as exc:
270 self.log.warning("Can't find flag %s in schema: %s", flag, exc)
271 self.outputSchema = afwTable.SourceCatalog(self.schema)
272
273 def runDataRef(self, patchRefList):
274 """!
275 @brief Merge coadd sources from multiple bands. Calls @ref `run`.
276 @param[in] patchRefList list of data references for each filter
277 """
278 catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
279 mergedCatalog = self.run(catalogs).mergedCatalog
280 self.write(patchRefList[0], mergedCatalog)
281
282 def run(self, catalogs):
283 """!
284 Merge measurement catalogs to create a single reference catalog for forced photometry
285
286 @param[in] catalogs: the catalogs to be merged
287
288 For parent sources, we choose the first band in config.priorityList for which the
289 merge_footprint flag for that band is is True.
290
291 For child sources, the logic is the same, except that we use the merge_peak flags.
292 """
293 # Put catalogs, filters in priority order
294 orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
295 orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
296
297 mergedCatalog = afwTable.SourceCatalog(self.schema)
298 mergedCatalog.reserve(len(orderedCatalogs[0]))
299
300 idKey = orderedCatalogs[0].table.getIdKey()
301 for catalog in orderedCatalogs[1:]:
302 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303 raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
304
305 # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
306 # record for each band, in priority order.
307 for orderedRecords in zip(*orderedCatalogs):
308
309 maxSNRecord = None
310 maxSNFlagKeys = None
311 maxSN = 0.
312 priorityRecord = None
313 priorityFlagKeys = None
314 prioritySN = 0.
315 hasPseudoFilter = False
316
317 # Now we iterate over those record-band pairs, keeping track of the priority and the
318 # largest S/N band.
319 for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
320 parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
321 child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
322
323 if not (parent or child):
324 for pseudoFilterKey in self.pseudoFilterKeys:
325 if inputRecord.get(pseudoFilterKey):
326 hasPseudoFilter = True
327 priorityRecord = inputRecord
328 priorityFlagKeys = flagKeys
329 break
330 if hasPseudoFilter:
331 break
332
333 isBad = any(inputRecord.get(flag) for flag in self.badFlags)
334 if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
335 sn = 0.
336 else:
337 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
338 if numpy.isnan(sn) or sn < 0.:
339 sn = 0.
340 if (parent or child) and priorityRecord is None:
341 priorityRecord = inputRecord
342 priorityFlagKeys = flagKeys
343 prioritySN = sn
344 if sn > maxSN:
345 maxSNRecord = inputRecord
346 maxSNFlagKeys = flagKeys
347 maxSN = sn
348
349 # If the priority band has a low S/N we would like to choose the band with the highest S/N as
350 # the reference band instead. However, we only want to choose the highest S/N band if it is
351 # significantly better than the priority band. Therefore, to choose a band other than the
352 # priority, we require that the priority S/N is below the minimum threshold and that the
353 # difference between the priority and highest S/N is larger than the difference threshold.
354 #
355 # For pseudo code objects we always choose the first band in the priority list.
356 bestRecord = None
357 bestFlagKeys = None
358 if hasPseudoFilter:
359 bestRecord = priorityRecord
360 bestFlagKeys = priorityFlagKeys
361 elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff
362 and maxSNRecord is not None):
363 bestRecord = maxSNRecord
364 bestFlagKeys = maxSNFlagKeys
365 elif priorityRecord is not None:
366 bestRecord = priorityRecord
367 bestFlagKeys = priorityFlagKeys
368
369 if bestRecord is not None and bestFlagKeys is not None:
370 outputRecord = mergedCatalog.addNew()
371 outputRecord.assign(bestRecord, self.schemaMapper)
372 outputRecord.set(bestFlagKeys.output, True)
373 else: # if we didn't find any records
374 raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
375 inputRecord.getId())
376
377 # more checking for sane inputs, since zip silently iterates over the smallest sequence
378 for inputCatalog in orderedCatalogs:
379 if len(mergedCatalog) != len(inputCatalog):
380 raise ValueError("Mismatch between catalog sizes: %s != %s" %
381 (len(mergedCatalog), len(orderedCatalogs)))
382
383 return pipeBase.Struct(
384 mergedCatalog=mergedCatalog
385 )
386
387 def write(self, patchRef, catalog):
388 """!
389 @brief Write the output.
390
391 @param[in] patchRef data reference for patch
392 @param[in] catalog catalog
393
394 We write as the dataset provided by the 'outputDataset'
395 class variable.
396 """
397 patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
398 # since the filter isn't actually part of the data ID for the dataset we're saving,
399 # it's confusing to see it in the log message, even if the butler simply ignores it.
400 mergeDataId = patchRef.dataId.copy()
401 del mergeDataId["filter"]
402 self.log.info("Wrote merged catalog: %s", mergeDataId)
403
404 def writeMetadata(self, dataRefList):
405 """!
406 @brief No metadata to write, and not sure how to write it for a list of dataRefs.
407 """
408 pass
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Definition: getTemplate.py:596
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.