22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
37from .coaddBase
import makeSkyInfo
38from .simpleAssociation
import SimpleAssociationTask
42 dimensions=(
"tract",
"patch",
"skymap"),
43 defaultTemplates={
"coaddName":
"deep",
46 diaSourceTables = pipeBase.connectionTypes.Input(
47 doc=
"Set of catalogs of calibrated DiaSources.",
48 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
49 storageClass=
"DataFrame",
50 dimensions=(
"instrument",
"visit",
"detector"),
54 skyMap = pipeBase.connectionTypes.Input(
55 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
57 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
58 storageClass=
"SkyMap",
59 dimensions=(
"skymap", ),
61 assocDiaSourceTable = pipeBase.connectionTypes.Output(
62 doc=
"Catalog of DiaSources covering the patch and associated with a "
64 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
65 storageClass=
"DataFrame",
66 dimensions=(
"tract",
"patch"),
68 diaObjectTable = pipeBase.connectionTypes.Output(
69 doc=
"Catalog of DiaObjects created from spatially associating "
71 name=
"{fakesType}{coaddName}Diff_diaObjTable",
72 storageClass=
"DataFrame",
73 dimensions=(
"tract",
"patch"),
77class DrpAssociationPipeConfig(
78 pipeBase.PipelineTaskConfig,
79 pipelineConnections=DrpAssociationPipeConnections):
80 associator = pexConfig.ConfigurableField(
81 target=SimpleAssociationTask,
82 doc=
"Task used to associate DiaSources with DiaObjects.",
84 doAddDiaObjectCoords = pexConfig.Field(
87 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
88 "Duplicates information, but needed for bulk ingest into qserv."
90 doWriteEmptyTables = pexConfig.Field(
93 doc=
"If True, construct and write out empty diaSource and diaObject "
94 "tables. If False, raise NoWorkFound"
98class DrpAssociationPipeTask(pipeBase.PipelineTask):
99 """Driver pipeline for loading DiaSource catalogs in a patch/tract
100 region and associating them.
102 ConfigClass = DrpAssociationPipeConfig
103 _DefaultName = "drpAssociation"
105 def __init__(self, **kwargs):
106 super().__init__(**kwargs)
107 self.makeSubtask(
'associator')
109 def runQuantum(self, butlerQC, inputRefs, outputRefs):
110 inputs = butlerQC.get(inputRefs)
112 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
113 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
114 tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
117 inputs[
"tractPatchId"] = tractPatchId
118 inputs[
"skymapBits"] = skymapBits
120 outputs = self.run(**inputs)
121 butlerQC.put(outputs, outputRefs)
130 """Trim DiaSources to the current Patch and run association.
132 Takes in the set of DiaSource catalogs that covers the current patch,
133 trims them to the dimensions of the patch,
and [TODO: eventually]
134 runs association on the concatenated DiaSource Catalog.
138 diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle`
139 Set of DiaSource catalogs potentially covering this patch/tract.
141 SkyMap defining the patch/tract
143 Id of current tract being processed.
145 Id of current patch being processed
149 output : `lsst.pipe.base.Struct`
150 Results struct
with attributes:
152 ``assocDiaSourceTable``
153 Table of DiaSources
with updated value
for diaObjectId.
156 Table of DiaObjects
from matching DiaSources
157 (`pandas.DataFrame`).
159 self.log.info("Running DPR Association on patch %i, tract %i...",
162 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
165 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
167 diaSourceHistory = []
168 for catRef
in diaSourceTables:
170 datasetType=self.config.connections.diaSourceTables,
173 isInTractPatch = self._trimToPatch(cat,
177 nDiaSrc = isInTractPatch.sum()
179 "Read DiaSource catalog of length %i from visit %i, "
180 "detector %i. Found %i sources within the patch/tract "
182 len(cat), catRef.dataId[
"visit"],
183 catRef.dataId[
"detector"], nDiaSrc)
188 cutCat = cat[isInTractPatch]
189 diaSourceHistory.append(cutCat)
192 diaSourceHistoryCat = pd.concat(diaSourceHistory)
195 if self.config.doWriteEmptyTables:
196 self.log.info(
"Constructing empty table")
198 diaSourceHistoryCat = cat.drop(cat.index)
200 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
202 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
203 len(diaSourceHistoryCat), patchId, tractId)
205 assocResult = self.associator.run(diaSourceHistoryCat,
209 self.log.info(
"Associated DiaSources into %i DiaObjects",
210 len(assocResult.diaObjects))
212 if self.config.doAddDiaObjectCoords:
213 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
214 assocResult.assocDiaSources)
216 return pipeBase.Struct(
217 diaObjectTable=assocResult.diaObjects,
218 assocDiaSourceTable=assocResult.assocDiaSources)
220 def _addDiaObjectCoords(self, objects, sources):
221 obj = objects[[
'ra',
'decl']].rename(columns={
"ra":
"coord_ra",
"decl":
"coord_dec"})
222 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
223 how=
'inner').set_index(
'diaSourceId')
226 def _trimToPatch(self, cat, innerPatchBox, wcs):
227 """Create generator testing if a set of DiaSources are in the
232 cat : `pandas.DataFrame`
233 Catalog of DiaSources to test within patch/tract.
235 Bounding box of the patch.
236 wcs : `lsst.geom.SkyWcs`
241 isInPatch : `numpy.ndarray`, (N,)
242 Booleans representing if the DiaSources are contained within the
243 current patch
and tract.
245 isInPatch = np.array([
246 innerPatchBox.contains(
249 for idx, row
in cat.iterrows()])
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.