25 from numpy.lib.recfunctions
import rec_join
27 from .multiBandUtils
import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
28 getInputSchema, readCatalog)
37 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
38 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
39 PipelineTaskConnections)
40 import lsst.pipe.base.connectionTypes
as cT
46 """Match two catalogs derived from the same mergeDet catalog
48 When testing downstream features, like deblending methods/parameters
49 and measurement algorithms/parameters, it is useful to to compare
50 the same sources in two catalogs. In most cases this must be done
51 by matching on either RA/DEC or XY positions, which occassionally
52 will mismatch one source with another.
54 For a more robust solution, as long as the downstream catalog is
55 derived from the same mergeDet catalog, exact source matching
56 can be done via the unique ``(parent, deblend_peakID)``
57 combination. So this function performs this exact matching for
58 all sources both catalogs.
62 catalog1, catalog2 : `lsst.afw.table.SourceCatalog`
63 The two catalogs to merge
65 patch1, patch2 : array of int
66 Patch for each row, converted into an integer.
67 In the gen3 butler this is done already, in gen2
68 it is recommended to use `patch2Int`, assuming that
69 the patches are the same structure as HSC, that range
74 result: list of `lsst.afw.table.SourceMatch`
75 List of matches for each source (using an inner join).
79 sidx1 = catalog1[
"parent"] != 0
80 sidx2 = catalog2[
"parent"] != 0
83 parents1 = np.array(catalog1[
"parent"][sidx1])
84 peaks1 = np.array(catalog1[
"deblend_peakId"][sidx1])
85 index1 = np.arange(len(catalog1))[sidx1]
86 parents2 = np.array(catalog2[
"parent"][sidx2])
87 peaks2 = np.array(catalog2[
"deblend_peakId"][sidx2])
88 index2 = np.arange(len(catalog2))[sidx2]
90 if patch1
is not None:
92 msg = (
"If the catalogs are from different patches then patch1 and patch2 must be specified"
93 ", got {} and {}").
format(patch1, patch2)
95 patch1 = patch1[sidx1]
96 patch2 = patch2[sidx2]
98 key1 = np.rec.array((parents1, peaks1, patch1, index1),
99 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
100 (
"patch", patch1.dtype), (
"index", np.int32)])
101 key2 = np.rec.array((parents2, peaks2, patch2, index2),
102 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
103 (
"patch", patch2.dtype), (
"index", np.int32)])
104 matchColumns = (
"parent",
"peakId",
"patch")
106 key1 = np.rec.array((parents1, peaks1, index1),
107 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
108 key2 = np.rec.array((parents2, peaks2, index2),
109 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
110 matchColumns = (
"parent",
"peakId")
115 matched = rec_join(matchColumns, key1, key2, jointype=
"inner")
118 indices1 = matched[
"index1"]
119 indices2 = matched[
"index2"]
124 for i1, i2
in zip(indices1, indices2)
131 dimensions=(
"tract",
"patch",
"skymap"),
132 defaultTemplates={
"inputCoaddName":
'deep',
"outputCoaddName":
"deep"}):
133 schema = cT.InitInput(
134 doc=
"Schema of the input detection catalog",
135 name=
"{inputCoaddName}Coadd_det_schema",
136 storageClass=
"SourceCatalog"
139 outputSchema = cT.InitOutput(
140 doc=
"Schema of the merged detection catalog",
141 name=
"{outputCoaddName}Coadd_mergeDet_schema",
142 storageClass=
"SourceCatalog"
145 outputPeakSchema = cT.InitOutput(
146 doc=
"Output schema of the Footprint peak catalog",
147 name=
"{outputCoaddName}Coadd_peak_schema",
148 storageClass=
"PeakCatalog"
152 doc=
"Detection Catalogs to be merged",
153 name=
"{inputCoaddName}Coadd_det",
154 storageClass=
"SourceCatalog",
155 dimensions=(
"tract",
"patch",
"skymap",
"band"),
160 doc=
"SkyMap to be used in merging",
161 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
162 storageClass=
"SkyMap",
163 dimensions=(
"skymap",),
166 outputCatalog = cT.Output(
167 doc=
"Merged Detection catalog",
168 name=
"{outputCoaddName}Coadd_mergeDet",
169 storageClass=
"SourceCatalog",
170 dimensions=(
"tract",
"patch",
"skymap"),
174 class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
176 @anchor MergeDetectionsConfig_
178 @brief Configuration parameters for the MergeDetectionsTask.
180 minNewPeak =
Field(dtype=float, default=1,
181 doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
183 maxSamePeak =
Field(dtype=float, default=0.3,
184 doc=
"When adding new catalogs to the merge, all peaks less than this distance "
185 " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
186 cullPeaks =
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
188 skyFilterName =
Field(dtype=str, default=
"sky",
189 doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
190 "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
191 skyObjects =
ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
192 priorityList =
ListField(dtype=str, default=[],
193 doc=
"Priority-ordered list of filter bands for the merge.")
194 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
197 Config.setDefaults(self)
198 self.skyObjects.avoidMask = [
"DETECTED"]
202 if len(self.priorityList) == 0:
203 raise RuntimeError(
"No priority list provided")
206 class MergeDetectionsTask(PipelineTask, CmdLineTask):
208 @anchor MergeDetectionsTask_
210 @brief Merge coadd detections from multiple bands.
212 @section pipe_tasks_multiBand_Contents Contents
214 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
215 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init
216 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run
217 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config
218 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
219 - @ref pipe_tasks_multiband_MergeDetectionsTask_Example
221 @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
223 Command-line task that merges sources detected in coadds of exposures obtained with different filters.
225 To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of
226 sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping
227 track of which band each source originates in.
229 The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright
230 objects are culled as described in @ref CullPeaksConfig_.
233 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
235 deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
239 @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
241 @copydoc \_\_init\_\_
243 @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
247 @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
249 See @ref MergeDetectionsConfig_
251 @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
253 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d
254 to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files.
256 MergeDetectionsTask has no debug variables.
258 @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
260 MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset
261 of the available bands.
262 The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the
263 chosen subset of filters.
264 Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources
265 and, eventually, perform forced photometry.
266 Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed.
267 A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the
268 `--help` command line argument:
270 mergeCoaddDetections.py --help
273 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
274 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
275 step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows:
277 mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
279 This will merge the HSC-I & -R band parent source catalogs and write the results to
280 `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
282 The next step in the multi-band processing procedure is
283 @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask"
285 ConfigClass = MergeDetectionsConfig
286 RunnerClass = MergeSourcesRunner
287 _DefaultName =
"mergeCoaddDetections"
289 outputDataset =
"mergeDet"
290 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
293 def _makeArgumentParser(cls):
299 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
302 @brief Initialize the merge detections task.
304 A @ref FootprintMergeList_ "FootprintMergeList" will be used to
305 merge the source catalogs.
307 @param[in] schema the schema of the detection catalogs used as input to this one
308 @param[in] butler a butler used to read the input schema from disk, if schema is None
309 @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in
310 through the PipelineTask middleware
311 @param[in] **kwargs keyword arguments to be passed to CmdLineTask.__init__
313 The task will set its own self.schema attribute to the schema of the output merged catalog.
315 super().__init__(**kwargs)
316 if initInputs
is not None:
317 schema = initInputs[
'schema'].schema
319 self.makeSubtask(
"skyObjects")
320 self.schema = self.getInputSchema(butler=butler, schema=schema)
322 filterNames =
list(self.config.priorityList)
323 filterNames.append(self.config.skyFilterName)
328 def runDataRef(self, patchRefList):
329 catalogs = dict(
readCatalog(self, patchRef)
for patchRef
in patchRefList)
330 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
331 idFactory = self.makeIdFactory(patchRefList[0])
332 skySeed = patchRefList[0].get(self.config.coaddName +
"MergedCoaddId")
333 mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
334 self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
336 def runQuantum(self, butlerQC, inputRefs, outputRefs):
337 inputs = butlerQC.get(inputRefs)
338 exposureIdInfo = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"tract_patch")
339 inputs[
"skySeed"] = exposureIdInfo.expId
340 inputs[
"idFactory"] = exposureIdInfo.makeSourceIdFactory()
341 catalogDict = {ref.dataId[
'band']: cat
for ref, cat
in zip(inputRefs.catalogs,
343 inputs[
'catalogs'] = catalogDict
344 skyMap = inputs.pop(
'skyMap')
346 tractNumber = inputRefs.catalogs[0].dataId[
'tract']
347 tractInfo = skyMap[tractNumber]
348 patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
353 wcs=tractInfo.getWcs(),
354 bbox=patchInfo.getOuterBBox()
356 inputs[
'skyInfo'] = skyInfo
358 outputs = self.run(**inputs)
359 butlerQC.put(outputs, outputRefs)
361 def run(self, catalogs, skyInfo, idFactory, skySeed):
363 @brief Merge multiple catalogs.
365 After ordering the catalogs and filters in priority order,
366 @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by
367 @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove
368 garbage peaks detected around bright objects.
372 @param[out] mergedList
376 tractWcs = skyInfo.wcs
377 peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
378 samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
381 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
382 orderedBands = [band
for band
in self.config.priorityList
if band
in catalogs.keys()]
384 mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
385 self.schema, idFactory,
391 skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
392 if skySourceFootprints:
393 key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
394 for foot
in skySourceFootprints:
395 s = mergedList.addNew()
400 for record
in mergedList:
401 record.getFootprint().sortPeaks()
402 self.log.
info(
"Merged to %d sources", len(mergedList))
404 self.cullPeaks(mergedList)
405 return Struct(outputCatalog=mergedList)
407 def cullPeaks(self, catalog):
409 @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
411 @param[in] catalog Source catalog
413 keys = [item.key
for item
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
414 assert len(keys) > 0,
"Error finding flags that associate peaks with their detection bands."
417 for parentSource
in catalog:
420 keptPeaks = parentSource.getFootprint().getPeaks()
421 oldPeaks =
list(keptPeaks)
423 familySize = len(oldPeaks)
424 totalPeaks += familySize
425 for rank, peak
in enumerate(oldPeaks):
426 if ((rank < self.config.cullPeaks.rankSufficient)
427 or (sum([peak.get(k)
for k
in keys]) >= self.config.cullPeaks.nBandsSufficient)
428 or (rank < self.config.cullPeaks.rankConsidered
429 and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
430 keptPeaks.append(peak)
433 self.log.
info(
"Culled %d of %d peaks", culledPeaks, totalPeaks)
437 Return a dict of empty catalogs for each catalog dataset produced by this task.
439 @param[out] dictionary of empty catalogs
443 return {self.config.coaddName +
"Coadd_mergeDet": mergeDet,
444 self.config.coaddName +
"Coadd_peak": peak}
446 def getSkySourceFootprints(self, mergedList, skyInfo, seed):
448 @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList
450 @param mergedList The merged Footprints from all the input bands
451 @param skyInfo A description of the patch
452 @param seed Seed for the random number generator
455 detected = mask.getPlaneBitMask(
"DETECTED")
457 s.getFootprint().spans.setMask(mask, detected)
459 footprints = self.skyObjects.
run(mask, seed)
464 schema = self.merged.getPeakSchema()
465 mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
467 for oldFoot
in footprints:
468 assert len(oldFoot.getPeaks()) == 1,
"Should be a single peak only"
469 peak = oldFoot.getPeaks()[0]
471 newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
472 newFoot.getPeaks()[0].
set(mergeKey,
True)
473 converted.append(newFoot)
479 @brief Write the output.
481 @param[in] patchRef data reference for patch
482 @param[in] catalog catalog
484 We write as the dataset provided by the 'outputDataset'
487 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.outputDataset)
490 mergeDataId = patchRef.dataId.copy()
491 del mergeDataId[
"filter"]
492 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
496 @brief No metadata to write, and not sure how to write it for a list of dataRefs.
Represent a 2-dimensional array of bitmask pixels.
daf::base::PropertyList * list
daf::base::PropertySet * set
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
def run(self, coaddExposures, bbox, wcs)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
def getSchemaCatalogs(self)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded.
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def matchCatalogsExact(catalog1, catalog2, patch1=None, patch2=None)
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.
Lightweight representation of a geometric match between two records.