22"""Pipeline for running DiaSource association in a DRP context.
25__all__ = [
"DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
38from .coaddBase
import makeSkyInfo
39from .simpleAssociation
import SimpleAssociationTask
43 dimensions=(
"tract",
"patch",
"skymap"),
44 defaultTemplates={
"coaddName":
"deep",
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc=
"Set of catalogs of calibrated DiaSources.",
49 name=
"{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass=
"DataFrame",
51 dimensions=(
"instrument",
"visit",
"detector"),
55 skyMap = pipeBase.connectionTypes.Input(
56 doc=
"Input definition of geometry/bbox and projection/wcs for coadded "
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass=
"SkyMap",
60 dimensions=(
"skymap", ),
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc=
"Catalog of DiaSources covering the patch and associated with a "
65 name=
"{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass=
"DataFrame",
67 dimensions=(
"tract",
"patch"),
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc=
"Catalog of DiaObjects created from spatially associating "
72 name=
"{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass=
"DataFrame",
74 dimensions=(
"tract",
"patch"),
78class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc=
"Task used to associate DiaSources with DiaObjects.",
85 doAddDiaObjectCoords = pexConfig.Field(
88 doc=
"Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
91 doWriteEmptyTables = pexConfig.Field(
94 doc=
"If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
97 idGenerator = SkyMapIdGeneratorConfig.make_field()
100class DrpAssociationPipeTask(pipeBase.PipelineTask):
101 """Driver pipeline for loading DiaSource catalogs in a patch/tract
102 region and associating them.
104 ConfigClass = DrpAssociationPipeConfig
105 _DefaultName = "drpAssociation"
107 def __init__(self, **kwargs):
108 super().__init__(**kwargs)
109 self.makeSubtask(
'associator')
111 def runQuantum(self, butlerQC, inputRefs, outputRefs):
112 inputs = butlerQC.get(inputRefs)
114 inputs[
"tractId"] = butlerQC.quantum.dataId[
"tract"]
115 inputs[
"patchId"] = butlerQC.quantum.dataId[
"patch"]
116 inputs[
"idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
118 outputs = self.run(**inputs)
119 butlerQC.put(outputs, outputRefs)
129 """Trim DiaSources to the current Patch and run association.
131 Takes in the set of DiaSource catalogs that covers the current patch,
132 trims them to the dimensions of the patch,
and [TODO: eventually]
133 runs association on the concatenated DiaSource Catalog.
137 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
138 Set of DiaSource catalogs potentially covering this patch/tract.
140 SkyMap defining the patch/tract
142 Id of current tract being processed.
144 Id of current patch being processed.
145 tractPatchId : `int`, optional
146 Unique identifier
for the tract patch. Deprecated
in favor of
147 ``idGenerator`` along
with `lsst.obs.base.ExposureIdInfo`.
149 Maximum number of bits used the ``tractPatchId`` integer
150 identifier. Deprecated
in favor of ``idGenerator`` along
with
151 `lsst.obs.base.ExposureIdInfo`.
153 Object that generates Object IDs
and random number generator seeds.
157 output : `lsst.pipe.base.Struct`
158 Results struct
with attributes:
160 ``assocDiaSourceTable``
161 Table of DiaSources
with updated value
for diaObjectId.
164 Table of DiaObjects
from matching DiaSources
165 (`pandas.DataFrame`).
167 self.log.info("Running DPR Association on patch %i, tract %i...",
170 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
173 innerPatchBox =
geom.Box2D(skyInfo.patchInfo.getInnerBBox())
175 diaSourceHistory = []
176 for catRef
in diaSourceTables:
179 isInTractPatch = self._trimToPatch(cat,
183 nDiaSrc = isInTractPatch.sum()
185 "Read DiaSource catalog of length %i from visit %i, "
186 "detector %i. Found %i sources within the patch/tract "
188 len(cat), catRef.dataId[
"visit"],
189 catRef.dataId[
"detector"], nDiaSrc)
194 cutCat = cat[isInTractPatch]
195 diaSourceHistory.append(cutCat)
198 diaSourceHistoryCat = pd.concat(diaSourceHistory)
201 if self.config.doWriteEmptyTables:
202 self.log.info(
"Constructing empty table")
204 diaSourceHistoryCat = cat.drop(cat.index)
206 raise pipeBase.NoWorkFound(
"Found no overlapping DIASources to associate.")
208 self.log.info(
"Found %i DiaSources overlapping patch %i, tract %i",
209 len(diaSourceHistoryCat), patchId, tractId)
211 assocResult = self.associator.run(diaSourceHistoryCat,
212 tractPatchId=tractPatchId,
213 skymapBits=skymapBits,
214 idGenerator=idGenerator)
216 self.log.info(
"Associated DiaSources into %i DiaObjects",
217 len(assocResult.diaObjects))
219 if self.config.doAddDiaObjectCoords:
220 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
221 assocResult.assocDiaSources)
223 return pipeBase.Struct(
224 diaObjectTable=assocResult.diaObjects,
225 assocDiaSourceTable=assocResult.assocDiaSources)
227 def _addDiaObjectCoords(self, objects, sources):
228 obj = objects[[
'ra',
'dec']].rename(columns={
"ra":
"coord_ra",
"dec":
"coord_dec"})
229 df = pd.merge(sources.reset_index(), obj, left_on=
'diaObjectId', right_index=
True,
230 how=
'inner').set_index(
'diaSourceId')
233 def _trimToPatch(self, cat, innerPatchBox, wcs):
234 """Create generator testing if a set of DiaSources are in the
239 cat : `pandas.DataFrame`
240 Catalog of DiaSources to test within patch/tract.
242 Bounding box of the patch.
243 wcs : `lsst.geom.SkyWcs`
248 isInPatch : `numpy.ndarray`, (N,)
249 Booleans representing if the DiaSources are contained within the
250 current patch
and tract.
252 isInPatch = np.array([
253 innerPatchBox.contains(
256 for idx, row
in cat.iterrows()])
A floating-point coordinate rectangle geometry.
Point in an unspecified spherical coordinate system.