25 from numpy.lib.recfunctions 
import rec_join
 
   27 from .multiBandUtils 
import (CullPeaksConfig, MergeSourcesRunner, _makeMakeIdFactory, makeMergeArgumentParser,
 
   28                              getInputSchema, readCatalog)
 
   37 from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
 
   38 from lsst.pipe.base import (CmdLineTask, PipelineTask, PipelineTaskConfig, Struct,
 
   39                             PipelineTaskConnections)
 
   40 import lsst.pipe.base.connectionTypes 
as cT
 
   46     """Match two catalogs derived from the same mergeDet catalog 
   48     When testing downstream features, like deblending methods/parameters 
   49     and measurement algorithms/parameters, it is useful to to compare 
   50     the same sources in two catalogs. In most cases this must be done 
   51     by matching on either RA/DEC or XY positions, which occassionally 
   52     will mismatch one source with another. 
   54     For a more robust solution, as long as the downstream catalog is 
   55     derived from the same mergeDet catalog, exact source matching 
   56     can be done via the unique ``(parent, deblend_peakID)`` 
   57     combination. So this function performs this exact matching for 
   58     all sources both catalogs. 
   62     catalog1, catalog2 : `lsst.afw.table.SourceCatalog` 
   63         The two catalogs to merge 
   65     patch1, patch2 : array of int 
   66         Patch for each row, converted into an integer. 
   67         In the gen3 butler this is done already, in gen2 
   68         it is recommended to use `patch2Int`, assuming that 
   69         the patches are the same structure as HSC, that range 
   74     result: list of `lsst.afw.table.SourceMatch` 
   75         List of matches for each source (using an inner join). 
   79     sidx1 = catalog1[
"parent"] != 0
 
   80     sidx2 = catalog2[
"parent"] != 0
 
   83     parents1 = np.array(catalog1[
"parent"][sidx1])
 
   84     peaks1 = np.array(catalog1[
"deblend_peakId"][sidx1])
 
   85     index1 = np.arange(len(catalog1))[sidx1]
 
   86     parents2 = np.array(catalog2[
"parent"][sidx2])
 
   87     peaks2 = np.array(catalog2[
"deblend_peakId"][sidx2])
 
   88     index2 = np.arange(len(catalog2))[sidx2]
 
   90     if patch1 
is not None:
 
   92             msg = (
"If the catalogs are from different patches then patch1 and patch2 must be specified" 
   93                    ", got {} and {}").
format(patch1, patch2)
 
   95         patch1 = patch1[sidx1]
 
   96         patch2 = patch2[sidx2]
 
   98         key1 = np.rec.array((parents1, peaks1, patch1, index1),
 
   99                             dtype=[(
'parent', np.int64), (
'peakId', np.int32),
 
  100                                    (
"patch", patch1.dtype), (
"index", np.int32)])
 
  101         key2 = np.rec.array((parents2, peaks2, patch2, index2),
 
  102                             dtype=[(
'parent', np.int64), (
'peakId', np.int32),
 
  103                                    (
"patch", patch2.dtype), (
"index", np.int32)])
 
  104         matchColumns = (
"parent", 
"peakId", 
"patch")
 
  106         key1 = np.rec.array((parents1, peaks1, index1),
 
  107                             dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
 
  108         key2 = np.rec.array((parents2, peaks2, index2),
 
  109                             dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
 
  110         matchColumns = (
"parent", 
"peakId")
 
  115     matched = rec_join(matchColumns, key1, key2, jointype=
"inner")
 
  118     indices1 = matched[
"index1"]
 
  119     indices2 = matched[
"index2"]
 
  124         for i1, i2 
in zip(indices1, indices2)
 
  131                                  dimensions=(
"tract", 
"patch", 
"skymap"),
 
  132                                  defaultTemplates={
"inputCoaddName": 
'deep', 
"outputCoaddName": 
"deep"}):
 
  133     schema = cT.InitInput(
 
  134         doc=
"Schema of the input detection catalog",
 
  135         name=
"{inputCoaddName}Coadd_det_schema",
 
  136         storageClass=
"SourceCatalog" 
  139     outputSchema = cT.InitOutput(
 
  140         doc=
"Schema of the merged detection catalog",
 
  141         name=
"{outputCoaddName}Coadd_mergeDet_schema",
 
  142         storageClass=
"SourceCatalog" 
  145     outputPeakSchema = cT.InitOutput(
 
  146         doc=
"Output schema of the Footprint peak catalog",
 
  147         name=
"{outputCoaddName}Coadd_peak_schema",
 
  148         storageClass=
"PeakCatalog" 
  152         doc=
"Detection Catalogs to be merged",
 
  153         name=
"{inputCoaddName}Coadd_det",
 
  154         storageClass=
"SourceCatalog",
 
  155         dimensions=(
"tract", 
"patch", 
"skymap", 
"band"),
 
  160         doc=
"SkyMap to be used in merging",
 
  161         name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
 
  162         storageClass=
"SkyMap",
 
  163         dimensions=(
"skymap",),
 
  166     outputCatalog = cT.Output(
 
  167         doc=
"Merged Detection catalog",
 
  168         name=
"{outputCoaddName}Coadd_mergeDet",
 
  169         storageClass=
"SourceCatalog",
 
  170         dimensions=(
"tract", 
"patch", 
"skymap"),
 
  174 class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
 
  176     @anchor MergeDetectionsConfig_ 
  178     @brief Configuration parameters for the MergeDetectionsTask. 
  180     minNewPeak = 
Field(dtype=float, default=1,
 
  181                        doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
 
  183     maxSamePeak = 
Field(dtype=float, default=0.3,
 
  184                         doc=
"When adding new catalogs to the merge, all peaks less than this distance " 
  185                         " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
 
  186     cullPeaks = 
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
 
  188     skyFilterName = 
Field(dtype=str, default=
"sky",
 
  189                           doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n" 
  190                           "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
 
  191     skyObjects = 
ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
 
  192     priorityList = 
ListField(dtype=str, default=[],
 
  193                              doc=
"Priority-ordered list of filter bands for the merge.")
 
  194     coaddName = 
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  197         Config.setDefaults(self)
 
  198         self.skyObjects.avoidMask = [
"DETECTED"]  
 
  202         if len(self.priorityList) == 0:
 
  203             raise RuntimeError(
"No priority list provided")
 
  206 class MergeDetectionsTask(PipelineTask, CmdLineTask):
 
  208     @anchor MergeDetectionsTask_ 
  210     @brief Merge coadd detections from multiple bands. 
  212     @section pipe_tasks_multiBand_Contents Contents 
  214       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Purpose 
  215       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Init 
  216       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Run 
  217       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Config 
  218       - @ref pipe_tasks_multiBand_MergeDetectionsTask_Debug 
  219       - @ref pipe_tasks_multiband_MergeDetectionsTask_Example 
  221     @section pipe_tasks_multiBand_MergeDetectionsTask_Purpose   Description 
  223     Command-line task that merges sources detected in coadds of exposures obtained with different filters. 
  225     To perform photometry consistently across coadds in multiple filter bands, we create a master catalog of 
  226     sources from all bands by merging the sources (peaks & footprints) detected in each coadd, while keeping 
  227     track of which band each source originates in. 
  229     The catalog merge is performed by @ref getMergedSourceCatalog. Spurious peaks detected around bright 
  230     objects are culled as described in @ref CullPeaksConfig_. 
  233         deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints) 
  235         deepCoadd_mergeDet{tract,patch}: SourceCatalog (only parent Footprints) 
  239     @section pipe_tasks_multiBand_MergeDetectionsTask_Init       Task initialisation 
  241     @copydoc \_\_init\_\_ 
  243     @section pipe_tasks_multiBand_MergeDetectionsTask_Run       Invoking the Task 
  247     @section pipe_tasks_multiBand_MergeDetectionsTask_Config       Configuration parameters 
  249     See @ref MergeDetectionsConfig_ 
  251     @section pipe_tasks_multiBand_MergeDetectionsTask_Debug             Debug variables 
  253     The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a flag @c -d 
  254     to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py files. 
  256     MergeDetectionsTask has no debug variables. 
  258     @section pipe_tasks_multiband_MergeDetectionsTask_Example   A complete example of using MergeDetectionsTask 
  260     MergeDetectionsTask is meant to be run after detecting sources in coadds generated for the chosen subset 
  261     of the available bands. 
  262     The purpose of the task is to merge sources (peaks & footprints) detected in the coadds generated from the 
  263     chosen subset of filters. 
  264     Subsequent tasks in the multi-band processing procedure will deblend the generated master list of sources 
  265     and, eventually, perform forced photometry. 
  266     Command-line usage of MergeDetectionsTask expects data references for all the coadds to be processed. 
  267     A list of the available optional arguments can be obtained by calling mergeCoaddDetections.py with the 
  268     `--help` command line argument: 
  270     mergeCoaddDetections.py --help 
  273     To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we 
  274     will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished 
  275     step 5 at @ref pipeTasks_multiBand, one may merge the catalogs of sources from each coadd as follows: 
  277     mergeCoaddDetections.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R 
  279     This will merge the HSC-I & -R band parent source catalogs and write the results to 
  280     `$CI_HSC_DIR/DATA/deepCoadd-results/merged/0/5,4/mergeDet-0-5,4.fits`. 
  282     The next step in the multi-band processing procedure is 
  283     @ref MeasureMergedCoaddSourcesTask_ "MeasureMergedCoaddSourcesTask" 
  285     ConfigClass = MergeDetectionsConfig
 
  286     RunnerClass = MergeSourcesRunner
 
  287     _DefaultName = 
"mergeCoaddDetections" 
  289     outputDataset = 
"mergeDet" 
  290     makeIdFactory = _makeMakeIdFactory(
"MergedCoaddId")
 
  293     def _makeArgumentParser(cls):
 
  299     def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
 
  302         @brief Initialize the merge detections task. 
  304         A @ref FootprintMergeList_ "FootprintMergeList" will be used to 
  305         merge the source catalogs. 
  307         @param[in] schema     the schema of the detection catalogs used as input to this one 
  308         @param[in] butler     a butler used to read the input schema from disk, if schema is None 
  309         @param[in] initInputs This a PipelineTask-only argument that holds all inputs passed in 
  310                               through the PipelineTask middleware 
  311         @param[in] **kwargs   keyword arguments to be passed to CmdLineTask.__init__ 
  313         The task will set its own self.schema attribute to the schema of the output merged catalog. 
  315         super().__init__(**kwargs)
 
  316         if initInputs 
is not None:
 
  317             schema = initInputs[
'schema'].schema
 
  319         self.makeSubtask(
"skyObjects")
 
  320         self.schema = self.getInputSchema(butler=butler, schema=schema)
 
  322         filterNames = 
list(self.config.priorityList)
 
  323         filterNames.append(self.config.skyFilterName)
 
  328     def runDataRef(self, patchRefList):
 
  329         catalogs = dict(
readCatalog(self, patchRef) 
for patchRef 
in patchRefList)
 
  330         skyInfo = 
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRefList[0])
 
  331         idFactory = self.makeIdFactory(patchRefList[0])
 
  332         skySeed = patchRefList[0].get(self.config.coaddName + 
"MergedCoaddId")
 
  333         mergeCatalogStruct = self.run(catalogs, skyInfo, idFactory, skySeed)
 
  334         self.write(patchRefList[0], mergeCatalogStruct.outputCatalog)
 
  336     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  337         inputs = butlerQC.get(inputRefs)
 
  338         exposureIdInfo = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, 
"tract_patch")
 
  339         inputs[
"skySeed"] = exposureIdInfo.expId
 
  340         inputs[
"idFactory"] = exposureIdInfo.makeSourceIdFactory()
 
  341         catalogDict = {ref.dataId[
'band']: cat 
for ref, cat 
in zip(inputRefs.catalogs,
 
  343         inputs[
'catalogs'] = catalogDict
 
  344         skyMap = inputs.pop(
'skyMap')
 
  346         tractNumber = inputRefs.catalogs[0].dataId[
'tract']
 
  347         tractInfo = skyMap[tractNumber]
 
  348         patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
 
  353             wcs=tractInfo.getWcs(),
 
  354             bbox=patchInfo.getOuterBBox()
 
  356         inputs[
'skyInfo'] = skyInfo
 
  358         outputs = self.run(**inputs)
 
  359         butlerQC.put(outputs, outputRefs)
 
  361     def run(self, catalogs, skyInfo, idFactory, skySeed):
 
  363         @brief Merge multiple catalogs. 
  365         After ordering the catalogs and filters in priority order, 
  366         @ref getMergedSourceCatalog of the @ref FootprintMergeList_ "FootprintMergeList" created by 
  367         @ref \_\_init\_\_ is used to perform the actual merging. Finally, @ref cullPeaks is used to remove 
  368         garbage peaks detected around bright objects. 
  372         @param[out] mergedList 
  376         tractWcs = skyInfo.wcs
 
  377         peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
 
  378         samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
 
  381         orderedCatalogs = [catalogs[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  382         orderedBands = [band 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  384         mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
 
  385                                                         self.schema, idFactory,
 
  391         skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
 
  392         if skySourceFootprints:
 
  393             key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
 
  394             for foot 
in skySourceFootprints:
 
  395                 s = mergedList.addNew()
 
  400         for record 
in mergedList:
 
  401             record.getFootprint().sortPeaks()
 
  402         self.log.
info(
"Merged to %d sources", len(mergedList))
 
  404         self.cullPeaks(mergedList)
 
  405         return Struct(outputCatalog=mergedList)
 
  407     def cullPeaks(self, catalog):
 
  409         @brief Attempt to remove garbage peaks (mostly on the outskirts of large blends). 
  411         @param[in] catalog Source catalog 
  413         keys = [item.key 
for item 
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
 
  414         assert len(keys) > 0, 
"Error finding flags that associate peaks with their detection bands." 
  417         for parentSource 
in catalog:
 
  420             keptPeaks = parentSource.getFootprint().getPeaks()
 
  421             oldPeaks = 
list(keptPeaks)
 
  423             familySize = len(oldPeaks)
 
  424             totalPeaks += familySize
 
  425             for rank, peak 
in enumerate(oldPeaks):
 
  426                 if ((rank < self.config.cullPeaks.rankSufficient)
 
  427                     or (sum([peak.get(k) 
for k 
in keys]) >= self.config.cullPeaks.nBandsSufficient)
 
  428                     or (rank < self.config.cullPeaks.rankConsidered
 
  429                         and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
 
  430                     keptPeaks.append(peak)
 
  433         self.log.
info(
"Culled %d of %d peaks", culledPeaks, totalPeaks)
 
  437         Return a dict of empty catalogs for each catalog dataset produced by this task. 
  439         @param[out] dictionary of empty catalogs 
  443         return {self.config.coaddName + 
"Coadd_mergeDet": mergeDet,
 
  444                 self.config.coaddName + 
"Coadd_peak": peak}
 
  446     def getSkySourceFootprints(self, mergedList, skyInfo, seed):
 
  448         @brief Return a list of Footprints of sky objects which don't overlap with anything in mergedList 
  450         @param mergedList  The merged Footprints from all the input bands 
  451         @param skyInfo     A description of the patch 
  452         @param seed        Seed for the random number generator 
  455         detected = mask.getPlaneBitMask(
"DETECTED")
 
  457             s.getFootprint().spans.setMask(mask, detected)
 
  459         footprints = self.skyObjects.
run(mask, seed)
 
  464         schema = self.merged.getPeakSchema()
 
  465         mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
 
  467         for oldFoot 
in footprints:
 
  468             assert len(oldFoot.getPeaks()) == 1, 
"Should be a single peak only" 
  469             peak = oldFoot.getPeaks()[0]
 
  471             newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
 
  472             newFoot.getPeaks()[0].
set(mergeKey, 
True)
 
  473             converted.append(newFoot)
 
  479         @brief Write the output. 
  481         @param[in]  patchRef   data reference for patch 
  482         @param[in]  catalog    catalog 
  484         We write as the dataset provided by the 'outputDataset' 
  487         patchRef.put(catalog, self.config.coaddName + 
"Coadd_" + self.outputDataset)
 
  490         mergeDataId = patchRef.dataId.copy()
 
  491         del mergeDataId[
"filter"]
 
  492         self.log.
info(
"Wrote merged catalog: %s", mergeDataId)
 
  496         @brief No metadata to write, and not sure how to write it for a list of dataRefs. 
Represent a 2-dimensional array of bitmask pixels.
 
daf::base::PropertyList * list
 
daf::base::PropertySet * set
 
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
 
def run(self, coaddExposures, bbox, wcs)
 
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations.
 
def getSchemaCatalogs(self)
 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
 
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded.
 
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
 
def matchCatalogsExact(catalog1, catalog2, patch1=None, patch2=None)
 
def write(self, patchRef, catalog)
Write the output.
 
def makeMergeArgumentParser(name, dataset)
Create a suitable ArgumentParser.
 
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
 
def readCatalog(task, patchRef)
Read input catalog.
 
Lightweight representation of a geometric match between two records.