LSSTApplications  18.1.0
LSSTDataManagementBasePackage
mergeMeasurements.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from .multiBandUtils import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26  getInputSchema, getShortFilterName, readCatalog)
27 
28 import lsst.afw.table as afwTable
29 import lsst.pex.config as pexConfig
30 import lsst.pipe.base as pipeBase
31 
32 
33 class MergeMeasurementsConfig(pipeBase.PipelineTaskConfig):
34  """!
35  @anchor MergeMeasurementsConfig_
36 
37  @brief Configuration parameters for the MergeMeasurementsTask
38  """
39  # Gen 3 options
40  inputSchema = pipeBase.InitInputDatasetField(
41  doc="Schema for the input measurement catalogs.",
42  nameTemplate="{inputCoaddName}Coadd_meas_schema",
43  storageClass="SourceCatalog",
44  )
45  outputSchema = pipeBase.InitOutputDatasetField(
46  doc="Schema for the output merged measurement catalog.",
47  nameTemplate="{outputCoaddName}Coadd_ref_schema",
48  storageClass="SourceCatalog",
49  )
50  catalogs = pipeBase.InputDatasetField(
51  doc="Input catalogs to merge.",
52  nameTemplate="{inputCoaddName}Coadd_meas",
53  scalar=False,
54  storageClass="SourceCatalog",
55  dimensions=["abstract_filter", "skymap", "tract", "patch"],
56  )
57  mergedCatalog = pipeBase.OutputDatasetField(
58  doc="Output merged catalog.",
59  nameTemplate="{outputCoaddName}Coadd_ref",
60  scalar=True,
61  storageClass="SourceCatalog",
62  dimensions=["skymap", "tract", "patch"],
63  )
64  # Task configuration options
65  pseudoFilterList = pexConfig.ListField(
66  dtype=str,
67  default=["sky"],
68  doc="Names of filters which may have no associated detection\n"
69  "(N.b. should include MergeDetectionsConfig.skyFilterName)"
70  )
71  snName = pexConfig.Field(
72  dtype=str,
73  default="base_PsfFlux",
74  doc="Name of flux measurement for calculating the S/N when choosing the reference band."
75  )
76  minSN = pexConfig.Field(
77  dtype=float,
78  default=10.,
79  doc="If the S/N from the priority band is below this value (and the S/N "
80  "is larger than minSNDiff compared to the priority band), use the band with "
81  "the largest S/N as the reference band."
82  )
83  minSNDiff = pexConfig.Field(
84  dtype=float,
85  default=3.,
86  doc="If the difference in S/N between another band and the priority band is larger "
87  "than this value (and the S/N in the priority band is less than minSN) "
88  "use the band with the largest S/N as the reference band"
89  )
90  flags = pexConfig.ListField(
91  dtype=str,
92  doc="Require that these flags, if available, are not set",
93  default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
94  "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
95  )
96  priorityList = pexConfig.ListField(
97  dtype=str,
98  default=[],
99  doc="Priority-ordered list of bands for the merge."
100  )
101  coaddName = pexConfig.Field(
102  dtype=str,
103  default="deep",
104  doc="Name of coadd"
105  )
106 
107  def validate(self):
108  super().validate()
109  if len(self.priorityList) == 0:
110  raise RuntimeError("No priority list provided")
111 
112  def setDefaults(self):
113  super().setDefaults()
114  self.formatTemplateNames({"inputCoaddName": "deep",
115  "outputCoaddName": "deep"})
116  self.quantum.dimensions = ("skymap", "tract", "patch")
117 
118 
124 
125 
126 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
127  r"""!
128  @anchor MergeMeasurementsTask_
129 
130  @brief Merge measurements from multiple bands
131 
132  @section pipe_tasks_multiBand_Contents Contents
133 
134  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
135  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
136  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
137  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
138  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
139  - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
140 
141  @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
142 
143  Command-line task that merges measurements from multiple bands.
144 
145  Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
146  bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
147  required to have centroid, shape and flux measurements in each band.
148 
149  @par Inputs:
150  deepCoadd_meas{tract,patch,filter}: SourceCatalog
151  @par Outputs:
152  deepCoadd_ref{tract,patch}: SourceCatalog
153  @par Data Unit:
154  tract, patch
155 
156  MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask".
157 
158  @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
159 
160  @copydoc \_\_init\_\_
161 
162  @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
163 
164  @copydoc run
165 
166  @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
167 
168  See @ref MergeMeasurementsConfig_
169 
170  @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
171 
172  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
173  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
174  files.
175 
176  MergeMeasurementsTask has no debug variables.
177 
178  @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
179  of using MergeMeasurementsTask
180 
181  MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
182  The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
183  coadds and individual exposures.
184  Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
185  of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
186  command line argument:
187  @code
188  mergeCoaddMeasurements.py --help
189  @endcode
190 
191  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
192  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
193  step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
194  as follows:
195  @code
196  mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
197  @endcode
198  This will merge the HSC-I & HSC-R band catalogs. The results are written in
199  `$CI_HSC_DIR/DATA/deepCoadd-results/`.
200  """
201  _DefaultName = "mergeCoaddMeasurements"
202  ConfigClass = MergeMeasurementsConfig
203  RunnerClass = MergeSourcesRunner
204  inputDataset = "meas"
205  outputDataset = "ref"
206  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
207 
208  @classmethod
209  def _makeArgumentParser(cls):
211 
212  def getInputSchema(self, butler=None, schema=None):
213  return getInputSchema(self, butler, schema)
214 
216  return {"outputSchema": afwTable.SourceCatalog(self.schema), }
217 
218  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
219  catalogDict = {dataId['abstract_filter']: cat for dataId, cat in zip(inputDataIds['catalogs'],
220  inputData['catalogs'])}
221  inputData['catalogs'] = catalogDict
222 
223  return super().adaptArgsAndRun(inputData, inputDataIds, outputDataIds, butler)
224 
225  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
226  """!
227  Initialize the task.
228 
229  @param[in] schema: the schema of the detection catalogs used as input to this one
230  @param[in] butler: a butler used to read the input schema from disk, if schema is None
231 
232  The task will set its own self.schema attribute to the schema of the output merged catalog.
233  """
234  super().__init__(**kwargs)
235 
236  if initInputs is not None:
237  inputSchema = initInputs['inputSchema'].schema
238  else:
239  inputSchema = self.getInputSchema(butler=butler, schema=schema)
240  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
241  self.schemaMapper.addMinimalSchema(inputSchema, True)
242  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
243  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
244  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
245 
246  self.flagKeys = {}
247  for band in self.config.priorityList:
248  short = getShortFilterName(band)
249  outputKey = self.schemaMapper.editOutputSchema().addField(
250  "merge_measurement_%s" % short,
251  type="Flag",
252  doc="Flag field set if the measurements here are from the %s filter" % band
253  )
254  peakKey = inputSchema.find("merge_peak_%s" % short).key
255  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
256  self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
257  self.schema = self.schemaMapper.getOutputSchema()
258 
260  for filt in self.config.pseudoFilterList:
261  try:
262  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
263  except Exception as e:
264  self.log.warn("merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
265 
266  self.badFlags = {}
267  for flag in self.config.flags:
268  try:
269  self.badFlags[flag] = self.schema.find(flag).getKey()
270  except KeyError as exc:
271  self.log.warn("Can't find flag %s in schema: %s" % (flag, exc,))
272 
273  def runDataRef(self, patchRefList):
274  """!
275  @brief Merge coadd sources from multiple bands. Calls @ref `run`.
276  @param[in] patchRefList list of data references for each filter
277  """
278  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
279  mergedCatalog = self.run(catalogs).mergedCatalog
280  self.write(patchRefList[0], mergedCatalog)
281 
282  def run(self, catalogs):
283  """!
284  Merge measurement catalogs to create a single reference catalog for forced photometry
285 
286  @param[in] catalogs: the catalogs to be merged
287 
288  For parent sources, we choose the first band in config.priorityList for which the
289  merge_footprint flag for that band is is True.
290 
291  For child sources, the logic is the same, except that we use the merge_peak flags.
292  """
293  # Put catalogs, filters in priority order
294  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
295  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
296 
297  mergedCatalog = afwTable.SourceCatalog(self.schema)
298  mergedCatalog.reserve(len(orderedCatalogs[0]))
299 
300  idKey = orderedCatalogs[0].table.getIdKey()
301  for catalog in orderedCatalogs[1:]:
302  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
304 
305  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
306  # record for each band, in priority order.
307  for orderedRecords in zip(*orderedCatalogs):
308 
309  maxSNRecord = None
310  maxSNFlagKeys = None
311  maxSN = 0.
312  priorityRecord = None
313  priorityFlagKeys = None
314  prioritySN = 0.
315  hasPseudoFilter = False
316 
317  # Now we iterate over those record-band pairs, keeping track of the priority and the
318  # largest S/N band.
319  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
320  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
321  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
322 
323  if not (parent or child):
324  for pseudoFilterKey in self.pseudoFilterKeys:
325  if inputRecord.get(pseudoFilterKey):
326  hasPseudoFilter = True
327  priorityRecord = inputRecord
328  priorityFlagKeys = flagKeys
329  break
330  if hasPseudoFilter:
331  break
332 
333  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
334  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
335  sn = 0.
336  else:
337  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
338  if numpy.isnan(sn) or sn < 0.:
339  sn = 0.
340  if (parent or child) and priorityRecord is None:
341  priorityRecord = inputRecord
342  priorityFlagKeys = flagKeys
343  prioritySN = sn
344  if sn > maxSN:
345  maxSNRecord = inputRecord
346  maxSNFlagKeys = flagKeys
347  maxSN = sn
348 
349  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
350  # the reference band instead. However, we only want to choose the highest S/N band if it is
351  # significantly better than the priority band. Therefore, to choose a band other than the
352  # priority, we require that the priority S/N is below the minimum threshold and that the
353  # difference between the priority and highest S/N is larger than the difference threshold.
354  #
355  # For pseudo code objects we always choose the first band in the priority list.
356  bestRecord = None
357  bestFlagKeys = None
358  if hasPseudoFilter:
359  bestRecord = priorityRecord
360  bestFlagKeys = priorityFlagKeys
361  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff and
362  maxSNRecord is not None):
363  bestRecord = maxSNRecord
364  bestFlagKeys = maxSNFlagKeys
365  elif priorityRecord is not None:
366  bestRecord = priorityRecord
367  bestFlagKeys = priorityFlagKeys
368 
369  if bestRecord is not None and bestFlagKeys is not None:
370  outputRecord = mergedCatalog.addNew()
371  outputRecord.assign(bestRecord, self.schemaMapper)
372  outputRecord.set(bestFlagKeys.output, True)
373  else: # if we didn't find any records
374  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
375  inputRecord.getId())
376 
377  # more checking for sane inputs, since zip silently iterates over the smallest sequence
378  for inputCatalog in orderedCatalogs:
379  if len(mergedCatalog) != len(inputCatalog):
380  raise ValueError("Mismatch between catalog sizes: %s != %s" %
381  (len(mergedCatalog), len(orderedCatalogs)))
382 
383  return pipeBase.Struct(
384  mergedCatalog=mergedCatalog
385  )
386 
387  def write(self, patchRef, catalog):
388  """!
389  @brief Write the output.
390 
391  @param[in] patchRef data reference for patch
392  @param[in] catalog catalog
393 
394  We write as the dataset provided by the 'outputDataset'
395  class variable.
396  """
397  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
398  # since the filter isn't actually part of the data ID for the dataset we're saving,
399  # it's confusing to see it in the log message, even if the butler simply ignores it.
400  mergeDataId = patchRef.dataId.copy()
401  del mergeDataId["filter"]
402  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
403 
404  def writeMetadata(self, dataRefList):
405  """!
406  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
407  """
408  pass
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
def run(self, catalogs)
Merge measurement catalogs to create a single reference catalog for forced photometry.
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
Configuration parameters for the MergeMeasurementsTask.
def write(self, patchRef, catalog)
Write the output.
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def __init__(self, butler=None, schema=None, initInputs=None, kwargs)
Initialize the task.
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.