22__all__ = [
"MergeMeasurementsConfig",
"MergeMeasurementsTask"]
30from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
31import lsst.pipe.base.connectionTypes
as cT
35 dimensions=(
"skymap",
"tract",
"patch"),
36 defaultTemplates={
"inputCoaddName":
"deep",
37 "outputCoaddName":
"deep"}):
38 inputSchema = cT.InitInput(
39 doc=
"Schema for the output merged measurement catalog.",
40 name=
"{inputCoaddName}Coadd_meas_schema",
41 storageClass=
"SourceCatalog",
43 outputSchema = cT.InitOutput(
44 doc=
"Schema for the output merged measurement catalog.",
45 name=
"{outputCoaddName}Coadd_ref_schema",
46 storageClass=
"SourceCatalog",
49 doc=
"Input catalogs to merge.",
50 name=
"{inputCoaddName}Coadd_meas",
52 storageClass=
"SourceCatalog",
53 dimensions=[
"band",
"skymap",
"tract",
"patch"],
55 mergedCatalog = cT.Output(
56 doc=
"Output merged catalog.",
57 name=
"{outputCoaddName}Coadd_ref",
58 storageClass=
"SourceCatalog",
59 dimensions=[
"skymap",
"tract",
"patch"],
63class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
64 """Configuration parameters for the MergeMeasurementsTask.
66 pseudoFilterList = pexConfig.ListField(
69 doc=
"Names of filters which may have no associated detection\n"
70 "(N.b. should include MergeDetectionsConfig.skyFilterName)"
72 snName = pexConfig.Field(
74 default=
"base_PsfFlux",
75 doc=
"Name of flux measurement for calculating the S/N when choosing the reference band."
77 minSN = pexConfig.Field(
80 doc=
"If the S/N from the priority band is below this value (and the S/N "
81 "is larger than minSNDiff compared to the priority band), use the band with "
82 "the largest S/N as the reference band."
84 minSNDiff = pexConfig.Field(
87 doc=
"If the difference in S/N between another band and the priority band is larger "
88 "than this value (and the S/N in the priority band is less than minSN) "
89 "use the band with the largest S/N as the reference band"
91 flags = pexConfig.ListField(
93 doc=
"Require that these flags, if available, are not set",
94 default=[
"base_PixelFlags_flag_interpolatedCenter",
"base_PsfFlux_flag",
95 "ext_photometryKron_KronFlux_flag",
"modelfit_CModel_flag", ]
97 priorityList = pexConfig.ListField(
100 doc=
"Priority-ordered list of filter bands for the merge."
102 coaddName = pexConfig.Field(
110 if len(self.priorityList) == 0:
111 raise RuntimeError(
"No priority list provided")
114class MergeMeasurementsTask(pipeBase.PipelineTask):
115 """Merge measurements from multiple bands.
117 Combines consistent (i.e. with the same peaks and footprints) catalogs of
118 sources from multiple filter bands to construct a unified catalog that is
119 suitable for driving forced photometry. Every source is required to have
120 centroid, shape and flux measurements in each band.
122 MergeMeasurementsTask is meant to be run after deblending & measuring
123 sources in every band. The purpose of the task is to generate a catalog of
124 sources suitable for driving forced photometry in coadds and individual
129 schema : `lsst.afw.table.Schema`, optional
130 The schema of the detection catalogs used as input to this task.
131 initInputs : `dict`, optional
132 Dictionary that can contain a key ``inputSchema`` containing the
133 input schema. If present will override the value of ``schema``.
135 Additional keyword arguments.
138 _DefaultName =
"mergeCoaddMeasurements"
139 ConfigClass = MergeMeasurementsConfig
141 inputDataset =
"meas"
142 outputDataset =
"ref"
144 def __init__(self, schema=None, initInputs=None, **kwargs):
145 super().__init__(**kwargs)
147 if initInputs
is not None:
148 schema = initInputs[
'inputSchema'].schema
151 raise ValueError(
"No input schema or initInputs['inputSchema'] provided.")
156 self.schemaMapper.addMinimalSchema(inputSchema,
True)
157 self.instFluxKey = inputSchema.find(self.config.snName +
"_instFlux").getKey()
158 self.instFluxErrKey = inputSchema.find(self.config.snName +
"_instFluxErr").getKey()
159 self.fluxFlagKey = inputSchema.find(self.config.snName +
"_flag").getKey()
162 for band
in self.config.priorityList:
163 outputKey = self.schemaMapper.editOutputSchema().addField(
164 "merge_measurement_%s" % band,
166 doc=
"Flag field set if the measurements here are from the %s filter" % band
168 peakKey = inputSchema.find(
"merge_peak_%s" % band).key
169 footprintKey = inputSchema.find(
"merge_footprint_%s" % band).key
170 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
171 self.schema = self.schemaMapper.getOutputSchema()
173 self.pseudoFilterKeys = []
174 for filt
in self.config.pseudoFilterList:
176 self.pseudoFilterKeys.append(self.schema.find(
"merge_peak_%s" % filt).getKey())
177 except Exception
as e:
178 self.log.warning(
"merge_peak is not set for pseudo-filter %s: %s", filt, e)
181 for flag
in self.config.flags:
183 self.badFlags[flag] = self.schema.find(flag).getKey()
184 except KeyError
as exc:
185 self.log.warning(
"Can't find flag %s in schema: %s", flag, exc)
188 def runQuantum(self, butlerQC, inputRefs, outputRefs):
189 inputs = butlerQC.get(inputRefs)
190 dataIds = (ref.dataId
for ref
in inputRefs.catalogs)
191 catalogDict = {dataId[
'band']: cat
for dataId, cat
in zip(dataIds, inputs[
'catalogs'])}
192 inputs[
'catalogs'] = catalogDict
193 outputs = self.run(**inputs)
194 butlerQC.put(outputs, outputRefs)
196 def run(self, catalogs):
197 """Merge measurement catalogs to create a single reference catalog for forced photometry.
201 catalogs : `lsst.afw.table.SourceCatalog`
202 Catalogs to be merged.
207 Raised if no catalog records were found;
208 if there is no valid reference for the input record ID;
209 or if there is a mismatch between catalog sizes.
213 For parent sources, we choose the first band in config.priorityList for which the
214 merge_footprint flag for that band is is True.
216 For child sources, the logic is the same, except that we use the merge_peak flags.
219 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
220 orderedKeys = [self.flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
223 mergedCatalog.reserve(len(orderedCatalogs[0]))
225 idKey = orderedCatalogs[0].table.getIdKey()
226 for catalog
in orderedCatalogs[1:]:
227 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
228 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
232 for orderedRecords
in zip(*orderedCatalogs):
237 priorityRecord =
None
238 priorityFlagKeys =
None
240 hasPseudoFilter =
False
244 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
245 parent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
246 child = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
248 if not (parent
or child):
249 for pseudoFilterKey
in self.pseudoFilterKeys:
250 if inputRecord.get(pseudoFilterKey):
251 hasPseudoFilter =
True
252 priorityRecord = inputRecord
253 priorityFlagKeys = flagKeys
259 any(inputRecord.get(flag)
for flag
in self.badFlags)
260 or inputRecord[
"deblend_dataCoverage"] == 0
261 or inputRecord.get(self.fluxFlagKey)
262 or inputRecord.get(self.instFluxErrKey) == 0
267 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
268 if numpy.isnan(sn)
or sn < 0.:
270 if (parent
or child)
and priorityRecord
is None:
271 priorityRecord = inputRecord
272 priorityFlagKeys = flagKeys
275 maxSNRecord = inputRecord
276 maxSNFlagKeys = flagKeys
289 bestRecord = priorityRecord
290 bestFlagKeys = priorityFlagKeys
291 elif (prioritySN < self.config.minSN
and (maxSN - prioritySN) > self.config.minSNDiff
292 and maxSNRecord
is not None):
293 bestRecord = maxSNRecord
294 bestFlagKeys = maxSNFlagKeys
295 elif priorityRecord
is not None:
296 bestRecord = priorityRecord
297 bestFlagKeys = priorityFlagKeys
299 if bestRecord
is not None and bestFlagKeys
is not None:
300 outputRecord = mergedCatalog.addNew()
301 outputRecord.assign(bestRecord, self.schemaMapper)
302 outputRecord.set(bestFlagKeys.output,
True)
304 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
308 for inputCatalog
in orderedCatalogs:
309 if len(mergedCatalog) != len(inputCatalog):
310 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
311 (len(mergedCatalog), len(orderedCatalogs)))
313 return pipeBase.Struct(
314 mergedCatalog=mergedCatalog
A mapping between the keys of two Schemas, used to copy data between them.