25 from .multiBandUtils
import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26 getInputSchema, getShortFilterName, readCatalog)
35 @anchor MergeMeasurementsConfig_ 37 @brief Configuration parameters for the MergeMeasurementsTask 40 inputSchema = pipeBase.InitInputDatasetField(
41 doc=
"Schema for the input measurement catalogs.",
42 nameTemplate=
"{inputCoaddName}Coadd_meas_schema",
43 storageClass=
"SourceCatalog",
45 outputSchema = pipeBase.InitOutputDatasetField(
46 doc=
"Schema for the output merged measurement catalog.",
47 nameTemplate=
"{outputCoaddName}Coadd_ref_schema",
48 storageClass=
"SourceCatalog",
50 catalogs = pipeBase.InputDatasetField(
51 doc=
"Input catalogs to merge.",
52 nameTemplate=
"{inputCoaddName}Coadd_meas",
54 storageClass=
"SourceCatalog",
55 dimensions=[
"AbstractFilter",
"SkyMap",
"Tract",
"Patch"],
57 mergedCatalog = pipeBase.OutputDatasetField(
58 doc=
"Output merged catalog.",
59 nameTemplate=
"{outputCoaddName}Coadd_ref",
61 storageClass=
"SourceCatalog",
62 dimensions=[
"SkyMap",
"Tract",
"Patch"],
65 pseudoFilterList = pexConfig.ListField(
68 doc=
"Names of filters which may have no associated detection\n" 69 "(N.b. should include MergeDetectionsConfig.skyFilterName)" 71 snName = pexConfig.Field(
73 default=
"base_PsfFlux",
74 doc=
"Name of flux measurement for calculating the S/N when choosing the reference band." 76 minSN = pexConfig.Field(
79 doc=
"If the S/N from the priority band is below this value (and the S/N " 80 "is larger than minSNDiff compared to the priority band), use the band with " 81 "the largest S/N as the reference band." 83 minSNDiff = pexConfig.Field(
86 doc=
"If the difference in S/N between another band and the priority band is larger " 87 "than this value (and the S/N in the priority band is less than minSN) " 88 "use the band with the largest S/N as the reference band" 90 flags = pexConfig.ListField(
92 doc=
"Require that these flags, if available, are not set",
93 default=[
"base_PixelFlags_flag_interpolatedCenter",
"base_PsfFlux_flag",
94 "ext_photometryKron_KronFlux_flag",
"modelfit_CModel_flag", ]
96 priorityList = pexConfig.ListField(
99 doc=
"Priority-ordered list of bands for the merge." 101 coaddName = pexConfig.Field(
110 raise RuntimeError(
"No priority list provided")
114 self.formatTemplateNames({
"inputCoaddName":
"deep",
115 "outputCoaddName":
"deep"})
116 self.quantum.dimensions = (
"SkyMap",
"Tract",
"Patch")
128 @anchor MergeMeasurementsTask_ 130 @brief Merge measurements from multiple bands 132 @section pipe_tasks_multiBand_Contents Contents 134 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose 135 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize 136 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run 137 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config 138 - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug 139 - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example 141 @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description 143 Command-line task that merges measurements from multiple bands. 145 Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter 146 bands to construct a unified catalog that is suitable for driving forced photometry. Every source is 147 required to have centroid, shape and flux measurements in each band. 150 deepCoadd_meas{tract,patch,filter}: SourceCatalog 152 deepCoadd_ref{tract,patch}: SourceCatalog 156 MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask". 158 @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization 160 @copydoc \_\_init\_\_ 162 @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task 166 @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters 168 See @ref MergeMeasurementsConfig_ 170 @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables 172 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a 173 flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py 176 MergeMeasurementsTask has no debug variables. 178 @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example 179 of using MergeMeasurementsTask 181 MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band. 182 The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in 183 coadds and individual exposures. 184 Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list 185 of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help` 186 command line argument: 188 mergeCoaddMeasurements.py --help 191 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 192 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 193 step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring 196 mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 198 This will merge the HSC-I & HSC-R band catalogs. The results are written in 199 `$CI_HSC_DIR/DATA/deepCoadd-results/`. 201 _DefaultName =
"mergeCoaddMeasurements" 202 ConfigClass = MergeMeasurementsConfig
203 RunnerClass = MergeSourcesRunner
204 inputDataset =
"meas" 205 outputDataset =
"ref" 206 getSchemaCatalogs = _makeGetSchemaCatalogs(
"ref")
209 def _makeArgumentParser(cls):
219 catalogDict = {dataId[
'abstract_filter']: cat
for dataId, cat
in zip(inputDataIds[
'catalogs'],
220 inputData[
'catalogs'])}
221 inputData[
'catalogs'] = catalogDict
223 return super().
adaptArgsAndRun(inputData, inputDataIds, outputDataIds, butler)
225 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
229 @param[in] schema: the schema of the detection catalogs used as input to this one 230 @param[in] butler: a butler used to read the input schema from disk, if schema is None 232 The task will set its own self.schema attribute to the schema of the output merged catalog. 236 if initInputs
is not None:
237 inputSchema = initInputs[
'inputSchema'].schema
242 self.
instFluxKey = inputSchema.find(self.config.snName +
"_instFlux").getKey()
243 self.
instFluxErrKey = inputSchema.find(self.config.snName +
"_instFluxErr").getKey()
244 self.
fluxFlagKey = inputSchema.find(self.config.snName +
"_flag").getKey()
247 for band
in self.config.priorityList:
249 outputKey = self.
schemaMapper.editOutputSchema().addField(
250 "merge_measurement_%s" % short,
252 doc=
"Flag field set if the measurements here are from the %s filter" % band
254 peakKey = inputSchema.find(
"merge_peak_%s" % short).key
255 footprintKey = inputSchema.find(
"merge_footprint_%s" % short).key
256 self.
flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
260 for filt
in self.config.pseudoFilterList:
263 except Exception
as e:
264 self.log.
warn(
"merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
267 for flag
in self.config.flags:
270 except KeyError
as exc:
271 self.log.
warn(
"Can't find flag %s in schema: %s" % (flag, exc,))
275 @brief Merge coadd sources from multiple bands. Calls @ref `run`. 276 @param[in] patchRefList list of data references for each filter 278 catalogs = dict(
readCatalog(self, patchRef)
for patchRef
in patchRefList)
279 mergedCatalog = self.
run(catalogs).mergedCatalog
280 self.
write(patchRefList[0], mergedCatalog)
284 Merge measurement catalogs to create a single reference catalog for forced photometry 286 @param[in] catalogs: the catalogs to be merged 288 For parent sources, we choose the first band in config.priorityList for which the 289 merge_footprint flag for that band is is True. 291 For child sources, the logic is the same, except that we use the merge_peak flags. 294 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
295 orderedKeys = [self.
flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
298 mergedCatalog.reserve(len(orderedCatalogs[0]))
300 idKey = orderedCatalogs[0].table.getIdKey()
301 for catalog
in orderedCatalogs[1:]:
302 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
307 for orderedRecords
in zip(*orderedCatalogs):
312 priorityRecord =
None 313 priorityFlagKeys =
None 315 hasPseudoFilter =
False 319 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
320 parent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
321 child = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
323 if not (parent
or child):
325 if inputRecord.get(pseudoFilterKey):
326 hasPseudoFilter =
True 327 priorityRecord = inputRecord
328 priorityFlagKeys = flagKeys
333 isBad =
any(inputRecord.get(flag)
for flag
in self.
badFlags)
338 if numpy.isnan(sn)
or sn < 0.:
340 if (parent
or child)
and priorityRecord
is None:
341 priorityRecord = inputRecord
342 priorityFlagKeys = flagKeys
345 maxSNRecord = inputRecord
346 maxSNFlagKeys = flagKeys
359 bestRecord = priorityRecord
360 bestFlagKeys = priorityFlagKeys
361 elif (prioritySN < self.config.minSN
and (maxSN - prioritySN) > self.config.minSNDiff
and 362 maxSNRecord
is not None):
363 bestRecord = maxSNRecord
364 bestFlagKeys = maxSNFlagKeys
365 elif priorityRecord
is not None:
366 bestRecord = priorityRecord
367 bestFlagKeys = priorityFlagKeys
369 if bestRecord
is not None and bestFlagKeys
is not None:
370 outputRecord = mergedCatalog.addNew()
372 outputRecord.set(bestFlagKeys.output,
True)
374 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
378 for inputCatalog
in orderedCatalogs:
379 if len(mergedCatalog) != len(inputCatalog):
380 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
381 (len(mergedCatalog), len(orderedCatalogs)))
383 return pipeBase.Struct(
384 mergedCatalog=mergedCatalog
389 @brief Write the output. 391 @param[in] patchRef data reference for patch 392 @param[in] catalog catalog 394 We write as the dataset provided by the 'outputDataset' 397 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.
outputDataset)
400 mergeDataId = patchRef.dataId.copy()
401 del mergeDataId[
"filter"]
402 self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
406 @brief No metadata to write, and not sure how to write it for a list of dataRefs.
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
A mapping between the keys of two Schemas, used to copy data between them.
Merge measurements from multiple bands.
def run(self, catalogs)
Merge measurement catalogs to create a single reference catalog for forced photometry.
def getInitOutputDatasets(self)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
Configuration parameters for the MergeMeasurementsTask.
def getInputSchema(self, butler=None, schema=None)
def write(self, patchRef, catalog)
Write the output.
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def __init__(self, butler=None, schema=None, initInputs=None, kwargs)
Initialize the task.
def getShortFilterName(name)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.