LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
mergeDetections.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 from .multiBandUtils import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
25  getInputSchema, readCatalog)
26 
27 
28 import lsst.afw.detection as afwDetect
29 import lsst.afw.image as afwImage
30 import lsst.afw.table as afwTable
31 
32 from lsst.meas.algorithms import SkyObjectsTask
33 from lsst.skymap import BaseSkyMap
34 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
35 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
36  PipelineTaskConnections)
38 from lsst.pipe.tasks.coaddBase import getSkyInfo
39 from lsst.obs.base import ExposureIdInfo
40 
41 
43  dimensions=("tract", "patch", "skymap"),
44  defaultTemplates={"inputCoaddName": 'deep', "outputCoaddName": "deep"}):
45  schema = cT.InitInput(
46  doc="Schema of the input detection catalog",
47  name="{inputCoaddName}Coadd_det_schema",
48  storageClass="SourceCatalog"
49  )
50 
51  outputSchema = cT.InitOutput(
52  doc="Schema of the merged detection catalog",
53  name="{outputCoaddName}Coadd_mergeDet_schema",
54  storageClass="SourceCatalog"
55  )
56 
57  outputPeakSchema = cT.InitOutput(
58  doc="Output schema of the Footprint peak catalog",
59  name="{outputCoaddName}Coadd_peak_schema",
60  storageClass="PeakCatalog"
61  )
62 
63  catalogs = cT.Input(
64  doc="Detection Catalogs to be merged",
65  name="{inputCoaddName}Coadd_det",
66  storageClass="SourceCatalog",
67  dimensions=("tract", "patch", "skymap", "band"),
68  multiple=True
69  )
70 
71  skyMap = cT.Input(
72  doc="SkyMap to be used in merging",
73  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
74  storageClass="SkyMap",
75  dimensions=("skymap",),
76  )
77 
78  outputCatalog = cT.Output(
79  doc="Merged Detection catalog",
80  name="{outputCoaddName}Coadd_mergeDet",
81  storageClass="SourceCatalog",
82  dimensions=("tract", "patch", "skymap"),
83  )
84 
85 
86 class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
87  """!
88  @anchor MergeDetectionsConfig_
89 
90  @brief Configuration parameters for the MergeDetectionsTask.
91  """
92  minNewPeak = Field(dtype=float, default=1,
93  doc="Minimum distance from closest peak to create a new one (in arcsec).")
94 
95  maxSamePeak = Field(dtype=float, default=0.3,
96  doc="When adding new catalogs to the merge, all peaks less than this distance "
97  " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
98  cullPeaks = ConfigField(dtype=CullPeaksConfig, doc="Configuration for how to cull peaks.")
99 
100  skyFilterName = Field(dtype=str, default="sky",
101  doc="Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
102  "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
103  skyObjects = ConfigurableField(target=SkyObjectsTask, doc="Generate sky objects")
104  priorityList = ListField(dtype=str, default=[],
105  doc="Priority-ordered list of filter bands for the merge.")
106  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
107 
108  def setDefaults(self):
109  Config.setDefaults(self)
110  self.skyObjects.avoidMask = ["DETECTED"] # Nothing else is available in our custom mask
111 
112  def validate(self):
113  super().validate()
114  if len(self.priorityList) == 0:
115  raise RuntimeError("No priority list provided")
116 
117 
118 class MergeDetectionsTask(PipelineTask, CmdLineTask):
119  r"""!
120  @anchor MergeDetectionsTask_
121 
122  @brief Merge coadd detections from multiple bands.
123 
124  @section pipe_tasks_multiBand_Contents Contents
125 
126  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
127  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init
128  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run
129  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config
130  - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
131  - @ref pipe_tasks_multiband_MergeDetectionsTask_Example
132 
133  @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
134 
135  Command-line task that merges sources detected in coadds of exposures obtained with different filters.
136 
137  To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of
138  sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping
139  track of which band each source originates in.
140 
141  The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright
142  objects are culled as described in @ref CullPeaksConfig_.
143 
144  @par Inputs:
145  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
146  @par Outputs:
147  deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
148  @par Data Unit:
149  tract, patch
150 
151  @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
152 
153  @copydoc \_\_init\_\_
154 
155  @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
156 
157  @copydoc run
158 
159  @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
160 
161  See @ref MergeDetectionsConfig_
162 
163  @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
164 
165  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d
166  to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files.
167 
168  MergeDetectionsTask has no debug variables.
169 
170  @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
171 
172  MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset
173  of the available bands.
174  The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the
175  chosen subset of filters.
176  Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources
177  and, eventually, perform forced photometry.
178  Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed.
179  A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the
180  `--help` command line argument:
181  @code
182  mergeCoaddDetections.py --help
183  @endcode
184 
185  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
186  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
187  step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows:
188  @code
189  mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
190  @endcode
191  This will merge the HSC-I & -R band parent source catalogs and write the results to
192  `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
193 
194  The next step in the multi-band processing procedure is
195  @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask"
196  """
197  ConfigClass = MergeDetectionsConfig
198  RunnerClass = MergeSourcesRunner
199  _DefaultName = "mergeCoaddDetections"
200  inputDataset = "det"
201  outputDataset = "mergeDet"
202  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
203 
204  @classmethod
205  def _makeArgumentParser(cls):
206  return makeMergeArgumentParser(cls._DefaultName, cls.inputDataset)
207 
208  def getInputSchema(self, butler=None, schema=None):
209  return getInputSchema(self, butler, schema)
210 
211  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
212  # Make PipelineTask-only wording less transitional after cmdlineTask is removed
213  """!
214  @brief Initialize the merge detections task.
215 
216  A @ref FootprintMergeList_ "FootprintMergeList" will be used to
217  merge the source catalogs.
218 
219  @param[in] schema the schema of the detection catalogs used as input to this one
220  @param[in] butler a butler used to read the input schema from disk, if schema is None
221  @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in
222  through the PipelineTask middleware
223  @param[in] **kwargs keyword arguments to be passed to CmdLineTask.__init__
224 
225  The task will set its own self.schema attribute to the schema of the output merged catalog.
226  """
227  super().__init__(**kwargs)
228  if initInputs is not None:
229  schema = initInputs['schema'].schema
230 
231  self.makeSubtask("skyObjects")
232  self.schema = self.getInputSchema(butler=butler, schema=schema)
233 
234  filterNames = list(self.config.priorityList)
235  filterNames.append(self.config.skyFilterName)
236  self.merged = afwDetect.FootprintMergeList(self.schema, filterNames)
237  self.outputSchema = afwTable.SourceCatalog(self.schema)
238  self.outputPeakSchema = afwDetect.PeakCatalog(self.merged.getPeakSchema())
239 
240  def runDataRef(self, patchRefList):
241  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
242  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
243  idFactory = self.makeIdFactory(patchRefList[0])
244  skySeed = patchRefList[0].get(self.config.coaddName + "MergedCoaddId")
245  mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
246  self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
247 
248  def runQuantum(self, butlerQC, inputRefs, outputRefs):
249  inputs = butlerQC.get(inputRefs)
250  exposureIdInfo = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, "tract_patch")
251  inputs["skySeed"] = exposureIdInfo.expId
252  inputs["idFactory"] = exposureIdInfo.makeSourceIdFactory()
253  catalogDict = {ref.dataId['band']: cat for ref, cat in zip(inputRefs.catalogs,
254  inputs['catalogs'])}
255  inputs['catalogs'] = catalogDict
256  skyMap = inputs.pop('skyMap')
257  # Can use the first dataId to find the tract and patch being worked on
258  tractNumber = inputRefs.catalogs[0].dataId['tract']
259  tractInfo = skyMap[tractNumber]
260  patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId['patch'])
261  skyInfo = Struct(
262  skyMap=skyMap,
263  tractInfo=tractInfo,
264  patchInfo=patchInfo,
265  wcs=tractInfo.getWcs(),
266  bbox=patchInfo.getOuterBBox()
267  )
268  inputs['skyInfo'] = skyInfo
269 
270  outputs = self.run(**inputs)
271  butlerQC.put(outputs, outputRefs)
272 
273  def run(self, catalogs, skyInfo, idFactory, skySeed):
274  r"""!
275  @brief Merge multiple catalogs.
276 
277  After ordering the catalogs and filters in priority order,
278  @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by
279  @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove
280  garbage peaks detected around bright objects.
281 
282  @param[in] catalogs
283  @param[in] patchRef
284  @param[out] mergedList
285  """
286 
287  # Convert distance to tract coordinate
288  tractWcs = skyInfo.wcs
289  peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
290  samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
291 
292  # Put catalogs, filters in priority order
293  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
294  orderedBands = [band for band in self.config.priorityList if band in catalogs.keys()]
295 
296  mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
297  self.schema, idFactory,
298  samePeakDistance)
299 
300  #
301  # Add extra sources that correspond to blank sky
302  #
303  skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
304  if skySourceFootprints:
305  key = mergedList.schema.find("merge_footprint_%s" % self.config.skyFilterName).key
306  for foot in skySourceFootprints:
307  s = mergedList.addNew()
308  s.setFootprint(foot)
309  s.set(key, True)
310 
311  # Sort Peaks from brightest to faintest
312  for record in mergedList:
313  record.getFootprint().sortPeaks()
314  self.log.info("Merged to %d sources" % len(mergedList))
315  # Attempt to remove garbage peaks
316  self.cullPeaks(mergedList)
317  return Struct(outputCatalog=mergedList)
318 
319  def cullPeaks(self, catalog):
320  """!
321  @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
322 
323  @param[in] catalog Source catalog
324  """
325  keys = [item.key for item in self.merged.getPeakSchema().extract("merge_peak_*").values()]
326  assert len(keys) > 0, "Error finding flags that associate peaks with their detection bands."
327  totalPeaks = 0
328  culledPeaks = 0
329  for parentSource in catalog:
330  # Make a list copy so we can clear the attached PeakCatalog and append the ones we're keeping
331  # to it (which is easier than deleting as we iterate).
332  keptPeaks = parentSource.getFootprint().getPeaks()
333  oldPeaks = list(keptPeaks)
334  keptPeaks.clear()
335  familySize = len(oldPeaks)
336  totalPeaks += familySize
337  for rank, peak in enumerate(oldPeaks):
338  if ((rank < self.config.cullPeaks.rankSufficient)
339  or (sum([peak.get(k) for k in keys]) >= self.config.cullPeaks.nBandsSufficient)
340  or (rank < self.config.cullPeaks.rankConsidered
341  and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
342  keptPeaks.append(peak)
343  else:
344  culledPeaks += 1
345  self.log.info("Culled %d of %d peaks" % (culledPeaks, totalPeaks))
346 
347  def getSchemaCatalogs(self):
348  """!
349  Return a dict of empty catalogs for each catalog dataset produced by this task.
350 
351  @param[out] dictionary of empty catalogs
352  """
353  mergeDet = afwTable.SourceCatalog(self.schema)
354  peak = afwDetect.PeakCatalog(self.merged.getPeakSchema())
355  return {self.config.coaddName + "Coadd_mergeDet": mergeDet,
356  self.config.coaddName + "Coadd_peak": peak}
357 
358  def getSkySourceFootprints(self, mergedList, skyInfo, seed):
359  """!
360  @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList
361 
362  @param mergedList The merged Footprints from all the input bands
363  @param skyInfo A description of the patch
364  @param seed Seed for the random number generator
365  """
366  mask = afwImage.Mask(skyInfo.patchInfo.getOuterBBox())
367  detected = mask.getPlaneBitMask("DETECTED")
368  for s in mergedList:
369  s.getFootprint().spans.setMask(mask, detected)
370 
371  footprints = self.skyObjects.run(mask, seed)
372  if not footprints:
373  return footprints
374 
375  # Need to convert the peak catalog's schema so we can set the "merge_peak_<skyFilterName>" flags
376  schema = self.merged.getPeakSchema()
377  mergeKey = schema.find("merge_peak_%s" % self.config.skyFilterName).key
378  converted = []
379  for oldFoot in footprints:
380  assert len(oldFoot.getPeaks()) == 1, "Should be a single peak only"
381  peak = oldFoot.getPeaks()[0]
382  newFoot = afwDetect.Footprint(oldFoot.spans, schema)
383  newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
384  newFoot.getPeaks()[0].set(mergeKey, True)
385  converted.append(newFoot)
386 
387  return converted
388 
389  def write(self, patchRef, catalog):
390  """!
391  @brief Write the output.
392 
393  @param[in] patchRef data reference for patch
394  @param[in] catalog catalog
395 
396  We write as the dataset provided by the 'outputDataset'
397  class variable.
398  """
399  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
400  # since the filter isn't actually part of the data ID for the dataset we're saving,
401  # it's confusing to see it in the log message, even if the butler simply ignores it.
402  mergeDataId = patchRef.dataId.copy()
403  del mergeDataId["filter"]
404  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
405 
406  def writeMetadata(self, dataRefList):
407  """!
408  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
409  """
410  pass
Class to describe the properties of a detected object from an image.
Definition: Footprint.h:63
Represent a 2-dimensional array of bitmask pixels.
Definition: Mask.h:77
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded.
Definition: coaddBase.py:271
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.