23 """Pipeline for running DiaSource association in a DRP context.
34 from .coaddBase
import makeSkyInfo
35 from .simpleAssociation
import SimpleAssociationTask
37 __all__ = [
"DrpAssociationPipeTask",
38 "DrpAssociationPipeConfig",
39 "DrpAssociationPipeConnections"]
43 dimensions=(
"tract",
"patch",
"skymap"),
44 defaultTemplates={
"coaddName":
"deep",
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc=
"Set of catalogs of calibrated DiaSources.",
49 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass=
"DataFrame",
51 dimensions=(
"instrument",
"visit",
"detector"),
55 skyMap = pipeBase.connectionTypes.Input(
56 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass=
"SkyMap",
60 dimensions=(
"skymap", ),
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc=
"Catalog of DiaSources covering the patch and associated with a "
65 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass=
"DataFrame",
67 dimensions=(
"tract",
"patch"),
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc=
"Catalog of DiaObjects created from spatially associating "
72 name=
"{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass=
"DataFrame",
74 dimensions=(
"tract",
"patch"),
78 class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc=
"Task used to associate DiaSources with DiaObjects.",
85 doAddDiaObjectCoords = pexConfig.Field(
88 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
91 doWriteEmptyTables = pexConfig.Field(
94 doc=
"If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
99 class DrpAssociationPipeTask(pipeBase.PipelineTask):
100 """Driver pipeline for loading DiaSource catalogs in a patch/tract
101 region and associating them.
103 ConfigClass = DrpAssociationPipeConfig
104 _DefaultName =
"drpAssociation"
106 def __init__(self, **kwargs):
107 super().__init__(**kwargs)
108 self.makeSubtask(
'associator')
110 def runQuantum(self, butlerQC, inputRefs, outputRefs):
111 inputs = butlerQC.get(inputRefs)
113 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
114 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
115 tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
118 inputs[
"tractPatchId"] = tractPatchId
119 inputs[
"skymapBits"] = skymapBits
121 outputs = self.run(**inputs)
122 butlerQC.put(outputs, outputRefs)
131 """Trim DiaSources to the current Patch and run association.
133 Takes in the set of DiaSource catalogs that covers the current patch,
134 trims them to the dimensions of the patch, and [TODO: eventually]
135 runs association on the concatenated DiaSource Catalog.
139 diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle`
140 Set of DiaSource catalogs potentially covering this patch/tract.
141 skyMap : `lsst.skymap.BaseSkyMap`
142 SkyMap defining the patch/tract
144 Id of current tract being processed.
146 Id of current patch being processed
150 output : `lsst.pipe.base.Struct`
151 Results struct with attributes:
153 ``assocDiaSourceTable``
154 Table of DiaSources with updated value for diaObjectId.
157 Table of DiaObjects from matching DiaSources
158 (`pandas.DataFrame`).
160 self.log.
info(
"Running DPR Association on patch %i, tract %i...",
166 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
168 diaSourceHistory = []
169 for catRef
in diaSourceTables:
171 datasetType=self.config.connections.diaSourceTables,
174 isInTractPatch = self._trimToPatch(cat,
178 nDiaSrc = isInTractPatch.sum()
180 "Read DiaSource catalog of length %i from visit %i, "
181 "detector %i. Found %i sources within the patch/tract "
183 len(cat), catRef.dataId[
"visit"],
184 catRef.dataId[
"detector"], nDiaSrc)
189 cutCat = cat[isInTractPatch]
190 diaSourceHistory.append(cutCat)
193 diaSourceHistoryCat = pd.concat(diaSourceHistory)
196 if self.config.doWriteEmptyTables:
197 self.log.
info(
"Constructing empty table")
199 diaSourceHistoryCat = cat.drop(cat.index)
201 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
203 self.log.
info(
"Found %i DiaSources overlapping patch %i, tract %i",
204 len(diaSourceHistoryCat), patchId, tractId)
206 assocResult = self.associator.
run(diaSourceHistoryCat,
210 self.log.
info(
"Associated DiaSources into %i DiaObjects",
211 len(assocResult.diaObjects))
213 if self.config.doAddDiaObjectCoords:
214 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
215 assocResult.assocDiaSources)
217 return pipeBase.Struct(
218 diaObjectTable=assocResult.diaObjects,
219 assocDiaSourceTable=assocResult.assocDiaSources)
221 def _addDiaObjectCoords(self, objects, sources):
222 obj = objects[[
'ra',
'decl']].rename(columns={
"ra":
"coord_ra",
"decl":
"coord_dec"})
223 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
224 how=
'inner').set_index(
'diaSourceId')
227 def _trimToPatch(self, cat, innerPatchBox, wcs):
228 """Create generator testing if a set of DiaSources are in the
233 cat : `pandas.DataFrame`
234 Catalog of DiaSources to test within patch/tract.
235 innerPatchBox : `lsst.geom.Box2D`
236 Bounding box of the patch.
237 wcs : `lsst.geom.SkyWcs`
242 isInPatch : `numpy.ndarray`, (N,)
243 Booleans representing if the DiaSources are contained within the
244 current patch and tract.
246 isInPatch = np.array([
247 innerPatchBox.contains(
250 for idx, row
in cat.iterrows()])
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.
def run(self, coaddExposures, bbox, wcs)
def makeSkyInfo(skyMap, tractId, patchId)