LSSTApplications  20.0.0
LSSTDataManagementBasePackage
mergeDetections.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 from .multiBandUtils import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
25  getInputSchema, getShortFilterName, readCatalog)
26 
27 
28 import lsst.afw.detection as afwDetect
29 import lsst.afw.image as afwImage
30 import lsst.afw.table as afwTable
31 
32 from lsst.meas.algorithms import SkyObjectsTask
33 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
34 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
35  PipelineTaskConnections)
37 from lsst.pipe.tasks.coaddBase import getSkyInfo
38 
39 
41  dimensions=("tract", "patch", "skymap"),
42  defaultTemplates={"inputCoaddName": 'deep', "outputCoaddName": "deep"}):
43  schema = cT.InitInput(
44  doc="Schema of the input detection catalog",
45  name="{inputCoaddName}Coadd_det_schema",
46  storageClass="SourceCatalog"
47  )
48 
49  outputSchema = cT.InitOutput(
50  doc="Schema of the merged detection catalog",
51  name="{outputCoaddName}Coadd_mergeDet_schema",
52  storageClass="SourceCatalog"
53  )
54 
55  outputPeakSchema = cT.InitOutput(
56  doc="Output schema of the Footprint peak catalog",
57  name="{outputCoaddName}Coadd_peak_schema",
58  storageClass="PeakCatalog"
59  )
60 
61  catalogs = cT.Input(
62  doc="Detection Catalogs to be merged",
63  name="{inputCoaddName}Coadd_det",
64  storageClass="SourceCatalog",
65  dimensions=("tract", "patch", "skymap", "abstract_filter"),
66  multiple=True
67  )
68 
69  skyMap = cT.Input(
70  doc="SkyMap to be used in merging",
71  name="{inputCoaddName}Coadd_skyMap",
72  storageClass="SkyMap",
73  dimensions=("skymap",),
74  )
75 
76  outputCatalog = cT.Output(
77  doc="Merged Detection catalog",
78  name="{outputCoaddName}Coadd_mergeDet",
79  storageClass="SourceCatalog",
80  dimensions=("tract", "patch", "skymap"),
81  )
82 
83 
84 class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
85  """!
86  @anchor MergeDetectionsConfig_
87 
88  @brief Configuration parameters for the MergeDetectionsTask.
89  """
90  minNewPeak = Field(dtype=float, default=1,
91  doc="Minimum distance from closest peak to create a new one (in arcsec).")
92 
93  maxSamePeak = Field(dtype=float, default=0.3,
94  doc="When adding new catalogs to the merge, all peaks less than this distance "
95  " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
96  cullPeaks = ConfigField(dtype=CullPeaksConfig, doc="Configuration for how to cull peaks.")
97 
98  skyFilterName = Field(dtype=str, default="sky",
99  doc="Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
100  "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
101  skyObjects = ConfigurableField(target=SkyObjectsTask, doc="Generate sky objects")
102  priorityList = ListField(dtype=str, default=[],
103  doc="Priority-ordered list of bands for the merge.")
104  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
105 
106  def setDefaults(self):
107  Config.setDefaults(self)
108  self.skyObjects.avoidMask = ["DETECTED"] # Nothing else is available in our custom mask
109 
110  def validate(self):
111  super().validate()
112  if len(self.priorityList) == 0:
113  raise RuntimeError("No priority list provided")
114 
115 
116 class MergeDetectionsTask(PipelineTask, CmdLineTask):
117  r"""!
118  @anchor MergeDetectionsTask_
119 
120  @brief Merge coadd detections from multiple bands.
121 
122  @section pipe_tasks_multiBand_Contents Contents
123 
124  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
125  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init
126  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run
127  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config
128  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
129  - @ref pipe_tasks_multiband_MergeDetectionsTask_Example
130 
131  @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
132 
133  Command-line task that merges sources detected in coadds of exposures obtained with different filters.
134 
135  To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of
136  sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping
137  track of which band each source originates in.
138 
139  The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright
140  objects are culled as described in @ref CullPeaksConfig_.
141 
142  @par Inputs:
143  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
144  @par Outputs:
145  deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
146  @par Data Unit:
147  tract, patch
148 
149  @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
150 
151  @copydoc \_\_init\_\_
152 
153  @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
154 
155  @copydoc run
156 
157  @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
158 
159  See @ref MergeDetectionsConfig_
160 
161  @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
162 
163  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d
164  to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files.
165 
166  MergeDetectionsTask has no debug variables.
167 
168  @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
169 
170  MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset
171  of the available bands.
172  The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the
173  chosen subset of filters.
174  Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources
175  and, eventually, perform forced photometry.
176  Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed.
177  A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the
178  `--help` command line argument:
179  @code
180  mergeCoaddDetections.py --help
181  @endcode
182 
183  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
184  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
185  step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows:
186  @code
187  mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
188  @endcode
189  This will merge the HSC-I & -R band parent source catalogs and write the results to
190  `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
191 
192  The next step in the multi-band processing procedure is
193  @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask"
194  """
195  ConfigClass = MergeDetectionsConfig
196  RunnerClass = MergeSourcesRunner
197  _DefaultName = "mergeCoaddDetections"
198  inputDataset = "det"
199  outputDataset = "mergeDet"
200  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
201 
202  @classmethod
203  def _makeArgumentParser(cls):
204  return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
205 
206  def getInputSchema(self, butler=None, schema=None):
207  return getInputSchema(self, butler, schema)
208 
209  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
210  # Make PipelineTask-only wording less transitional after cmdlineTask is removed
211  """!
212  @brief Initialize the merge detections task.
213 
214  A @ref FootprintMergeList_ "FootprintMergeList" will be used to
215  merge the source catalogs.
216 
217  @param[in] schema the schema of the detection catalogs used as input to this one
218  @param[in] butler a butler used to read the input schema from disk, if schema is None
219  @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in
220  through the PipelineTask middleware
221  @param[in] **kwargs keyword arguments to be passed to CmdLineTask.__init__
222 
223  The task will set its own self.schema attribute to the schema of the output merged catalog.
224  """
225  super().__init__(**kwargs)
226  if initInputs is not None:
227  schema = initInputs['schema'].schema
228 
229  self.makeSubtask("skyObjects")
230  self.schema = self.getInputSchema(butler=butler, schema=schema)
231 
232  filterNames = [getShortFilterName(name) for name in self.config.priorityList]
233  filterNames += [self.config.skyFilterName]
234  self.merged = afwDetect.FootprintMergeList(self.schema, filterNames)
235  self.outputSchema = afwTable.SourceCatalog(self.schema)
236  self.outputPeakSchema = afwDetect.PeakCatalog(self.merged.getPeakSchema())
237 
238  def runDataRef(self, patchRefList):
239  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
240  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
241  idFactory = self.makeIdFactory(patchRefList[0])
242  skySeed = patchRefList[0].get(self.config.coaddName + "MergedCoaddId")
243  mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
244  self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
245 
246  def runQuantum(self, butlerQC, inputRefs, outputRefs):
247  inputs = butlerQC.get(inputRefs)
248  packedId, maxBits = butlerQC.quantum.dataId.pack("tract_patch", returnMaxBits=True)
249  inputs["skySeed"] = packedId
250  inputs["idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
251  catalogDict = {ref.dataId['abstract_filter']: cat for ref, cat in zip(inputRefs.catalogs,
252  inputs['catalogs'])}
253  inputs['catalogs'] = catalogDict
254  skyMap = inputs.pop('skyMap')
255  # Can use the first dataId to find the tract and patch being worked on
256  tractNumber = inputRefs.catalogs[0].dataId['tract']
257  tractInfo = skyMap[tractNumber]
258  patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId['patch'])
259  skyInfo = Struct(
260  skyMap=skyMap,
261  tractInfo=tractInfo,
262  patchInfo=patchInfo,
263  wcs=tractInfo.getWcs(),
264  bbox=patchInfo.getOuterBBox()
265  )
266  inputs['skyInfo'] = skyInfo
267 
268  outputs = self.run(**inputs)
269  butlerQC.put(outputs, outputRefs)
270 
271  def run(self, catalogs, skyInfo, idFactory, skySeed):
272  r"""!
273  @brief Merge multiple catalogs.
274 
275  After ordering the catalogs and filters in priority order,
276  @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by
277  @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove
278  garbage peaks detected around bright objects.
279 
280  @param[in] catalogs
281  @param[in] patchRef
282  @param[out] mergedList
283  """
284 
285  # Convert distance to tract coordinate
286  tractWcs = skyInfo.wcs
287  peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
288  samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
289 
290  # Put catalogs, filters in priority order
291  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
292  orderedBands = [getShortFilterName(band) for band in self.config.priorityList
293  if band in catalogs.keys()]
294 
295  mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
296  self.schema, idFactory,
297  samePeakDistance)
298 
299  #
300  # Add extra sources that correspond to blank sky
301  #
302  skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
303  if skySourceFootprints:
304  key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
305  for foot in skySourceFootprints:
306  s = mergedList.addNew()
307  s.setFootprint(foot)
308  s.set(key, True)
309 
310  # Sort Peaks from brightest to faintest
311  for record in mergedList:
312  record.getFootprint().sortPeaks()
313  self.log.info("Merged to %d sources" % len(mergedList))
314  # Attempt to remove garbage peaks
315  self.cullPeaks(mergedList)
316  return Struct(outputCatalog=mergedList)
317 
318  def cullPeaks(self, catalog):
319  """!
320  @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
321 
322  @param[in] catalog Source catalog
323  """
324  keys = [item.key for item in self.merged.getPeakSchema().extract("merge_peak_*").values()]
325  assert len(keys) > 0, "Error finding flags that associate peaks with their detection bands."
326  totalPeaks = 0
327  culledPeaks = 0
328  for parentSource in catalog:
329  # Make a list copy so we can clear the attached PeakCatalog and append the ones we're keeping
330  # to it (which is easier than deleting as we iterate).
331  keptPeaks = parentSource.getFootprint().getPeaks()
332  oldPeaks = list(keptPeaks)
333  keptPeaks.clear()
334  familySize = len(oldPeaks)
335  totalPeaks += familySize
336  for rank, peak in enumerate(oldPeaks):
337  if ((rank < self.config.cullPeaks.rankSufficient)
338  or (sum([peak.get(k) for k in keys]) >= self.config.cullPeaks.nBandsSufficient)
339  or (rank < self.config.cullPeaks.rankConsidered
340  and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
341  keptPeaks.append(peak)
342  else:
343  culledPeaks += 1
344  self.log.info("Culled %d of %d peaks" % (culledPeaks, totalPeaks))
345 
346  def getSchemaCatalogs(self):
347  """!
348  Return a dict of empty catalogs for each catalog dataset produced by this task.
349 
350  @param[out] dictionary of empty catalogs
351  """
352  mergeDet = afwTable.SourceCatalog(self.schema)
353  peak = afwDetect.PeakCatalog(self.merged.getPeakSchema())
354  return {self.config.coaddName + "Coadd_mergeDet": mergeDet,
355  self.config.coaddName + "Coadd_peak": peak}
356 
357  def getSkySourceFootprints(self, mergedList, skyInfo, seed):
358  """!
359  @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList
360 
361  @param mergedList The merged Footprints from all the input bands
362  @param skyInfo A description of the patch
363  @param seed Seed for the random number generator
364  """
365  mask = afwImage.Mask(skyInfo.patchInfo.getOuterBBox())
366  detected = mask.getPlaneBitMask("DETECTED")
367  for s in mergedList:
368  s.getFootprint().spans.setMask(mask, detected)
369 
370  footprints = self.skyObjects.run(mask, seed)
371  if not footprints:
372  return footprints
373 
374  # Need to convert the peak catalog's schema so we can set the "merge_peak_<skyFilterName>" flags
375  schema = self.merged.getPeakSchema()
376  mergeKey = schema.find("merge_peak_%s" % self.config.skyFilterName).key
377  converted = []
378  for oldFoot in footprints:
379  assert len(oldFoot.getPeaks()) == 1, "Should be a single peak only"
380  peak = oldFoot.getPeaks()[0]
381  newFoot = afwDetect.Footprint(oldFoot.spans, schema)
382  newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
383  newFoot.getPeaks()[0].set(mergeKey, True)
384  converted.append(newFoot)
385 
386  return converted
387 
388  def write(self, patchRef, catalog):
389  """!
390  @brief Write the output.
391 
392  @param[in] patchRef data reference for patch
393  @param[in] catalog catalog
394 
395  We write as the dataset provided by the 'outputDataset'
396  class variable.
397  """
398  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
399  # since the filter isn't actually part of the data ID for the dataset we're saving,
400  # it's confusing to see it in the log message, even if the butler simply ignores it.
401  mergeDataId = patchRef.dataId.copy()
402  del mergeDataId["filter"]
403  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
404 
405  def writeMetadata(self, dataRefList):
406  """!
407  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
408  """
409  pass
lsst::afw::image
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
Definition: imageAlgorithm.dox:1
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:198
lsst::afw::image::Mask
Represent a 2-dimensional array of bitmask pixels.
Definition: Mask.h:77
lsst.pipe.tasks.coaddBase.getSkyInfo
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded.
Definition: coaddBase.py:261
lsst::afw::detection::FootprintMergeList
List of Merged Footprints.
Definition: FootprintMerge.h:56
lsst.pipe.tasks.multiBandUtils.makeMergeArgumentParser
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
Definition: multiBandUtils.py:112
lsst.pipe.tasks.mergeDetections.write
def write(self, patchRef, catalog)
Write the output.
Definition: mergeDetections.py:388
lsst.pipe.tasks.multiBandUtils.getInputSchema
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
Definition: multiBandUtils.py:127
lsst.pipe.tasks.assembleCoadd.run
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
Definition: assembleCoadd.py:712
lsst.pipe.base.pipelineTask.PipelineTask
Definition: pipelineTask.py:32
lsst.pipe.base.struct.Struct
Definition: struct.py:26
lsst.pipe.tasks.multiBandUtils.getShortFilterName
def getShortFilterName(name)
Definition: multiBandUtils.py:141
lsst::afw::table._source.SourceCatalog
Definition: _source.py:33
pex.config.wrap.setDefaults
setDefaults
Definition: wrap.py:293
lsst.pipe.base.connections.PipelineTaskConnections
Definition: connections.py:254
lsst::afw::table
Definition: table.dox:3
lsst.pipe.tasks.multiBandUtils.readCatalog
def readCatalog(task, patchRef)
Read input catalog.
Definition: multiBandUtils.py:153
lsst.pipe.base.config.PipelineTaskConfig
Definition: config.py:113
lsst::afw::detection
Definition: Footprint.h:50
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.tasks.coaddBase
Definition: coaddBase.py:1
lsst.pipe.tasks.mergeDetections.MergeDetectionsConnections
Definition: mergeDetections.py:42
lsst::afw::detection::Footprint
Class to describe the properties of a detected object from an image.
Definition: Footprint.h:63
lsst.pipe.base
Definition: __init__.py:1
lsst::meas::algorithms
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
Definition: CoaddBoundedField.h:34
lsst::afw::table::CatalogT< PeakRecord >
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.pipe.base.connectionTypes
Definition: connectionTypes.py:1
lsst.pipe.tasks.mergeDetections.writeMetadata
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
Definition: mergeDetections.py:405
pex.config.wrap.validate
validate
Definition: wrap.py:295
lsst.pipe.base.cmdLineTask.CmdLineTask
Definition: cmdLineTask.py:492