22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
38from .coaddBase
import makeSkyInfo
39from .simpleAssociation
import SimpleAssociationTask
43 dimensions=(
"tract",
"patch",
"skymap"),
44 defaultTemplates={
"coaddName":
"deep",
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc=
"Set of catalogs of calibrated DiaSources.",
49 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass=
"DataFrame",
51 dimensions=(
"instrument",
"visit",
"detector"),
55 skyMap = pipeBase.connectionTypes.Input(
56 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass=
"SkyMap",
60 dimensions=(
"skymap", ),
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc=
"Catalog of DiaSources covering the patch and associated with a "
65 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass=
"DataFrame",
67 dimensions=(
"tract",
"patch"),
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc=
"Catalog of DiaObjects created from spatially associating "
72 name=
"{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass=
"DataFrame",
74 dimensions=(
"tract",
"patch"),
78class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc=
"Task used to associate DiaSources with DiaObjects.",
85 doAddDiaObjectCoords = pexConfig.Field(
88 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
91 doWriteEmptyTables = pexConfig.Field(
94 doc=
"If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
97 idGenerator = SkyMapIdGeneratorConfig.make_field()
100class DrpAssociationPipeTask(pipeBase.PipelineTask):
101 """Driver pipeline for loading DiaSource catalogs in a patch/tract
102 region and associating them.
104 ConfigClass = DrpAssociationPipeConfig
105 _DefaultName =
"drpAssociation"
107 def __init__(self, **kwargs):
108 super().__init__(**kwargs)
109 self.makeSubtask(
'associator')
111 def runQuantum(self, butlerQC, inputRefs, outputRefs):
112 inputs = butlerQC.get(inputRefs)
114 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
115 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
116 inputs[
"idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
118 outputs = self.run(**inputs)
119 butlerQC.put(outputs, outputRefs)
127 """Trim DiaSources to the current Patch and run association.
129 Takes in the set of DiaSource catalogs that covers the current patch,
130 trims them to the dimensions of the patch, and [TODO: eventually]
131 runs association on the concatenated DiaSource Catalog.
135 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
136 Set of DiaSource catalogs potentially covering this patch/tract.
137 skyMap : `lsst.skymap.BaseSkyMap`
138 SkyMap defining the patch/tract
140 Id of current tract being processed.
142 Id of current patch being processed.
143 idGenerator : `lsst.meas.base.IdGenerator`, optional
144 Object that generates Object IDs and random number generator seeds.
148 output : `lsst.pipe.base.Struct`
149 Results struct with attributes:
151 ``assocDiaSourceTable``
152 Table of DiaSources with updated value for diaObjectId.
155 Table of DiaObjects from matching DiaSources
156 (`pandas.DataFrame`).
158 self.log.info(
"Running DPR Association on patch %i, tract %i...",
161 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
164 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
166 diaSourceHistory = []
167 for catRef
in diaSourceTables:
170 isInTractPatch = self._trimToPatch(cat,
174 nDiaSrc = isInTractPatch.sum()
176 "Read DiaSource catalog of length %i from visit %i, "
177 "detector %i. Found %i sources within the patch/tract "
179 len(cat), catRef.dataId[
"visit"],
180 catRef.dataId[
"detector"], nDiaSrc)
185 cutCat = cat[isInTractPatch]
186 diaSourceHistory.append(cutCat)
189 diaSourceHistoryCat = pd.concat(diaSourceHistory)
192 if self.config.doWriteEmptyTables:
193 self.log.info(
"Constructing empty table")
195 diaSourceHistoryCat = cat.drop(cat.index)
197 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
199 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
200 len(diaSourceHistoryCat), patchId, tractId)
202 assocResult = self.associator.run(diaSourceHistoryCat, idGenerator=idGenerator)
204 self.log.info(
"Associated DiaSources into %i DiaObjects",
205 len(assocResult.diaObjects))
207 if self.config.doAddDiaObjectCoords:
208 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
209 assocResult.assocDiaSources)
211 return pipeBase.Struct(
212 diaObjectTable=assocResult.diaObjects,
213 assocDiaSourceTable=assocResult.assocDiaSources)
215 def _addDiaObjectCoords(self, objects, sources):
216 obj = objects[[
'ra',
'dec']].rename(columns={
"ra":
"coord_ra",
"dec":
"coord_dec"})
217 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
218 how=
'inner').set_index(
'diaSourceId')
221 def _trimToPatch(self, cat, innerPatchBox, wcs):
222 """Create generator testing if a set of DiaSources are in the
227 cat : `pandas.DataFrame`
228 Catalog of DiaSources to test within patch/tract.
229 innerPatchBox : `lsst.geom.Box2D`
230 Bounding box of the patch.
231 wcs : `lsst.geom.SkyWcs`
236 isInPatch : `numpy.ndarray`, (N,)
237 Booleans representing if the DiaSources are contained within the
238 current patch and tract.
240 isInPatch = np.array([
241 innerPatchBox.contains(
244 for idx, row
in cat.iterrows()])
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.