23 """Pipeline for running DiaSource association in a DRP context.
34 from .coaddBase
import makeSkyInfo
35 from .simpleAssociation
import SimpleAssociationTask
37 __all__ = [
"DrpAssociationPipeTask",
38 "DrpAssociationPipeConfig",
39 "DrpAssociationPipeConnections"]
43 dimensions=(
"tract",
"patch",
"skymap"),
44 defaultTemplates={
"coaddName":
"deep",
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc=
"Set of catalogs of calibrated DiaSources.",
49 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass=
"DataFrame",
51 dimensions=(
"instrument",
"visit",
"detector"),
55 skyMap = pipeBase.connectionTypes.Input(
56 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass=
"SkyMap",
60 dimensions=(
"skymap", ),
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc=
"Catalog of DiaSources covering the patch and associated with a "
65 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass=
"DataFrame",
67 dimensions=(
"tract",
"patch"),
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc=
"Catalog of DiaObjects created from spatially associating "
72 name=
"{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass=
"DataFrame",
74 dimensions=(
"tract",
"patch"),
78 class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc=
"Task used to associate DiaSources with DiaObjects.",
85 doAddDiaObjectCoords = pexConfig.Field(
88 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
93 class DrpAssociationPipeTask(pipeBase.PipelineTask):
94 """Driver pipeline for loading DiaSource catalogs in a patch/tract
95 region and associating them.
97 ConfigClass = DrpAssociationPipeConfig
98 _DefaultName =
"drpAssociation"
100 def __init__(self, **kwargs):
101 super().__init__(**kwargs)
102 self.makeSubtask(
'associator')
104 def runQuantum(self, butlerQC, inputRefs, outputRefs):
105 inputs = butlerQC.get(inputRefs)
107 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
108 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
109 tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
112 inputs[
"tractPatchId"] = tractPatchId
113 inputs[
"skymapBits"] = skymapBits
115 outputs = self.run(**inputs)
116 butlerQC.put(outputs, outputRefs)
125 """Trim DiaSources to the current Patch and run association.
127 Takes in the set of DiaSource catalogs that covers the current patch,
128 trims them to the dimensions of the patch, and [TODO: eventually]
129 runs association on the concatenated DiaSource Catalog.
133 diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle`
134 Set of DiaSource catalogs potentially covering this patch/tract.
135 skyMap : `lsst.skymap.BaseSkyMap`
136 SkyMap defining the patch/tract
138 Id of current tract being processed.
140 Id of current patch being processed
144 output : `lsst.pipe.base.Struct`
145 Results struct with attributes:
147 ``assocDiaSourceTable``
148 Table of DiaSources with updated value for diaObjectId.
151 Table of DiaObjects from matching DiaSources
152 (`pandas.DataFrame`).
154 self.log.
info(
"Running DPR Association on patch %i, tract %i...",
160 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
162 diaSourceHistory = []
163 for catRef
in diaSourceTables:
165 datasetType=self.config.connections.diaSourceTables,
168 isInTractPatch = self._trimToPatch(cat,
172 nDiaSrc = isInTractPatch.sum()
174 "Read DiaSource catalog of length %i from visit %i, "
175 "detector %i. Found %i sources within the patch/tract "
177 len(cat), catRef.dataId[
"visit"],
178 catRef.dataId[
"detector"], nDiaSrc)
181 diaSourceHistory.append(pd.DataFrame(columns=cat.columns))
184 cutCat = cat[isInTractPatch]
185 diaSourceHistory.append(cutCat)
187 diaSourceHistoryCat = pd.concat(diaSourceHistory)
188 self.log.
info(
"Found %i DiaSources overlapping patch %i, tract %i",
189 len(diaSourceHistoryCat), patchId, tractId)
191 assocResult = self.associator.
run(diaSourceHistoryCat,
195 self.log.
info(
"Associated DiaSources into %i DiaObjects",
196 len(assocResult.diaObjects))
198 if self.config.doAddDiaObjectCoords
and not assocResult.diaObjects.empty:
199 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
200 assocResult.assocDiaSources)
202 return pipeBase.Struct(
203 diaObjectTable=assocResult.diaObjects,
204 assocDiaSourceTable=assocResult.assocDiaSources)
206 def _addDiaObjectCoords(self, objects, sources):
207 obj = objects[[
'ra',
'decl']].rename(columns={
"ra":
"coord_ra",
"decl":
"coord_dec"})
208 df = pd.merge(sources, obj, left_on=
'diaObjectId', right_index=
True, how=
'inner')
211 def _trimToPatch(self, cat, innerPatchBox, wcs):
212 """Create generator testing if a set of DiaSources are in the
217 cat : `pandas.DataFrame`
218 Catalog of DiaSources to test within patch/tract.
219 innerPatchBox : `lsst.geom.Box2D`
220 Bounding box of the patch.
221 wcs : `lsst.geom.SkyWcs`
226 isInPatch : `numpy.ndarray`, (N,)
227 Booleans representing if the DiaSources are contained within the
228 current patch and tract.
230 isInPatch = np.array([
231 innerPatchBox.contains(
234 for idx, row
in cat.iterrows()])
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
def run(self, coaddExposures, bbox, wcs)
def makeSkyInfo(skyMap, tractId, patchId)