LSST Applications 26.0.0,g0265f82a02+6660c170cc,g07994bdeae+30b05a742e,g0a0026dc87+17526d298f,g0a60f58ba1+17526d298f,g0e4bf8285c+96dd2c2ea9,g0ecae5effc+c266a536c8,g1e7d6db67d+6f7cb1f4bb,g26482f50c6+6346c0633c,g2bbee38e9b+6660c170cc,g2cc88a2952+0a4e78cd49,g3273194fdb+f6908454ef,g337abbeb29+6660c170cc,g337c41fc51+9a8f8f0815,g37c6e7c3d5+7bbafe9d37,g44018dc512+6660c170cc,g4a941329ef+4f7594a38e,g4c90b7bd52+5145c320d2,g58be5f913a+bea990ba40,g635b316a6c+8d6b3a3e56,g67924a670a+bfead8c487,g6ae5381d9b+81bc2a20b4,g93c4d6e787+26b17396bd,g98cecbdb62+ed2cb6d659,g98ffbb4407+81bc2a20b4,g9ddcbc5298+7f7571301f,ga1e77700b3+99e9273977,gae46bcf261+6660c170cc,gb2715bf1a1+17526d298f,gc86a011abf+17526d298f,gcf0d15dbbd+96dd2c2ea9,gdaeeff99f8+0d8dbea60f,gdb4ec4c597+6660c170cc,ge23793e450+96dd2c2ea9,gf041782ebf+171108ac67
LSST Data Management Base Package
Loading...
Searching...
No Matches
drpAssociationPipe.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Pipeline for running DiaSource association in a DRP context.
23"""
24
25__all__ = ["DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
28
29import numpy as np
30import pandas as pd
31
32import lsst.geom as geom
33import lsst.pex.config as pexConfig
34import lsst.pipe.base as pipeBase
35from lsst.meas.base import SkyMapIdGeneratorConfig
36from lsst.skymap import BaseSkyMap
37
38from .coaddBase import makeSkyInfo
39from .simpleAssociation import SimpleAssociationTask
40
41
42class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
43 dimensions=("tract", "patch", "skymap"),
44 defaultTemplates={"coaddName": "deep",
45 "warpTypeSuffix": "",
46 "fakesType": ""}):
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc="Set of catalogs of calibrated DiaSources.",
49 name="{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass="DataFrame",
51 dimensions=("instrument", "visit", "detector"),
52 deferLoad=True,
53 multiple=True
54 )
55 skyMap = pipeBase.connectionTypes.Input(
56 doc="Input definition of geometry/bbox and projection/wcs for coadded "
57 "exposures",
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass="SkyMap",
60 dimensions=("skymap", ),
61 )
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc="Catalog of DiaSources covering the patch and associated with a "
64 "DiaObject.",
65 name="{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass="DataFrame",
67 dimensions=("tract", "patch"),
68 )
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc="Catalog of DiaObjects created from spatially associating "
71 "DiaSources.",
72 name="{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass="DataFrame",
74 dimensions=("tract", "patch"),
75 )
76
77
78class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc="Task used to associate DiaSources with DiaObjects.",
84 )
85 doAddDiaObjectCoords = pexConfig.Field(
86 dtype=bool,
87 default=True,
88 doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
90 )
91 doWriteEmptyTables = pexConfig.Field(
92 dtype=bool,
93 default=False,
94 doc="If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
96 )
97 idGenerator = SkyMapIdGeneratorConfig.make_field()
98
99
100class DrpAssociationPipeTask(pipeBase.PipelineTask):
101 """Driver pipeline for loading DiaSource catalogs in a patch/tract
102 region and associating them.
103 """
104 ConfigClass = DrpAssociationPipeConfig
105 _DefaultName = "drpAssociation"
106
107 def __init__(self, **kwargs):
108 super().__init__(**kwargs)
109 self.makeSubtask('associator')
110
111 def runQuantum(self, butlerQC, inputRefs, outputRefs):
112 inputs = butlerQC.get(inputRefs)
113
114 inputs["tractId"] = butlerQC.quantum.dataId["tract"]
115 inputs["patchId"] = butlerQC.quantum.dataId["patch"]
116 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
117
118 outputs = self.run(**inputs)
119 butlerQC.put(outputs, outputRefs)
120
121 def run(self,
122 diaSourceTables,
123 skyMap,
124 tractId,
125 patchId,
126 tractPatchId=None,
127 skymapBits=None,
128 idGenerator=None):
129 """Trim DiaSources to the current Patch and run association.
130
131 Takes in the set of DiaSource catalogs that covers the current patch,
132 trims them to the dimensions of the patch, and [TODO: eventually]
133 runs association on the concatenated DiaSource Catalog.
134
135 Parameters
136 ----------
137 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
138 Set of DiaSource catalogs potentially covering this patch/tract.
139 skyMap : `lsst.skymap.BaseSkyMap`
140 SkyMap defining the patch/tract
141 tractId : `int`
142 Id of current tract being processed.
143 patchId : `int`
144 Id of current patch being processed.
145 tractPatchId : `int`, optional
146 Unique identifier for the tract patch. Deprecated in favor of
147 ``idGenerator`` along with `lsst.obs.base.ExposureIdInfo`.
148 skymapBits : `int`
149 Maximum number of bits used the ``tractPatchId`` integer
150 identifier. Deprecated in favor of ``idGenerator`` along with
151 `lsst.obs.base.ExposureIdInfo`.
152 idGenerator : `lsst.meas.base.IdGenerator`, optional
153 Object that generates Object IDs and random number generator seeds.
154
155 Returns
156 -------
157 output : `lsst.pipe.base.Struct`
158 Results struct with attributes:
159
160 ``assocDiaSourceTable``
161 Table of DiaSources with updated value for diaObjectId.
162 (`pandas.DataFrame`)
163 ``diaObjectTable``
164 Table of DiaObjects from matching DiaSources
165 (`pandas.DataFrame`).
166 """
167 self.log.info("Running DPR Association on patch %i, tract %i...",
168 patchId, tractId)
169
170 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
171
172 # Get the patch bounding box.
173 innerPatchBox = geom.Box2D(skyInfo.patchInfo.getInnerBBox())
174
175 diaSourceHistory = []
176 for catRef in diaSourceTables:
177 cat = catRef.get()
178
179 isInTractPatch = self._trimToPatch(cat,
180 innerPatchBox,
181 skyInfo.wcs)
182
183 nDiaSrc = isInTractPatch.sum()
184 self.log.info(
185 "Read DiaSource catalog of length %i from visit %i, "
186 "detector %i. Found %i sources within the patch/tract "
187 "footprint.",
188 len(cat), catRef.dataId["visit"],
189 catRef.dataId["detector"], nDiaSrc)
190
191 if nDiaSrc <= 0:
192 continue
193
194 cutCat = cat[isInTractPatch]
195 diaSourceHistory.append(cutCat)
196
197 if diaSourceHistory:
198 diaSourceHistoryCat = pd.concat(diaSourceHistory)
199 else:
200 # No rows to associate
201 if self.config.doWriteEmptyTables:
202 self.log.info("Constructing empty table")
203 # Construct empty table using last table and dropping all the rows
204 diaSourceHistoryCat = cat.drop(cat.index)
205 else:
206 raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")
207
208 self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
209 len(diaSourceHistoryCat), patchId, tractId)
210
211 assocResult = self.associator.run(diaSourceHistoryCat,
212 tractPatchId=tractPatchId,
213 skymapBits=skymapBits,
214 idGenerator=idGenerator)
215
216 self.log.info("Associated DiaSources into %i DiaObjects",
217 len(assocResult.diaObjects))
218
219 if self.config.doAddDiaObjectCoords:
220 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
221 assocResult.assocDiaSources)
222
223 return pipeBase.Struct(
224 diaObjectTable=assocResult.diaObjects,
225 assocDiaSourceTable=assocResult.assocDiaSources)
226
227 def _addDiaObjectCoords(self, objects, sources):
228 obj = objects[['ra', 'dec']].rename(columns={"ra": "coord_ra", "dec": "coord_dec"})
229 df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
230 how='inner').set_index('diaSourceId')
231 return df
232
233 def _trimToPatch(self, cat, innerPatchBox, wcs):
234 """Create generator testing if a set of DiaSources are in the
235 patch/tract.
236
237 Parameters
238 ----------
239 cat : `pandas.DataFrame`
240 Catalog of DiaSources to test within patch/tract.
241 innerPatchBox : `lsst.geom.Box2D`
242 Bounding box of the patch.
243 wcs : `lsst.geom.SkyWcs`
244 Wcs of the tract.
245
246 Returns
247 ------
248 isInPatch : `numpy.ndarray`, (N,)
249 Booleans representing if the DiaSources are contained within the
250 current patch and tract.
251 """
252 isInPatch = np.array([
253 innerPatchBox.contains(
254 wcs.skyToPixel(
255 geom.SpherePoint(row["ra"], row["dec"], geom.degrees)))
256 for idx, row in cat.iterrows()])
257 return isInPatch
A floating-point coordinate rectangle geometry.
Definition Box.h:413
Point in an unspecified spherical coordinate system.
Definition SpherePoint.h:57