LSSTApplications  19.0.0-14-gb0260a2+72efe9b372,20.0.0+7927753e06,20.0.0+8829bf0056,20.0.0+995114c5d2,20.0.0+b6f4b2abd1,20.0.0+bddc4f4cbe,20.0.0-1-g253301a+8829bf0056,20.0.0-1-g2b7511a+0d71a2d77f,20.0.0-1-g5b95a8c+7461dd0434,20.0.0-12-g321c96ea+23efe4bbff,20.0.0-16-gfab17e72e+fdf35455f6,20.0.0-2-g0070d88+ba3ffc8f0b,20.0.0-2-g4dae9ad+ee58a624b3,20.0.0-2-g61b8584+5d3db074ba,20.0.0-2-gb780d76+d529cf1a41,20.0.0-2-ged6426c+226a441f5f,20.0.0-2-gf072044+8829bf0056,20.0.0-2-gf1f7952+ee58a624b3,20.0.0-20-geae50cf+e37fec0aee,20.0.0-25-g3dcad98+544a109665,20.0.0-25-g5eafb0f+ee58a624b3,20.0.0-27-g64178ef+f1f297b00a,20.0.0-3-g4cc78c6+e0676b0dc8,20.0.0-3-g8f21e14+4fd2c12c9a,20.0.0-3-gbd60e8c+187b78b4b8,20.0.0-3-gbecbe05+48431fa087,20.0.0-38-ge4adf513+a12e1f8e37,20.0.0-4-g97dc21a+544a109665,20.0.0-4-gb4befbc+087873070b,20.0.0-4-gf910f65+5d3db074ba,20.0.0-5-gdfe0fee+199202a608,20.0.0-5-gfbfe500+d529cf1a41,20.0.0-6-g64f541c+d529cf1a41,20.0.0-6-g9a5b7a1+a1cd37312e,20.0.0-68-ga3f3dda+5fca18c6a4,20.0.0-9-g4aef684+e18322736b,w.2020.45
LSSTDataManagementBasePackage
mergeMeasurements.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from .multiBandUtils import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26  getInputSchema, getShortFilterName, readCatalog)
27 
28 
29 import lsst.afw.table as afwTable
30 import lsst.pex.config as pexConfig
31 import lsst.pipe.base as pipeBase
32 
33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
35 
36 
38  dimensions=("skymap", "tract", "patch"),
39  defaultTemplates={"inputCoaddName": "deep",
40  "outputCoaddName": "deep"}):
41  inputSchema = cT.InitInput(
42  doc="Schema for the output merged measurement catalog.",
43  name="{inputCoaddName}Coadd_meas_schema",
44  storageClass="SourceCatalog",
45  )
46  outputSchema = cT.InitOutput(
47  doc="Schema for the output merged measurement catalog.",
48  name="{outputCoaddName}Coadd_ref_schema",
49  storageClass="SourceCatalog",
50  )
51  catalogs = cT.Input(
52  doc="Input catalogs to merge.",
53  name="{inputCoaddName}Coadd_meas",
54  multiple=True,
55  storageClass="SourceCatalog",
56  dimensions=["band", "skymap", "tract", "patch"],
57  )
58  mergedCatalog = cT.Output(
59  doc="Output merged catalog.",
60  name="{outputCoaddName}Coadd_ref",
61  storageClass="SourceCatalog",
62  dimensions=["skymap", "tract", "patch"],
63  )
64 
65 
66 class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
67  """!
68  @anchor MergeMeasurementsConfig_
69 
70  @brief Configuration parameters for the MergeMeasurementsTask
71  """
72  pseudoFilterList = pexConfig.ListField(
73  dtype=str,
74  default=["sky"],
75  doc="Names of filters which may have no associated detection\n"
76  "(N.b. should include MergeDetectionsConfig.skyFilterName)"
77  )
78  snName = pexConfig.Field(
79  dtype=str,
80  default="base_PsfFlux",
81  doc="Name of flux measurement for calculating the S/N when choosing the reference band."
82  )
83  minSN = pexConfig.Field(
84  dtype=float,
85  default=10.,
86  doc="If the S/N from the priority band is below this value (and the S/N "
87  "is larger than minSNDiff compared to the priority band), use the band with "
88  "the largest S/N as the reference band."
89  )
90  minSNDiff = pexConfig.Field(
91  dtype=float,
92  default=3.,
93  doc="If the difference in S/N between another band and the priority band is larger "
94  "than this value (and the S/N in the priority band is less than minSN) "
95  "use the band with the largest S/N as the reference band"
96  )
97  flags = pexConfig.ListField(
98  dtype=str,
99  doc="Require that these flags, if available, are not set",
100  default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
101  "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
102  )
103  priorityList = pexConfig.ListField(
104  dtype=str,
105  default=[],
106  doc="Priority-ordered list of bands for the merge."
107  )
108  coaddName = pexConfig.Field(
109  dtype=str,
110  default="deep",
111  doc="Name of coadd"
112  )
113 
114  def validate(self):
115  super().validate()
116  if len(self.priorityList) == 0:
117  raise RuntimeError("No priority list provided")
118 
119 
120 
126 
127 
128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
129  r"""!
130  @anchor MergeMeasurementsTask_
131 
132  @brief Merge measurements from multiple bands
133 
134  @section pipe_tasks_multiBand_Contents Contents
135 
136  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
137  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
138  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
139  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
140  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
141  - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
142 
143  @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
144 
145  Command-line task that merges measurements from multiple bands.
146 
147  Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
148  bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
149  required to have centroid, shape and flux measurements in each band.
150 
151  @par Inputs:
152  deepCoadd_meas{tract,patch,filter}: SourceCatalog
153  @par Outputs:
154  deepCoadd_ref{tract,patch}: SourceCatalog
155  @par Data Unit:
156  tract, patch
157 
158  MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask".
159 
160  @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
161 
162  @copydoc \_\_init\_\_
163 
164  @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
165 
166  @copydoc run
167 
168  @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
169 
170  See @ref MergeMeasurementsConfig_
171 
172  @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
173 
174  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
175  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
176  files.
177 
178  MergeMeasurementsTask has no debug variables.
179 
180  @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
181  of using MergeMeasurementsTask
182 
183  MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
184  The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
185  coadds and individual exposures.
186  Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
187  of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
188  command line argument:
189  @code
190  mergeCoaddMeasurements.py --help
191  @endcode
192 
193  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
194  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
195  step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
196  as follows:
197  @code
198  mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
199  @endcode
200  This will merge the HSC-I & HSC-R band catalogs. The results are written in
201  `$CI_HSC_DIR/DATA/deepCoadd-results/`.
202  """
203  _DefaultName = "mergeCoaddMeasurements"
204  ConfigClass = MergeMeasurementsConfig
205  RunnerClass = MergeSourcesRunner
206  inputDataset = "meas"
207  outputDataset = "ref"
208  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
209 
210  @classmethod
211  def _makeArgumentParser(cls):
212  return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
213 
214  def getInputSchema(self, butler=None, schema=None):
215  return getInputSchema(self, butler, schema)
216 
217  def runQuantum(self, butlerQC, inputRefs, outputRefs):
218  inputs = butlerQC.get(inputRefs)
219  dataIds = (ref.dataId for ref in inputRefs.catalogs)
220  catalogDict = {dataId['band']: cat for dataId, cat in zip(dataIds, inputs['catalogs'])}
221  inputs['catalogs'] = catalogDict
222  outputs = self.run(**inputs)
223  butlerQC.put(outputs, outputRefs)
224 
225  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
226  """!
227  Initialize the task.
228 
229  @param[in] schema: the schema of the detection catalogs used as input to this one
230  @param[in] butler: a butler used to read the input schema from disk, if schema is None
231 
232  The task will set its own self.schema attribute to the schema of the output merged catalog.
233  """
234  super().__init__(**kwargs)
235 
236  if initInputs is not None:
237  inputSchema = initInputs['inputSchema'].schema
238  else:
239  inputSchema = self.getInputSchema(butler=butler, schema=schema)
240  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
241  self.schemaMapper.addMinimalSchema(inputSchema, True)
242  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
243  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
244  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
245 
246  self.flagKeys = {}
247  for band in self.config.priorityList:
248  short = getShortFilterName(band)
249  outputKey = self.schemaMapper.editOutputSchema().addField(
250  "merge_measurement_%s" % short,
251  type="Flag",
252  doc="Flag field set if the measurements here are from the %s filter" % band
253  )
254  peakKey = inputSchema.find("merge_peak_%s" % short).key
255  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
256  self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
257  self.schema = self.schemaMapper.getOutputSchema()
258 
259  self.pseudoFilterKeys = []
260  for filt in self.config.pseudoFilterList:
261  try:
262  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
263  except Exception as e:
264  self.log.warn("merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
265 
266  self.badFlags = {}
267  for flag in self.config.flags:
268  try:
269  self.badFlags[flag] = self.schema.find(flag).getKey()
270  except KeyError as exc:
271  self.log.warn("Can't find flag %s in schema: %s" % (flag, exc,))
272  self.outputSchema = afwTable.SourceCatalog(self.schema)
273 
274  def runDataRef(self, patchRefList):
275  """!
276  @brief Merge coadd sources from multiple bands. Calls @ref `run`.
277  @param[in] patchRefList list of data references for each filter
278  """
279  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
280  mergedCatalog = self.run(catalogs).mergedCatalog
281  self.write(patchRefList[0], mergedCatalog)
282 
283  def run(self, catalogs):
284  """!
285  Merge measurement catalogs to create a single reference catalog for forced photometry
286 
287  @param[in] catalogs: the catalogs to be merged
288 
289  For parent sources, we choose the first band in config.priorityList for which the
290  merge_footprint flag for that band is is True.
291 
292  For child sources, the logic is the same, except that we use the merge_peak flags.
293  """
294  # Put catalogs, filters in priority order
295  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
296  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
297 
298  mergedCatalog = afwTable.SourceCatalog(self.schema)
299  mergedCatalog.reserve(len(orderedCatalogs[0]))
300 
301  idKey = orderedCatalogs[0].table.getIdKey()
302  for catalog in orderedCatalogs[1:]:
303  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
304  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
305 
306  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
307  # record for each band, in priority order.
308  for orderedRecords in zip(*orderedCatalogs):
309 
310  maxSNRecord = None
311  maxSNFlagKeys = None
312  maxSN = 0.
313  priorityRecord = None
314  priorityFlagKeys = None
315  prioritySN = 0.
316  hasPseudoFilter = False
317 
318  # Now we iterate over those record-band pairs, keeping track of the priority and the
319  # largest S/N band.
320  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
321  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
322  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
323 
324  if not (parent or child):
325  for pseudoFilterKey in self.pseudoFilterKeys:
326  if inputRecord.get(pseudoFilterKey):
327  hasPseudoFilter = True
328  priorityRecord = inputRecord
329  priorityFlagKeys = flagKeys
330  break
331  if hasPseudoFilter:
332  break
333 
334  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
335  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
336  sn = 0.
337  else:
338  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
339  if numpy.isnan(sn) or sn < 0.:
340  sn = 0.
341  if (parent or child) and priorityRecord is None:
342  priorityRecord = inputRecord
343  priorityFlagKeys = flagKeys
344  prioritySN = sn
345  if sn > maxSN:
346  maxSNRecord = inputRecord
347  maxSNFlagKeys = flagKeys
348  maxSN = sn
349 
350  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
351  # the reference band instead. However, we only want to choose the highest S/N band if it is
352  # significantly better than the priority band. Therefore, to choose a band other than the
353  # priority, we require that the priority S/N is below the minimum threshold and that the
354  # difference between the priority and highest S/N is larger than the difference threshold.
355  #
356  # For pseudo code objects we always choose the first band in the priority list.
357  bestRecord = None
358  bestFlagKeys = None
359  if hasPseudoFilter:
360  bestRecord = priorityRecord
361  bestFlagKeys = priorityFlagKeys
362  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff
363  and maxSNRecord is not None):
364  bestRecord = maxSNRecord
365  bestFlagKeys = maxSNFlagKeys
366  elif priorityRecord is not None:
367  bestRecord = priorityRecord
368  bestFlagKeys = priorityFlagKeys
369 
370  if bestRecord is not None and bestFlagKeys is not None:
371  outputRecord = mergedCatalog.addNew()
372  outputRecord.assign(bestRecord, self.schemaMapper)
373  outputRecord.set(bestFlagKeys.output, True)
374  else: # if we didn't find any records
375  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
376  inputRecord.getId())
377 
378  # more checking for sane inputs, since zip silently iterates over the smallest sequence
379  for inputCatalog in orderedCatalogs:
380  if len(mergedCatalog) != len(inputCatalog):
381  raise ValueError("Mismatch between catalog sizes: %s != %s" %
382  (len(mergedCatalog), len(orderedCatalogs)))
383 
384  return pipeBase.Struct(
385  mergedCatalog=mergedCatalog
386  )
387 
388  def write(self, patchRef, catalog):
389  """!
390  @brief Write the output.
391 
392  @param[in] patchRef data reference for patch
393  @param[in] catalog catalog
394 
395  We write as the dataset provided by the 'outputDataset'
396  class variable.
397  """
398  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
399  # since the filter isn't actually part of the data ID for the dataset we're saving,
400  # it's confusing to see it in the log message, even if the butler simply ignores it.
401  mergeDataId = patchRef.dataId.copy()
402  del mergeDataId["filter"]
403  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
404 
405  def writeMetadata(self, dataRefList):
406  """!
407  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
408  """
409  pass
lsst::log.log.logContinued.warn
def warn(fmt, *args)
Definition: logContinued.py:205
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:201
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsConnections
Definition: mergeMeasurements.py:39
lsst.pipe.tasks.multiBandUtils.makeMergeArgumentParser
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
Definition: multiBandUtils.py:112
lsst.pipe.tasks.mergeDetections.write
def write(self, patchRef, catalog)
Write the output.
Definition: mergeDetections.py:388
lsst.pipe.tasks.multiBandUtils.getInputSchema
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
Definition: multiBandUtils.py:127
lsst.pipe.tasks.assembleCoadd.run
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
Definition: assembleCoadd.py:720
lsst.pipe.tasks.multiBandUtils.getShortFilterName
def getShortFilterName(name)
Definition: multiBandUtils.py:141
lsst.pex.config
Definition: __init__.py:1
lsst::afw::table._source.SourceCatalog
Definition: _source.py:32
lsst::afw::table::SchemaMapper
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
lsst::geom::any
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
Definition: CoordinateExpr.h:89
lsst.pipe.base.connections.PipelineTaskConnections
Definition: connections.py:260
lsst::afw::table
Definition: table.dox:3
lsst.pipe.tasks.multiBandUtils.readCatalog
def readCatalog(task, patchRef)
Read input catalog.
Definition: multiBandUtils.py:153
lsst.pipe.base.config.PipelineTaskConfig
Definition: config.py:115
lsst.pipe.base
Definition: __init__.py:1
lsst.pex.config.wrap.validate
validate
Definition: wrap.py:295
lsst.pipe.base.connectionTypes
Definition: connectionTypes.py:1
lsst.pipe.tasks.mergeDetections.writeMetadata
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
Definition: mergeDetections.py:405