22__all__ = [
"MergeDetectionsConfig", 
"MergeDetectionsTask"]
 
   25from numpy.lib.recfunctions 
import rec_join
 
   28from .multiBandUtils 
import CullPeaksConfig
 
   36from lsst.pex.config import Config, Field, ListField, ConfigurableField, ConfigField
 
   37from lsst.pipe.base import (PipelineTask, PipelineTaskConfig, Struct,
 
   38                            PipelineTaskConnections)
 
   39import lsst.pipe.base.connectionTypes 
as cT
 
   40from lsst.obs.base 
import ExposureIdInfo
 
   44    """Match two catalogs derived from the same mergeDet catalog. 
   46    When testing downstream features, like deblending methods/parameters 
   47    and measurement algorithms/parameters, it 
is useful to to compare
 
   48    the same sources 
in two catalogs. In most cases this must be done
 
   49    by matching on either RA/DEC 
or XY positions, which occassionally
 
   50    will mismatch one source 
with another.
 
   52    For a more robust solution, 
as long 
as the downstream catalog 
is 
   53    derived 
from the same mergeDet catalog, exact source matching
 
   54    can be done via the unique ``(parent, deblend_peakID)``
 
   55    combination. So this function performs this exact matching 
for 
   56    all sources both catalogs.
 
   61        The two catalogs to merge
 
   62    patch1, patch2 : `array` of `int`
 
   63        Patch 
for each row, converted into an integer.
 
   68        List of matches 
for each source (using an inner join).
 
   72    sidx1 = catalog1[
"parent"] != 0
 
   73    sidx2 = catalog2[
"parent"] != 0
 
   76    parents1 = np.array(catalog1[
"parent"][sidx1])
 
   77    peaks1 = np.array(catalog1[
"deblend_peakId"][sidx1])
 
   78    index1 = np.arange(len(catalog1))[sidx1]
 
   79    parents2 = np.array(catalog2[
"parent"][sidx2])
 
   80    peaks2 = np.array(catalog2[
"deblend_peakId"][sidx2])
 
   81    index2 = np.arange(len(catalog2))[sidx2]
 
   83    if patch1 
is not None:
 
   85            msg = (
"If the catalogs are from different patches then patch1 and patch2 must be specified" 
   86                   ", got {} and {}").format(patch1, patch2)
 
   88        patch1 = patch1[sidx1]
 
   89        patch2 = patch2[sidx2]
 
   91        key1 = np.rec.array((parents1, peaks1, patch1, index1),
 
   92                            dtype=[(
'parent', np.int64), (
'peakId', np.int32),
 
   93                                   (
"patch", patch1.dtype), (
"index", np.int32)])
 
   94        key2 = np.rec.array((parents2, peaks2, patch2, index2),
 
   95                            dtype=[(
'parent', np.int64), (
'peakId', np.int32),
 
   96                                   (
"patch", patch2.dtype), (
"index", np.int32)])
 
   97        matchColumns = (
"parent", 
"peakId", 
"patch")
 
   99        key1 = np.rec.array((parents1, peaks1, index1),
 
  100                            dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
 
  101        key2 = np.rec.array((parents2, peaks2, index2),
 
  102                            dtype=[(
'parent', np.int64), (
'peakId', np.int32), (
"index", np.int32)])
 
  103        matchColumns = (
"parent", 
"peakId")
 
  108    matched = rec_join(matchColumns, key1, key2, jointype=
"inner")
 
  111    indices1 = matched[
"index1"]
 
  112    indices2 = matched[
"index2"]
 
  117        for i1, i2 
in zip(indices1, indices2)
 
  124                                 dimensions=(
"tract", 
"patch", 
"skymap"),
 
  125                                 defaultTemplates={
"inputCoaddName": 
'deep', 
"outputCoaddName": 
"deep"}):
 
  126    schema = cT.InitInput(
 
  127        doc=
"Schema of the input detection catalog",
 
  128        name=
"{inputCoaddName}Coadd_det_schema",
 
  129        storageClass=
"SourceCatalog" 
  132    outputSchema = cT.InitOutput(
 
  133        doc=
"Schema of the merged detection catalog",
 
  134        name=
"{outputCoaddName}Coadd_mergeDet_schema",
 
  135        storageClass=
"SourceCatalog" 
  138    outputPeakSchema = cT.InitOutput(
 
  139        doc=
"Output schema of the Footprint peak catalog",
 
  140        name=
"{outputCoaddName}Coadd_peak_schema",
 
  141        storageClass=
"PeakCatalog" 
  145        doc=
"Detection Catalogs to be merged",
 
  146        name=
"{inputCoaddName}Coadd_det",
 
  147        storageClass=
"SourceCatalog",
 
  148        dimensions=(
"tract", 
"patch", 
"skymap", 
"band"),
 
  153        doc=
"SkyMap to be used in merging",
 
  154        name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
 
  155        storageClass=
"SkyMap",
 
  156        dimensions=(
"skymap",),
 
  159    outputCatalog = cT.Output(
 
  160        doc=
"Merged Detection catalog",
 
  161        name=
"{outputCoaddName}Coadd_mergeDet",
 
  162        storageClass=
"SourceCatalog",
 
  163        dimensions=(
"tract", 
"patch", 
"skymap"),
 
  167class MergeDetectionsConfig(PipelineTaskConfig, pipelineConnections=MergeDetectionsConnections):
 
  168    """Configuration parameters for the MergeDetectionsTask. 
  170    minNewPeak = Field(dtype=float, default=1, 
  171                       doc="Minimum distance from closest peak to create a new one (in arcsec).")
 
  173    maxSamePeak = 
Field(dtype=float, default=0.3,
 
  174                        doc=
"When adding new catalogs to the merge, all peaks less than this distance " 
  175                        " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
 
  176    cullPeaks = 
ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
 
  178    skyFilterName = 
Field(dtype=str, default=
"sky",
 
  179                          doc=
"Name of `filter' used to label sky objects (e.g. flag merge_peak_sky is set)\n" 
  180                          "(N.b. should be in MergeMeasurementsConfig.pseudoFilterList)")
 
  181    skyObjects = 
ConfigurableField(target=SkyObjectsTask, doc=
"Generate sky objects")
 
  182    priorityList = 
ListField(dtype=str, default=[],
 
  183                             doc=
"Priority-ordered list of filter bands for the merge.")
 
  184    coaddName = 
Field(dtype=str, default=
"deep", doc=
"Name of coadd")
 
  186    def setDefaults(self):
 
  187        Config.setDefaults(self)
 
  188        self.skyObjects.avoidMask = [
"DETECTED"]  
 
  192        if len(self.priorityList) == 0:
 
  193            raise RuntimeError(
"No priority list provided")
 
  196class MergeDetectionsTask(PipelineTask):
 
  197    """Merge sources detected in coadds of exposures obtained with different filters. 
  199    Merge sources detected in coadds of exposures obtained 
with different
 
  200    filters. To perform photometry consistently across coadds 
in multiple
 
  201    filter bands, we create a master catalog of sources 
from all bands by
 
  202    merging the sources (peaks & footprints) detected 
in each coadd, 
while 
  203    keeping track of which band each source originates 
in. The catalog merge
 
  205    `~lsst.afw.detection.FootprintMergeList.getMergedSourceCatalog`. Spurious
 
  206    peaks detected around bright objects are culled 
as described 
in 
  209    MergeDetectionsTask 
is meant to be run after detecting sources 
in coadds
 
  210    generated 
for the chosen subset of the available bands. The purpose of the
 
  211    task 
is to merge sources (peaks & footprints) detected 
in the coadds
 
  212    generated 
from the chosen subset of filters. Subsequent tasks 
in the
 
  213    multi-band processing procedure will deblend the generated master list of
 
  214    sources 
and, eventually, perform forced photometry.
 
  218    butler : `
None`, optional
 
  219        Compatibility parameter. Should always be `
None`.
 
  221        The schema of the detection catalogs used 
as input to this task.
 
  222    initInputs : `dict`, optional
 
  223        Dictionary that can contain a key ``schema`` containing the
 
  224        input schema. If present will override the value of ``schema``.
 
  226        Additional keyword arguments.
 
  228    ConfigClass = MergeDetectionsConfig 
  229    _DefaultName = "mergeCoaddDetections" 
  231    def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
 
  232        super().__init__(**kwargs)
 
  234        if butler 
is not None:
 
  235            warnings.warn(
"The 'butler' parameter is no longer used and can be safely removed.",
 
  236                          category=FutureWarning, stacklevel=2)
 
  239        if initInputs 
is not None:
 
  240            schema = initInputs[
'schema'].schema
 
  243            raise ValueError(
"No input schema or initInputs['schema'] provided.")
 
  247        self.makeSubtask(
"skyObjects")
 
  249        filterNames = 
list(self.config.priorityList)
 
  250        filterNames.append(self.config.skyFilterName)
 
  255    def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  256        inputs = butlerQC.get(inputRefs)
 
  257        exposureIdInfo = ExposureIdInfo.fromDataId(butlerQC.quantum.dataId, 
"tract_patch")
 
  258        inputs[
"skySeed"] = exposureIdInfo.expId
 
  259        inputs[
"idFactory"] = exposureIdInfo.makeSourceIdFactory()
 
  260        catalogDict = {ref.dataId[
'band']: cat 
for ref, cat 
in zip(inputRefs.catalogs,
 
  262        inputs[
'catalogs'] = catalogDict
 
  263        skyMap = inputs.pop(
'skyMap')
 
  265        tractNumber = inputRefs.catalogs[0].dataId[
'tract']
 
  266        tractInfo = skyMap[tractNumber]
 
  267        patchInfo = tractInfo.getPatchInfo(inputRefs.catalogs[0].dataId[
'patch'])
 
  272            wcs=tractInfo.getWcs(),
 
  273            bbox=patchInfo.getOuterBBox()
 
  275        inputs[
'skyInfo'] = skyInfo
 
  277        outputs = self.run(**inputs)
 
  278        butlerQC.put(outputs, outputRefs)
 
  280    def run(self, catalogs, skyInfo, idFactory, skySeed):
 
  281        """Merge multiple catalogs. 
  283        After ordering the catalogs and filters 
in priority order,
 
  284        ``getMergedSourceCatalog`` of the
 
  286        used to perform the actual merging. Finally, `cullPeaks` 
is used to
 
  287        remove garbage peaks detected around bright objects.
 
  292            Catalogs to be merged.
 
  298        result : `lsst.pipe.base.Struct`
 
  299            Results 
as a struct 
with attributes:
 
  305        tractWcs = skyInfo.wcs
 
  306        peakDistance = self.config.minNewPeak / tractWcs.getPixelScale().asArcseconds()
 
  307        samePeakDistance = self.config.maxSamePeak / tractWcs.getPixelScale().asArcseconds()
 
  310        orderedCatalogs = [catalogs[band] 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  311        orderedBands = [band 
for band 
in self.config.priorityList 
if band 
in catalogs.keys()]
 
  313        mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
 
  314                                                        self.schema, idFactory,
 
  320        skySourceFootprints = self.getSkySourceFootprints(mergedList, skyInfo, skySeed)
 
  321        if skySourceFootprints:
 
  322            key = mergedList.schema.find(
"merge_footprint_%s" % self.config.skyFilterName).key
 
  323            for foot 
in skySourceFootprints:
 
  324                s = mergedList.addNew()
 
  329        for record 
in mergedList:
 
  330            record.getFootprint().sortPeaks()
 
  331        self.log.info(
"Merged to %d sources", len(mergedList))
 
  333        self.cullPeaks(mergedList)
 
  334        return Struct(outputCatalog=mergedList)
 
  336    def cullPeaks(self, catalog):
 
  337        """Attempt to remove garbage peaks (mostly on the outskirts of large blends). 
  344        keys = [item.key for item 
in self.merged.getPeakSchema().extract(
"merge_peak_*").values()]
 
  345        assert len(keys) > 0, 
"Error finding flags that associate peaks with their detection bands." 
  348        for parentSource 
in catalog:
 
  351            keptPeaks = parentSource.getFootprint().getPeaks()
 
  352            oldPeaks = 
list(keptPeaks)
 
  354            familySize = len(oldPeaks)
 
  355            totalPeaks += familySize
 
  356            for rank, peak 
in enumerate(oldPeaks):
 
  357                if ((rank < self.config.cullPeaks.rankSufficient)
 
  358                    or (sum([peak.get(k) 
for k 
in keys]) >= self.config.cullPeaks.nBandsSufficient)
 
  359                    or (rank < self.config.cullPeaks.rankConsidered
 
  360                        and rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
 
  361                    keptPeaks.append(peak)
 
  364        self.log.info(
"Culled %d of %d peaks", culledPeaks, totalPeaks)
 
  366    def getSkySourceFootprints(self, mergedList, skyInfo, seed):
 
  367        """Return a list of Footprints of sky objects which don't overlap with anything in mergedList. 
  372            The merged Footprints from all the input bands.
 
  373        skyInfo : `lsst.pipe.base.Struct`
 
  374            A description of the patch.
 
  376            Seed 
for the random number generator.
 
  379        detected = mask.getPlaneBitMask("DETECTED")
 
  381            s.getFootprint().spans.setMask(mask, detected)
 
  383        footprints = self.skyObjects.run(mask, seed)
 
  388        schema = self.merged.getPeakSchema()
 
  389        mergeKey = schema.find(
"merge_peak_%s" % self.config.skyFilterName).key
 
  391        for oldFoot 
in footprints:
 
  392            assert len(oldFoot.getPeaks()) == 1, 
"Should be a single peak only" 
  393            peak = oldFoot.getPeaks()[0]
 
  395            newFoot.addPeak(peak.getFx(), peak.getFy(), peak.getPeakValue())
 
  396            newFoot.getPeaks()[0].
set(mergeKey, 
True)
 
  397            converted.append(newFoot)
 
Represent a 2-dimensional array of bitmask pixels.
Defines the fields and offsets for a table.
daf::base::PropertyList * list
daf::base::PropertySet * set
def matchCatalogsExact(catalog1, catalog2, patch1=None, patch2=None)
Lightweight representation of a geometric match between two records.