22__all__ = [
"MergeDetectionsConfig",
"MergeDetectionsTask"]
25from numpy.lib.recfunctions
import rec_join
27from .multiBandUtils
import CullPeaksConfig
35from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
36from lsst.pipe.base import (PipelineTask, PipelineTaskConfig, Struct,
37 PipelineTaskConnections)
38import lsst.pipe.base.connectionTypes
as cT
43 """Match two catalogs derived from the same mergeDet catalog.
45 When testing downstream features, like deblending methods/parameters
46 and measurement algorithms/parameters, it is useful to to compare
47 the same sources in two catalogs. In most cases this must be done
48 by matching on either RA/DEC or XY positions, which occassionally
49 will mismatch one source with another.
51 For a more robust solution, as long as the downstream catalog is
52 derived from the same mergeDet catalog, exact source matching
53 can be done via the unique ``(parent, deblend_peakID)``
54 combination. So this function performs this exact matching for
55 all sources both catalogs.
59 catalog1, catalog2 : `lsst.afw.table.SourceCatalog`
60 The two catalogs to merge
61 patch1, patch2 : `array` of `int`
62 Patch for each row, converted into an integer.
66 result : `list` of `lsst.afw.table.SourceMatch`
67 List of matches for each source (using an inner join).
71 sidx1 = catalog1[
"parent"] != 0
72 sidx2 = catalog2[
"parent"] != 0
75 parents1 = np.array(catalog1[
"parent"][sidx1])
76 peaks1 = np.array(catalog1[
"deblend_peakId"][sidx1])
77 index1 = np.arange(len(catalog1))[sidx1]
78 parents2 = np.array(catalog2[
"parent"][sidx2])
79 peaks2 = np.array(catalog2[
"deblend_peakId"][sidx2])
80 index2 = np.arange(len(catalog2))[sidx2]
82 if patch1
is not None:
84 msg = (
"If the catalogs are from different patches then patch1 and patch2 must be specified"
85 ", got {} and {}").format(patch1, patch2)
87 patch1 = patch1[sidx1]
88 patch2 = patch2[sidx2]
90 key1 = np.rec.array((parents1, peaks1, patch1, index1),
91 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
92 (
"patch", patch1.dtype), (
"index", np.int32)])
93 key2 = np.rec.array((parents2, peaks2, patch2, index2),
94 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
95 (
"patch", patch2.dtype), (
"index", np.int32)])
96 matchColumns = (
"parent",
"peakId",
"patch")
98 key1 = np.rec.array((parents1, peaks1, index1),
99 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
100 key2 = np.rec.array((parents2, peaks2, index2),
101 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
102 matchColumns = (
"parent",
"peakId")
107 matched = rec_join(matchColumns, key1, key2, jointype=
"inner")
110 indices1 = matched[
"index1"]
111 indices2 = matched[
"index2"]
116 for i1, i2
in zip(indices1, indices2)
123 dimensions=(
"tract",
"patch",
"skymap"),
124 defaultTemplates={
"inputCoaddName":
'deep',
"outputCoaddName":
"deep"}):
125 schema = cT.InitInput(
126 doc=
"Schema of the input detection catalog",
127 name=
"{inputCoaddName}Coadd_det_schema",
128 storageClass=
"SourceCatalog"
131 outputSchema = cT.InitOutput(
132 doc=
"Schema of the merged detection catalog",
133 name=
"{outputCoaddName}Coadd_mergeDet_schema",
134 storageClass=
"SourceCatalog"
137 outputPeakSchema = cT.InitOutput(
138 doc=
"Output schema of the Footprint peak catalog",
139 name=
"{outputCoaddName}Coadd_peak_schema",
140 storageClass=
"PeakCatalog"
144 doc=
"Detection Catalogs to be merged",
145 name=
"{inputCoaddName}Coadd_det",
146 storageClass=
"SourceCatalog",
147 dimensions=(
"tract",
"patch",
"skymap",
"band"),
152 doc=
"SkyMap to be used in merging",
153 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
154 storageClass=
"SkyMap",
155 dimensions=(
"skymap",),
158 outputCatalog = cT.Output(
159 doc=
"Merged Detection catalog",
160 name=
"{outputCoaddName}Coadd_mergeDet",
161 storageClass=
"SourceCatalog",
162 dimensions=(
"tract",
"patch",
"skymap"),
166class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
167 """Configuration parameters for the MergeDetectionsTask.
169 minNewPeak =
Field(dtype=float, default=1,
170 doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
172 maxSamePeak =
Field(dtype=float, default=0.3,
173 doc=
"When adding new catalogs to the merge, all peaks less than this distance "
174 " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
175 cullPeaks =
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
177 skyFilterName =
Field(dtype=str, default=
"sky",
178 doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
179 "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
180 skyObjects =
ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
181 priorityList =
ListField(dtype=str, default=[],
182 doc=
"Priority-ordered list of filter bands for the merge.")
183 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
184 idGenerator = SkyMapIdGeneratorConfig.make_field()
186 def setDefaults(self):
187 Config.setDefaults(self)
188 self.skyObjects.avoidMask = [
"DETECTED"]
192 if len(self.priorityList) == 0:
193 raise RuntimeError(
"No priority list provided")
196class MergeDetectionsTask(PipelineTask):
197 """Merge sources detected in coadds of exposures obtained with different filters.
199 Merge sources detected in coadds of exposures obtained with different
200 filters. To perform photometry consistently across coadds in multiple
201 filter bands, we create a master catalog of sources from all bands by
202 merging the sources (peaks & footprints) detected in each coadd, while
203 keeping track of which band each source originates in. The catalog merge
205 `~lsst.afw.detection.FootprintMergeList.getMergedSourceCatalog`. Spurious
206 peaks detected around bright objects are culled as described in
207 `~lsst.pipe.tasks.multiBandUtils.CullPeaksConfig`.
209 MergeDetectionsTask is meant to be run after detecting sources in coadds
210 generated for the chosen subset of the available bands. The purpose of the
211 task is to merge sources (peaks & footprints) detected in the coadds
212 generated from the chosen subset of filters. Subsequent tasks in the
213 multi-band processing procedure will deblend the generated master list of
214 sources and, eventually, perform forced photometry.
218 schema : `lsst.afw.table.Schema`, optional
219 The schema of the detection catalogs used as input to this task.
220 initInputs : `dict`, optional
221 Dictionary that can contain a key ``schema`` containing the
222 input schema. If present will override the value of ``schema``.
224 Additional keyword arguments.
226 ConfigClass = MergeDetectionsConfig
227 _DefaultName =
"mergeCoaddDetections"
229 def __init__(self, schema=None, initInputs=None, **kwargs):
230 super().__init__(**kwargs)
232 if initInputs
is not None:
233 schema = initInputs[
'schema'].schema
236 raise ValueError(
"No input schema or initInputs['schema'] provided.")
240 self.makeSubtask(
"skyObjects")
242 filterNames = list(self.config.priorityList)
243 filterNames.append(self.config.skyFilterName)
244 self.merged = afwDetect.FootprintMergeList(self.schema, filterNames)
246 self.outputPeakSchema = afwDetect.PeakCatalog(self.merged.getPeakSchema())
248 def runQuantum(self, butlerQC, inputRefs, outputRefs):
249 inputs = butlerQC.get(inputRefs)
250 idGenerator = self.config.idGenerator.apply(butlerQC.quantum.dataId)
251 inputs[
"skySeed"] = idGenerator.catalog_id
252 inputs[
"idFactory"] = idGenerator.make_table_id_factory()
253 catalogDict = {ref.dataId[
'band']: cat
for ref, cat
in zip(inputRefs.catalogs,
255 inputs[
'catalogs'] = catalogDict
256 skyMap = inputs.pop(
'skyMap')
258 tractNumber = inputRefs.catalogs[0].dataId[
'tract']
259 tractInfo = skyMap[tractNumber]
260 patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
265 wcs=tractInfo.getWcs(),
266 bbox=patchInfo.getOuterBBox()
268 inputs[
'skyInfo'] = skyInfo
270 outputs = self.run(**inputs)
271 butlerQC.put(outputs, outputRefs)
273 def run(self, catalogs, skyInfo, idFactory, skySeed):
274 """Merge multiple catalogs.
276 After ordering the catalogs and filters in priority order,
277 ``getMergedSourceCatalog`` of the
278 `~lsst.afw.detection.FootprintMergeList` created by ``__init__`` is
279 used to perform the actual merging. Finally, `cullPeaks` is used to
280 remove garbage peaks detected around bright objects.
284 catalogs : `lsst.afw.table.SourceCatalog`
285 Catalogs to be merged.
286 mergedList : `lsst.afw.table.SourceCatalog`
291 result : `lsst.pipe.base.Struct`
292 Results as a struct with attributes:
295 Merged catalogs (`lsst.afw.table.SourceCatalog`).
298 tractWcs = skyInfo.wcs
299 peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
300 samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
303 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
304 orderedBands = [band
for band
in self.config.priorityList
if band
in catalogs.keys()]
306 mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
307 self.schema, idFactory,
313 skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
314 if skySourceFootprints:
315 key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
316 for foot
in skySourceFootprints:
317 s = mergedList.addNew()
322 for record
in mergedList:
323 record.getFootprint().sortPeaks()
324 self.log.info(
"Merged to %d sources", len(mergedList))
326 self.cullPeaks(mergedList)
327 return Struct(outputCatalog=mergedList)
329 def cullPeaks(self, catalog):
330 """Attempt to remove garbage peaks (mostly on the outskirts of large blends).
334 catalog : `lsst.afw.table.SourceCatalog`
337 keys = [item.key
for item
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
338 assert len(keys) > 0,
"Error finding flags that associate peaks with their detection bands."
341 for parentSource
in catalog:
344 keptPeaks = parentSource.getFootprint().getPeaks()
345 oldPeaks = list(keptPeaks)
347 familySize = len(oldPeaks)
348 totalPeaks += familySize
349 for rank, peak
in enumerate(oldPeaks):
350 if ((rank < self.config.cullPeaks.rankSufficient)
351 or (sum([peak.get(k)
for k
in keys]) >= self.config.cullPeaks.nBandsSufficient)
352 or (rank < self.config.cullPeaks.rankConsidered
353 and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
354 keptPeaks.append(peak)
357 self.log.info(
"Culled %d of %d peaks", culledPeaks, totalPeaks)
359 def getSkySourceFootprints(self, mergedList, skyInfo, seed):
360 """Return a list of Footprints of sky objects which don't overlap with anything in mergedList.
364 mergedList : `lsst.afw.table.SourceCatalog`
365 The merged Footprints from all the input bands.
366 skyInfo : `lsst.pipe.base.Struct`
367 A description of the patch.
369 Seed for the random number generator.
372 detected = mask.getPlaneBitMask(
"DETECTED")
374 s.getFootprint().spans.setMask(mask, detected)
376 footprints = self.skyObjects.run(mask, seed)
381 schema = self.merged.getPeakSchema()
382 mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
384 for oldFoot
in footprints:
385 assert len(oldFoot.getPeaks()) == 1,
"Should be a single peak only"
386 peak = oldFoot.getPeaks()[0]
387 newFoot = afwDetect.Footprint(oldFoot.spans, schema)
388 newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
389 newFoot.getPeaks()[0].
set(mergeKey,
True)
390 converted.append(newFoot)
Represent a 2-dimensional array of bitmask pixels.
Tag types used to declare specialized field types.
daf::base::PropertySet * set
matchCatalogsExact(catalog1, catalog2, patch1=None, patch2=None)