LSSTApplications  18.0.0+106,18.0.0+50,19.0.0,19.0.0+1,19.0.0+10,19.0.0+11,19.0.0+13,19.0.0+17,19.0.0+2,19.0.0-1-g20d9b18+6,19.0.0-1-g425ff20,19.0.0-1-g5549ca4,19.0.0-1-g580fafe+6,19.0.0-1-g6fe20d0+1,19.0.0-1-g7011481+9,19.0.0-1-g8c57eb9+6,19.0.0-1-gb5175dc+11,19.0.0-1-gdc0e4a7+9,19.0.0-1-ge272bc4+6,19.0.0-1-ge3aa853,19.0.0-10-g448f008b,19.0.0-12-g6990b2c,19.0.0-2-g0d9f9cd+11,19.0.0-2-g3d9e4fb2+11,19.0.0-2-g5037de4,19.0.0-2-gb96a1c4+3,19.0.0-2-gd955cfd+15,19.0.0-3-g2d13df8,19.0.0-3-g6f3c7dc,19.0.0-4-g725f80e+11,19.0.0-4-ga671dab3b+1,19.0.0-4-gad373c5+3,19.0.0-5-ga2acb9c+2,19.0.0-5-gfe96e6c+2,w.2020.01
LSSTDataManagementBasePackage
mergeMeasurements.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from .multiBandUtils import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26  getInputSchema, getShortFilterName, readCatalog)
27 
28 
29 import lsst.afw.table as afwTable
30 import lsst.pex.config as pexConfig
31 import lsst.pipe.base as pipeBase
32 
33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
35 
36 
38  dimensions=("skymap", "tract", "patch"),
39  defaultTemplates={"inputCoaddName": "deep",
40  "outputCoaddName": "deep"}):
41  inputSchema = cT.InitInput(
42  doc="Schema for the output merged measurement catalog.",
43  name="{inputCoaddName}Coadd_meas_schema",
44  storageClass="SourceCatalog",
45  )
46  outputSchema = cT.InitOutput(
47  doc="Schema for the output merged measurement catalog.",
48  name="{outputCoaddName}Coadd_ref_schema",
49  storageClass="SourceCatalog",
50  )
51  catalogs = cT.Input(
52  doc="Input catalogs to merge.",
53  name="{inputCoaddName}Coadd_meas",
54  multiple=True,
55  storageClass="SourceCatalog",
56  dimensions=["abstract_filter", "skymap", "tract", "patch"],
57  )
58  mergedCatalog = cT.Output(
59  doc="Output merged catalog.",
60  name="{outputCoaddName}Coadd_ref",
61  storageClass="SourceCatalog",
62  dimensions=["skymap", "tract", "patch"],
63  )
64 
65 
66 class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
67  """!
68  @anchor MergeMeasurementsConfig_
69 
70  @brief Configuration parameters for the MergeMeasurementsTask
71  """
72  pseudoFilterList = pexConfig.ListField(
73  dtype=str,
74  default=["sky"],
75  doc="Names of filters which may have no associated detection\n"
76  "(N.b. should include MergeDetectionsConfig.skyFilterName)"
77  )
78  snName = pexConfig.Field(
79  dtype=str,
80  default="base_PsfFlux",
81  doc="Name of flux measurement for calculating the S/N when choosing the reference band."
82  )
83  minSN = pexConfig.Field(
84  dtype=float,
85  default=10.,
86  doc="If the S/N from the priority band is below this value (and the S/N "
87  "is larger than minSNDiff compared to the priority band), use the band with "
88  "the largest S/N as the reference band."
89  )
90  minSNDiff = pexConfig.Field(
91  dtype=float,
92  default=3.,
93  doc="If the difference in S/N between another band and the priority band is larger "
94  "than this value (and the S/N in the priority band is less than minSN) "
95  "use the band with the largest S/N as the reference band"
96  )
97  flags = pexConfig.ListField(
98  dtype=str,
99  doc="Require that these flags, if available, are not set",
100  default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
101  "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
102  )
103  priorityList = pexConfig.ListField(
104  dtype=str,
105  default=[],
106  doc="Priority-ordered list of bands for the merge."
107  )
108  coaddName = pexConfig.Field(
109  dtype=str,
110  default="deep",
111  doc="Name of coadd"
112  )
113 
114  def validate(self):
115  super().validate()
116  if len(self.priorityList) == 0:
117  raise RuntimeError("No priority list provided")
118 
119 
120 
126 
127 
128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
129  r"""!
130  @anchor MergeMeasurementsTask_
131 
132  @brief Merge measurements from multiple bands
133 
134  @section pipe_tasks_multiBand_Contents Contents
135 
136  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
137  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
138  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
139  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
140  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
141  - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
142 
143  @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
144 
145  Command-line task that merges measurements from multiple bands.
146 
147  Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
148  bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
149  required to have centroid, shape and flux measurements in each band.
150 
151  @par Inputs:
152  deepCoadd_meas{tract,patch,filter}: SourceCatalog
153  @par Outputs:
154  deepCoadd_ref{tract,patch}: SourceCatalog
155  @par Data Unit:
156  tract, patch
157 
158  MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask".
159 
160  @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
161 
162  @copydoc \_\_init\_\_
163 
164  @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
165 
166  @copydoc run
167 
168  @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
169 
170  See @ref MergeMeasurementsConfig_
171 
172  @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
173 
174  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
175  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
176  files.
177 
178  MergeMeasurementsTask has no debug variables.
179 
180  @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
181  of using MergeMeasurementsTask
182 
183  MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
184  The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
185  coadds and individual exposures.
186  Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
187  of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
188  command line argument:
189  @code
190  mergeCoaddMeasurements.py --help
191  @endcode
192 
193  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
194  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
195  step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
196  as follows:
197  @code
198  mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
199  @endcode
200  This will merge the HSC-I & HSC-R band catalogs. The results are written in
201  `$CI_HSC_DIR/DATA/deepCoadd-results/`.
202  """
203  _DefaultName = "mergeCoaddMeasurements"
204  ConfigClass = MergeMeasurementsConfig
205  RunnerClass = MergeSourcesRunner
206  inputDataset = "meas"
207  outputDataset = "ref"
208  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
209 
210  @classmethod
211  def _makeArgumentParser(cls):
212  return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
213 
214  def getInputSchema(self, butler=None, schema=None):
215  return getInputSchema(self, butler, schema)
216 
217  def runQuantum(self, butlerQC, inputRefs, outputRefs):
218  inputs = butlerQC.get(inputRefs)
219  dataIds = (ref.dataId for ref in inputRefs.catalogs)
220  catalogDict = {dataId['abstract_filter']: cat for dataId, cat in zip(dataIds,
221  inputs['catalogs'])}
222  inputs['catalogs'] = catalogDict
223  outputs = self.run(**inputs)
224  butlerQC.put(outputs, outputRefs)
225 
226  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
227  """!
228  Initialize the task.
229 
230  @param[in] schema: the schema of the detection catalogs used as input to this one
231  @param[in] butler: a butler used to read the input schema from disk, if schema is None
232 
233  The task will set its own self.schema attribute to the schema of the output merged catalog.
234  """
235  super().__init__(**kwargs)
236 
237  if initInputs is not None:
238  inputSchema = initInputs['inputSchema'].schema
239  else:
240  inputSchema = self.getInputSchema(butler=butler, schema=schema)
241  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
242  self.schemaMapper.addMinimalSchema(inputSchema, True)
243  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
244  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
245  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
246 
247  self.flagKeys = {}
248  for band in self.config.priorityList:
249  short = getShortFilterName(band)
250  outputKey = self.schemaMapper.editOutputSchema().addField(
251  "merge_measurement_%s" % short,
252  type="Flag",
253  doc="Flag field set if the measurements here are from the %s filter" % band
254  )
255  peakKey = inputSchema.find("merge_peak_%s" % short).key
256  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
257  self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
258  self.schema = self.schemaMapper.getOutputSchema()
259 
260  self.pseudoFilterKeys = []
261  for filt in self.config.pseudoFilterList:
262  try:
263  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
264  except Exception as e:
265  self.log.warn("merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
266 
267  self.badFlags = {}
268  for flag in self.config.flags:
269  try:
270  self.badFlags[flag] = self.schema.find(flag).getKey()
271  except KeyError as exc:
272  self.log.warn("Can't find flag %s in schema: %s" % (flag, exc,))
273  self.outputSchema = afwTable.SourceCatalog(self.schema)
274 
275  def runDataRef(self, patchRefList):
276  """!
277  @brief Merge coadd sources from multiple bands. Calls @ref `run`.
278  @param[in] patchRefList list of data references for each filter
279  """
280  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
281  mergedCatalog = self.run(catalogs).mergedCatalog
282  self.write(patchRefList[0], mergedCatalog)
283 
284  def run(self, catalogs):
285  """!
286  Merge measurement catalogs to create a single reference catalog for forced photometry
287 
288  @param[in] catalogs: the catalogs to be merged
289 
290  For parent sources, we choose the first band in config.priorityList for which the
291  merge_footprint flag for that band is is True.
292 
293  For child sources, the logic is the same, except that we use the merge_peak flags.
294  """
295  # Put catalogs, filters in priority order
296  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
297  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
298 
299  mergedCatalog = afwTable.SourceCatalog(self.schema)
300  mergedCatalog.reserve(len(orderedCatalogs[0]))
301 
302  idKey = orderedCatalogs[0].table.getIdKey()
303  for catalog in orderedCatalogs[1:]:
304  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
305  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
306 
307  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
308  # record for each band, in priority order.
309  for orderedRecords in zip(*orderedCatalogs):
310 
311  maxSNRecord = None
312  maxSNFlagKeys = None
313  maxSN = 0.
314  priorityRecord = None
315  priorityFlagKeys = None
316  prioritySN = 0.
317  hasPseudoFilter = False
318 
319  # Now we iterate over those record-band pairs, keeping track of the priority and the
320  # largest S/N band.
321  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
322  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
323  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
324 
325  if not (parent or child):
326  for pseudoFilterKey in self.pseudoFilterKeys:
327  if inputRecord.get(pseudoFilterKey):
328  hasPseudoFilter = True
329  priorityRecord = inputRecord
330  priorityFlagKeys = flagKeys
331  break
332  if hasPseudoFilter:
333  break
334 
335  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
336  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
337  sn = 0.
338  else:
339  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
340  if numpy.isnan(sn) or sn < 0.:
341  sn = 0.
342  if (parent or child) and priorityRecord is None:
343  priorityRecord = inputRecord
344  priorityFlagKeys = flagKeys
345  prioritySN = sn
346  if sn > maxSN:
347  maxSNRecord = inputRecord
348  maxSNFlagKeys = flagKeys
349  maxSN = sn
350 
351  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
352  # the reference band instead. However, we only want to choose the highest S/N band if it is
353  # significantly better than the priority band. Therefore, to choose a band other than the
354  # priority, we require that the priority S/N is below the minimum threshold and that the
355  # difference between the priority and highest S/N is larger than the difference threshold.
356  #
357  # For pseudo code objects we always choose the first band in the priority list.
358  bestRecord = None
359  bestFlagKeys = None
360  if hasPseudoFilter:
361  bestRecord = priorityRecord
362  bestFlagKeys = priorityFlagKeys
363  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff and
364  maxSNRecord is not None):
365  bestRecord = maxSNRecord
366  bestFlagKeys = maxSNFlagKeys
367  elif priorityRecord is not None:
368  bestRecord = priorityRecord
369  bestFlagKeys = priorityFlagKeys
370 
371  if bestRecord is not None and bestFlagKeys is not None:
372  outputRecord = mergedCatalog.addNew()
373  outputRecord.assign(bestRecord, self.schemaMapper)
374  outputRecord.set(bestFlagKeys.output, True)
375  else: # if we didn't find any records
376  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
377  inputRecord.getId())
378 
379  # more checking for sane inputs, since zip silently iterates over the smallest sequence
380  for inputCatalog in orderedCatalogs:
381  if len(mergedCatalog) != len(inputCatalog):
382  raise ValueError("Mismatch between catalog sizes: %s != %s" %
383  (len(mergedCatalog), len(orderedCatalogs)))
384 
385  return pipeBase.Struct(
386  mergedCatalog=mergedCatalog
387  )
388 
389  def write(self, patchRef, catalog):
390  """!
391  @brief Write the output.
392 
393  @param[in] patchRef data reference for patch
394  @param[in] catalog catalog
395 
396  We write as the dataset provided by the 'outputDataset'
397  class variable.
398  """
399  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
400  # since the filter isn't actually part of the data ID for the dataset we're saving,
401  # it's confusing to see it in the log message, even if the butler simply ignores it.
402  mergeDataId = patchRef.dataId.copy()
403  del mergeDataId["filter"]
404  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
405 
406  def writeMetadata(self, dataRefList):
407  """!
408  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
409  """
410  pass
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def readCatalog(task, patchRef)
Read input catalog.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.