25from numpy.lib.recfunctions
import rec_join
27from .multiBandUtils
import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
28 getInputSchema, readCatalog)
38from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
39from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
40 PipelineTaskConnections)
41import lsst.pipe.base.connectionTypes
as cT
47 """Match two catalogs derived from the same mergeDet catalog
49 When testing downstream features, like deblending methods/parameters
50 and measurement algorithms/parameters, it
is useful to to compare
51 the same sources
in two catalogs. In most cases this must be done
52 by matching on either RA/DEC
or XY positions, which occassionally
53 will mismatch one source
with another.
55 For a more robust solution,
as long
as the downstream catalog
is
56 derived
from the same mergeDet catalog, exact source matching
57 can be done via the unique ``(parent, deblend_peakID)``
58 combination. So this function performs this exact matching
for
59 all sources both catalogs.
64 The two catalogs to merge
66 patch1, patch2 : array of int
67 Patch
for each row, converted into an integer.
68 In the gen3 butler this
is done already,
in gen2
69 it
is recommended to use `patch2Int`, assuming that
70 the patches are the same structure
as HSC, that range
76 List of matches
for each source (using an inner join).
80 sidx1 = catalog1[
"parent"] != 0
81 sidx2 = catalog2[
"parent"] != 0
84 parents1 = np.array(catalog1[
"parent"][sidx1])
85 peaks1 = np.array(catalog1[
"deblend_peakId"][sidx1])
86 index1 = np.arange(len(catalog1))[sidx1]
87 parents2 = np.array(catalog2[
"parent"][sidx2])
88 peaks2 = np.array(catalog2[
"deblend_peakId"][sidx2])
89 index2 = np.arange(len(catalog2))[sidx2]
91 if patch1
is not None:
93 msg = (
"If the catalogs are from different patches then patch1 and patch2 must be specified"
94 ", got {} and {}").
format(patch1, patch2)
96 patch1 = patch1[sidx1]
97 patch2 = patch2[sidx2]
99 key1 = np.rec.array((parents1, peaks1, patch1, index1),
100 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
101 (
"patch", patch1.dtype), (
"index", np.int32)])
102 key2 = np.rec.array((parents2, peaks2, patch2, index2),
103 dtype=[(
'parent', np.int64), (
'peakId', np.int32),
104 (
"patch", patch2.dtype), (
"index", np.int32)])
105 matchColumns = (
"parent",
"peakId",
"patch")
107 key1 = np.rec.array((parents1, peaks1, index1),
108 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
109 key2 = np.rec.array((parents2, peaks2, index2),
110 dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
111 matchColumns = (
"parent",
"peakId")
116 matched = rec_join(matchColumns, key1, key2, jointype=
"inner")
119 indices1 = matched[
"index1"]
120 indices2 = matched[
"index2"]
125 for i1, i2
in zip(indices1, indices2)
132 dimensions=(
"tract",
"patch",
"skymap"),
133 defaultTemplates={
"inputCoaddName":
'deep',
"outputCoaddName":
"deep"}):
134 schema = cT.InitInput(
135 doc=
"Schema of the input detection catalog",
136 name=
"{inputCoaddName}Coadd_det_schema",
137 storageClass=
"SourceCatalog"
140 outputSchema = cT.InitOutput(
141 doc=
"Schema of the merged detection catalog",
142 name=
"{outputCoaddName}Coadd_mergeDet_schema",
143 storageClass=
"SourceCatalog"
146 outputPeakSchema = cT.InitOutput(
147 doc=
"Output schema of the Footprint peak catalog",
148 name=
"{outputCoaddName}Coadd_peak_schema",
149 storageClass=
"PeakCatalog"
153 doc=
"Detection Catalogs to be merged",
154 name=
"{inputCoaddName}Coadd_det",
155 storageClass=
"SourceCatalog",
156 dimensions=(
"tract",
"patch",
"skymap",
"band"),
161 doc=
"SkyMap to be used in merging",
162 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
163 storageClass=
"SkyMap",
164 dimensions=(
"skymap",),
167 outputCatalog = cT.Output(
168 doc=
"Merged Detection catalog",
169 name=
"{outputCoaddName}Coadd_mergeDet",
170 storageClass=
"SourceCatalog",
171 dimensions=(
"tract",
"patch",
"skymap"),
175class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
177 @anchor MergeDetectionsConfig_
179 @brief Configuration parameters
for the MergeDetectionsTask.
181 minNewPeak = Field(dtype=float, default=1,
182 doc="Minimum distance from closest peak to create a new one (in arcsec).")
184 maxSamePeak =
Field(dtype=float, default=0.3,
185 doc=
"When adding new catalogs to the merge, all peaks less than this distance "
186 " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
187 cullPeaks =
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
189 skyFilterName =
Field(dtype=str, default=
"sky",
190 doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n"
191 "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
192 skyObjects =
ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
193 priorityList =
ListField(dtype=str, default=[],
194 doc=
"Priority-ordered list of filter bands for the merge.")
195 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
198 Config.setDefaults(self)
199 self.skyObjects.avoidMask = [
"DETECTED"]
203 if len(self.priorityList) == 0:
204 raise RuntimeError(
"No priority list provided")
207class MergeDetectionsTask(PipelineTask, CmdLineTask):
209 @anchor MergeDetectionsTask_
211 @brief Merge coadd detections
from multiple bands.
213 @section pipe_tasks_multiBand_Contents Contents
215 -
@ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose
216 -
@ref pipe_tasks_multiBand_MergeDetectionsTask_Init
217 -
@ref pipe_tasks_multiBand_MergeDetectionsTask_Run
218 -
@ref pipe_tasks_multiBand_MergeDetectionsTask_Config
219 -
@ref pipe_tasks_multiBand_MergeDetectionsTask_Debug
220 -
@ref pipe_tasks_multiband_MergeDetectionsTask_Example
222 @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description
224 Command-line task that merges sources detected
in coadds of exposures obtained
with different filters.
226 To perform photometry consistently across coadds
in multiple filter bands, we create a master catalog of
227 sources
from all bands by merging the sources (peaks & footprints) detected
in each coadd,
while keeping
228 track of which band each source originates
in.
230 The catalog merge
is performed by
@ref getMergedSourceCatalog. Spurious peaks detected around bright
231 objects are culled
as described
in @ref CullPeaksConfig_.
234 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
236 deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints)
240 @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation
242 @copydoc \_\_init\_\_
244 @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task
248 @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters
250 See
@ref MergeDetectionsConfig_
252 @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables
254 The command line task interface supports a flag
@c -d
255 to
import @b debug.py
from your
@c PYTHONPATH; see
@ref baseDebug
for more about
@b debug.py files.
257 MergeDetectionsTask has no debug variables.
259 @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask
261 MergeDetectionsTask
is meant to be run after detecting sources
in coadds generated
for the chosen subset
262 of the available bands.
263 The purpose of the task
is to merge sources (peaks & footprints) detected
in the coadds generated
from the
264 chosen subset of filters.
265 Subsequent tasks
in the multi-band processing procedure will deblend the generated master list of sources
266 and, eventually, perform forced photometry.
267 Command-line usage of MergeDetectionsTask expects data references
for all the coadds to be processed.
268 A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py
with the
269 `--help` command line argument:
271 mergeCoaddDetections.py --help
274 To demonstrate usage of the DetectCoaddSourcesTask
in the larger context of multi-band processing, we
275 will process HSC data
in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
276 step 5 at
@ref pipeTasks_multiBand, one may merge the catalogs of sources
from each coadd
as follows:
278 mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R
280 This will merge the HSC-I & -R band parent source catalogs
and write the results to
281 `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`.
283 The next step
in the multi-band processing procedure
is
284 @ref MeasureMergedCoaddSourcesTask_
"MeasureMergedCoaddSourcesTask"
286 ConfigClass = MergeDetectionsConfig
287 RunnerClass = MergeSourcesRunner
288 _DefaultName = "mergeCoaddDetections"
290 outputDataset =
"mergeDet"
291 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId", includeBand=
False)
294 def _makeArgumentParser(cls):
300 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
303 @brief Initialize the merge detections task.
305 A
@ref FootprintMergeList_
"FootprintMergeList" will be used to
306 merge the source catalogs.
308 @param[
in] schema the schema of the detection catalogs used
as input to this one
309 @param[
in] butler a butler used to read the input schema
from disk,
if schema
is None
310 @param[
in] initInputs This a PipelineTask-only argument that holds all inputs passed
in
311 through the PipelineTask middleware
312 @param[
in] **kwargs keyword arguments to be passed to CmdLineTask.__init__
314 The task will set its own self.schema attribute to the schema of the output merged catalog.
316 super().__init__(**kwargs)
317 if initInputs
is not None:
318 schema = initInputs[
'schema'].schema
320 self.makeSubtask(
"skyObjects")
321 self.schema = self.getInputSchema(butler=butler, schema=schema)
323 filterNames =
list(self.config.priorityList)
324 filterNames.append(self.config.skyFilterName)
329 def runDataRef(self, patchRefList):
330 catalogs = dict(
readCatalog(self, patchRef)
for patchRef
in patchRefList)
331 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
332 idFactory = self.makeIdFactory(patchRefList[0])
335 mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
336 self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
338 def runQuantum(self, butlerQC, inputRefs, outputRefs):
339 inputs = butlerQC.get(inputRefs)
340 exposureIdInfo = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId,
"tract_patch")
341 inputs[
"skySeed"] = exposureIdInfo.expId
342 inputs[
"idFactory"] = exposureIdInfo.makeSourceIdFactory()
343 catalogDict = {ref.dataId[
'band']: cat
for ref, cat
in zip(inputRefs.catalogs,
345 inputs[
'catalogs'] = catalogDict
346 skyMap = inputs.pop(
'skyMap')
348 tractNumber = inputRefs.catalogs[0].dataId[
'tract']
349 tractInfo = skyMap[tractNumber]
350 patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
355 wcs=tractInfo.getWcs(),
356 bbox=patchInfo.getOuterBBox()
358 inputs[
'skyInfo'] = skyInfo
360 outputs = self.run(**inputs)
361 butlerQC.put(outputs, outputRefs)
363 def run(self, catalogs, skyInfo, idFactory, skySeed):
365 @brief Merge multiple catalogs.
367 After ordering the catalogs
and filters
in priority order,
368 @ref getMergedSourceCatalog of the
@ref FootprintMergeList_
"FootprintMergeList" created by
369 @ref \_\_init\_\_
is used to perform the actual merging. Finally,
@ref cullPeaks
is used to remove
370 garbage peaks detected around bright objects.
374 @param[out] mergedList
378 tractWcs = skyInfo.wcs
379 peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
380 samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
383 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
384 orderedBands = [band
for band
in self.config.priorityList
if band
in catalogs.keys()]
386 mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
387 self.schema, idFactory,
393 skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
394 if skySourceFootprints:
395 key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
396 for foot
in skySourceFootprints:
397 s = mergedList.addNew()
402 for record
in mergedList:
403 record.getFootprint().sortPeaks()
404 self.log.
info(
"Merged to %d sources", len(mergedList))
406 self.cullPeaks(mergedList)
407 return Struct(outputCatalog=mergedList)
409 def cullPeaks(self, catalog):
411 @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends).
413 @param[
in] catalog Source catalog
415 keys = [item.key for item
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
416 assert len(keys) > 0,
"Error finding flags that associate peaks with their detection bands."
419 for parentSource
in catalog:
422 keptPeaks = parentSource.getFootprint().getPeaks()
423 oldPeaks =
list(keptPeaks)
425 familySize = len(oldPeaks)
426 totalPeaks += familySize
427 for rank, peak
in enumerate(oldPeaks):
428 if ((rank < self.config.cullPeaks.rankSufficient)
429 or (sum([peak.get(k)
for k
in keys]) >= self.config.cullPeaks.nBandsSufficient)
430 or (rank < self.config.cullPeaks.rankConsidered
431 and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
432 keptPeaks.append(peak)
435 self.log.
info(
"Culled %d of %d peaks", culledPeaks, totalPeaks)
439 Return a dict of empty catalogs for each catalog dataset produced by this task.
441 @param[out] dictionary of empty catalogs
445 return {self.config.coaddName +
"Coadd_mergeDet": mergeDet,
446 self.config.coaddName +
"Coadd_peak": peak}
448 def getSkySourceFootprints(self, mergedList, skyInfo, seed):
450 @brief Return a list of Footprints of sky objects which don
't overlap with anything in mergedList
452 @param mergedList The merged Footprints
from all the input bands
453 @param skyInfo A description of the patch
454 @param seed Seed
for the random number generator
457 detected = mask.getPlaneBitMask("DETECTED")
459 s.getFootprint().spans.setMask(mask, detected)
461 footprints = self.skyObjects.
run(mask, seed)
466 schema = self.merged.getPeakSchema()
467 mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
469 for oldFoot
in footprints:
470 assert len(oldFoot.getPeaks()) == 1,
"Should be a single peak only"
471 peak = oldFoot.getPeaks()[0]
473 newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
474 newFoot.getPeaks()[0].
set(mergeKey,
True)
475 converted.append(newFoot)
481 @brief Write the output.
483 @param[
in] patchRef data reference
for patch
484 @param[
in] catalog catalog
486 We write
as the dataset provided by the
'outputDataset'
489 patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
492 mergeDataId = patchRef.dataId.copy()
493 del mergeDataId[
"filter"]
494 self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
498 @brief No metadata to write,
and not sure how to write it
for a list of dataRefs.
Represent a 2-dimensional array of bitmask pixels.
daf::base::PropertyList * list
daf::base::PropertySet * set
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
def getGen3CoaddExposureId(dataRef, coaddName="deep", includeBand=True, log=None)
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
def getSchemaCatalogs(self)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded.
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
def matchCatalogsExact(catalog1, catalog2, patch1=None, patch2=None)
def write(self, patchRef, catalog)
Write the output.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def readCatalog(task, patchRef)
Read input catalog.
Lightweight representation of a geometric match between two records.