23 """Pipeline for running DiaSource association in a DRP context. 
   34 from .coaddBase 
import makeSkyInfo
 
   35 from .simpleAssociation 
import SimpleAssociationTask
 
   37 __all__ = [
"DrpAssociationPipeTask",
 
   38            "DrpAssociationPipeConfig",
 
   39            "DrpAssociationPipeConnections"]
 
   43                                     dimensions=(
"tract", 
"patch", 
"skymap"),
 
   44                                     defaultTemplates={
"coaddName": 
"deep",
 
   47     diaSourceTables = pipeBase.connectionTypes.Input(
 
   48         doc=
"Set of catalogs of calibrated DiaSources.",
 
   49         name=
"{fakesType}{coaddName}Diff_diaSrcTable",
 
   50         storageClass=
"DataFrame",
 
   51         dimensions=(
"instrument", 
"visit", 
"detector"),
 
   55     skyMap = pipeBase.connectionTypes.Input(
 
   56         doc=
"Input definition of geometry/bbox and projection/wcs for coadded " 
   58         name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
 
   59         storageClass=
"SkyMap",
 
   60         dimensions=(
"skymap", ),
 
   62     assocDiaSourceTable = pipeBase.connectionTypes.Output(
 
   63         doc=
"Catalog of DiaSources covering the patch and associated with a " 
   65         name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
 
   66         storageClass=
"DataFrame",
 
   67         dimensions=(
"tract", 
"patch"),
 
   69     diaObjectTable = pipeBase.connectionTypes.Output(
 
   70         doc=
"Catalog of DiaObjects created from spatially associating " 
   72         name=
"{fakesType}{coaddName}Diff_diaObjTable",
 
   73         storageClass=
"DataFrame",
 
   74         dimensions=(
"tract", 
"patch"),
 
   78 class DrpAssociationPipeConfig(
 
   79         pipeBase.PipelineTaskConfig,
 
   80         pipelineConnections=DrpAssociationPipeConnections):
 
   81     associator = pexConfig.ConfigurableField(
 
   82         target=SimpleAssociationTask,
 
   83         doc=
"Task used to associate DiaSources with DiaObjects.",
 
   85     doAddDiaObjectCoords = pexConfig.Field(
 
   88         doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec" 
   89             "Duplicates information, but needed for bulk ingest into qserv." 
   91     doWriteEmptyTables = pexConfig.Field(
 
   94         doc=
"If True, construct and write out empty diaSource and diaObject " 
   95             "tables. If False, raise NoWorkFound" 
   99 class DrpAssociationPipeTask(pipeBase.PipelineTask):
 
  100     """Driver pipeline for loading DiaSource catalogs in a patch/tract 
  101     region and associating them. 
  103     ConfigClass = DrpAssociationPipeConfig
 
  104     _DefaultName = 
"drpAssociation" 
  106     def __init__(self, **kwargs):
 
  107         super().__init__(**kwargs)
 
  108         self.makeSubtask(
'associator')
 
  110     def runQuantum(self, butlerQC, inputRefs, outputRefs):
 
  111         inputs = butlerQC.get(inputRefs)
 
  113         inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
 
  114         inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
 
  115         tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
 
  118         inputs[
"tractPatchId"] = tractPatchId
 
  119         inputs[
"skymapBits"] = skymapBits
 
  121         outputs = self.run(**inputs)
 
  122         butlerQC.put(outputs, outputRefs)
 
  131         """Trim DiaSources to the current Patch and run association. 
  133         Takes in the set of DiaSource catalogs that covers the current patch, 
  134         trims them to the dimensions of the patch, and [TODO: eventually] 
  135         runs association on the concatenated DiaSource Catalog. 
  139         diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle` 
  140             Set of DiaSource catalogs potentially covering this patch/tract. 
  141         skyMap : `lsst.skymap.BaseSkyMap` 
  142             SkyMap defining the patch/tract 
  144             Id of current tract being processed. 
  146             Id of current patch being processed 
  150         output : `lsst.pipe.base.Struct` 
  151             Results struct with attributes: 
  153             ``assocDiaSourceTable`` 
  154                 Table of DiaSources with updated value for diaObjectId. 
  157                 Table of DiaObjects from matching DiaSources 
  158                 (`pandas.DataFrame`). 
  160         self.log.
info(
"Running DPR Association on patch %i, tract %i...",
 
  166         innerPatchBox = 
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
 
  168         diaSourceHistory = []
 
  169         for catRef 
in diaSourceTables:
 
  171                 datasetType=self.config.connections.diaSourceTables,
 
  174             isInTractPatch = self._trimToPatch(cat,
 
  178             nDiaSrc = isInTractPatch.sum()
 
  180                 "Read DiaSource catalog of length %i from visit %i, " 
  181                 "detector %i. Found %i sources within the patch/tract " 
  183                 len(cat), catRef.dataId[
"visit"],
 
  184                 catRef.dataId[
"detector"], nDiaSrc)
 
  189             cutCat = cat[isInTractPatch]
 
  190             diaSourceHistory.append(cutCat)
 
  193             diaSourceHistoryCat = pd.concat(diaSourceHistory)
 
  196             if self.config.doWriteEmptyTables:
 
  197                 self.log.
info(
"Constructing empty table")
 
  199                 diaSourceHistoryCat = cat.drop(cat.index)
 
  201                 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
 
  203         self.log.
info(
"Found %i DiaSources overlapping patch %i, tract %i",
 
  204                       len(diaSourceHistoryCat), patchId, tractId)
 
  206         assocResult = self.associator.
run(diaSourceHistoryCat,
 
  210         self.log.
info(
"Associated DiaSources into %i DiaObjects",
 
  211                       len(assocResult.diaObjects))
 
  213         if self.config.doAddDiaObjectCoords:
 
  214             assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
 
  215                                                                    assocResult.assocDiaSources)
 
  217         return pipeBase.Struct(
 
  218             diaObjectTable=assocResult.diaObjects,
 
  219             assocDiaSourceTable=assocResult.assocDiaSources)
 
  221     def _addDiaObjectCoords(self, objects, sources):
 
  222         obj = objects[[
'ra', 
'decl']].rename(columns={
"ra": 
"coord_ra", 
"decl": 
"coord_dec"})
 
  223         df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
 
  224                       how=
'inner').set_index(
'diaSourceId')
 
  227     def _trimToPatch(self, cat, innerPatchBox, wcs):
 
  228         """Create generator testing if a set of DiaSources are in the 
  233         cat : `pandas.DataFrame` 
  234             Catalog of DiaSources to test within patch/tract. 
  235         innerPatchBox : `lsst.geom.Box2D` 
  236             Bounding box of the patch. 
  237         wcs : `lsst.geom.SkyWcs` 
  242         isInPatch : `numpy.ndarray`, (N,) 
  243             Booleans representing if the DiaSources are contained within the 
  244             current patch and tract. 
  246         isInPatch = np.array([
 
  247             innerPatchBox.contains(
 
  250             for idx, row 
in cat.iterrows()])
 
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
def run(self, coaddExposures, bbox, wcs)
def makeSkyInfo(skyMap, tractId, patchId)