24 from .multiBandUtils
import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
25 getInputSchema, getShortFilterName, readCatalog)
33 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
34 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, InitOutputDatasetField,
35 InputDatasetField, InitInputDatasetField, OutputDatasetField, Struct)
41 @anchor MergeDetectionsConfig_ 43 @brief Configuration parameters for the MergeDetectionsTask. 45 minNewPeak =
Field(dtype=float, default=1,
46 doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
48 maxSamePeak =
Field(dtype=float, default=0.3,
49 doc=
"When adding new catalogs to the merge, all peaks less than this distance " 50 " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
51 cullPeaks =
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
53 skyFilterName =
Field(dtype=str, default=
"sky",
54 doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n" 55 "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
58 doc=
"Priority-ordered list of bands for the merge.")
59 coaddName =
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
62 doc=
"Schema of the input detection catalog",
63 nameTemplate=
"{inputCoaddName}Coadd_det_schema",
64 storageClass=
"SourceCatalog" 68 doc=
"Schema of the merged detection catalog",
69 nameTemplate=
"{outputCoaddName}Coadd_mergeDet_schema",
70 storageClass=
"SourceCatalog" 74 doc=
"Output schema of the Footprint peak catalog",
75 nameTemplate=
"{outputCoaddName}Coadd_peak_schema",
76 storageClass=
"PeakCatalog" 80 doc=
"Detection Catalogs to be merged",
81 nameTemplate=
"{inputCoaddName}Coadd_det",
82 storageClass=
"SourceCatalog",
83 dimensions=(
"tract",
"patch",
"skymap",
"abstract_filter")
87 doc=
"SkyMap to be used in merging",
88 nameTemplate=
"{inputCoaddName}Coadd_skyMap",
89 storageClass=
"SkyMap",
90 dimensions=(
"skymap",),
95 doc=
"Merged Detection catalog",
96 nameTemplate=
"{outputCoaddName}Coadd_mergeDet",
97 storageClass=
"SourceCatalog",
98 dimensions=(
"tract",
"patch",
"skymap"),
103 Config.setDefaults(self)
104 self.formatTemplateNames({
"inputCoaddName":
'deep',
"outputCoaddName":
"deep"})
106 self.quantum.dimensions = (
"tract",
"patch",
"skymap")
111 raise RuntimeError(
"No priority list provided")
116 @anchor MergeDetectionsTask_ 118 @brief Merge coadd detections from multiple bands. 120 @section pipe_tasks_multiBand_Contents Contents 122 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose 123 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init 124 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run 125 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config 126 - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug 127 - @ref pipe_tasks_multiband_MergeDetectionsTask_Example 129 @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose Description 131 Command-line task that merges sources detected in coadds of exposures obtained with different filters. 133 To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of 134 sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping 135 track of which band each source originates in. 137 The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright 138 objects are culled as described in @ref CullPeaksConfig_. 141 deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints) 143 deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints) 147 @section pipe_tasks_multiBand_MergeDetectionsTask_Init Task initialisation 149 @copydoc \_\_init\_\_ 151 @section pipe_tasks_multiBand_MergeDetectionsTask_Run Invoking the Task 155 @section pipe_tasks_multiBand_MergeDetectionsTask_Config Configuration parameters 157 See @ref MergeDetectionsConfig_ 159 @section pipe_tasks_multiBand_MergeDetectionsTask_Debug Debug variables 161 The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d 162 to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files. 164 MergeDetectionsTask has no debug variables. 166 @section pipe_tasks_multiband_MergeDetectionsTask_Example A complete example of using MergeDetectionsTask 168 MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset 169 of the available bands. 170 The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the 171 chosen subset of filters. 172 Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources 173 and, eventually, perform forced photometry. 174 Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed. 175 A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the 176 `--help` command line argument: 178 mergeCoaddDetections.py --help 181 To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 182 will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 183 step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows: 185 mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 187 This will merge the HSC-I & -R band parent source catalogs and write the results to 188 `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`. 190 The next step in the multi-band processing procedure is 191 @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask" 193 ConfigClass = MergeDetectionsConfig
194 RunnerClass = MergeSourcesRunner
195 _DefaultName =
"mergeCoaddDetections" 197 outputDataset =
"mergeDet" 198 makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
201 def _makeArgumentParser(cls):
207 def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
210 @brief Initialize the merge detections task. 212 A @ref FootprintMergeList_ "FootprintMergeList" will be used to 213 merge the source catalogs. 215 @param[in] schema the schema of the detection catalogs used as input to this one 216 @param[in] butler a butler used to read the input schema from disk, if schema is None 217 @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in 218 through the PipelineTask middleware 219 @param[in] **kwargs keyword arguments to be passed to CmdLineTask.__init__ 221 The task will set its own self.schema attribute to the schema of the output merged catalog. 224 if initInputs
is not None:
225 schema = initInputs[
'schema'].schema
227 self.makeSubtask(
"skyObjects")
231 filterNames += [self.config.skyFilterName]
239 catalogs = dict(
readCatalog(self, patchRef)
for patchRef
in patchRefList)
240 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
242 skySeed = patchRefList[0].get(self.config.coaddName +
"MergedCoaddId")
243 mergeCatalogStruct = self.
run(catalogs, skyInfo, idFactory, skySeed)
244 self.
write(patchRefList[0], mergeCatalogStruct.outputCatalog)
247 packedId, maxBits = butler.registry.packDataId(
"tract_patch", outputDataIds[
'outputCatalog'],
249 inputData[
"skySeed"] = packedId
250 inputData[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
251 catalogDict = {dataId[
'abstract_filter']: cat
for dataId, cat
in zip(inputDataIds[
'catalogs'],
252 inputData[
'catalogs'])}
253 inputData[
'catalogs'] = catalogDict
254 skyMap = inputData.pop(
'skyMap')
256 tractNumber = inputDataIds[
'catalogs'][0][
'tract']
257 tractInfo = skyMap[tractNumber]
258 patchInfo = tractInfo.getPatchInfo(inputDataIds[
'catalogs'][0][
'patch'])
263 wcs=tractInfo.getWcs(),
264 bbox=patchInfo.getOuterBBox()
266 inputData[
'skyInfo'] = skyInfo
267 return self.
run(**inputData)
269 def run(self, catalogs, skyInfo, idFactory, skySeed):
271 @brief Merge multiple catalogs. 273 After ordering the catalogs and filters in priority order, 274 @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by 275 @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove 276 garbage peaks detected around bright objects. 280 @param[out] mergedList 284 tractWcs = skyInfo.wcs
285 peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
286 samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
289 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
291 if band
in catalogs.keys()]
293 mergedList = self.
merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
301 if skySourceFootprints:
302 key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
303 for foot
in skySourceFootprints:
304 s = mergedList.addNew()
309 for record
in mergedList:
310 record.getFootprint().sortPeaks()
311 self.log.
info(
"Merged to %d sources" % len(mergedList))
314 return Struct(outputCatalog=mergedList)
318 @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends). 320 @param[in] catalog Source catalog 322 keys = [item.key
for item
in self.
merged.getPeakSchema().extract(
"merge_peak_*").values()]
323 assert len(keys) > 0,
"Error finding flags that associate peaks with their detection bands." 326 for parentSource
in catalog:
329 keptPeaks = parentSource.getFootprint().getPeaks()
330 oldPeaks =
list(keptPeaks)
332 familySize = len(oldPeaks)
333 totalPeaks += familySize
334 for rank, peak
in enumerate(oldPeaks):
335 if ((rank < self.config.cullPeaks.rankSufficient)
or 336 (sum([peak.get(k)
for k
in keys]) >= self.config.cullPeaks.nBandsSufficient)
or 337 (rank < self.config.cullPeaks.rankConsidered
and 338 rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
339 keptPeaks.append(peak)
342 self.log.
info(
"Culled %d of %d peaks" % (culledPeaks, totalPeaks))
346 Return a dict of empty catalogs for each catalog dataset produced by this task. 348 @param[out] dictionary of empty catalogs 352 return {self.config.coaddName +
"Coadd_mergeDet": mergeDet,
353 self.config.coaddName +
"Coadd_peak": peak}
357 @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList 359 @param mergedList The merged Footprints from all the input bands 360 @param skyInfo A description of the patch 361 @param seed Seed for the random number generator 364 detected = mask.getPlaneBitMask(
"DETECTED")
366 s.getFootprint().spans.setMask(mask, detected)
368 footprints = self.skyObjects.
run(mask, seed)
373 schema = self.
merged.getPeakSchema()
374 mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
376 for oldFoot
in footprints:
377 assert len(oldFoot.getPeaks()) == 1,
"Should be a single peak only" 378 peak = oldFoot.getPeaks()[0]
380 newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
381 newFoot.getPeaks()[0].
set(mergeKey,
True)
382 converted.append(newFoot)
388 @brief Write the output. 390 @param[in] patchRef data reference for patch 391 @param[in] catalog catalog 393 We write as the dataset provided by the 'outputDataset' 396 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.
outputDataset)
399 mergeDataId = patchRef.dataId.copy()
400 del mergeDataId[
"filter"]
401 self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
405 @brief No metadata to write, and not sure how to write it for a list of dataRefs. def InitOutputDatasetField
def getSchemaCatalogs(self)
Return a dict of empty catalogs for each catalog dataset produced by this task.
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
Merge coadd detections from multiple bands.
def readCatalog(task, patchRef)
Read input catalog.
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def __init__(self, butler=None, schema=None, initInputs=None, kwargs)
Initialize the merge detections task.
Represent a 2-dimensional array of bitmask pixels.
def getInitOutputDatasets(self)
def run(self, catalogs, skyInfo, idFactory, skySeed)
Merge multiple catalogs.
Configuration parameters for the MergeDetectionsTask.
def getShortFilterName(name)
def InitInputDatasetField
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded...
def cullPeaks(self, catalog)
Attempt to remove garbage peaks (mostly on the outskirts of large blends).
def write(self, patchRef, catalog)
Write the output.
def getInputSchema(self, butler=None, schema=None)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def runDataRef(self, patchRefList)
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
daf::base::PropertyList * list
def getSkySourceFootprints(self, mergedList, skyInfo, seed)
Return a list of Footprints of sky objects which don't overlap with anything in mergedList.