24 from .multiBandUtils 
import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
 
   25                              getInputSchema, getShortFilterName, readCatalog)
 
   33 from lsst.pex.config 
import Config, Field, ListField, ConfigurableField, ConfigField
 
   34 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
 
   35                             PipelineTaskConnections)
 
   41                                  dimensions=(
"tract", 
"patch", 
"skymap"),
 
   42                                  defaultTemplates={
"inputCoaddName": 
'deep', 
"outputCoaddName": 
"deep"}):
 
   43     schema = cT.InitInput(
 
   44         doc=
"Schema of the input detection catalog",
 
   45         name=
"{inputCoaddName}Coadd_det_schema",
 
   46         storageClass=
"SourceCatalog" 
   49     outputSchema = cT.InitOutput(
 
   50         doc=
"Schema of the merged detection catalog",
 
   51         name=
"{outputCoaddName}Coadd_mergeDet_schema",
 
   52         storageClass=
"SourceCatalog" 
   55     outputPeakSchema = cT.InitOutput(
 
   56         doc=
"Output schema of the Footprint peak catalog",
 
   57         name=
"{outputCoaddName}Coadd_peak_schema",
 
   58         storageClass=
"PeakCatalog" 
   62         doc=
"Detection Catalogs to be merged",
 
   63         name=
"{inputCoaddName}Coadd_det",
 
   64         storageClass=
"SourceCatalog",
 
   65         dimensions=(
"tract", 
"patch", 
"skymap", 
"abstract_filter"),
 
   70         doc=
"SkyMap to be used in merging",
 
   71         name=
"{inputCoaddName}Coadd_skyMap",
 
   72         storageClass=
"SkyMap",
 
   73         dimensions=(
"skymap",),
 
   76     outputCatalog = cT.Output(
 
   77         doc=
"Merged Detection catalog",
 
   78         name=
"{outputCoaddName}Coadd_mergeDet",
 
   79         storageClass=
"SourceCatalog",
 
   80         dimensions=(
"tract", 
"patch", 
"skymap"),
 
   84 class MergeDetectionsConfig(
PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
 
   86     @anchor MergeDetectionsConfig_ 
   88     @brief Configuration parameters for the MergeDetectionsTask. 
   90     minNewPeak = Field(dtype=float, default=1,
 
   91                        doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
 
   93     maxSamePeak = Field(dtype=float, default=0.3,
 
   94                         doc=
"When adding new catalogs to the merge, all peaks less than this distance " 
   95                         " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
 
   96     cullPeaks = ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
 
   98     skyFilterName = Field(dtype=str, default=
"sky",
 
   99                           doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n" 
  100                           "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
 
  101     skyObjects = ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
 
  102     priorityList = ListField(dtype=str, default=[],
 
  103                              doc=
"Priority-ordered list of bands for the merge.")
 
  104     coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  107         Config.setDefaults(self)
 
  108         self.skyObjects.avoidMask = [
"DETECTED"]  
 
  112         if len(self.priorityList) == 0:
 
  113             raise RuntimeError(
"No priority list provided")
 
  118     @anchor MergeDetectionsTask_ 
  120     @brief Merge coadd detections from multiple bands. 
  122     @section pipe_tasks_multiBand_Contents Contents 
  124       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose 
  125       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init 
  126       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run 
  127       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config 
  128       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug 
  129       - @ref pipe_tasks_multiband_MergeDetectionsTask_Example 
  131     @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose   Description 
  133     Command-line task that merges sources detected in coadds of exposures obtained with different filters. 
  135     To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of 
  136     sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping 
  137     track of which band each source originates in. 
  139     The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright 
  140     objects are culled as described in @ref CullPeaksConfig_. 
  143         deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints) 
  145         deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints) 
  149     @section pipe_tasks_multiBand_MergeDetectionsTask_Init       Task initialisation 
  151     @copydoc \_\_init\_\_ 
  153     @section pipe_tasks_multiBand_MergeDetectionsTask_Run       Invoking the Task 
  157     @section pipe_tasks_multiBand_MergeDetectionsTask_Config       Configuration parameters 
  159     See @ref MergeDetectionsConfig_ 
  161     @section pipe_tasks_multiBand_MergeDetectionsTask_Debug             Debug variables 
  163     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d 
  164     to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files. 
  166     MergeDetectionsTask has no debug variables. 
  168     @section pipe_tasks_multiband_MergeDetectionsTask_Example   A complete example of using MergeDetectionsTask 
  170     MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset 
  171     of the available bands. 
  172     The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the 
  173     chosen subset of filters. 
  174     Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources 
  175     and, eventually, perform forced photometry. 
  176     Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed. 
  177     A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the 
  178     `--help` command line argument: 
  180     mergeCoaddDetections.py --help 
  183     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  184     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 
  185     step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows: 
  187     mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 
  189     This will merge the HSC-I & -R band parent source catalogs and write the results to 
  190     `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`. 
  192     The next step in the multi-band processing procedure is 
  193     @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask" 
  195     ConfigClass = MergeDetectionsConfig
 
  196     RunnerClass = MergeSourcesRunner
 
  197     _DefaultName = 
"mergeCoaddDetections" 
  199     outputDataset = 
"mergeDet" 
  200     makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
 
  203     def _makeArgumentParser(cls):
 
  209     def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
 
  212         @brief Initialize the merge detections task. 
  214         A @ref FootprintMergeList_ "FootprintMergeList" will be used to 
  215         merge the source catalogs. 
  217         @param[in] schema     the schema of the detection catalogs used as input to this one 
  218         @param[in] butler     a butler used to read the input schema from disk, if schema is None 
  219         @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in 
  220                               through the PipelineTask middleware 
  221         @param[in] **kwargs   keyword arguments to be passed to CmdLineTask.__init__ 
  223         The task will set its own self.schema attribute to the schema of the output merged catalog. 
  225         super().__init__(**kwargs)
 
  226         if initInputs 
is not None:
 
  227             schema = initInputs[
'schema'].schema
 
  229         self.makeSubtask(
"skyObjects")
 
  230         self.schema = self.getInputSchema(butler=butler, schema=schema)
 
  233         filterNames += [self.config.skyFilterName]
 
  238     def runDataRef(self, patchRefList):
 
  239         catalogs = dict(
readCatalog(self, patchRef) 
for patchRef 
in patchRefList)
 
  240         skyInfo = 
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
 
  241         idFactory = self.makeIdFactory(patchRefList[0])
 
  242         skySeed = patchRefList[0].get(self.config.coaddName + 
"MergedCoaddId")
 
  243         mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
 
  244         self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
 
  246     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  247         inputs = butlerQC.get(inputRefs)
 
  248         packedId, maxBits = butlerQC.quantum.dataId.pack(
"tract_patch", returnMaxBits=
True)
 
  249         inputs[
"skySeed"] = packedId
 
  250         inputs[
"idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
 
  251         catalogDict = {ref.dataId[
'abstract_filter']: cat 
for ref, cat 
in zip(inputRefs.catalogs,
 
  253         inputs[
'catalogs'] = catalogDict
 
  254         skyMap = inputs.pop(
'skyMap')
 
  256         tractNumber = inputRefs.catalogs[0].dataId[
'tract']
 
  257         tractInfo = skyMap[tractNumber]
 
  258         patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
 
  263             wcs=tractInfo.getWcs(),
 
  264             bbox=patchInfo.getOuterBBox()
 
  266         inputs[
'skyInfo'] = skyInfo
 
  268         outputs = self.run(**inputs)
 
  269         butlerQC.put(outputs, outputRefs)
 
  271     def run(self, catalogs, skyInfo, idFactory, skySeed):
 
  273         @brief Merge multiple catalogs. 
  275         After ordering the catalogs and filters in priority order, 
  276         @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by 
  277         @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove 
  278         garbage peaks detected around bright objects. 
  282         @param[out] mergedList 
  286         tractWcs = skyInfo.wcs
 
  287         peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
 
  288         samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
 
  291         orderedCatalogs = [catalogs[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  293                         if band 
in catalogs.keys()]
 
  295         mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
 
  296                                                         self.schema, idFactory,
 
  302         skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
 
  303         if skySourceFootprints:
 
  304             key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
 
  305             for foot 
in skySourceFootprints:
 
  306                 s = mergedList.addNew()
 
  311         for record 
in mergedList:
 
  312             record.getFootprint().sortPeaks()
 
  313         self.log.
info(
"Merged to %d sources" % len(mergedList))
 
  315         self.cullPeaks(mergedList)
 
  316         return Struct(outputCatalog=mergedList)
 
  318     def cullPeaks(self, catalog):
 
  320         @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends). 
  322         @param[in] catalog Source catalog 
  324         keys = [item.key 
for item 
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
 
  325         assert len(keys) > 0, 
"Error finding flags that associate peaks with their detection bands." 
  328         for parentSource 
in catalog:
 
  331             keptPeaks = parentSource.getFootprint().getPeaks()
 
  332             oldPeaks = 
list(keptPeaks)
 
  334             familySize = len(oldPeaks)
 
  335             totalPeaks += familySize
 
  336             for rank, peak 
in enumerate(oldPeaks):
 
  337                 if ((rank < self.config.cullPeaks.rankSufficient)
 
  338                     or (sum([peak.get(k) 
for k 
in keys]) >= self.config.cullPeaks.nBandsSufficient)
 
  339                     or (rank < self.config.cullPeaks.rankConsidered
 
  340                         and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
 
  341                     keptPeaks.append(peak)
 
  344         self.log.
info(
"Culled %d of %d peaks" % (culledPeaks, totalPeaks))
 
  346     def getSchemaCatalogs(self):
 
  348         Return a dict of empty catalogs for each catalog dataset produced by this task. 
  350         @param[out] dictionary of empty catalogs 
  354         return {self.config.coaddName + 
"Coadd_mergeDet": mergeDet,
 
  355                 self.config.coaddName + 
"Coadd_peak": peak}
 
  357     def getSkySourceFootprints(self, mergedList, skyInfo, seed):
 
  359         @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList 
  361         @param mergedList  The merged Footprints from all the input bands 
  362         @param skyInfo     A description of the patch 
  363         @param seed        Seed for the random number generator 
  366         detected = mask.getPlaneBitMask(
"DETECTED")
 
  368             s.getFootprint().spans.setMask(mask, detected)
 
  370         footprints = self.skyObjects.
run(mask, seed)
 
  375         schema = self.merged.getPeakSchema()
 
  376         mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
 
  378         for oldFoot 
in footprints:
 
  379             assert len(oldFoot.getPeaks()) == 1, 
"Should be a single peak only" 
  380             peak = oldFoot.getPeaks()[0]
 
  382             newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
 
  383             newFoot.getPeaks()[0].
set(mergeKey, 
True)
 
  384             converted.append(newFoot)
 
  390         @brief Write the output. 
  392         @param[in]  patchRef   data reference for patch 
  393         @param[in]  catalog    catalog 
  395         We write as the dataset provided by the 'outputDataset' 
  398         patchRef.put(catalog, self.config.coaddName + 
"Coadd_" + self.outputDataset)
 
  401         mergeDataId = patchRef.dataId.copy()
 
  402         del mergeDataId[
"filter"]
 
  403         self.log.
info(
"Wrote merged catalog: %s" % (mergeDataId,))
 
  407         @brief No metadata to write, and not sure how to write it for a list of dataRefs.