LSSTApplications  18.1.0
LSSTDataManagementBasePackage
mergeDetections.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 from .multiBandUtils import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
25  getInputSchema, getShortFilterName, readCatalog)
26 
27 
28 import lsst.afw.detection as afwDetect
29 import lsst.afw.image as afwImage
30 import lsst.afw.table as afwTable
31 
32 from lsst.meas.algorithms import SkyObjectsTask
33 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
34 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, InitOutputDatasetField,
35  InputDatasetField, InitInputDatasetField, OutputDatasetField, Struct)
36 from lsst.pipe.tasks.coaddBase import getSkyInfo
37 
38 
39 class MergeDetectionsConfig(PipelineTaskConfig):
40  """!
41  @anchor MergeDetectionsConfig_
42 
43  @brief Configuration parameters for the MergeDetectionsTask.
44  """
45  minNewPeak = Field(dtype=float, default=1,
46  doc="Minimum distance from closest peak to create a new one (in arcsec).")
47 
48  maxSamePeak = Field(dtype=float, default=0.3,
49  doc="When adding new catalogs to the merge, all peaks less than this distance "
50  " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
51  cullPeaks = ConfigField(dtype=CullPeaksConfig, doc="Configuration for how to cull peaks.")
52 
53  skyFilterName = Field(dtype=str, default="sky",
54  doc="Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
55  "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
56  skyObjects = ConfigurableField(target=SkyObjectsTask, doc="Generate sky objects")
57  priorityList = ListField(dtype=str, default=[],
58  doc="Priority-ordered list of bands for the merge.")
59  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
60 
62  doc="Schema of the input detection catalog",
63  nameTemplate="{inputCoaddName}Coadd_det_schema",
64  storageClass="SourceCatalog"
65  )
66 
67  outputSchema = InitOutputDatasetField(
68  doc="Schema of the merged detection catalog",
69  nameTemplate="{outputCoaddName}Coadd_mergeDet_schema",
70  storageClass="SourceCatalog"
71  )
72 
73  outputPeakSchema = InitOutputDatasetField(
74  doc="Output schema of the Footprint peak catalog",
75  nameTemplate="{outputCoaddName}Coadd_peak_schema",
76  storageClass="PeakCatalog"
77  )
78 
79  catalogs = InputDatasetField(
80  doc="Detection Catalogs to be merged",
81  nameTemplate="{inputCoaddName}Coadd_det",
82  storageClass="SourceCatalog",
83  dimensions=("tract", "patch", "skymap", "abstract_filter")
84  )
85 
87  doc="SkyMap to be used in merging",
88  nameTemplate="{inputCoaddName}Coadd_skyMap",
89  storageClass="SkyMap",
90  dimensions=("skymap",),
91  scalar=True
92  )
93 
94  outputCatalog = OutputDatasetField(
95  doc="Merged Detection catalog",
96  nameTemplate="{outputCoaddName}Coadd_mergeDet",
97  storageClass="SourceCatalog",
98  dimensions=("tract", "patch", "skymap"),
99  scalar=True
100  )
101 
102  def setDefaults(self):
103  Config.setDefaults(self)
104  self.formatTemplateNames({"inputCoaddName": 'deep', "outputCoaddName": "deep"})
105  self.skyObjects.avoidMask = ["DETECTED"] # Nothing else is available in our custom mask
106  self.quantum.dimensions = ("tract", "patch", "skymap")
107 
108  def validate(self):
109  super().validate()
110  if len(self.priorityList) == 0:
111  raise RuntimeError("No priority list provided")
112 
113 
114 class MergeDetectionsTask(PipelineTask, CmdLineTask):
115  r"""!
116  @anchor MergeDetectionsTask_
117 
118  @brief Merge coadd detections from multiple bands.
119 
120  @section pipe_tasks_multiBand_Contents Contents
121 
122  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
123  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init
124  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run
125  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config
126  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
127  - @ref pipe_tasks_multiband_MergeDetectionsTask_Example
128 
129  @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
130 
131  Command-line task that merges sources detected in coadds of exposures obtained with different filters.
132 
133  To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of
134  sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping
135  track of which band each source originates in.
136 
137  The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright
138  objects are culled as described in @ref CullPeaksConfig_.
139 
140  @par Inputs:
141  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
142  @par Outputs:
143  deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
144  @par Data Unit:
145  tract, patch
146 
147  @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
148 
149  @copydoc \_\_init\_\_
150 
151  @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
152 
153  @copydoc run
154 
155  @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
156 
157  See @ref MergeDetectionsConfig_
158 
159  @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
160 
161  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d
162  to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files.
163 
164  MergeDetectionsTask has no debug variables.
165 
166  @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
167 
168  MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset
169  of the available bands.
170  The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the
171  chosen subset of filters.
172  Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources
173  and, eventually, perform forced photometry.
174  Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed.
175  A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the
176  `--help` command line argument:
177  @code
178  mergeCoaddDetections.py --help
179  @endcode
180 
181  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
182  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
183  step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows:
184  @code
185  mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
186  @endcode
187  This will merge the HSC-I & -R band parent source catalogs and write the results to
188  `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
189 
190  The next step in the multi-band processing procedure is
191  @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask"
192  """
193  ConfigClass = MergeDetectionsConfig
194  RunnerClass = MergeSourcesRunner
195  _DefaultName = "mergeCoaddDetections"
196  inputDataset = "det"
197  outputDataset = "mergeDet"
198  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
199 
200  @classmethod
201  def _makeArgumentParser(cls):
203 
204  def getInputSchema(self, butler=None, schema=None):
205  return getInputSchema(self, butler, schema)
206 
207  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
208  # Make PipelineTask-only wording less transitional after cmdlineTask is removed
209  """!
210  @brief Initialize the merge detections task.
211 
212  A @ref FootprintMergeList_ "FootprintMergeList" will be used to
213  merge the source catalogs.
214 
215  @param[in] schema the schema of the detection catalogs used as input to this one
216  @param[in] butler a butler used to read the input schema from disk, if schema is None
217  @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in
218  through the PipelineTask middleware
219  @param[in] **kwargs keyword arguments to be passed to CmdLineTask.__init__
220 
221  The task will set its own self.schema attribute to the schema of the output merged catalog.
222  """
223  super().__init__(**kwargs)
224  if initInputs is not None:
225  schema = initInputs['schema'].schema
226 
227  self.makeSubtask("skyObjects")
228  self.schema = self.getInputSchema(butler=butler, schema=schema)
229 
230  filterNames = [getShortFilterName(name) for name in self.config.priorityList]
231  filterNames += [self.config.skyFilterName]
232  self.merged = afwDetect.FootprintMergeList(self.schema, filterNames)
233 
235  return {"outputSchema": afwTable.SourceCatalog(self.schema),
236  "outputPeakSchema": afwDetect.PeakCatalog(self.merged.getPeakSchema())}
237 
238  def runDataRef(self, patchRefList):
239  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
240  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
241  idFactory = self.makeIdFactory(patchRefList[0])
242  skySeed = patchRefList[0].get(self.config.coaddName + "MergedCoaddId")
243  mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
244  self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
245 
246  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
247  packedId, maxBits = butler.registry.packDataId("tract_patch", outputDataIds['outputCatalog'],
248  returnMaxBits=True)
249  inputData["skySeed"] = packedId
250  inputData["idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
251  catalogDict = {dataId['abstract_filter']: cat for dataId, cat in zip(inputDataIds['catalogs'],
252  inputData['catalogs'])}
253  inputData['catalogs'] = catalogDict
254  skyMap = inputData.pop('skyMap')
255  # Can use the first dataId to find the tract and patch being worked on
256  tractNumber = inputDataIds['catalogs'][0]['tract']
257  tractInfo = skyMap[tractNumber]
258  patchInfo = tractInfo.getPatchInfo(inputDataIds['catalogs'][0]['patch'])
259  skyInfo = Struct(
260  skyMap=skyMap,
261  tractInfo=tractInfo,
262  patchInfo=patchInfo,
263  wcs=tractInfo.getWcs(),
264  bbox=patchInfo.getOuterBBox()
265  )
266  inputData['skyInfo'] = skyInfo
267  return self.run(**inputData)
268 
269  def run(self, catalogs, skyInfo, idFactory, skySeed):
270  r"""!
271  @brief Merge multiple catalogs.
272 
273  After ordering the catalogs and filters in priority order,
274  @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by
275  @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove
276  garbage peaks detected around bright objects.
277 
278  @param[in] catalogs
279  @param[in] patchRef
280  @param[out] mergedList
281  """
282 
283  # Convert distance to tract coordinate
284  tractWcs = skyInfo.wcs
285  peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
286  samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
287 
288  # Put catalogs, filters in priority order
289  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
290  orderedBands = [getShortFilterName(band) for band in self.config.priorityList
291  if band in catalogs.keys()]
292 
293  mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
294  self.schema, idFactory,
295  samePeakDistance)
296 
297  #
298  # Add extra sources that correspond to blank sky
299  #
300  skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
301  if skySourceFootprints:
302  key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
303  for foot in skySourceFootprints:
304  s = mergedList.addNew()
305  s.setFootprint(foot)
306  s.set(key, True)
307 
308  # Sort Peaks from brightest to faintest
309  for record in mergedList:
310  record.getFootprint().sortPeaks()
311  self.log.info("Merged to %d sources" % len(mergedList))
312  # Attempt to remove garbage peaks
313  self.cullPeaks(mergedList)
314  return Struct(outputCatalog=mergedList)
315 
316  def cullPeaks(self, catalog):
317  """!
318  @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
319 
320  @param[in] catalog Source catalog
321  """
322  keys = [item.key for item in self.merged.getPeakSchema().extract("merge_peak_*").values()]
323  assert len(keys) > 0, "Error finding flags that associate peaks with their detection bands."
324  totalPeaks = 0
325  culledPeaks = 0
326  for parentSource in catalog:
327  # Make a list copy so we can clear the attached PeakCatalog and append the ones we're keeping
328  # to it (which is easier than deleting as we iterate).
329  keptPeaks = parentSource.getFootprint().getPeaks()
330  oldPeaks = list(keptPeaks)
331  keptPeaks.clear()
332  familySize = len(oldPeaks)
333  totalPeaks += familySize
334  for rank, peak in enumerate(oldPeaks):
335  if ((rank < self.config.cullPeaks.rankSufficient) or
336  (sum([peak.get(k) for k in keys]) >= self.config.cullPeaks.nBandsSufficient) or
337  (rank < self.config.cullPeaks.rankConsidered and
338  rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
339  keptPeaks.append(peak)
340  else:
341  culledPeaks += 1
342  self.log.info("Culled %d of %d peaks" % (culledPeaks, totalPeaks))
343 
344  def getSchemaCatalogs(self):
345  """!
346  Return a dict of empty catalogs for each catalog dataset produced by this task.
347 
348  @param[out] dictionary of empty catalogs
349  """
350  mergeDet = afwTable.SourceCatalog(self.schema)
351  peak = afwDetect.PeakCatalog(self.merged.getPeakSchema())
352  return {self.config.coaddName + "Coadd_mergeDet": mergeDet,
353  self.config.coaddName + "Coadd_peak": peak}
354 
355  def getSkySourceFootprints(self, mergedList, skyInfo, seed):
356  """!
357  @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList
358 
359  @param mergedList The merged Footprints from all the input bands
360  @param skyInfo A description of the patch
361  @param seed Seed for the random number generator
362  """
363  mask = afwImage.Mask(skyInfo.patchInfo.getOuterBBox())
364  detected = mask.getPlaneBitMask("DETECTED")
365  for s in mergedList:
366  s.getFootprint().spans.setMask(mask, detected)
367 
368  footprints = self.skyObjects.run(mask, seed)
369  if not footprints:
370  return footprints
371 
372  # Need to convert the peak catalog's schema so we can set the "merge_peak_<skyFilterName>" flags
373  schema = self.merged.getPeakSchema()
374  mergeKey = schema.find("merge_peak_%s" % self.config.skyFilterName).key
375  converted = []
376  for oldFoot in footprints:
377  assert len(oldFoot.getPeaks()) == 1, "Should be a single peak only"
378  peak = oldFoot.getPeaks()[0]
379  newFoot = afwDetect.Footprint(oldFoot.spans, schema)
380  newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
381  newFoot.getPeaks()[0].set(mergeKey, True)
382  converted.append(newFoot)
383 
384  return converted
385 
386  def write(self, patchRef, catalog):
387  """!
388  @brief Write the output.
389 
390  @param[in] patchRef data reference for patch
391  @param[in] catalog catalog
392 
393  We write as the dataset provided by the 'outputDataset'
394  class variable.
395  """
396  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
397  # since the filter isn't actually part of the data ID for the dataset we're saving,
398  # it's confusing to see it in the log message, even if the butler simply ignores it.
399  mergeDataId = patchRef.dataId.copy()
400  del mergeDataId["filter"]
401  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
402 
403  def writeMetadata(self, dataRefList):
404  """!
405  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
406  """
407  pass
def InitOutputDatasetField
Definition: config.py:339
def getSchemaCatalogs(self)
Return a dict of empty catalogs for each catalog dataset produced by this task.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
Merge coadd detections from multiple bands.
def readCatalog(task, patchRef)
Read input catalog.
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
Definition: fits.cc:884
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def __init__(self, butler=None, schema=None, initInputs=None, kwargs)
Initialize the merge detections task.
Represent a 2-dimensional array of bitmask pixels.
Definition: Mask.h:78
Class to describe the properties of a detected object from an image.
Definition: Footprint.h:62
def run(self, catalogs, skyInfo, idFactory, skySeed)
Merge multiple catalogs.
Configuration parameters for the MergeDetectionsTask.
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded...
Definition: coaddBase.py:231
def cullPeaks(self, catalog)
Attempt to remove garbage peaks (mostly on the outskirts of large blends).
def write(self, patchRef, catalog)
Write the output.
def getInputSchema(self, butler=None, schema=None)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
daf::base::PropertyList * list
Definition: fits.cc:885
def getSkySourceFootprints(self, mergedList, skyInfo, seed)
Return a list of Footprints of sky objects which don&#39;t overlap with anything in mergedList.