25 from .multiBandUtils
import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26 getInputSchema, getShortFilterName, readCatalog)
30 import lsst.pex.config
as pexConfig
33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
38 dimensions=(
"skymap",
"tract",
"patch"),
39 defaultTemplates={
"inputCoaddName":
"deep",
40 "outputCoaddName":
"deep"}):
41 inputSchema = cT.InitInput(
42 doc=
"Schema for the output merged measurement catalog.",
43 name=
"{inputCoaddName}Coadd_meas_schema",
44 storageClass=
"SourceCatalog",
46 outputSchema = cT.InitOutput(
47 doc=
"Schema for the output merged measurement catalog.",
48 name=
"{outputCoaddName}Coadd_ref_schema",
49 storageClass=
"SourceCatalog",
52 doc=
"Input catalogs to merge.",
53 name=
"{inputCoaddName}Coadd_meas",
55 storageClass=
"SourceCatalog",
56 dimensions=[
"abstract_filter",
"skymap",
"tract",
"patch"],
58 mergedCatalog = cT.Output(
59 doc=
"Output merged catalog.",
60 name=
"{outputCoaddName}Coadd_ref",
61 storageClass=
"SourceCatalog",
62 dimensions=[
"skymap",
"tract",
"patch"],
66 class MergeMeasurementsConfig(
PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
68 @anchor MergeMeasurementsConfig_ 70 @brief Configuration parameters for the MergeMeasurementsTask 72 pseudoFilterList = pexConfig.ListField(
75 doc=
"Names of filters which may have no associated detection\n" 76 "(N.b. should include MergeDetectionsConfig.skyFilterName)" 78 snName = pexConfig.Field(
80 default=
"base_PsfFlux",
81 doc=
"Name of flux measurement for calculating the S/N when choosing the reference band." 83 minSN = pexConfig.Field(
86 doc=
"If the S/N from the priority band is below this value (and the S/N " 87 "is larger than minSNDiff compared to the priority band), use the band with " 88 "the largest S/N as the reference band." 90 minSNDiff = pexConfig.Field(
93 doc=
"If the difference in S/N between another band and the priority band is larger " 94 "than this value (and the S/N in the priority band is less than minSN) " 95 "use the band with the largest S/N as the reference band" 97 flags = pexConfig.ListField(
99 doc=
"Require that these flags, if available, are not set",
100 default=[
"base_PixelFlags_flag_interpolatedCenter",
"base_PsfFlux_flag",
101 "ext_photometryKron_KronFlux_flag",
"modelfit_CModel_flag", ]
103 priorityList = pexConfig.ListField(
106 doc=
"Priority-ordered list of bands for the merge." 108 coaddName = pexConfig.Field(
116 if len(self.priorityList) == 0:
117 raise RuntimeError(
"No priority list provided")
128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
130 @anchor MergeMeasurementsTask_ 132 @brief Merge measurements from multiple bands 134 @section pipe_tasks_multiBand_Contents Contents 136 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose 137 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize 138 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run 139 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config 140 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug 141 - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example 143 @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description 145 Command-line task that merges measurements from multiple bands. 147 Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter 148 bands to construct a unified catalog that is suitable for driving forced photometry. Every source is 149 required to have centroid, shape and flux measurements in each band. 152 deepCoadd_meas{tract,patch,filter}: SourceCatalog 154 deepCoadd_ref{tract,patch}: SourceCatalog 158 MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask". 160 @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization 162 @copydoc \_\_init\_\_ 164 @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task 168 @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters 170 See @ref MergeMeasurementsConfig_ 172 @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables 174 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 175 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 178 MergeMeasurementsTask has no debug variables. 180 @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example 181 of using MergeMeasurementsTask 183 MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band. 184 The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in 185 coadds and individual exposures. 186 Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list 187 of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help` 188 command line argument: 190 mergeCoaddMeasurements.py --help 193 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 194 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 195 step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring 198 mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 200 This will merge the HSC-I & HSC-R band catalogs. The results are written in 201 `$CI_HSC_DIR/DATA/deepCoadd-results/`. 203 _DefaultName =
"mergeCoaddMeasurements" 204 ConfigClass = MergeMeasurementsConfig
205 RunnerClass = MergeSourcesRunner
206 inputDataset =
"meas" 207 outputDataset =
"ref" 208 getSchemaCatalogs = _makeGetSchemaCatalogs(
"ref")
211 def _makeArgumentParser(cls):
217 def runQuantum(self, butlerQC, inputRefs, outputRefs):
218 inputs = butlerQC.get(inputRefs)
219 dataIds = (ref.dataId
for ref
in inputRefs.catalogs)
220 catalogDict = {dataId[
'abstract_filter']: cat
for dataId, cat
in zip(dataIds,
222 inputs[
'catalogs'] = catalogDict
223 outputs = self.run(**inputs)
224 butlerQC.put(outputs, outputRefs)
226 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
230 @param[in] schema: the schema of the detection catalogs used as input to this one 231 @param[in] butler: a butler used to read the input schema from disk, if schema is None 233 The task will set its own self.schema attribute to the schema of the output merged catalog. 235 super().__init__(**kwargs)
237 if initInputs
is not None:
238 inputSchema = initInputs[
'inputSchema'].schema
240 inputSchema = self.getInputSchema(butler=butler, schema=schema)
242 self.schemaMapper.addMinimalSchema(inputSchema,
True)
243 self.instFluxKey = inputSchema.find(self.config.snName +
"_instFlux").getKey()
244 self.instFluxErrKey = inputSchema.find(self.config.snName +
"_instFluxErr").getKey()
245 self.fluxFlagKey = inputSchema.find(self.config.snName +
"_flag").getKey()
248 for band
in self.config.priorityList:
250 outputKey = self.schemaMapper.editOutputSchema().addField(
251 "merge_measurement_%s" % short,
253 doc=
"Flag field set if the measurements here are from the %s filter" % band
255 peakKey = inputSchema.find(
"merge_peak_%s" % short).key
256 footprintKey = inputSchema.find(
"merge_footprint_%s" % short).key
257 self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
258 self.schema = self.schemaMapper.getOutputSchema()
260 self.pseudoFilterKeys = []
261 for filt
in self.config.pseudoFilterList:
263 self.pseudoFilterKeys.
append(self.schema.find(
"merge_peak_%s" % filt).getKey())
264 except Exception
as e:
265 self.log.
warn(
"merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
268 for flag
in self.config.flags:
270 self.badFlags[flag] = self.schema.find(flag).getKey()
271 except KeyError
as exc:
272 self.log.
warn(
"Can't find flag %s in schema: %s" % (flag, exc,))
275 def runDataRef(self, patchRefList):
277 @brief Merge coadd sources from multiple bands. Calls @ref `run`. 278 @param[in] patchRefList list of data references for each filter 280 catalogs = dict(
readCatalog(self, patchRef)
for patchRef
in patchRefList)
281 mergedCatalog = self.run(catalogs).mergedCatalog
282 self.write(patchRefList[0], mergedCatalog)
284 def run(self, catalogs):
286 Merge measurement catalogs to create a single reference catalog for forced photometry 288 @param[in] catalogs: the catalogs to be merged 290 For parent sources, we choose the first band in config.priorityList for which the 291 merge_footprint flag for that band is is True. 293 For child sources, the logic is the same, except that we use the merge_peak flags. 296 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
297 orderedKeys = [self.flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
300 mergedCatalog.reserve(len(orderedCatalogs[0]))
302 idKey = orderedCatalogs[0].table.getIdKey()
303 for catalog
in orderedCatalogs[1:]:
304 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
305 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
309 for orderedRecords
in zip(*orderedCatalogs):
314 priorityRecord =
None 315 priorityFlagKeys =
None 317 hasPseudoFilter =
False 321 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
322 parent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
323 child = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
325 if not (parent
or child):
326 for pseudoFilterKey
in self.pseudoFilterKeys:
327 if inputRecord.get(pseudoFilterKey):
328 hasPseudoFilter =
True 329 priorityRecord = inputRecord
330 priorityFlagKeys = flagKeys
335 isBad =
any(inputRecord.get(flag)
for flag
in self.badFlags)
336 if isBad
or inputRecord.get(self.fluxFlagKey)
or inputRecord.get(self.instFluxErrKey) == 0:
339 sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
340 if numpy.isnan(sn)
or sn < 0.:
342 if (parent
or child)
and priorityRecord
is None:
343 priorityRecord = inputRecord
344 priorityFlagKeys = flagKeys
347 maxSNRecord = inputRecord
348 maxSNFlagKeys = flagKeys
361 bestRecord = priorityRecord
362 bestFlagKeys = priorityFlagKeys
363 elif (prioritySN < self.config.minSN
and (maxSN - prioritySN) > self.config.minSNDiff
and 364 maxSNRecord
is not None):
365 bestRecord = maxSNRecord
366 bestFlagKeys = maxSNFlagKeys
367 elif priorityRecord
is not None:
368 bestRecord = priorityRecord
369 bestFlagKeys = priorityFlagKeys
371 if bestRecord
is not None and bestFlagKeys
is not None:
372 outputRecord = mergedCatalog.addNew()
373 outputRecord.assign(bestRecord, self.schemaMapper)
374 outputRecord.set(bestFlagKeys.output,
True)
376 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
380 for inputCatalog
in orderedCatalogs:
381 if len(mergedCatalog) != len(inputCatalog):
382 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
383 (len(mergedCatalog), len(orderedCatalogs)))
385 return pipeBase.Struct(
386 mergedCatalog=mergedCatalog
389 def write(self, patchRef, catalog):
391 @brief Write the output. 393 @param[in] patchRef data reference for patch 394 @param[in] catalog catalog 396 We write as the dataset provided by the 'outputDataset' 399 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.outputDataset)
402 mergeDataId = patchRef.dataId.copy()
403 del mergeDataId[
"filter"]
404 self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
408 @brief No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
A mapping between the keys of two Schemas, used to copy data between them.
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getShortFilterName(name)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.