LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
mergeMeasurements.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from .multiBandUtils import (MergeSourcesRunner, _makeGetSchemaCatalogs, makeMergeArgumentParser,
26  getInputSchema, readCatalog)
27 
28 
29 import lsst.afw.table as afwTable
30 import lsst.pex.config as pexConfig
31 import lsst.pipe.base as pipeBase
32 
33 from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig
34 import lsst.pipe.base.connectionTypes as cT
35 
36 
37 class MergeMeasurementsConnections(PipelineTaskConnections,
38  dimensions=("skymap", "tract", "patch"),
39  defaultTemplates={"inputCoaddName": "deep",
40  "outputCoaddName": "deep"}):
41  inputSchema = cT.InitInput(
42  doc="Schema for the output merged measurement catalog.",
43  name="{inputCoaddName}Coadd_meas_schema",
44  storageClass="SourceCatalog",
45  )
46  outputSchema = cT.InitOutput(
47  doc="Schema for the output merged measurement catalog.",
48  name="{outputCoaddName}Coadd_ref_schema",
49  storageClass="SourceCatalog",
50  )
51  catalogs = cT.Input(
52  doc="Input catalogs to merge.",
53  name="{inputCoaddName}Coadd_meas",
54  multiple=True,
55  storageClass="SourceCatalog",
56  dimensions=["band", "skymap", "tract", "patch"],
57  )
58  mergedCatalog = cT.Output(
59  doc="Output merged catalog.",
60  name="{outputCoaddName}Coadd_ref",
61  storageClass="SourceCatalog",
62  dimensions=["skymap", "tract", "patch"],
63  )
64 
65 
66 class MergeMeasurementsConfig(PipelineTaskConfig, pipelineConnections=MergeMeasurementsConnections):
67  """!
68  @anchor MergeMeasurementsConfig_
69 
70  @brief Configuration parameters for the MergeMeasurementsTask
71  """
72  pseudoFilterList = pexConfig.ListField(
73  dtype=str,
74  default=["sky"],
75  doc="Names of filters which may have no associated detection\n"
76  "(N.b. should include MergeDetectionsConfig.skyFilterName)"
77  )
78  snName = pexConfig.Field(
79  dtype=str,
80  default="base_PsfFlux",
81  doc="Name of flux measurement for calculating the S/N when choosing the reference band."
82  )
83  minSN = pexConfig.Field(
84  dtype=float,
85  default=10.,
86  doc="If the S/N from the priority band is below this value (and the S/N "
87  "is larger than minSNDiff compared to the priority band), use the band with "
88  "the largest S/N as the reference band."
89  )
90  minSNDiff = pexConfig.Field(
91  dtype=float,
92  default=3.,
93  doc="If the difference in S/N between another band and the priority band is larger "
94  "than this value (and the S/N in the priority band is less than minSN) "
95  "use the band with the largest S/N as the reference band"
96  )
97  flags = pexConfig.ListField(
98  dtype=str,
99  doc="Require that these flags, if available, are not set",
100  default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
101  "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ]
102  )
103  priorityList = pexConfig.ListField(
104  dtype=str,
105  default=[],
106  doc="Priority-ordered list of filter bands for the merge."
107  )
108  coaddName = pexConfig.Field(
109  dtype=str,
110  default="deep",
111  doc="Name of coadd"
112  )
113 
114  def validate(self):
115  super().validate()
116  if len(self.priorityList) == 0:
117  raise RuntimeError("No priority list provided")
118 
119 
120 
126 
127 
128 class MergeMeasurementsTask(pipeBase.PipelineTask, pipeBase.CmdLineTask):
129  r"""!
130  @anchor MergeMeasurementsTask_
131 
132  @brief Merge measurements from multiple bands
133 
134  @section pipe_tasks_multiBand_Contents Contents
135 
136  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
137  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
138  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
139  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
140  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
141  - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
142 
143  @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
144 
145  Command-line task that merges measurements from multiple bands.
146 
147  Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
148  bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
149  required to have centroid, shape and flux measurements in each band.
150 
151  @par Inputs:
152  deepCoadd_meas{tract,patch,filter}: SourceCatalog
153  @par Outputs:
154  deepCoadd_ref{tract,patch}: SourceCatalog
155  @par Data Unit:
156  tract, patch
157 
158  MergeMeasurementsTask subclasses @ref CmdLineTask_ "CmdLineTask".
159 
160  @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
161 
162  @copydoc \_\_init\_\_
163 
164  @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
165 
166  @copydoc run
167 
168  @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
169 
170  See @ref MergeMeasurementsConfig_
171 
172  @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
173 
174  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
175  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
176  files.
177 
178  MergeMeasurementsTask has no debug variables.
179 
180  @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
181  of using MergeMeasurementsTask
182 
183  MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
184  The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
185  coadds and individual exposures.
186  Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
187  of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
188  command line argument:
189  @code
190  mergeCoaddMeasurements.py --help
191  @endcode
192 
193  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
194  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
195  step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
196  as follows:
197  @code
198  mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
199  @endcode
200  This will merge the HSC-I & HSC-R band catalogs. The results are written in
201  `$CI_HSC_DIR/DATA/deepCoadd-results/`.
202  """
203  _DefaultName = "mergeCoaddMeasurements"
204  ConfigClass = MergeMeasurementsConfig
205  RunnerClass = MergeSourcesRunner
206  inputDataset = "meas"
207  outputDataset = "ref"
208  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
209 
210  @classmethod
211  def _makeArgumentParser(cls):
212  return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
213 
214  def getInputSchema(self, butler=None, schema=None):
215  return getInputSchema(self, butler, schema)
216 
217  def runQuantum(self, butlerQC, inputRefs, outputRefs):
218  inputs = butlerQC.get(inputRefs)
219  dataIds = (ref.dataId for ref in inputRefs.catalogs)
220  catalogDict = {dataId['band']: cat for dataId, cat in zip(dataIds, inputs['catalogs'])}
221  inputs['catalogs'] = catalogDict
222  outputs = self.run(**inputs)
223  butlerQC.put(outputs, outputRefs)
224 
225  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
226  """!
227  Initialize the task.
228 
229  @param[in] schema: the schema of the detection catalogs used as input to this one
230  @param[in] butler: a butler used to read the input schema from disk, if schema is None
231 
232  The task will set its own self.schema attribute to the schema of the output merged catalog.
233  """
234  super().__init__(**kwargs)
235 
236  if initInputs is not None:
237  inputSchema = initInputs['inputSchema'].schema
238  else:
239  inputSchema = self.getInputSchema(butler=butler, schema=schema)
240  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
241  self.schemaMapper.addMinimalSchema(inputSchema, True)
242  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
243  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
244  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
245 
246  self.flagKeys = {}
247  for band in self.config.priorityList:
248  outputKey = self.schemaMapper.editOutputSchema().addField(
249  "merge_measurement_%s" % band,
250  type="Flag",
251  doc="Flag field set if the measurements here are from the %s filter" % band
252  )
253  peakKey = inputSchema.find("merge_peak_%s" % band).key
254  footprintKey = inputSchema.find("merge_footprint_%s" % band).key
255  self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
256  self.schema = self.schemaMapper.getOutputSchema()
257 
258  self.pseudoFilterKeys = []
259  for filt in self.config.pseudoFilterList:
260  try:
261  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
262  except Exception as e:
263  self.log.warning("merge_peak is not set for pseudo-filter %s: %s", filt, e)
264 
265  self.badFlags = {}
266  for flag in self.config.flags:
267  try:
268  self.badFlags[flag] = self.schema.find(flag).getKey()
269  except KeyError as exc:
270  self.log.warning("Can't find flag %s in schema: %s", flag, exc)
271  self.outputSchema = afwTable.SourceCatalog(self.schema)
272 
273  def runDataRef(self, patchRefList):
274  """!
275  @brief Merge coadd sources from multiple bands. Calls @ref `run`.
276  @param[in] patchRefList list of data references for each filter
277  """
278  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
279  mergedCatalog = self.run(catalogs).mergedCatalog
280  self.write(patchRefList[0], mergedCatalog)
281 
282  def run(self, catalogs):
283  """!
284  Merge measurement catalogs to create a single reference catalog for forced photometry
285 
286  @param[in] catalogs: the catalogs to be merged
287 
288  For parent sources, we choose the first band in config.priorityList for which the
289  merge_footprint flag for that band is is True.
290 
291  For child sources, the logic is the same, except that we use the merge_peak flags.
292  """
293  # Put catalogs, filters in priority order
294  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
295  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
296 
297  mergedCatalog = afwTable.SourceCatalog(self.schema)
298  mergedCatalog.reserve(len(orderedCatalogs[0]))
299 
300  idKey = orderedCatalogs[0].table.getIdKey()
301  for catalog in orderedCatalogs[1:]:
302  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
304 
305  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
306  # record for each band, in priority order.
307  for orderedRecords in zip(*orderedCatalogs):
308 
309  maxSNRecord = None
310  maxSNFlagKeys = None
311  maxSN = 0.
312  priorityRecord = None
313  priorityFlagKeys = None
314  prioritySN = 0.
315  hasPseudoFilter = False
316 
317  # Now we iterate over those record-band pairs, keeping track of the priority and the
318  # largest S/N band.
319  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
320  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
321  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
322 
323  if not (parent or child):
324  for pseudoFilterKey in self.pseudoFilterKeys:
325  if inputRecord.get(pseudoFilterKey):
326  hasPseudoFilter = True
327  priorityRecord = inputRecord
328  priorityFlagKeys = flagKeys
329  break
330  if hasPseudoFilter:
331  break
332 
333  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
334  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
335  sn = 0.
336  else:
337  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
338  if numpy.isnan(sn) or sn < 0.:
339  sn = 0.
340  if (parent or child) and priorityRecord is None:
341  priorityRecord = inputRecord
342  priorityFlagKeys = flagKeys
343  prioritySN = sn
344  if sn > maxSN:
345  maxSNRecord = inputRecord
346  maxSNFlagKeys = flagKeys
347  maxSN = sn
348 
349  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
350  # the reference band instead. However, we only want to choose the highest S/N band if it is
351  # significantly better than the priority band. Therefore, to choose a band other than the
352  # priority, we require that the priority S/N is below the minimum threshold and that the
353  # difference between the priority and highest S/N is larger than the difference threshold.
354  #
355  # For pseudo code objects we always choose the first band in the priority list.
356  bestRecord = None
357  bestFlagKeys = None
358  if hasPseudoFilter:
359  bestRecord = priorityRecord
360  bestFlagKeys = priorityFlagKeys
361  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff
362  and maxSNRecord is not None):
363  bestRecord = maxSNRecord
364  bestFlagKeys = maxSNFlagKeys
365  elif priorityRecord is not None:
366  bestRecord = priorityRecord
367  bestFlagKeys = priorityFlagKeys
368 
369  if bestRecord is not None and bestFlagKeys is not None:
370  outputRecord = mergedCatalog.addNew()
371  outputRecord.assign(bestRecord, self.schemaMapper)
372  outputRecord.set(bestFlagKeys.output, True)
373  else: # if we didn't find any records
374  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
375  inputRecord.getId())
376 
377  # more checking for sane inputs, since zip silently iterates over the smallest sequence
378  for inputCatalog in orderedCatalogs:
379  if len(mergedCatalog) != len(inputCatalog):
380  raise ValueError("Mismatch between catalog sizes: %s != %s" %
381  (len(mergedCatalog), len(orderedCatalogs)))
382 
383  return pipeBase.Struct(
384  mergedCatalog=mergedCatalog
385  )
386 
387  def write(self, patchRef, catalog):
388  """!
389  @brief Write the output.
390 
391  @param[in] patchRef data reference for patch
392  @param[in] catalog catalog
393 
394  We write as the dataset provided by the 'outputDataset'
395  class variable.
396  """
397  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
398  # since the filter isn't actually part of the data ID for the dataset we're saving,
399  # it's confusing to see it in the log message, even if the butler simply ignores it.
400  mergeDataId = patchRef.dataId.copy()
401  del mergeDataId["filter"]
402  self.log.info("Wrote merged catalog: %s", mergeDataId)
403 
404  def writeMetadata(self, dataRefList):
405  """!
406  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
407  """
408  pass
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
void write(OutputArchiveHandle &handle) const override
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.