LSSTApplications  16.0-10-g1758552+4,16.0-10-g4f78f78+4,16.0-10-gc1446dd+30,16.0-11-g39ac3c7+2,16.0-13-g066a532+3,16.0-14-g6c7ed55+4,16.0-14-gd373004+3,16.0-15-g072d20e+4,16.0-15-gb461e1a+2,16.0-16-g48c959a+3,16.0-16-g89065d4+2,16.0-16-gd8e3590+4,16.0-19-gb830ed4e+4,16.0-2-g0febb12+20,16.0-2-g9d5294e+53,16.0-2-ga8830df+3,16.0-20-g17d57d5+2,16.0-22-gf7a7fdf+3,16.0-27-g78173a71+3,16.0-3-g324faa9+3,16.0-3-gcfd6c53+51,16.0-3-ge00e371+9,16.0-4-g03cf288+42,16.0-4-g5f3a788+19,16.0-4-ga3eb747+9,16.0-4-gabf74b7+4,16.0-4-gb13d127+3,16.0-5-g6a53317+9,16.0-5-gb3f8a4b+62,16.0-5-gef99c9f+4,16.0-57-g90e7ba260+2,16.0-6-g0838257+3,16.0-6-g9321be7+3,16.0-6-gcbc7b31+3,16.0-6-gf49912c+4,16.0-7-gd2eeba5+12,16.0-8-g21fd5fe+4,16.0-8-g3a9f023+4,16.0-9-g85d1a16+4,master-g7b902255af+4,w.2018.43
LSSTDataManagementBasePackage
multiBand.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from lsst.coadd.utils.coaddDataIdContainer import ExistingCoaddDataIdContainer
26 from lsst.pipe.base import (CmdLineTask, Struct, TaskRunner, ArgumentParser, ButlerInitializedTaskRunner,
27  PipelineTask, InitOutputDatasetField, InputDatasetField, OutputDatasetField,
28  QuantumConfig)
29 from lsst.pex.config import Config, Field, ListField, ConfigurableField, RangeField, ConfigField
30 from lsst.meas.algorithms import DynamicDetectionTask, SkyObjectsTask
31 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
32 from lsst.meas.deblender import SourceDeblendTask, MultibandDeblendTask
33 from lsst.pipe.tasks.coaddBase import getSkyInfo
34 from lsst.pipe.tasks.scaleVariance import ScaleVarianceTask
35 from lsst.meas.astrom import DirectMatchTask, denormalizeMatches
36 from lsst.pipe.tasks.fakes import BaseFakeSourcesTask
37 from lsst.pipe.tasks.setPrimaryFlags import SetPrimaryFlagsTask
38 from lsst.pipe.tasks.propagateVisitFlags import PropagateVisitFlagsTask
39 import lsst.afw.image as afwImage
40 import lsst.afw.table as afwTable
41 import lsst.afw.math as afwMath
42 import lsst.afw.detection as afwDetect
43 from lsst.daf.base import PropertyList
44 
45 """
46 New dataset types:
47 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
48 * deepCoadd_mergeDet: merged detections (tract, patch)
49 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
50 * deepCoadd_ref: reference sources (tract, patch)
51 All of these have associated *_schema catalogs that require no data ID and hold no records.
52 
53 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
54 the mergeDet, meas, and ref dataset Footprints:
55 * deepCoadd_peak_schema
56 """
57 
58 
59 def _makeGetSchemaCatalogs(datasetSuffix):
60  """Construct a getSchemaCatalogs instance method
61 
62  These are identical for most of the classes here, so we'll consolidate
63  the code.
64 
65  datasetSuffix: Suffix of dataset name, e.g., "src" for "deepCoadd_src"
66  """
67 
68  def getSchemaCatalogs(self):
69  """Return a dict of empty catalogs for each catalog dataset produced by this task."""
70  src = afwTable.SourceCatalog(self.schema)
71  if hasattr(self, "algMetadata"):
72  src.getTable().setMetadata(self.algMetadata)
73  return {self.config.coaddName + "Coadd_" + datasetSuffix: src}
74  return getSchemaCatalogs
75 
76 
77 def _makeMakeIdFactory(datasetName):
78  """Construct a makeIdFactory instance method
79 
80  These are identical for all the classes here, so this consolidates
81  the code.
82 
83  datasetName: Dataset name without the coadd name prefix, e.g., "CoaddId" for "deepCoaddId"
84  """
85 
86  def makeIdFactory(self, dataRef):
87  """Return an IdFactory for setting the detection identifiers
88 
89  The actual parameters used in the IdFactory are provided by
90  the butler (through the provided data reference.
91  """
92  expBits = dataRef.get(self.config.coaddName + datasetName + "_bits")
93  expId = int(dataRef.get(self.config.coaddName + datasetName))
94  return afwTable.IdFactory.makeSource(expId, 64 - expBits)
95  return makeIdFactory
96 
97 
99  """Given a longer, camera-specific filter name (e.g. "HSC-I") return its shorthand name ("i").
100  """
101  # I'm not sure if this is the way this is supposed to be implemented, but it seems to work,
102  # and its the only way I could get it to work.
103  return afwImage.Filter(name).getFilterProperty().getName()
104 
105 
106 
107 
109  """!
110  @anchor DetectCoaddSourcesConfig_
111 
112  @brief Configuration parameters for the DetectCoaddSourcesTask
113  """
114  doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?")
115  scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc="Variance rescaling")
116  detection = ConfigurableField(target=DynamicDetectionTask, doc="Source detection")
117  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
118  doInsertFakes = Field(dtype=bool, default=False,
119  doc="Run fake sources injection task")
120  insertFakes = ConfigurableField(target=BaseFakeSourcesTask,
121  doc="Injection of fake sources for testing "
122  "purposes (must be retargeted)")
123  detectionSchema = InitOutputDatasetField(
124  doc="Schema of the detection catalog",
125  name="{}Coadd_det_schema",
126  storageClass="SourceCatalog",
127  )
128  exposure = InputDatasetField(
129  doc="Exposure on which detections are to be performed",
130  name="deepCoadd",
131  scalar=True,
132  storageClass="Exposure",
133  units=("Tract", "Patch", "AbstractFilter", "SkyMap")
134  )
135  outputBackgrounds = OutputDatasetField(
136  doc="Output Backgrounds used in detection",
137  name="{}Coadd_calexp_background",
138  scalar=True,
139  storageClass="Background",
140  units=("Tract", "Patch", "AbstractFilter", "SkyMap")
141  )
142  outputSources = OutputDatasetField(
143  doc="Detected sources catalog",
144  name="{}Coadd_det",
145  scalar=True,
146  storageClass="SourceCatalog",
147  units=("Tract", "Patch", "AbstractFilter", "SkyMap")
148  )
149  outputExposure = OutputDatasetField(
150  doc="Exposure post detection",
151  name="{}Coadd_calexp",
152  scalar=True,
153  storageClass="Exposure",
154  units=("Tract", "Patch", "AbstractFilter", "SkyMap")
155  )
156  quantum = QuantumConfig(
157  units=("Tract", "Patch", "AbstractFilter", "SkyMap")
158  )
159 
160  def setDefaults(self):
161  Config.setDefaults(self)
162  self.detection.thresholdType = "pixel_stdev"
163  self.detection.isotropicGrow = True
164  # Coadds are made from background-subtracted CCDs, so any background subtraction should be very basic
165  self.detection.reEstimateBackground = False
166  self.detection.background.useApprox = False
167  self.detection.background.binSize = 4096
168  self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER'
169  self.detection.doTempWideBackground = True # Suppress large footprints that overwhelm the deblender
170 
171 
177 
178 
179 class DetectCoaddSourcesTask(PipelineTask, CmdLineTask):
180  r"""!
181  @anchor DetectCoaddSourcesTask_
182 
183  @brief Detect sources on a coadd
184 
185  @section pipe_tasks_multiBand_Contents Contents
186 
187  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose
188  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize
189  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run
190  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config
191  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug
192  - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example
193 
194  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose Description
195 
196  Command-line task that detects sources on a coadd of exposures obtained with a single filter.
197 
198  Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise
199  properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane
200  in the coadd to match the observed variance. This is an approximate approach -- strictly, we should
201  propagate the full covariance matrix -- but it is simple and works well in practice.
202 
203  After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref
204  SourceDetectionTask_ "detection" subtask.
205 
206  @par Inputs:
207  deepCoadd{tract,patch,filter}: ExposureF
208  @par Outputs:
209  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
210  @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input
211  exposure (ExposureF)
212  @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList
213  @par Data Unit:
214  tract, patch, filter
215 
216  DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask.
217  You can retarget this subtask if you wish.
218 
219  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize Task initialization
220 
221  @copydoc \_\_init\_\_
222 
223  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run Invoking the Task
224 
225  @copydoc run
226 
227  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config Configuration parameters
228 
229  See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig"
230 
231  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug Debug variables
232 
233  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
234  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
235  files.
236 
237  DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to
238  @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for
239  @ref SourceDetectionTask_ "SourceDetectionTask" for further information.
240 
241  @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example
242  of using DetectCoaddSourcesTask
243 
244  DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of
245  the task is to update the background, detect all sources in a single band and generate a set of parent
246  footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and,
247  eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data
248  reference to the coadd to be processed. A list of the available optional arguments can be obtained by
249  calling detectCoaddSources.py with the `--help` command line argument:
250  @code
251  detectCoaddSources.py --help
252  @endcode
253 
254  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
255  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed
256  steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows:
257  @code
258  detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
259  @endcode
260  that will process the HSC-I band data. The results are written to
261  `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`.
262 
263  It is also necessary to run:
264  @code
265  detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
266  @endcode
267  to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
268  processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask".
269  """
270  _DefaultName = "detectCoaddSources"
271  ConfigClass = DetectCoaddSourcesConfig
272  getSchemaCatalogs = _makeGetSchemaCatalogs("det")
273  makeIdFactory = _makeMakeIdFactory("CoaddId")
274 
275  @classmethod
276  def _makeArgumentParser(cls):
277  parser = ArgumentParser(name=cls._DefaultName)
278  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
279  ContainerClass=ExistingCoaddDataIdContainer)
280  return parser
281 
282  @classmethod
283  def getOutputDatasetTypes(cls, config):
284  coaddName = config.coaddName
285  for name in ("outputBackgrounds", "outputSources", "outputExposure"):
286  attr = getattr(config, name)
287  setattr(attr, "name", attr.name.format(coaddName))
288  outputTypeDict = super().getOutputDatasetTypes(config)
289  return outputTypeDict
290 
291  @classmethod
292  def getInitOutputDatasetTypes(cls, config):
293  coaddName = config.coaddName
294  attr = config.detectionSchema
295  setattr(attr, "name", attr.name.format(coaddName))
296  output = super().getInitOutputDatasetTypes(config)
297  print(output)
298  return output
299 
300  def __init__(self, schema=None, **kwargs):
301  """!
302  @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask.
303 
304  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
305 
306  @param[in] schema: initial schema for the output catalog, modified-in place to include all
307  fields set by this task. If None, the source minimal schema will be used.
308  @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__
309  """
310  # N.B. Super is used here to handle the multiple inheritance of PipelineTasks, the init tree
311  # call structure has been reviewed carefully to be sure super will work as intended.
312  super().__init__(**kwargs)
313  if schema is None:
314  schema = afwTable.SourceTable.makeMinimalSchema()
315  if self.config.doInsertFakes:
316  self.makeSubtask("insertFakes")
317  self.schema = schema
318  self.makeSubtask("detection", schema=self.schema)
319  if self.config.doScaleVariance:
320  self.makeSubtask("scaleVariance")
321 
323  return {"detectionSchema": afwTable.SourceCatalog(self.schema)}
324 
325  def runDataRef(self, patchRef):
326  """!
327  @brief Run detection on a coadd.
328 
329  Invokes @ref run and then uses @ref write to output the
330  results.
331 
332  @param[in] patchRef: data reference for patch
333  """
334  exposure = patchRef.get(self.config.coaddName + "Coadd", immediate=True)
335  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
336  results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
337  self.write(results, patchRef)
338  return results
339 
340  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds):
341  # FINDME: DM-15843 needs to come back and address these next two lines with a final solution
342  inputData["idFactory"] = afwTable.IdFactory.makeSimple()
343  inputData["expId"] = 0
344  return self.run(**inputData)
345 
346  def run(self, exposure, idFactory, expId):
347  """!
348  @brief Run detection on an exposure.
349 
350  First scale the variance plane to match the observed variance
351  using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to
352  detect sources.
353 
354  @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled,
355  depending on configuration).
356  @param[in] idFactory: IdFactory to set source identifiers
357  @param[in] expId: Exposure identifier (integer) for RNG seed
358 
359  @return a pipe.base.Struct with fields
360  - sources: catalog of detections
361  - backgrounds: list of backgrounds
362  """
363  if self.config.doScaleVariance:
364  varScale = self.scaleVariance.run(exposure.maskedImage)
365  exposure.getMetadata().add("variance_scale", varScale)
366  backgrounds = afwMath.BackgroundList()
367  if self.config.doInsertFakes:
368  self.insertFakes.run(exposure, background=backgrounds)
369  table = afwTable.SourceTable.make(self.schema, idFactory)
370  detections = self.detection.makeSourceCatalog(table, exposure, expId=expId)
371  sources = detections.sources
372  fpSets = detections.fpSets
373  if hasattr(fpSets, "background") and fpSets.background:
374  for bg in fpSets.background:
375  backgrounds.append(bg)
376  return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
377 
378  def write(self, results, patchRef):
379  """!
380  @brief Write out results from runDetection.
381 
382  @param[in] exposure: Exposure to write out
383  @param[in] results: Struct returned from runDetection
384  @param[in] patchRef: data reference for patch
385  """
386  coaddName = self.config.coaddName + "Coadd"
387  patchRef.put(results.outputBackgrounds, coaddName + "_calexp_background")
388  patchRef.put(results.outputSources, coaddName + "_det")
389  patchRef.put(results.outputExposure, coaddName + "_calexp")
390 
391 
392 
393 
394 class MergeSourcesRunner(TaskRunner):
395  """Task runner for the `MergeSourcesTask`
396 
397  Required because the run method requires a list of
398  dataRefs rather than a single dataRef.
399  """
400  def makeTask(self, parsedCmd=None, args=None):
401  """Provide a butler to the Task constructor.
402 
403  Parameters
404  ----------
405  parsedCmd:
406  The parsed command
407  args: tuple
408  Tuple of a list of data references and kwargs (un-used)
409 
410  Raises
411  ------
412  RuntimeError
413  Thrown if both `parsedCmd` & `args` are `None`
414  """
415  if parsedCmd is not None:
416  butler = parsedCmd.butler
417  elif args is not None:
418  dataRefList, kwargs = args
419  butler = dataRefList[0].getButler()
420  else:
421  raise RuntimeError("Neither parsedCmd or args specified")
422  return self.TaskClass(config=self.config, log=self.log, butler=butler)
423 
424  @staticmethod
425  def buildRefDict(parsedCmd):
426  """Build a hierarchical dictionary of patch references
427 
428  Parameters
429  ----------
430  parsedCmd:
431  The parsed command
432 
433  Returns
434  -------
435  refDict: dict
436  A reference dictionary of the form {patch: {tract: {filter: dataRef}}}
437 
438  Raises
439  ------
440  RuntimeError
441  Thrown when multiple references are provided for the same
442  combination of tract, patch and filter
443  """
444  refDict = {} # Will index this as refDict[tract][patch][filter] = ref
445  for ref in parsedCmd.id.refList:
446  tract = ref.dataId["tract"]
447  patch = ref.dataId["patch"]
448  filter = ref.dataId["filter"]
449  if tract not in refDict:
450  refDict[tract] = {}
451  if patch not in refDict[tract]:
452  refDict[tract][patch] = {}
453  if filter in refDict[tract][patch]:
454  raise RuntimeError("Multiple versions of %s" % (ref.dataId,))
455  refDict[tract][patch][filter] = ref
456  return refDict
457 
458  @staticmethod
459  def getTargetList(parsedCmd, **kwargs):
460  """Provide a list of patch references for each patch, tract, filter combo.
461 
462  Parameters
463  ----------
464  parsedCmd:
465  The parsed command
466  kwargs:
467  Keyword arguments passed to the task
468 
469  Returns
470  -------
471  targetList: list
472  List of tuples, where each tuple is a (dataRef, kwargs) pair.
473  """
474  refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
475  return [(list(p.values()), kwargs) for t in refDict.values() for p in t.values()]
476 
477 
479  """!
480  @anchor MergeSourcesConfig_
481 
482  @brief Configuration for merging sources.
483  """
484  priorityList = ListField(dtype=str, default=[],
485  doc="Priority-ordered list of bands for the merge.")
486  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
487 
488  def validate(self):
489  Config.validate(self)
490  if len(self.priorityList) == 0:
491  raise RuntimeError("No priority list provided")
492 
493 
494 class MergeSourcesTask(CmdLineTask):
495  """!
496  @anchor MergeSourcesTask_
497 
498  @brief A base class for merging source catalogs.
499 
500  Merging detections (MergeDetectionsTask) and merging measurements (MergeMeasurementsTask) are
501  so similar that it makes sense to re-use the code, in the form of this abstract base class.
502 
503  NB: Do not use this class directly. Instead use one of the child classes that inherit from
504  MergeSourcesTask such as @ref MergeDetectionsTask_ "MergeDetectionsTask" or @ref MergeMeasurementsTask_
505  "MergeMeasurementsTask"
506 
507  Sub-classes should set the following class variables:
508  * `_DefaultName`: name of Task
509  * `inputDataset`: name of dataset to read
510  * `outputDataset`: name of dataset to write
511  * `getSchemaCatalogs` to the result of `_makeGetSchemaCatalogs(outputDataset)`
512 
513  In addition, sub-classes must implement the run method.
514  """
515  _DefaultName = None
516  ConfigClass = MergeSourcesConfig
517  RunnerClass = MergeSourcesRunner
518  inputDataset = None
519  outputDataset = None
520  getSchemaCatalogs = None
521 
522  @classmethod
523  def _makeArgumentParser(cls):
524  """!
525  @brief Create a suitable ArgumentParser.
526 
527  We will use the ArgumentParser to get a provide a list of data
528  references for patches; the RunnerClass will sort them into lists
529  of data references for the same patch
530  """
531  parser = ArgumentParser(name=cls._DefaultName)
532  parser.add_id_argument("--id", "deepCoadd_" + cls.inputDataset,
533  ContainerClass=ExistingCoaddDataIdContainer,
534  help="data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i")
535  return parser
536 
537  def getInputSchema(self, butler=None, schema=None):
538  """!
539  @brief Obtain the input schema either directly or froma butler reference.
540 
541  @param[in] butler butler reference to obtain the input schema from
542  @param[in] schema the input schema
543  """
544  if schema is None:
545  assert butler is not None, "Neither butler nor schema specified"
546  schema = butler.get(self.config.coaddName + "Coadd_" + self.inputDataset + "_schema",
547  immediate=True).schema
548  return schema
549 
550  def __init__(self, butler=None, schema=None, **kwargs):
551  """!
552  @brief Initialize the task.
553 
554  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
555  @param[in] schema the schema of the detection catalogs used as input to this one
556  @param[in] butler a butler used to read the input schema from disk, if schema is None
557 
558  Derived classes should use the getInputSchema() method to handle the additional
559  arguments and retreive the actual input schema.
560  """
561  CmdLineTask.__init__(self, **kwargs)
562 
563  def runDataRef(self, patchRefList):
564  """!
565  @brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
566  subclasses that inherit from MergeSourcesTask.
567 
568  @param[in] patchRefList list of data references for each filter
569  """
570  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
571  mergedCatalog = self.run(catalogs, patchRefList[0])
572  self.write(patchRefList[0], mergedCatalog)
573 
574  def readCatalog(self, patchRef):
575  """!
576  @brief Read input catalog.
577 
578  We read the input dataset provided by the 'inputDataset'
579  class variable.
580 
581  @param[in] patchRef data reference for patch
582  @return tuple consisting of the filter name and the catalog
583  """
584  filterName = patchRef.dataId["filter"]
585  catalog = patchRef.get(self.config.coaddName + "Coadd_" + self.inputDataset, immediate=True)
586  self.log.info("Read %d sources for filter %s: %s" % (len(catalog), filterName, patchRef.dataId))
587  return filterName, catalog
588 
589  def run(self, catalogs, patchRef):
590  """!
591  @brief Merge multiple catalogs. This function must be defined in all subclasses that inherit from
592  MergeSourcesTask.
593 
594  @param[in] catalogs dict mapping filter name to source catalog
595 
596  @return merged catalog
597  """
598  raise NotImplementedError()
599 
600  def write(self, patchRef, catalog):
601  """!
602  @brief Write the output.
603 
604  @param[in] patchRef data reference for patch
605  @param[in] catalog catalog
606 
607  We write as the dataset provided by the 'outputDataset'
608  class variable.
609  """
610  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
611  # since the filter isn't actually part of the data ID for the dataset we're saving,
612  # it's confusing to see it in the log message, even if the butler simply ignores it.
613  mergeDataId = patchRef.dataId.copy()
614  del mergeDataId["filter"]
615  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
616 
617  def writeMetadata(self, dataRefList):
618  """!
619  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
620  """
621  pass
622 
623 
624 class CullPeaksConfig(Config):
625  """!
626  @anchor CullPeaksConfig_
627 
628  @brief Configuration for culling garbage peaks after merging footprints.
629 
630  Peaks may also be culled after detection or during deblending; this configuration object
631  only deals with culling after merging Footprints.
632 
633  These cuts are based on three quantities:
634  - nBands: the number of bands in which the peak was detected
635  - peakRank: the position of the peak within its family, sorted from brightest to faintest.
636  - peakRankNormalized: the peak rank divided by the total number of peaks in the family.
637 
638  The formula that identifie peaks to cull is:
639 
640  nBands < nBandsSufficient
641  AND (rank >= rankSufficient)
642  AND (rank >= rankConsider OR rank >= rankNormalizedConsider)
643 
644  To disable peak culling, simply set nBandsSufficient=1.
645  """
646 
647  nBandsSufficient = RangeField(dtype=int, default=2, min=1,
648  doc="Always keep peaks detected in this many bands")
649  rankSufficient = RangeField(dtype=int, default=20, min=1,
650  doc="Always keep this many peaks in each family")
651  rankConsidered = RangeField(dtype=int, default=30, min=1,
652  doc=("Keep peaks with less than this rank that also match the "
653  "rankNormalizedConsidered condition."))
654  rankNormalizedConsidered = RangeField(dtype=float, default=0.7, min=0.0,
655  doc=("Keep peaks with less than this normalized rank that"
656  " also match the rankConsidered condition."))
657 
658 
660  """!
661  @anchor MergeDetectionsConfig_
662 
663  @brief Configuration parameters for the MergeDetectionsTask.
664  """
665  minNewPeak = Field(dtype=float, default=1,
666  doc="Minimum distance from closest peak to create a new one (in arcsec).")
667 
668  maxSamePeak = Field(dtype=float, default=0.3,
669  doc="When adding new catalogs to the merge, all peaks less than this distance "
670  " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
671  cullPeaks = ConfigField(dtype=CullPeaksConfig, doc="Configuration for how to cull peaks.")
672 
673  skyFilterName = Field(dtype=str, default="sky",
674  doc="Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
675  "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
676  skyObjects = ConfigurableField(target=SkyObjectsTask, doc="Generate sky objects")
677 
678  def setDefaults(self):
679  MergeSourcesConfig.setDefaults(self)
680  self.skyObjects.avoidMask = ["DETECTED"] # Nothing else is available in our custom mask
681 
682 
683 
689 
690 
692  r"""!
693  @anchor MergeDetectionsTask_
694 
695  @brief Merge coadd detections from multiple bands.
696 
697  @section pipe_tasks_multiBand_Contents Contents
698 
699  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
700  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init
701  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run
702  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config
703  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
704  - @ref pipe_tasks_multiband_MergeDetectionsTask_Example
705 
706  @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
707 
708  Command-line task that merges sources detected in coadds of exposures obtained with different filters.
709 
710  To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of
711  sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping
712  track of which band each source originates in.
713 
714  The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright
715  objects are culled as described in @ref CullPeaksConfig_.
716 
717  @par Inputs:
718  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
719  @par Outputs:
720  deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
721  @par Data Unit:
722  tract, patch
723 
724  MergeDetectionsTask subclasses @ref MergeSourcesTask_ "MergeSourcesTask".
725 
726  @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
727 
728  @copydoc \_\_init\_\_
729 
730  @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
731 
732  @copydoc run
733 
734  @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
735 
736  See @ref MergeDetectionsConfig_
737 
738  @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
739 
740  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d
741  to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files.
742 
743  MergeDetectionsTask has no debug variables.
744 
745  @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
746 
747  MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset
748  of the available bands.
749  The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the
750  chosen subset of filters.
751  Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources
752  and, eventually, perform forced photometry.
753  Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed.
754  A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the
755  `--help` command line argument:
756  @code
757  mergeCoaddDetections.py --help
758  @endcode
759 
760  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
761  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
762  step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows:
763  @code
764  mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
765  @endcode
766  This will merge the HSC-I & -R band parent source catalogs and write the results to
767  `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
768 
769  The next step in the multi-band processing procedure is
770  @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask"
771  """
772  ConfigClass = MergeDetectionsConfig
773  _DefaultName = "mergeCoaddDetections"
774  inputDataset = "det"
775  outputDataset = "mergeDet"
776  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
777 
778  def __init__(self, butler=None, schema=None, **kwargs):
779  """!
780  @brief Initialize the merge detections task.
781 
782  A @ref FootprintMergeList_ "FootprintMergeList" will be used to
783  merge the source catalogs.
784 
785  Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
786  @param[in] schema the schema of the detection catalogs used as input to this one
787  @param[in] butler a butler used to read the input schema from disk, if schema is None
788  @param[in] **kwargs keyword arguments to be passed to MergeSourcesTask.__init__
789 
790  The task will set its own self.schema attribute to the schema of the output merged catalog.
791  """
792  MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
793  self.makeSubtask("skyObjects")
794  self.schema = self.getInputSchema(butler=butler, schema=schema)
795 
796  filterNames = [getShortFilterName(name) for name in self.config.priorityList]
797  filterNames += [self.config.skyFilterName]
798  self.merged = afwDetect.FootprintMergeList(self.schema, filterNames)
799 
800  def run(self, catalogs, patchRef):
801  r"""!
802  @brief Merge multiple catalogs.
803 
804  After ordering the catalogs and filters in priority order,
805  @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by
806  @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove
807  garbage peaks detected around bright objects.
808 
809  @param[in] catalogs
810  @param[in] patchRef
811  @param[out] mergedList
812  """
813 
814  # Convert distance to tract coordinate
815  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
816  tractWcs = skyInfo.wcs
817  peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
818  samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
819 
820  # Put catalogs, filters in priority order
821  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
822  orderedBands = [getShortFilterName(band) for band in self.config.priorityList
823  if band in catalogs.keys()]
824 
825  mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
826  self.schema, self.makeIdFactory(patchRef),
827  samePeakDistance)
828 
829  #
830  # Add extra sources that correspond to blank sky
831  #
832  skySeed = patchRef.get(self.config.coaddName + "MergedCoaddId")
833  skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
834  if skySourceFootprints:
835  key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
836  for foot in skySourceFootprints:
837  s = mergedList.addNew()
838  s.setFootprint(foot)
839  s.set(key, True)
840 
841  # Sort Peaks from brightest to faintest
842  for record in mergedList:
843  record.getFootprint().sortPeaks()
844  self.log.info("Merged to %d sources" % len(mergedList))
845  # Attempt to remove garbage peaks
846  self.cullPeaks(mergedList)
847  return mergedList
848 
849  def cullPeaks(self, catalog):
850  """!
851  @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
852 
853  @param[in] catalog Source catalog
854  """
855  keys = [item.key for item in self.merged.getPeakSchema().extract("merge_peak_*").values()]
856  assert len(keys) > 0, "Error finding flags that associate peaks with their detection bands."
857  totalPeaks = 0
858  culledPeaks = 0
859  for parentSource in catalog:
860  # Make a list copy so we can clear the attached PeakCatalog and append the ones we're keeping
861  # to it (which is easier than deleting as we iterate).
862  keptPeaks = parentSource.getFootprint().getPeaks()
863  oldPeaks = list(keptPeaks)
864  keptPeaks.clear()
865  familySize = len(oldPeaks)
866  totalPeaks += familySize
867  for rank, peak in enumerate(oldPeaks):
868  if ((rank < self.config.cullPeaks.rankSufficient) or
869  (sum([peak.get(k) for k in keys]) >= self.config.cullPeaks.nBandsSufficient) or
870  (rank < self.config.cullPeaks.rankConsidered and
871  rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
872  keptPeaks.append(peak)
873  else:
874  culledPeaks += 1
875  self.log.info("Culled %d of %d peaks" % (culledPeaks, totalPeaks))
876 
877  def getSchemaCatalogs(self):
878  """!
879  Return a dict of empty catalogs for each catalog dataset produced by this task.
880 
881  @param[out] dictionary of empty catalogs
882  """
883  mergeDet = afwTable.SourceCatalog(self.schema)
884  peak = afwDetect.PeakCatalog(self.merged.getPeakSchema())
885  return {self.config.coaddName + "Coadd_mergeDet": mergeDet,
886  self.config.coaddName + "Coadd_peak": peak}
887 
888  def getSkySourceFootprints(self, mergedList, skyInfo, seed):
889  """!
890  @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList
891 
892  @param mergedList The merged Footprints from all the input bands
893  @param skyInfo A description of the patch
894  @param seed Seed for the random number generator
895  """
896  mask = afwImage.Mask(skyInfo.patchInfo.getOuterBBox())
897  detected = mask.getPlaneBitMask("DETECTED")
898  for s in mergedList:
899  s.getFootprint().spans.setMask(mask, detected)
900 
901  footprints = self.skyObjects.run(mask, seed)
902  if not footprints:
903  return footprints
904 
905  # Need to convert the peak catalog's schema so we can set the "merge_peak_<skyFilterName>" flags
906  schema = self.merged.getPeakSchema()
907  mergeKey = schema.find("merge_peak_%s" % self.config.skyFilterName).key
908  converted = []
909  for oldFoot in footprints:
910  assert len(oldFoot.getPeaks()) == 1, "Should be a single peak only"
911  peak = oldFoot.getPeaks()[0]
912  newFoot = afwDetect.Footprint(oldFoot.spans, schema)
913  newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
914  newFoot.getPeaks()[0].set(mergeKey, True)
915  converted.append(newFoot)
916 
917  return converted
918 
919 
921  """DeblendCoaddSourcesConfig
922 
923  Configuration parameters for the `DeblendCoaddSourcesTask`.
924  """
925  singleBandDeblend = ConfigurableField(target=SourceDeblendTask,
926  doc="Deblend sources separately in each band")
927  multiBandDeblend = ConfigurableField(target=MultibandDeblendTask,
928  doc="Deblend sources simultaneously across bands")
929  simultaneous = Field(dtype=bool, default=False, doc="Simultaneously deblend all bands?")
930  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
931 
932  def setDefaults(self):
933  Config.setDefaults(self)
934  self.singleBandDeblend.propagateAllPeaks = True
935 
936 
938  """Task runner for the `MergeSourcesTask`
939 
940  Required because the run method requires a list of
941  dataRefs rather than a single dataRef.
942  """
943  @staticmethod
944  def getTargetList(parsedCmd, **kwargs):
945  """Provide a list of patch references for each patch, tract, filter combo.
946 
947  Parameters
948  ----------
949  parsedCmd:
950  The parsed command
951  kwargs:
952  Keyword arguments passed to the task
953 
954  Returns
955  -------
956  targetList: list
957  List of tuples, where each tuple is a (dataRef, kwargs) pair.
958  """
959  refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
960  kwargs["psfCache"] = parsedCmd.psfCache
961  return [(list(p.values()), kwargs) for t in refDict.values() for p in t.values()]
962 
963 
964 class DeblendCoaddSourcesTask(CmdLineTask):
965  """Deblend the sources in a merged catalog
966 
967  Deblend sources from master catalog in each coadd.
968  This can either be done separately in each band using the HSC-SDSS deblender
969  (`DeblendCoaddSourcesTask.config.simultaneous==False`)
970  or use SCARLET to simultaneously fit the blend in all bands
971  (`DeblendCoaddSourcesTask.config.simultaneous==True`).
972  The task will set its own `self.schema` atribute to the `Schema` of the
973  output deblended catalog.
974  This will include all fields from the input `Schema`, as well as additional fields
975  from the deblender.
976 
977  `pipe.tasks.multiband.DeblendCoaddSourcesTask Description
978  ---------------------------------------------------------
979  `
980 
981  Parameters
982  ----------
983  butler: `Butler`
984  Butler used to read the input schemas from disk or
985  construct the reference catalog loader, if `schema` or `peakSchema` or
986  schema: `Schema`
987  The schema of the merged detection catalog as an input to this task.
988  peakSchema: `Schema`
989  The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog
990  """
991  ConfigClass = DeblendCoaddSourcesConfig
992  RunnerClass = DeblendCoaddSourcesRunner
993  _DefaultName = "deblendCoaddSources"
994  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
995 
996  @classmethod
997  def _makeArgumentParser(cls):
998  parser = ArgumentParser(name=cls._DefaultName)
999  parser.add_id_argument("--id", "deepCoadd_calexp",
1000  help="data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
1001  ContainerClass=ExistingCoaddDataIdContainer)
1002  parser.add_argument("--psfCache", type=int, default=100, help="Size of CoaddPsf cache")
1003  return parser
1004 
1005  def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
1006  CmdLineTask.__init__(self, **kwargs)
1007  if schema is None:
1008  assert butler is not None, "Neither butler nor schema is defined"
1009  schema = butler.get(self.config.coaddName + "Coadd_mergeDet_schema", immediate=True).schema
1011  self.schemaMapper.addMinimalSchema(schema)
1012  self.schema = self.schemaMapper.getOutputSchema()
1013  if peakSchema is None:
1014  assert butler is not None, "Neither butler nor peakSchema is defined"
1015  peakSchema = butler.get(self.config.coaddName + "Coadd_peak_schema", immediate=True).schema
1016 
1017  if self.config.simultaneous:
1018  self.makeSubtask("multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
1019  else:
1020  self.makeSubtask("singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
1021 
1023  """Return a dict of empty catalogs for each catalog dataset produced by this task.
1024 
1025  Returns
1026  -------
1027  result: dict
1028  Dictionary of empty catalogs, with catalog names as keys.
1029  """
1030  catalog = afwTable.SourceCatalog(self.schema)
1031  return {self.config.coaddName + "Coadd_deblendedFlux": catalog,
1032  self.config.coaddName + "Coadd_deblendedModel": catalog}
1033 
1034  def runDataRef(self, patchRefList, psfCache=100):
1035  """Deblend the patch
1036 
1037  Deblend each source simultaneously or separately
1038  (depending on `DeblendCoaddSourcesTask.config.simultaneous`).
1039  Set `is-primary` and related flags.
1040  Propagate flags from individual visits.
1041  Write the deblended sources out.
1042 
1043  Parameters
1044  ----------
1045  patchRefList: list
1046  List of data references for each filter
1047  """
1048  if self.config.simultaneous:
1049  # Use SCARLET to simultaneously deblend across filters
1050  filters = []
1051  exposures = []
1052  for patchRef in patchRefList:
1053  exposure = patchRef.get(self.config.coaddName + "Coadd_calexp", immediate=True)
1054  filters.append(patchRef.dataId["filter"])
1055  exposures.append(exposure)
1056  # The input sources are the same for all bands, since it is a merged catalog
1057  sources = self.readSources(patchRef)
1058  exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
1059  fluxCatalogs, templateCatalogs = self.multiBandDeblend.run(exposure, sources)
1060  for n in range(len(patchRefList)):
1061  self.write(patchRefList[n], fluxCatalogs[filters[n]], templateCatalogs[filters[n]])
1062  else:
1063  # Use the singeband deblender to deblend each band separately
1064  for patchRef in patchRefList:
1065  exposure = patchRef.get(self.config.coaddName + "Coadd_calexp", immediate=True)
1066  exposure.getPsf().setCacheCapacity(psfCache)
1067  sources = self.readSources(patchRef)
1068  self.singleBandDeblend.run(exposure, sources)
1069  self.write(patchRef, sources)
1070 
1071  def readSources(self, dataRef):
1072  """Read merged catalog
1073 
1074  Read the catalog of merged detections and create a catalog
1075  in a single band.
1076 
1077  Parameters
1078  ----------
1079  dataRef: data reference
1080  Data reference for catalog of merged detections
1081 
1082  Returns
1083  -------
1084  sources: `SourceCatalog`
1085  List of sources in merged catalog
1086 
1087  We also need to add columns to hold the measurements we're about to make
1088  so we can measure in-place.
1089  """
1090  merged = dataRef.get(self.config.coaddName + "Coadd_mergeDet", immediate=True)
1091  self.log.info("Read %d detections: %s" % (len(merged), dataRef.dataId))
1092  idFactory = self.makeIdFactory(dataRef)
1093  for s in merged:
1094  idFactory.notify(s.getId())
1095  table = afwTable.SourceTable.make(self.schema, idFactory)
1096  sources = afwTable.SourceCatalog(table)
1097  sources.extend(merged, self.schemaMapper)
1098  return sources
1099 
1100  def write(self, dataRef, flux_sources, template_sources=None):
1101  """Write the source catalog(s)
1102 
1103  Parameters
1104  ----------
1105  dataRef: Data Reference
1106  Reference to the output catalog.
1107  flux_sources: `SourceCatalog`
1108  Flux conserved sources to write to file.
1109  If using the single band deblender, this is the catalog
1110  generated.
1111  template_sources: `SourceCatalog`
1112  Source catalog using the multiband template models
1113  as footprints.
1114  """
1115  # The multiband deblender does not have to conserve flux,
1116  # so only write the flux conserved catalog if it exists
1117  if flux_sources is not None:
1118  assert not self.config.simultaneous or self.config.multiBandDeblend.conserveFlux
1119  dataRef.put(flux_sources, self.config.coaddName + "Coadd_deblendedFlux")
1120  # Only the multiband deblender has the option to output the
1121  # template model catalog, which can optionally be used
1122  # in MeasureMergedCoaddSources
1123  if template_sources is not None:
1124  assert self.config.multiBandDeblend.saveTemplates
1125  dataRef.put(template_sources, self.config.coaddName + "Coadd_deblendedModel")
1126  self.log.info("Wrote %d sources: %s" % (len(flux_sources), dataRef.dataId))
1127 
1128  def writeMetadata(self, dataRefList):
1129  """Write the metadata produced from processing the data.
1130  Parameters
1131  ----------
1132  dataRefList
1133  List of Butler data references used to write the metadata.
1134  The metadata is written to dataset type `CmdLineTask._getMetadataName`.
1135  """
1136  for dataRef in dataRefList:
1137  try:
1138  metadataName = self._getMetadataName()
1139  if metadataName is not None:
1140  dataRef.put(self.getFullMetadata(), metadataName)
1141  except Exception as e:
1142  self.log.warn("Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
1143 
1144  def getExposureId(self, dataRef):
1145  """Get the ExposureId from a data reference
1146  """
1147  return int(dataRef.get(self.config.coaddName + "CoaddId"))
1148 
1149 
1151  """!
1152  @anchor MeasureMergedCoaddSourcesConfig_
1153 
1154  @brief Configuration parameters for the MeasureMergedCoaddSourcesTask
1155  """
1156  inputCatalog = Field(dtype=str, default="deblendedFlux",
1157  doc=("Name of the input catalog to use."
1158  "If the single band deblender was used this should be 'deblendedFlux."
1159  "If the multi-band deblender was used this should be 'deblendedModel."
1160  "If no deblending was performed this should be 'mergeDet'"))
1161  measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement")
1162  setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc="Set flags for primary tract/patch")
1163  doPropagateFlags = Field(
1164  dtype=bool, default=True,
1165  doc="Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
1166  )
1167  propagateFlags = ConfigurableField(target=PropagateVisitFlagsTask, doc="Propagate visit flags to coadd")
1168  doMatchSources = Field(dtype=bool, default=True, doc="Match sources to reference catalog?")
1169  match = ConfigurableField(target=DirectMatchTask, doc="Matching to reference catalog")
1170  doWriteMatchesDenormalized = Field(
1171  dtype=bool,
1172  default=False,
1173  doc=("Write reference matches in denormalized format? "
1174  "This format uses more disk space, but is more convenient to read."),
1175  )
1176  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
1177  checkUnitsParseStrict = Field(
1178  doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
1179  dtype=str,
1180  default="raise",
1181  )
1182  doApCorr = Field(
1183  dtype=bool,
1184  default=True,
1185  doc="Apply aperture corrections"
1186  )
1187  applyApCorr = ConfigurableField(
1188  target=ApplyApCorrTask,
1189  doc="Subtask to apply aperture corrections"
1190  )
1191  doRunCatalogCalculation = Field(
1192  dtype=bool,
1193  default=True,
1194  doc='Run catalogCalculation task'
1195  )
1196  catalogCalculation = ConfigurableField(
1197  target=CatalogCalculationTask,
1198  doc="Subtask to run catalogCalculation plugins on catalog"
1199  )
1200 
1201  def setDefaults(self):
1202  Config.setDefaults(self)
1203  self.measurement.plugins.names |= ['base_InputCount', 'base_Variance']
1204  self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['CLIPPED', 'SENSOR_EDGE',
1205  'INEXACT_PSF']
1206  self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['CLIPPED', 'SENSOR_EDGE',
1207  'INEXACT_PSF']
1208 
1209 
1215 
1216 
1217 class MeasureMergedCoaddSourcesRunner(ButlerInitializedTaskRunner):
1218  """Get the psfCache setting into MeasureMergedCoaddSourcesTask"""
1219  @staticmethod
1220  def getTargetList(parsedCmd, **kwargs):
1221  return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
1222 
1223 
1225  r"""!
1226  @anchor MeasureMergedCoaddSourcesTask_
1227 
1228  @brief Deblend sources from master catalog in each coadd seperately and measure.
1229 
1230  @section pipe_tasks_multiBand_Contents Contents
1231 
1232  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose
1233  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize
1234  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run
1235  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config
1236  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug
1237  - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example
1238 
1239  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description
1240 
1241  Command-line task that uses peaks and footprints from a master catalog to perform deblending and
1242  measurement in each coadd.
1243 
1244  Given a master input catalog of sources (peaks and footprints) or deblender outputs
1245  (including a HeavyFootprint in each band), measure each source on the
1246  coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a
1247  consistent set of child sources.
1248 
1249  The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source
1250  properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit
1251  flags are propagated to the coadd sources.
1252 
1253  Optionally, we can match the coadd sources to an external reference catalog.
1254 
1255  @par Inputs:
1256  deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog
1257  @n deepCoadd_calexp{tract,patch,filter}: ExposureF
1258  @par Outputs:
1259  deepCoadd_meas{tract,patch,filter}: SourceCatalog
1260  @par Data Unit:
1261  tract, patch, filter
1262 
1263  MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks:
1264 
1265  <DL>
1266  <DT> @ref SingleFrameMeasurementTask_ "measurement"
1267  <DD> Measure source properties of deblended sources.</DD>
1268  <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags"
1269  <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are
1270  not at the edge of the field and that have either not been deblended or are the children of deblended
1271  sources</DD>
1272  <DT> @ref PropagateVisitFlagsTask_ "propagateFlags"
1273  <DD> Propagate flags set in individual visits to the coadd.</DD>
1274  <DT> @ref DirectMatchTask_ "match"
1275  <DD> Match input sources to a reference catalog (optional).
1276  </DD>
1277  </DL>
1278  These subtasks may be retargeted as required.
1279 
1280  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize Task initialization
1281 
1282  @copydoc \_\_init\_\_
1283 
1284  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run Invoking the Task
1285 
1286  @copydoc run
1287 
1288  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config Configuration parameters
1289 
1290  See @ref MeasureMergedCoaddSourcesConfig_
1291 
1292  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug Debug variables
1293 
1294  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
1295  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
1296  files.
1297 
1298  MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to
1299  the various sub-tasks. See the documetation for individual sub-tasks for more information.
1300 
1301  @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using
1302  MeasureMergedCoaddSourcesTask
1303 
1304  After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs.
1305  The next stage in the multi-band processing procedure will merge these measurements into a suitable
1306  catalog for driving forced photometry.
1307 
1308  Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds
1309  to be processed.
1310  A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the
1311  `--help` command line argument:
1312  @code
1313  measureCoaddSources.py --help
1314  @endcode
1315 
1316  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
1317  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
1318  step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band
1319  coadd as follows:
1320  @code
1321  measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
1322  @endcode
1323  This will process the HSC-I band data. The results are written in
1324  `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits
1325 
1326  It is also necessary to run
1327  @code
1328  measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
1329  @endcode
1330  to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
1331  procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask".
1332  """
1333  _DefaultName = "measureCoaddSources"
1334  ConfigClass = MeasureMergedCoaddSourcesConfig
1335  RunnerClass = MeasureMergedCoaddSourcesRunner
1336  getSchemaCatalogs = _makeGetSchemaCatalogs("meas")
1337  makeIdFactory = _makeMakeIdFactory("MergedCoaddId") # The IDs we already have are of this type
1338 
1339  @classmethod
1340  def _makeArgumentParser(cls):
1341  parser = ArgumentParser(name=cls._DefaultName)
1342  parser.add_id_argument("--id", "deepCoadd_calexp",
1343  help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
1344  ContainerClass=ExistingCoaddDataIdContainer)
1345  parser.add_argument("--psfCache", type=int, default=100, help="Size of CoaddPsf cache")
1346  return parser
1347 
1348  def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, **kwargs):
1349  """!
1350  @brief Initialize the task.
1351 
1352  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
1353  @param[in] schema: the schema of the merged detection catalog used as input to this one
1354  @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
1355  @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
1356  catalog. May be None if the loader can be constructed from the butler argument or all steps
1357  requiring a reference catalog are disabled.
1358  @param[in] butler: a butler used to read the input schemas from disk or construct the reference
1359  catalog loader, if schema or peakSchema or refObjLoader is None
1360 
1361  The task will set its own self.schema attribute to the schema of the output measurement catalog.
1362  This will include all fields from the input schema, as well as additional fields for all the
1363  measurements.
1364  """
1365  CmdLineTask.__init__(self, **kwargs)
1366  self.deblended = self.config.inputCatalog.startswith("deblended")
1367  self.inputCatalog = "Coadd_" + self.config.inputCatalog
1368  if schema is None:
1369  assert butler is not None, "Neither butler nor schema is defined"
1370  schema = butler.get(self.config.coaddName + self.inputCatalog + "_schema", immediate=True).schema
1372  self.schemaMapper.addMinimalSchema(schema)
1373  self.schema = self.schemaMapper.getOutputSchema()
1375  self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata)
1376  self.makeSubtask("setPrimaryFlags", schema=self.schema)
1377  if self.config.doMatchSources:
1378  if refObjLoader is None:
1379  assert butler is not None, "Neither butler nor refObjLoader is defined"
1380  self.makeSubtask("match", butler=butler, refObjLoader=refObjLoader)
1381  if self.config.doPropagateFlags:
1382  self.makeSubtask("propagateFlags", schema=self.schema)
1383  self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
1384  if self.config.doApCorr:
1385  self.makeSubtask("applyApCorr", schema=self.schema)
1386  if self.config.doRunCatalogCalculation:
1387  self.makeSubtask("catalogCalculation", schema=self.schema)
1388 
1389  def runDataRef(self, patchRef, psfCache=100):
1390  """!
1391  @brief Deblend and measure.
1392 
1393  @param[in] patchRef: Patch reference.
1394 
1395  Set 'is-primary' and related flags. Propagate flags
1396  from individual visits. Optionally match the sources to a reference catalog and write the matches.
1397  Finally, write the deblended sources and measurements out.
1398  """
1399  exposure = patchRef.get(self.config.coaddName + "Coadd_calexp", immediate=True)
1400  exposure.getPsf().setCacheCapacity(psfCache)
1401  sources = self.readSources(patchRef)
1402  table = sources.getTable()
1403  table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
1404 
1405  self.measurement.run(sources, exposure, exposureId=self.getExposureId(patchRef))
1406 
1407  if self.config.doApCorr:
1408  self.applyApCorr.run(
1409  catalog=sources,
1410  apCorrMap=exposure.getInfo().getApCorrMap()
1411  )
1412 
1413  # TODO DM-11568: this contiguous check-and-copy could go away if we
1414  # reserve enough space during SourceDetection and/or SourceDeblend.
1415  # NOTE: sourceSelectors require contiguous catalogs, so ensure
1416  # contiguity now, so views are preserved from here on.
1417  if not sources.isContiguous():
1418  sources = sources.copy(deep=True)
1419 
1420  if self.config.doRunCatalogCalculation:
1421  self.catalogCalculation.run(sources)
1422 
1423  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
1424  self.setPrimaryFlags.run(sources, skyInfo.skyMap, skyInfo.tractInfo, skyInfo.patchInfo,
1425  includeDeblend=self.deblended)
1426  if self.config.doPropagateFlags:
1427  self.propagateFlags.run(patchRef.getButler(), sources, self.propagateFlags.getCcdInputs(exposure),
1428  exposure.getWcs())
1429  if self.config.doMatchSources:
1430  self.writeMatches(patchRef, exposure, sources)
1431  self.write(patchRef, sources)
1432 
1433  def readSources(self, dataRef):
1434  """!
1435  @brief Read input sources.
1436 
1437  @param[in] dataRef: Data reference for catalog of merged detections
1438  @return List of sources in merged catalog
1439 
1440  We also need to add columns to hold the measurements we're about to make
1441  so we can measure in-place.
1442  """
1443  merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=True)
1444  self.log.info("Read %d detections: %s" % (len(merged), dataRef.dataId))
1445  idFactory = self.makeIdFactory(dataRef)
1446  for s in merged:
1447  idFactory.notify(s.getId())
1448  table = afwTable.SourceTable.make(self.schema, idFactory)
1449  sources = afwTable.SourceCatalog(table)
1450  sources.extend(merged, self.schemaMapper)
1451  return sources
1452 
1453  def writeMatches(self, dataRef, exposure, sources):
1454  """!
1455  @brief Write matches of the sources to the astrometric reference catalog.
1456 
1457  We use the Wcs in the exposure to match sources.
1458 
1459  @param[in] dataRef: data reference
1460  @param[in] exposure: exposure with Wcs
1461  @param[in] sources: source catalog
1462  """
1463  result = self.match.run(sources, exposure.getInfo().getFilter().getName())
1464  if result.matches:
1465  matches = afwTable.packMatches(result.matches)
1466  matches.table.setMetadata(result.matchMeta)
1467  dataRef.put(matches, self.config.coaddName + "Coadd_measMatch")
1468  if self.config.doWriteMatchesDenormalized:
1469  denormMatches = denormalizeMatches(result.matches, result.matchMeta)
1470  dataRef.put(denormMatches, self.config.coaddName + "Coadd_measMatchFull")
1471 
1472  def write(self, dataRef, sources):
1473  """!
1474  @brief Write the source catalog.
1475 
1476  @param[in] dataRef: data reference
1477  @param[in] sources: source catalog
1478  """
1479  dataRef.put(sources, self.config.coaddName + "Coadd_meas")
1480  self.log.info("Wrote %d sources: %s" % (len(sources), dataRef.dataId))
1481 
1482  def getExposureId(self, dataRef):
1483  return int(dataRef.get(self.config.coaddName + "CoaddId"))
1484 
1485 
1487  """!
1488  @anchor MergeMeasurementsConfig_
1489 
1490  @brief Configuration parameters for the MergeMeasurementsTask
1491  """
1492  pseudoFilterList = ListField(dtype=str, default=["sky"],
1493  doc="Names of filters which may have no associated detection\n"
1494  "(N.b. should include MergeDetectionsConfig.skyFilterName)")
1495  snName = Field(dtype=str, default="base_PsfFlux",
1496  doc="Name of flux measurement for calculating the S/N when choosing the reference band.")
1497  minSN = Field(dtype=float, default=10.,
1498  doc="If the S/N from the priority band is below this value (and the S/N "
1499  "is larger than minSNDiff compared to the priority band), use the band with "
1500  "the largest S/N as the reference band.")
1501  minSNDiff = Field(dtype=float, default=3.,
1502  doc="If the difference in S/N between another band and the priority band is larger "
1503  "than this value (and the S/N in the priority band is less than minSN) "
1504  "use the band with the largest S/N as the reference band")
1505  flags = ListField(dtype=str, doc="Require that these flags, if available, are not set",
1506  default=["base_PixelFlags_flag_interpolatedCenter", "base_PsfFlux_flag",
1507  "ext_photometryKron_KronFlux_flag", "modelfit_CModel_flag", ])
1508 
1509 
1515 
1516 
1518  r"""!
1519  @anchor MergeMeasurementsTask_
1520 
1521  @brief Merge measurements from multiple bands
1522 
1523  @section pipe_tasks_multiBand_Contents Contents
1524 
1525  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Purpose
1526  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Initialize
1527  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Run
1528  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Config
1529  - @ref pipe_tasks_multiBand_MergeMeasurementsTask_Debug
1530  - @ref pipe_tasks_multiband_MergeMeasurementsTask_Example
1531 
1532  @section pipe_tasks_multiBand_MergeMeasurementsTask_Purpose Description
1533 
1534  Command-line task that merges measurements from multiple bands.
1535 
1536  Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter
1537  bands to construct a unified catalog that is suitable for driving forced photometry. Every source is
1538  required to have centroid, shape and flux measurements in each band.
1539 
1540  @par Inputs:
1541  deepCoadd_meas{tract,patch,filter}: SourceCatalog
1542  @par Outputs:
1543  deepCoadd_ref{tract,patch}: SourceCatalog
1544  @par Data Unit:
1545  tract, patch
1546 
1547  MergeMeasurementsTask subclasses @ref MergeSourcesTask_ "MergeSourcesTask".
1548 
1549  @section pipe_tasks_multiBand_MergeMeasurementsTask_Initialize Task initialization
1550 
1551  @copydoc \_\_init\_\_
1552 
1553  @section pipe_tasks_multiBand_MergeMeasurementsTask_Run Invoking the Task
1554 
1555  @copydoc run
1556 
1557  @section pipe_tasks_multiBand_MergeMeasurementsTask_Config Configuration parameters
1558 
1559  See @ref MergeMeasurementsConfig_
1560 
1561  @section pipe_tasks_multiBand_MergeMeasurementsTask_Debug Debug variables
1562 
1563  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
1564  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
1565  files.
1566 
1567  MergeMeasurementsTask has no debug variables.
1568 
1569  @section pipe_tasks_multiband_MergeMeasurementsTask_Example A complete example
1570  of using MergeMeasurementsTask
1571 
1572  MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band.
1573  The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in
1574  coadds and individual exposures.
1575  Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list
1576  of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the `--help`
1577  command line argument:
1578  @code
1579  mergeCoaddMeasurements.py --help
1580  @endcode
1581 
1582  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
1583  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
1584  step 7 at @ref pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring
1585  as follows:
1586  @code
1587  mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
1588  @endcode
1589  This will merge the HSC-I & HSC-R band catalogs. The results are written in
1590  `$CI_HSC_DIR/DATA/deepCoadd-results/`.
1591  """
1592  _DefaultName = "mergeCoaddMeasurements"
1593  ConfigClass = MergeMeasurementsConfig
1594  inputDataset = "meas"
1595  outputDataset = "ref"
1596  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
1597 
1598  def __init__(self, butler=None, schema=None, **kwargs):
1599  """!
1600  Initialize the task.
1601 
1602  Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
1603  @param[in] schema: the schema of the detection catalogs used as input to this one
1604  @param[in] butler: a butler used to read the input schema from disk, if schema is None
1605 
1606  The task will set its own self.schema attribute to the schema of the output merged catalog.
1607  """
1608  MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
1609  inputSchema = self.getInputSchema(butler=butler, schema=schema)
1610  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
1611  self.schemaMapper.addMinimalSchema(inputSchema, True)
1612  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
1613  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
1614  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
1615 
1616  self.flagKeys = {}
1617  for band in self.config.priorityList:
1618  short = getShortFilterName(band)
1619  outputKey = self.schemaMapper.editOutputSchema().addField(
1620  "merge_measurement_%s" % short,
1621  type="Flag",
1622  doc="Flag field set if the measurements here are from the %s filter" % band
1623  )
1624  peakKey = inputSchema.find("merge_peak_%s" % short).key
1625  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
1626  self.flagKeys[band] = Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
1627  self.schema = self.schemaMapper.getOutputSchema()
1628 
1630  for filt in self.config.pseudoFilterList:
1631  try:
1632  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
1633  except Exception as e:
1634  self.log.warn("merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
1635 
1636  self.badFlags = {}
1637  for flag in self.config.flags:
1638  try:
1639  self.badFlags[flag] = self.schema.find(flag).getKey()
1640  except KeyError as exc:
1641  self.log.warn("Can't find flag %s in schema: %s" % (flag, exc,))
1642 
1643  def run(self, catalogs, patchRef):
1644  """!
1645  Merge measurement catalogs to create a single reference catalog for forced photometry
1646 
1647  @param[in] catalogs: the catalogs to be merged
1648  @param[in] patchRef: patch reference for data
1649 
1650  For parent sources, we choose the first band in config.priorityList for which the
1651  merge_footprint flag for that band is is True.
1652 
1653  For child sources, the logic is the same, except that we use the merge_peak flags.
1654  """
1655  # Put catalogs, filters in priority order
1656  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
1657  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
1658 
1659  mergedCatalog = afwTable.SourceCatalog(self.schema)
1660  mergedCatalog.reserve(len(orderedCatalogs[0]))
1661 
1662  idKey = orderedCatalogs[0].table.getIdKey()
1663  for catalog in orderedCatalogs[1:]:
1664  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
1665  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
1666 
1667  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
1668  # record for each band, in priority order.
1669  for orderedRecords in zip(*orderedCatalogs):
1670 
1671  maxSNRecord = None
1672  maxSNFlagKeys = None
1673  maxSN = 0.
1674  priorityRecord = None
1675  priorityFlagKeys = None
1676  prioritySN = 0.
1677  hasPseudoFilter = False
1678 
1679  # Now we iterate over those record-band pairs, keeping track of the priority and the
1680  # largest S/N band.
1681  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
1682  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
1683  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
1684 
1685  if not (parent or child):
1686  for pseudoFilterKey in self.pseudoFilterKeys:
1687  if inputRecord.get(pseudoFilterKey):
1688  hasPseudoFilter = True
1689  priorityRecord = inputRecord
1690  priorityFlagKeys = flagKeys
1691  break
1692  if hasPseudoFilter:
1693  break
1694 
1695  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
1696  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
1697  sn = 0.
1698  else:
1699  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
1700  if numpy.isnan(sn) or sn < 0.:
1701  sn = 0.
1702  if (parent or child) and priorityRecord is None:
1703  priorityRecord = inputRecord
1704  priorityFlagKeys = flagKeys
1705  prioritySN = sn
1706  if sn > maxSN:
1707  maxSNRecord = inputRecord
1708  maxSNFlagKeys = flagKeys
1709  maxSN = sn
1710 
1711  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
1712  # the reference band instead. However, we only want to choose the highest S/N band if it is
1713  # significantly better than the priority band. Therefore, to choose a band other than the
1714  # priority, we require that the priority S/N is below the minimum threshold and that the
1715  # difference between the priority and highest S/N is larger than the difference threshold.
1716  #
1717  # For pseudo code objects we always choose the first band in the priority list.
1718  bestRecord = None
1719  bestFlagKeys = None
1720  if hasPseudoFilter:
1721  bestRecord = priorityRecord
1722  bestFlagKeys = priorityFlagKeys
1723  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff and
1724  maxSNRecord is not None):
1725  bestRecord = maxSNRecord
1726  bestFlagKeys = maxSNFlagKeys
1727  elif priorityRecord is not None:
1728  bestRecord = priorityRecord
1729  bestFlagKeys = priorityFlagKeys
1730 
1731  if bestRecord is not None and bestFlagKeys is not None:
1732  outputRecord = mergedCatalog.addNew()
1733  outputRecord.assign(bestRecord, self.schemaMapper)
1734  outputRecord.set(bestFlagKeys.output, True)
1735  else: # if we didn't find any records
1736  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
1737  inputRecord.getId())
1738 
1739  # more checking for sane inputs, since zip silently iterates over the smallest sequence
1740  for inputCatalog in orderedCatalogs:
1741  if len(mergedCatalog) != len(inputCatalog):
1742  raise ValueError("Mismatch between catalog sizes: %s != %s" %
1743  (len(mergedCatalog), len(orderedCatalogs)))
1744 
1745  return mergedCatalog
def getSkySourceFootprints(self, mergedList, skyInfo, seed)
Return a list of Footprints of sky objects which don&#39;t overlap with anything in mergedList.
Definition: multiBand.py:888
Merge coadd detections from multiple bands.
Definition: multiBand.py:691
def makeTask(self, parsedCmd=None, args=None)
Definition: multiBand.py:400
def getInputSchema(self, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
Definition: multiBand.py:537
def getSchemaCatalogs(self)
Return a dict of empty catalogs for each catalog dataset produced by this task.
Definition: multiBand.py:877
def runDataRef(self, patchRefList)
Merge coadd sources from multiple bands.
Definition: multiBand.py:563
def runDataRef(self, patchRef)
Run detection on a coadd.
Definition: multiBand.py:325
def cullPeaks(self, catalog)
Attempt to remove garbage peaks (mostly on the outskirts of large blends).
Definition: multiBand.py:849
def __init__(self, butler=None, schema=None, kwargs)
Initialize the task.
Definition: multiBand.py:550
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
def __init__(self, butler=None, schema=None, kwargs)
Initialize the task.
Definition: multiBand.py:1598
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
Configuration parameters for the DetectCoaddSourcesTask.
Definition: multiBand.py:108
def denormalizeMatches(matches, matchMeta=None)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds)
Definition: multiBand.py:340
def runDataRef(self, patchRefList, psfCache=100)
Definition: multiBand.py:1034
def __init__(self, schema=None, kwargs)
Initialize the task.
Definition: multiBand.py:300
Merge measurements from multiple bands.
Definition: multiBand.py:1517
Deblend sources from master catalog in each coadd seperately and measure.
Definition: multiBand.py:1224
def writeMatches(self, dataRef, exposure, sources)
Write matches of the sources to the astrometric reference catalog.
Definition: multiBand.py:1453
def getShortFilterName(name)
Definition: multiBand.py:98
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
Definition: fits.cc:832
def __init__(self, butler=None, schema=None, peakSchema=None, kwargs)
Definition: multiBand.py:1005
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.
Represent a 2-dimensional array of bitmask pixels.
Definition: Mask.h:78
Configuration parameters for the MergeMeasurementsTask.
Definition: multiBand.py:1486
def readSources(self, dataRef)
Read input sources.
Definition: multiBand.py:1433
def write(self, results, patchRef)
Write out results from runDetection.
Definition: multiBand.py:378
template BaseCatalog packMatches(SourceMatchVector const &)
Configuration parameters for the MergeDetectionsTask.
Definition: multiBand.py:659
Configuration for merging sources.
Definition: multiBand.py:478
Class to describe the properties of a detected object from an image.
Definition: Footprint.h:62
Holds an integer identifier for an LSST filter.
Definition: Filter.h:139
def run(self, catalogs, patchRef)
Merge measurement catalogs to create a single reference catalog for forced photometry.
Definition: multiBand.py:1643
def run(self, catalogs, patchRef)
Merge multiple catalogs.
Definition: multiBand.py:589
def write(self, patchRef, catalog)
Write the output.
Definition: multiBand.py:600
def write(self, dataRef, flux_sources, template_sources=None)
Definition: multiBand.py:1100
A base class for merging source catalogs.
Definition: multiBand.py:494
def write(self, dataRef, sources)
Write the source catalog.
Definition: multiBand.py:1472
def readCatalog(self, patchRef)
Read input catalog.
Definition: multiBand.py:574
Configuration parameters for the MeasureMergedCoaddSourcesTask.
Definition: multiBand.py:1150
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded...
Definition: coaddBase.py:253
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
Definition: multiBand.py:617
def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, kwargs)
Initialize the task.
Definition: multiBand.py:1348
def run(self, exposure, idFactory, expId)
Run detection on an exposure.
Definition: multiBand.py:346
daf::base::PropertyList * list
Definition: fits.cc:833
def runDataRef(self, patchRef, psfCache=100)
Deblend and measure.
Definition: multiBand.py:1389
def __init__(self, butler=None, schema=None, kwargs)
Initialize the merge detections task.
Definition: multiBand.py:778
def run(self, catalogs, patchRef)
Merge multiple catalogs.
Definition: multiBand.py:800