22__all__ = [
"MergeMeasurementsConfig",
"MergeMeasurementsTask"]
31from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
32import lsst.pipe.base.connectionTypes
as cT
36 dimensions=(
"skymap",
"tract",
"patch"),
37 defaultTemplates={
"inputCoaddName":
"deep",
38 "outputCoaddName":
"deep"}):
39 inputSchema = cT.InitInput(
40 doc=
"Schema for the output merged measurement catalog.",
41 name=
"{inputCoaddName}Coadd_meas_schema",
42 storageClass=
"SourceCatalog",
44 outputSchema = cT.InitOutput(
45 doc=
"Schema for the output merged measurement catalog.",
46 name=
"{outputCoaddName}Coadd_ref_schema",
47 storageClass=
"SourceCatalog",
50 doc=
"Input catalogs to merge.",
51 name=
"{inputCoaddName}Coadd_meas",
53 storageClass=
"SourceCatalog",
54 dimensions=[
"band",
"skymap",
"tract",
"patch"],
56 mergedCatalog = cT.Output(
57 doc=
"Output merged catalog.",
58 name=
"{outputCoaddName}Coadd_ref",
59 storageClass=
"SourceCatalog",
60 dimensions=[
"skymap",
"tract",
"patch"],
64class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
65 """Configuration parameters for the MergeMeasurementsTask.
67 pseudoFilterList = pexConfig.ListField(
70 doc=
"Names of filters which may have no associated detection\n"
71 "(N.b. should include MergeDetectionsConfig.skyFilterName)"
73 snName = pexConfig.Field(
75 default=
"base_PsfFlux",
76 doc=
"Name of flux measurement for calculating the S/N when choosing the reference band."
78 minSN = pexConfig.Field(
81 doc=
"If the S/N from the priority band is below this value (and the S/N "
82 "is larger than minSNDiff compared to the priority band), use the band with "
83 "the largest S/N as the reference band."
85 minSNDiff = pexConfig.Field(
88 doc=
"If the difference in S/N between another band and the priority band is larger "
89 "than this value (and the S/N in the priority band is less than minSN) "
90 "use the band with the largest S/N as the reference band"
92 flags = pexConfig.ListField(
94 doc=
"Require that these flags, if available, are not set",
95 default=[
"base_PixelFlags_flag_interpolatedCenter",
"base_PsfFlux_flag",
96 "ext_photometryKron_KronFlux_flag",
"modelfit_CModel_flag", ]
98 priorityList = pexConfig.ListField(
101 doc=
"Priority-ordered list of filter bands for the merge."
103 coaddName = pexConfig.Field(
111 if len(self.priorityList) == 0:
112 raise RuntimeError(
"No priority list provided")
115class MergeMeasurementsTask(pipeBase.PipelineTask):
116 """Merge measurements from multiple bands.
118 Combines consistent (i.e. with the same peaks
and footprints) catalogs of
119 sources
from multiple filter bands to construct a unified catalog that
is
120 suitable
for driving forced photometry. Every source
is required to have
121 centroid, shape
and flux measurements
in each band.
123 MergeMeasurementsTask
is meant to be run after deblending & measuring
124 sources
in every band. The purpose of the task
is to generate a catalog of
125 sources suitable
for driving forced photometry
in coadds
and individual
130 butler : `
None`, optional
131 Compatibility parameter. Should always be `
None`.
133 The schema of the detection catalogs used
as input to this task.
134 initInputs : `dict`, optional
135 Dictionary that can contain a key ``inputSchema`` containing the
136 input schema. If present will override the value of ``schema``.
138 Additional keyword arguments.
141 _DefaultName = "mergeCoaddMeasurements"
142 ConfigClass = MergeMeasurementsConfig
144 inputDataset =
"meas"
145 outputDataset =
"ref"
147 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
148 super().__init__(**kwargs)
150 if butler
is not None:
151 warnings.warn(
"The 'butler' parameter is no longer used and can be safely removed.",
152 category=FutureWarning, stacklevel=2)
155 if initInputs
is not None:
156 schema = initInputs[
'inputSchema'].schema
159 raise ValueError(
"No input schema or initInputs['inputSchema'] provided.")
164 self.schemaMapper.addMinimalSchema(inputSchema,
True)
165 self.instFluxKey = inputSchema.find(self.config.snName +
"_instFlux").getKey()
166 self.instFluxErrKey = inputSchema.find(self.config.snName +
"_instFluxErr").getKey()
167 self.fluxFlagKey = inputSchema.find(self.config.snName +
"_flag").getKey()
170 for band
in self.config.priorityList:
171 outputKey = self.schemaMapper.editOutputSchema().addField(
172 "merge_measurement_%s" % band,
174 doc=
"Flag field set if the measurements here are from the %s filter" % band
176 peakKey = inputSchema.find(
"merge_peak_%s" % band).key
177 footprintKey = inputSchema.find(
"merge_footprint_%s" % band).key
178 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
179 self.schema = self.schemaMapper.getOutputSchema()
181 self.pseudoFilterKeys = []
182 for filt
in self.config.pseudoFilterList:
184 self.pseudoFilterKeys.append(self.schema.find(
"merge_peak_%s" % filt).getKey())
185 except Exception
as e:
186 self.log.warning(
"merge_peak is not set for pseudo-filter %s: %s", filt, e)
189 for flag
in self.config.flags:
191 self.badFlags[flag] = self.schema.find(flag).getKey()
192 except KeyError
as exc:
193 self.log.warning(
"Can't find flag %s in schema: %s", flag, exc)
196 def runQuantum(self, butlerQC, inputRefs, outputRefs):
197 inputs = butlerQC.get(inputRefs)
198 dataIds = (ref.dataId
for ref
in inputRefs.catalogs)
199 catalogDict = {dataId[
'band']: cat
for dataId, cat
in zip(dataIds, inputs[
'catalogs'])}
200 inputs[
'catalogs'] = catalogDict
201 outputs = self.run(**inputs)
202 butlerQC.put(outputs, outputRefs)
204 def run(self, catalogs):
205 """Merge measurement catalogs to create a single reference catalog for forced photometry.
210 Catalogs to be merged.
215 Raised if no catalog records were found;
216 if there
is no valid reference
for the input record ID;
217 or if there
is a mismatch between catalog sizes.
221 For parent sources, we choose the first band
in config.priorityList
for which the
222 merge_footprint flag
for that band
is is True.
224 For child sources, the logic
is the same,
except that we use the merge_peak flags.
227 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
228 orderedKeys = [self.flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
231 mergedCatalog.reserve(len(orderedCatalogs[0]))
233 idKey = orderedCatalogs[0].table.getIdKey()
234 for catalog
in orderedCatalogs[1:]:
235 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
236 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
240 for orderedRecords
in zip(*orderedCatalogs):
245 priorityRecord =
None
246 priorityFlagKeys =
None
248 hasPseudoFilter =
False
252 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
253 parent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
254 child = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
256 if not (parent
or child):
257 for pseudoFilterKey
in self.pseudoFilterKeys:
258 if inputRecord.get(pseudoFilterKey):
259 hasPseudoFilter =
True
260 priorityRecord = inputRecord
261 priorityFlagKeys = flagKeys
266 isBad = any(inputRecord.get(flag)
for flag
in self.badFlags)
267 if isBad
or inputRecord.get(self.fluxFlagKey)
or inputRecord.get(self.instFluxErrKey) == 0:
270 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
271 if numpy.isnan(sn)
or sn < 0.:
273 if (parent
or child)
and priorityRecord
is None:
274 priorityRecord = inputRecord
275 priorityFlagKeys = flagKeys
278 maxSNRecord = inputRecord
279 maxSNFlagKeys = flagKeys
292 bestRecord = priorityRecord
293 bestFlagKeys = priorityFlagKeys
294 elif (prioritySN < self.config.minSN
and (maxSN - prioritySN) > self.config.minSNDiff
295 and maxSNRecord
is not None):
296 bestRecord = maxSNRecord
297 bestFlagKeys = maxSNFlagKeys
298 elif priorityRecord
is not None:
299 bestRecord = priorityRecord
300 bestFlagKeys = priorityFlagKeys
302 if bestRecord
is not None and bestFlagKeys
is not None:
303 outputRecord = mergedCatalog.addNew()
304 outputRecord.assign(bestRecord, self.schemaMapper)
305 outputRecord.set(bestFlagKeys.output,
True)
307 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
311 for inputCatalog
in orderedCatalogs:
312 if len(mergedCatalog) != len(inputCatalog):
313 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
314 (len(mergedCatalog), len(orderedCatalogs)))
316 return pipeBase.Struct(
317 mergedCatalog=mergedCatalog
Defines the fields and offsets for a table.
A mapping between the keys of two Schemas, used to copy data between them.