LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
mergeMeasurements.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["MergeMeasurementsConfig", "MergeMeasurementsTask"]
23
24import numpy
25import warnings
26
27import lsst.afw.table as afwTable
28import lsst.pex.config as pexConfig
29import lsst.pipe.base as pipeBase
30
31from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
32import lsst.pipe.base.connectionTypes as cT
33
34
35class MergeMeasurementsConnections(PipelineTaskConnections,
36 dimensions=("skymap", "tract", "patch"),
37 defaultTemplates={"inputCoaddName": "deep",
38 "outputCoaddName": "deep"}):
39 inputSchema = cT.InitInput(
40 doc="Schema for the output merged measurement catalog.",
41 name="{inputCoaddName}Coadd_meas_schema",
42 storageClass="SourceCatalog",
43 )
44 outputSchema = cT.InitOutput(
45 doc="Schema for the output merged measurement catalog.",
46 name="{outputCoaddName}Coadd_ref_schema",
47 storageClass="SourceCatalog",
48 )
49 catalogs = cT.Input(
50 doc="Input catalogs to merge.",
51 name="{inputCoaddName}Coadd_meas",
52 multiple=True,
53 storageClass="SourceCatalog",
54 dimensions=["band", "skymap", "tract", "patch"],
55 )
56 mergedCatalog = cT.Output(
57 doc="Output merged catalog.",
58 name="{outputCoaddName}Coadd_ref",
59 storageClass="SourceCatalog",
60 dimensions=["skymap", "tract", "patch"],
61 )
62
63
64class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
65 """Configuration parameters for the MergeMeasurementsTask.
66 """
67 pseudoFilterList = pexConfig.ListField(
68 dtype=str,
69 default=["sky"],
70 doc="Names of filters which may have no associated detection\n"
71 "(N.b. should include MergeDetectionsConfig.skyFilterName)"
72 )
73 snName = pexConfig.Field(
74 dtype=str,
75 default="base_PsfFlux",
76 doc="Name of flux measurement for calculating the S/N when choosing the reference band."
77 )
78 minSN = pexConfig.Field(
79 dtype=float,
80 default=10.,
81 doc="If the S/N from the priority band is below this value (and the S/N "
82 "is larger than minSNDiff compared to the priority band), use the band with "
83 "the largest S/N as the reference band."
84 )
85 minSNDiff = pexConfig.Field(
86 dtype=float,
87 default=3.,
88 doc="If the difference in S/N between another band and the priority band is larger "
89 "than this value (and the S/N in the priority band is less than minSN) "
90 "use the band with the largest S/N as the reference band"
91 )
92 flags = pexConfig.ListField(
93 dtype=str,
94 doc="Require that these flags, if available, are not set",
95 default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
96 "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
97 )
98 priorityList = pexConfig.ListField(
99 dtype=str,
100 default=[],
101 doc="Priority-ordered list of filter bands for the merge."
102 )
103 coaddName = pexConfig.Field(
104 dtype=str,
105 default="deep",
106 doc="Name of coadd"
107 )
108
109 def validate(self):
110 super().validate()
111 if len(self.priorityList) == 0:
112 raise RuntimeError("No priority list provided")
113
114
115class MergeMeasurementsTask(pipeBase.PipelineTask):
116 """Merge measurements from multiple bands.
117
118 Combines consistent (i.e. with the same peaks and footprints) catalogs of
119 sources from multiple filter bands to construct a unified catalog that is
120 suitable for driving forced photometry. Every source is required to have
121 centroid, shape and flux measurements in each band.
122
123 MergeMeasurementsTask is meant to be run after deblending & measuring
124 sources in every band. The purpose of the task is to generate a catalog of
125 sources suitable for driving forced photometry in coadds and individual
126 exposures.
127
128 Parameters
129 ----------
130 butler : `None`, optional
131 Compatibility parameter. Should always be `None`.
132 schema : `lsst.afw.table.Schema`, optional
133 The schema of the detection catalogs used as input to this task.
134 initInputs : `dict`, optional
135 Dictionary that can contain a key ``inputSchema`` containing the
136 input schema. If present will override the value of ``schema``.
137 **kwargs
138 Additional keyword arguments.
139 """
140
141 _DefaultName = "mergeCoaddMeasurements"
142 ConfigClass = MergeMeasurementsConfig
143
144 inputDataset = "meas"
145 outputDataset = "ref"
146
147 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
148 super().__init__(**kwargs)
149
150 if butler is not None:
151 warnings.warn("The 'butler' parameter is no longer used and can be safely removed.",
152 category=FutureWarning, stacklevel=2)
153 butler = None
154
155 if initInputs is not None:
156 schema = initInputs['inputSchema'].schema
157
158 if schema is None:
159 raise ValueError("No input schema or initInputs['inputSchema'] provided.")
160
161 inputSchema = schema
162
163 self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
164 self.schemaMapper.addMinimalSchema(inputSchema, True)
165 self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
166 self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
167 self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
168
169 self.flagKeys = {}
170 for band in self.config.priorityList:
171 outputKey = self.schemaMapper.editOutputSchema().addField(
172 "merge_measurement_%s" % band,
173 type="Flag",
174 doc="Flag field set if the measurements here are from the %s filter" % band
175 )
176 peakKey = inputSchema.find("merge_peak_%s" % band).key
177 footprintKey = inputSchema.find("merge_footprint_%s" % band).key
178 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
179 self.schema = self.schemaMapper.getOutputSchema()
180
181 self.pseudoFilterKeys = []
182 for filt in self.config.pseudoFilterList:
183 try:
184 self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
185 except Exception as e:
186 self.log.warning("merge_peak is not set for pseudo-filter %s: %s", filt, e)
187
188 self.badFlags = {}
189 for flag in self.config.flags:
190 try:
191 self.badFlags[flag] = self.schema.find(flag).getKey()
192 except KeyError as exc:
193 self.log.warning("Can't find flag %s in schema: %s", flag, exc)
194 self.outputSchema = afwTable.SourceCatalog(self.schema)
195
196 def runQuantum(self, butlerQC, inputRefs, outputRefs):
197 inputs = butlerQC.get(inputRefs)
198 dataIds = (ref.dataId for ref in inputRefs.catalogs)
199 catalogDict = {dataId['band']: cat for dataId, cat in zip(dataIds, inputs['catalogs'])}
200 inputs['catalogs'] = catalogDict
201 outputs = self.run(**inputs)
202 butlerQC.put(outputs, outputRefs)
203
204 def run(self, catalogs):
205 """Merge measurement catalogs to create a single reference catalog for forced photometry.
206
207 Parameters
208 ----------
210 Catalogs to be merged.
211
212 Raises
213 ------
214 ValueError
215 Raised if no catalog records were found;
216 if there is no valid reference for the input record ID;
217 or if there is a mismatch between catalog sizes.
218
219 Notes
220 -----
221 For parent sources, we choose the first band in config.priorityList for which the
222 merge_footprint flag for that band is is True.
223
224 For child sources, the logic is the same, except that we use the merge_peak flags.
225 """
226 # Put catalogs, filters in priority order
227 orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
228 orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
229
230 mergedCatalog = afwTable.SourceCatalog(self.schema)
231 mergedCatalog.reserve(len(orderedCatalogs[0]))
232
233 idKey = orderedCatalogs[0].table.getIdKey()
234 for catalog in orderedCatalogs[1:]:
235 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
236 raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
237
238 # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
239 # record for each band, in priority order.
240 for orderedRecords in zip(*orderedCatalogs):
241
242 maxSNRecord = None
243 maxSNFlagKeys = None
244 maxSN = 0.
245 priorityRecord = None
246 priorityFlagKeys = None
247 prioritySN = 0.
248 hasPseudoFilter = False
249
250 # Now we iterate over those record-band pairs, keeping track of the priority and the
251 # largest S/N band.
252 for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
253 parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
254 child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
255
256 if not (parent or child):
257 for pseudoFilterKey in self.pseudoFilterKeys:
258 if inputRecord.get(pseudoFilterKey):
259 hasPseudoFilter = True
260 priorityRecord = inputRecord
261 priorityFlagKeys = flagKeys
262 break
263 if hasPseudoFilter:
264 break
265
266 isBad = any(inputRecord.get(flag) for flag in self.badFlags)
267 if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
268 sn = 0.
269 else:
270 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
271 if numpy.isnan(sn) or sn < 0.:
272 sn = 0.
273 if (parent or child) and priorityRecord is None:
274 priorityRecord = inputRecord
275 priorityFlagKeys = flagKeys
276 prioritySN = sn
277 if sn > maxSN:
278 maxSNRecord = inputRecord
279 maxSNFlagKeys = flagKeys
280 maxSN = sn
281
282 # If the priority band has a low S/N we would like to choose the band with the highest S/N as
283 # the reference band instead. However, we only want to choose the highest S/N band if it is
284 # significantly better than the priority band. Therefore, to choose a band other than the
285 # priority, we require that the priority S/N is below the minimum threshold and that the
286 # difference between the priority and highest S/N is larger than the difference threshold.
287 #
288 # For pseudo code objects we always choose the first band in the priority list.
289 bestRecord = None
290 bestFlagKeys = None
291 if hasPseudoFilter:
292 bestRecord = priorityRecord
293 bestFlagKeys = priorityFlagKeys
294 elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff
295 and maxSNRecord is not None):
296 bestRecord = maxSNRecord
297 bestFlagKeys = maxSNFlagKeys
298 elif priorityRecord is not None:
299 bestRecord = priorityRecord
300 bestFlagKeys = priorityFlagKeys
301
302 if bestRecord is not None and bestFlagKeys is not None:
303 outputRecord = mergedCatalog.addNew()
304 outputRecord.assign(bestRecord, self.schemaMapper)
305 outputRecord.set(bestFlagKeys.output, True)
306 else: # if we didn't find any records
307 raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
308 inputRecord.getId())
309
310 # more checking for sane inputs, since zip silently iterates over the smallest sequence
311 for inputCatalog in orderedCatalogs:
312 if len(mergedCatalog) != len(inputCatalog):
313 raise ValueError("Mismatch between catalog sizes: %s != %s" %
314 (len(mergedCatalog), len(orderedCatalogs)))
315
316 return pipeBase.Struct(
317 mergedCatalog=mergedCatalog
318 )
Defines the fields and offsets for a table.
Definition: Schema.h:51
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21