25from .multiBandUtils
import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26 getInputSchema, readCatalog)
33from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
34import lsst.pipe.base.connectionTypes
as cT
38 dimensions=(
"skymap",
"tract",
"patch"),
39 defaultTemplates={
"inputCoaddName":
"deep",
40 "outputCoaddName":
"deep"}):
41 inputSchema = cT.InitInput(
42 doc=
"Schema for the output merged measurement catalog.",
43 name=
"{inputCoaddName}Coadd_meas_schema",
44 storageClass=
"SourceCatalog",
46 outputSchema = cT.InitOutput(
47 doc=
"Schema for the output merged measurement catalog.",
48 name=
"{outputCoaddName}Coadd_ref_schema",
49 storageClass=
"SourceCatalog",
52 doc=
"Input catalogs to merge.",
53 name=
"{inputCoaddName}Coadd_meas",
55 storageClass=
"SourceCatalog",
56 dimensions=[
"band",
"skymap",
"tract",
"patch"],
58 mergedCatalog = cT.Output(
59 doc=
"Output merged catalog.",
60 name=
"{outputCoaddName}Coadd_ref",
61 storageClass=
"SourceCatalog",
62 dimensions=[
"skymap",
"tract",
"patch"],
66class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
68 @anchor MergeMeasurementsConfig_
70 @brief Configuration parameters
for the MergeMeasurementsTask
72 pseudoFilterList = pexConfig.ListField(
75 doc=
"Names of filters which may have no associated detection\n"
76 "(N.b. should include MergeDetectionsConfig.skyFilterName)"
78 snName = pexConfig.Field(
80 default=
"base_PsfFlux",
81 doc=
"Name of flux measurement for calculating the S/N when choosing the reference band."
83 minSN = pexConfig.Field(
86 doc=
"If the S/N from the priority band is below this value (and the S/N "
87 "is larger than minSNDiff compared to the priority band), use the band with "
88 "the largest S/N as the reference band."
90 minSNDiff = pexConfig.Field(
93 doc=
"If the difference in S/N between another band and the priority band is larger "
94 "than this value (and the S/N in the priority band is less than minSN) "
95 "use the band with the largest S/N as the reference band"
97 flags = pexConfig.ListField(
99 doc=
"Require that these flags, if available, are not set",
100 default=[
"base_PixelFlags_flag_interpolatedCenter",
"base_PsfFlux_flag",
101 "ext_photometryKron_KronFlux_flag",
"modelfit_CModel_flag", ]
103 priorityList = pexConfig.ListField(
106 doc=
"Priority-ordered list of filter bands for the merge."
108 coaddName = pexConfig.Field(
116 if len(self.priorityList) == 0:
117 raise RuntimeError(
"No priority list provided")
128class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
130 @anchor MergeMeasurementsTask_
132 @brief Merge measurements
from multiple bands
134 @section pipe_tasks_multiBand_Contents Contents
136 -
@ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
137 -
@ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
138 -
@ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
139 -
@ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
140 -
@ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
141 -
@ref pipe_tasks_multiband_MergeMeasurementsTask_Example
143 @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
145 Command-line task that merges measurements
from multiple bands.
147 Combines consistent (i.e.
with the same peaks
and footprints) catalogs of sources
from multiple filter
148 bands to construct a unified catalog that
is suitable
for driving forced photometry. Every source
is
149 required to have centroid, shape
and flux measurements
in each band.
152 deepCoadd_meas{tract,patch,filter}: SourceCatalog
154 deepCoadd_ref{tract,patch}: SourceCatalog
158 MergeMeasurementsTask subclasses
@ref CmdLineTask_
"CmdLineTask".
160 @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
162 @copydoc \_\_init\_\_
164 @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
168 @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
170 See
@ref MergeMeasurementsConfig_
172 @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
174 The command line task interface supports a
175 flag
@c -d to
import @b debug.py
from your
@c PYTHONPATH; see
@ref baseDebug
for more about
@b debug.py
178 MergeMeasurementsTask has no debug variables.
180 @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
181 of using MergeMeasurementsTask
183 MergeMeasurementsTask
is meant to be run after deblending & measuring sources
in every band.
184 The purpose of the task
is to generate a catalog of sources suitable
for driving forced photometry
in
185 coadds
and individual exposures.
186 Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
187 of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py
with the `--help`
188 command line argument:
190 mergeCoaddMeasurements.py --help
193 To demonstrate usage of the DetectCoaddSourcesTask
in the larger context of multi-band processing, we
194 will process HSC data
in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
195 step 7 at
@ref pipeTasks_multiBand, one may merge the catalogs generated after deblending
and measuring
198 mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
200 This will merge the HSC-I & HSC-R band catalogs. The results are written
in
201 `$CI_HSC_DIR/DATA/deepCoadd-results/`.
203 _DefaultName = "mergeCoaddMeasurements"
204 ConfigClass = MergeMeasurementsConfig
205 RunnerClass = MergeSourcesRunner
206 inputDataset =
"meas"
207 outputDataset =
"ref"
208 getSchemaCatalogs = _makeGetSchemaCatalogs(
"ref")
211 def _makeArgumentParser(cls):
217 def runQuantum(self, butlerQC, inputRefs, outputRefs):
218 inputs = butlerQC.get(inputRefs)
219 dataIds = (ref.dataId
for ref
in inputRefs.catalogs)
220 catalogDict = {dataId[
'band']: cat
for dataId, cat
in zip(dataIds, inputs[
'catalogs'])}
221 inputs[
'catalogs'] = catalogDict
222 outputs = self.run(**inputs)
223 butlerQC.put(outputs, outputRefs)
225 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
229 @param[
in] schema: the schema of the detection catalogs used
as input to this one
230 @param[
in] butler: a butler used to read the input schema
from disk,
if schema
is None
232 The task will set its own self.schema attribute to the schema of the output merged catalog.
234 super().__init__(**kwargs)
236 if initInputs
is not None:
237 inputSchema = initInputs[
'inputSchema'].schema
239 inputSchema = self.getInputSchema(butler=butler, schema=schema)
241 self.schemaMapper.addMinimalSchema(inputSchema,
True)
242 self.instFluxKey = inputSchema.find(self.config.snName +
"_instFlux").getKey()
243 self.instFluxErrKey = inputSchema.find(self.config.snName +
"_instFluxErr").getKey()
244 self.fluxFlagKey = inputSchema.find(self.config.snName +
"_flag").getKey()
247 for band
in self.config.priorityList:
248 outputKey = self.schemaMapper.editOutputSchema().addField(
249 "merge_measurement_%s" % band,
251 doc=
"Flag field set if the measurements here are from the %s filter" % band
253 peakKey = inputSchema.find(
"merge_peak_%s" % band).key
254 footprintKey = inputSchema.find(
"merge_footprint_%s" % band).key
255 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
256 self.schema = self.schemaMapper.getOutputSchema()
258 self.pseudoFilterKeys = []
259 for filt
in self.config.pseudoFilterList:
261 self.pseudoFilterKeys.
append(self.schema.find(
"merge_peak_%s" % filt).getKey())
262 except Exception
as e:
263 self.log.
warning(
"merge_peak is not set for pseudo-filter %s: %s", filt, e)
266 for flag
in self.config.flags:
268 self.badFlags[flag] = self.schema.find(flag).getKey()
269 except KeyError
as exc:
270 self.log.
warning(
"Can't find flag %s in schema: %s", flag, exc)
273 def runDataRef(self, patchRefList):
275 @brief Merge coadd sources
from multiple bands. Calls
@ref `run`.
276 @param[
in] patchRefList list of data references
for each filter
278 catalogs = dict(readCatalog(self, patchRef) for patchRef
in patchRefList)
279 mergedCatalog = self.run(catalogs).mergedCatalog
280 self.write(patchRefList[0], mergedCatalog)
282 def run(self, catalogs):
284 Merge measurement catalogs to create a single reference catalog for forced photometry
286 @param[
in] catalogs: the catalogs to be merged
288 For parent sources, we choose the first band
in config.priorityList
for which the
289 merge_footprint flag
for that band
is is True.
291 For child sources, the logic
is the same,
except that we use the merge_peak flags.
294 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
295 orderedKeys = [self.flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
298 mergedCatalog.reserve(len(orderedCatalogs[0]))
300 idKey = orderedCatalogs[0].table.getIdKey()
301 for catalog
in orderedCatalogs[1:]:
302 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
307 for orderedRecords
in zip(*orderedCatalogs):
312 priorityRecord =
None
313 priorityFlagKeys =
None
315 hasPseudoFilter =
False
319 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
320 parent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
321 child = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
323 if not (parent
or child):
324 for pseudoFilterKey
in self.pseudoFilterKeys:
325 if inputRecord.get(pseudoFilterKey):
326 hasPseudoFilter =
True
327 priorityRecord = inputRecord
328 priorityFlagKeys = flagKeys
333 isBad =
any(inputRecord.get(flag)
for flag
in self.badFlags)
334 if isBad
or inputRecord.get(self.fluxFlagKey)
or inputRecord.get(self.instFluxErrKey) == 0:
337 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
338 if numpy.isnan(sn)
or sn < 0.:
340 if (parent
or child)
and priorityRecord
is None:
341 priorityRecord = inputRecord
342 priorityFlagKeys = flagKeys
345 maxSNRecord = inputRecord
346 maxSNFlagKeys = flagKeys
359 bestRecord = priorityRecord
360 bestFlagKeys = priorityFlagKeys
361 elif (prioritySN < self.config.minSN
and (maxSN - prioritySN) > self.config.minSNDiff
362 and maxSNRecord
is not None):
363 bestRecord = maxSNRecord
364 bestFlagKeys = maxSNFlagKeys
365 elif priorityRecord
is not None:
366 bestRecord = priorityRecord
367 bestFlagKeys = priorityFlagKeys
369 if bestRecord
is not None and bestFlagKeys
is not None:
370 outputRecord = mergedCatalog.addNew()
371 outputRecord.assign(bestRecord, self.schemaMapper)
372 outputRecord.set(bestFlagKeys.output,
True)
374 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
378 for inputCatalog
in orderedCatalogs:
379 if len(mergedCatalog) != len(inputCatalog):
380 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
381 (len(mergedCatalog), len(orderedCatalogs)))
383 return pipeBase.Struct(
384 mergedCatalog=mergedCatalog
387 def write(self, patchRef, catalog):
389 @brief Write the output.
391 @param[
in] patchRef data reference
for patch
392 @param[
in] catalog catalog
394 We write
as the dataset provided by the
'outputDataset'
397 patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
400 mergeDataId = patchRef.dataId.copy()
401 del mergeDataId[
"filter"]
402 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
406 @brief No metadata to write,
and not sure how to write it
for a list of dataRefs.
A mapping between the keys of two Schemas, used to copy data between them.
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.