Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+05816baf74,g1ec0fe41b4+f536777771,g1fd858c14a+a9301854fb,g35bb328faa+fcb1d3bbc8,g4af146b050+a5c07d5b1d,g4d2262a081+6e5fcc2a4e,g53246c7159+fcb1d3bbc8,g56a49b3a55+9c12191793,g5a012ec0e7+3632fc3ff3,g60b5630c4e+ded28b650d,g67b6fd64d1+ed4b5058f4,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g8352419a5c+fcb1d3bbc8,g87b7deb4dc+7b42cf88bf,g8852436030+e5453db6e6,g89139ef638+ed4b5058f4,g8e3bb8577d+d38d73bdbd,g9125e01d80+fcb1d3bbc8,g94187f82dc+ded28b650d,g989de1cb63+ed4b5058f4,g9d31334357+ded28b650d,g9f33ca652e+50a8019d8c,gabe3b4be73+1e0a283bba,gabf8522325+fa80ff7197,gb1101e3267+d9fb1f8026,gb58c049af0+f03b321e39,gb665e3612d+2a0c9e9e84,gb89ab40317+ed4b5058f4,gcf25f946ba+e5453db6e6,gd6cbbdb0b4+bb83cc51f8,gdd1046aedd+ded28b650d,gde0f65d7ad+941d412827,ge278dab8ac+d65b3c2b70,ge410e46f29+ed4b5058f4,gf23fb2af72+b7cae620c0,gf5e32f922b+fcb1d3bbc8,gf67bdafdda+ed4b5058f4,w.2025.16
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
drpAssociationPipe.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Pipeline for running DiaSource association in a DRP context.
23"""
24
25__all__ = ["DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
28
29import astropy.table as tb
30import numpy as np
31import pandas as pd
32
33
34from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask
35import lsst.geom as geom
36import lsst.pex.config as pexConfig
37import lsst.pipe.base as pipeBase
38from lsst.meas.base import SkyMapIdGeneratorConfig
39from lsst.skymap import BaseSkyMap
40
41from .coaddBase import makeSkyInfo
42from .simpleAssociation import SimpleAssociationTask
43
44
45class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
46 dimensions=("tract", "patch", "skymap"),
47 defaultTemplates={"coaddName": "deep",
48 "warpTypeSuffix": "",
49 "fakesType": ""}):
50 diaSourceTables = pipeBase.connectionTypes.Input(
51 doc="Set of catalogs of calibrated DiaSources.",
52 name="{fakesType}{coaddName}Diff_diaSrcTable",
53 storageClass="DataFrame",
54 dimensions=("instrument", "visit", "detector"),
55 deferLoad=True,
56 multiple=True
57 )
58 ssObjectTableRefs = pipeBase.connectionTypes.Input(
59 doc="Reference to catalogs of SolarSolarSystem objects expected to be "
60 "observable in each (visit, detector).",
61 name="preloaded_DRP_SsObjects",
62 storageClass="ArrowAstropy",
63 dimensions=("instrument", "visit", "detector"),
64 minimum=0,
65 deferLoad=True,
66 multiple=True
67 )
68 skyMap = pipeBase.connectionTypes.Input(
69 doc="Input definition of geometry/bbox and projection/wcs for coadded "
70 "exposures",
71 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
72 storageClass="SkyMap",
73 dimensions=("skymap", ),
74 )
75 finalVisitSummaryRefs = pipeBase.connectionTypes.Input(
76 doc="Reference to finalVisitSummary of each exposure, containing "
77 "visitInfo, bbox, and wcs.",
78 name="finalVisitSummary",
79 storageClass="ExposureCatalog",
80 dimensions=("instrument", "visit"),
81 deferLoad=True,
82 multiple=True
83 )
84 assocDiaSourceTable = pipeBase.connectionTypes.Output(
85 doc="Catalog of DiaSources covering the patch and associated with a "
86 "DiaObject.",
87 name="{fakesType}{coaddName}Diff_assocDiaSrcTable",
88 storageClass="DataFrame",
89 dimensions=("tract", "patch"),
90 )
91 associatedSsSources = pipeBase.connectionTypes.Output(
92 doc="Optional output storing ssSource data computed during association.",
93 name="{fakesType}{coaddName}Diff_assocSsSrcTable",
94 storageClass="ArrowAstropy",
95 dimensions=("tract", "patch"),
96 )
97 unassociatedSsObjects = pipeBase.connectionTypes.Output(
98 doc="Expected locations of ssObjects with no associated source.",
99 name="{fakesType}{coaddName}Diff_unassocSsObjTable",
100 storageClass="ArrowAstropy",
101 dimensions=("tract", "patch"),
102 )
103 diaObjectTable = pipeBase.connectionTypes.Output(
104 doc="Catalog of DiaObjects created from spatially associating "
105 "DiaSources.",
106 name="{fakesType}{coaddName}Diff_diaObjTable",
107 storageClass="DataFrame",
108 dimensions=("tract", "patch"),
109 )
110
111 def __init__(self, *, config=None):
112 super().__init__(config=config)
113
114 if not config.doSolarSystemAssociation:
115 del self.ssObjectTableRefs
116 del self.associatedSsSources
117 del self.unassociatedSsObjects
118 del self.finalVisitSummaryRefs
119
120
121class DrpAssociationPipeConfig(
122 pipeBase.PipelineTaskConfig,
123 pipelineConnections=DrpAssociationPipeConnections):
124 associator = pexConfig.ConfigurableField(
125 target=SimpleAssociationTask,
126 doc="Task used to associate DiaSources with DiaObjects.",
127 )
128 solarSystemAssociator = pexConfig.ConfigurableField(
129 target=SolarSystemAssociationTask,
130 doc="Task used to associate DiaSources with SolarSystemObjects.",
131 )
132 doAddDiaObjectCoords = pexConfig.Field(
133 dtype=bool,
134 default=True,
135 doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
136 "Duplicates information, but needed for bulk ingest into qserv."
137 )
138 doWriteEmptyTables = pexConfig.Field(
139 dtype=bool,
140 default=False,
141 doc="If True, construct and write out empty diaSource and diaObject "
142 "tables. If False, raise NoWorkFound"
143 )
144 doSolarSystemAssociation = pexConfig.Field(
145 dtype=bool,
146 default=True,
147 doc="Process SolarSystem objects through the pipeline.",
148 )
149 idGenerator = SkyMapIdGeneratorConfig.make_field()
150
151
152class DrpAssociationPipeTask(pipeBase.PipelineTask):
153 """Driver pipeline for loading DiaSource catalogs in a patch/tract
154 region and associating them.
155 """
156 ConfigClass = DrpAssociationPipeConfig
157 _DefaultName = "drpAssociation"
158
159 def __init__(self, **kwargs):
160 super().__init__(**kwargs)
161 self.makeSubtask('associator')
162 if self.config.doSolarSystemAssociation:
163 self.makeSubtask("solarSystemAssociator")
164
165 def runQuantum(self, butlerQC, inputRefs, outputRefs):
166 inputs = butlerQC.get(inputRefs)
167
168 inputs["tractId"] = butlerQC.quantum.dataId["tract"]
169 inputs["patchId"] = butlerQC.quantum.dataId["patch"]
170 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
171 if not self.config.doSolarSystemAssociation:
172 inputs["ssObjectTableRefs"] = []
173 inputs["finalVisitSummaryRefs"] = []
174 outputs = self.run(**inputs)
175 butlerQC.put(outputs, outputRefs)
176
177 def run(self,
178 diaSourceTables,
179 ssObjectTableRefs,
180 skyMap,
181 finalVisitSummaryRefs,
182 tractId,
183 patchId,
184 idGenerator=None):
185 """Trim DiaSources to the current Patch and run association.
186
187 Takes in the set of DiaSource catalogs that covers the current patch,
188 trims them to the dimensions of the patch, and [TODO: eventually]
189 runs association on the concatenated DiaSource Catalog.
190
191 Parameters
192 ----------
193 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
194 Set of DiaSource catalogs potentially covering this patch/tract.
195 ssObjectTableRefs: `list` of `lsst.daf.butler.DeferredDatasetHandle`
196 Set of known SSO ephemerides potentially covering this patch/tract.
197 skyMap : `lsst.skymap.BaseSkyMap`
198 SkyMap defining the patch/tract
199 visitInfoRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
200 Reference to finalVisitSummary of each exposure potentially
201 covering this patch/tract, which contain visitInfo, bbox, and wcs
202 tractId : `int`
203 Id of current tract being processed.
204 patchId : `int`
205 Id of current patch being processed.
206 idGenerator : `lsst.meas.base.IdGenerator`, optional
207 Object that generates Object IDs and random number generator seeds.
208
209 Returns
210 -------
211 output : `lsst.pipe.base.Struct`
212 Results struct with attributes:
213
214 ``assocDiaSourceTable``
215 Table of DiaSources with updated value for diaObjectId.
216 (`pandas.DataFrame`)
217 ``diaObjectTable``
218 Table of DiaObjects from matching DiaSources
219 (`pandas.DataFrame`).
220 """
221 self.log.info("Running DPR Association on patch %i, tract %i...",
222 patchId, tractId)
223
224 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
225
226 # Get the patch bounding box.
227 innerPatchBox = geom.Box2D(skyInfo.patchInfo.getInnerBBox())
228 innerTractSkyRegion = skyInfo.tractInfo.getInnerSkyRegion()
229
230 def visitDetectorPair(dataRef):
231 return (dataRef.dataId["visit"], dataRef.dataId["detector"])
232
233 # Keep track of our diaCats, ssObject cats, and finalVisitSummaries by their (visit, detector) IDs
234 diaIdDict, ssObjectIdDict, finalVisitSummaryIdDict = {}, {}, {}
235 for diaCatRef in diaSourceTables:
236 diaIdDict[visitDetectorPair(diaCatRef)] = diaCatRef
237 if self.config.doSolarSystemAssociation:
238 for ssCatRef in ssObjectTableRefs:
239 ssObjectIdDict[visitDetectorPair(ssCatRef)] = ssCatRef
240 for finalVisitSummaryRef in finalVisitSummaryRefs:
241 finalVisitSummaryIdDict[finalVisitSummaryRef.dataId["visit"]] = finalVisitSummaryRef
242
243 diaSourceHistory, ssSourceHistory, unassociatedSsObjectHistory = [], [], []
244 for visit, detector in diaIdDict:
245 diaCatRef = diaIdDict[(visit, detector)]
246 diaCat = diaCatRef.get()
247 associatedSsSources, unassociatedSsObjects = None, None
248 nSsSrc, nSsObj = 0, 0
249 # Always false if ! self.config.doSolarSystemAssociation
250 if all([(visit, detector) in ssObjectIdDict and visit in finalVisitSummaryIdDict]):
251 # Get the ssCat and finalVisitSummary
252 ssCat = ssObjectIdDict[(visit, detector)].get()
253 finalVisitSummary = finalVisitSummaryIdDict[visit].get()
254 # Get the exposure metadata from the detector's row in the finalVisitSummary table.
255 visitInfo = finalVisitSummary.find(detector).visitInfo
256 bbox = finalVisitSummary.find(detector).getBBox()
257 wcs = finalVisitSummary.find(detector).wcs
258 ssoAssocResult = self.solarSystemAssociator.run(
259 tb.Table.from_pandas(diaCat),
260 ssCat,
261 visitInfo,
262 bbox,
263 wcs,
264 )
265
266 associatedSsSources = ssoAssocResult.associatedSsSources
267 associatedSsDiaSources = ssoAssocResult.ssoAssocDiaSources
268 ssInTractPatch = self._trimToPatch(associatedSsSources.to_pandas(),
269 innerPatchBox,
270 innerTractSkyRegion,
271 skyInfo.wcs)
272 associatedSsSources = associatedSsSources[ssInTractPatch]
273 assocDiaSrcIds = set(associatedSsSources['diaSourceId'])
274 diaSrcMask = [diaId in assocDiaSrcIds for diaId in associatedSsDiaSources['diaSourceId']]
275 associatedSsDiaSources = associatedSsDiaSources[np.array(diaSrcMask)]
276
277 unassociatedSsObjects = ssoAssocResult.unassociatedSsObjects
278 ssObjInTractPatch = self._trimToPatch(unassociatedSsObjects.to_pandas(),
279 innerPatchBox,
280 innerTractSkyRegion,
281 skyInfo.wcs)
282 unassociatedSsObjects = unassociatedSsObjects[ssObjInTractPatch]
283 nSsSrc = ssInTractPatch.sum()
284 nSsObj = ssObjInTractPatch.sum()
285 if len(ssoAssocResult.unAssocDiaSources) > 0:
286 diaCat = ssoAssocResult.unAssocDiaSources.to_pandas()
287 else:
288 diaCat = pd.DataFrame(columns=ssoAssocResult.unAssocDiaSources.columns)
289
290 diaInTractPatch = self._trimToPatch(diaCat,
291 innerPatchBox,
292 innerTractSkyRegion,
293 skyInfo.wcs)
294 diaCat = diaCat[diaInTractPatch]
295
296 nDiaSrc = diaInTractPatch.sum()
297
298 self.log.info(
299 "Read DiaSource catalog of length %i from visit %i, "
300 "detector %i. Found %i sources within the patch/tract "
301 "footprint, including %i associated with SSOs.",
302 len(diaCat), diaCatRef.dataId["visit"],
303 diaCatRef.dataId["detector"], nDiaSrc + nSsSrc, nSsSrc)
304
305 if nDiaSrc > 0:
306 diaSourceHistory.append(diaCat)
307 if nSsSrc > 0:
308 ssSourceHistory.append(associatedSsSources)
309 diaSourceHistory.append(associatedSsDiaSources.to_pandas())
310 if nSsObj > 0:
311 unassociatedSsObjects['visit'] = visit
312 unassociatedSsObjects['detector'] = detector
313 unassociatedSsObjectHistory.append(unassociatedSsObjects)
314
315 if diaSourceHistory:
316 diaSourceHistoryCat = pd.concat(diaSourceHistory)
317 else:
318 diaSourceHistoryCat = diaCat.drop(diaCat.index)
319 if self.config.doSolarSystemAssociation:
320 nSsSrc, nSsObj = 0, 0
321 if ssSourceHistory:
322 ssSourceHistoryCat = tb.vstack(ssSourceHistory)
323 ssSourceHistoryCat.remove_columns(['ra', 'dec'])
324 nSsSrc = len(ssSourceHistoryCat)
325 else:
326 ssSourceHistoryCat = associatedSsSources # Empty table?
327 if unassociatedSsObjectHistory:
328 unassociatedSsObjectHistoryCat = tb.vstack(unassociatedSsObjectHistory)
329 nSsObj = len(unassociatedSsObjectHistoryCat)
330 else:
331 unassociatedSsObjectHistoryCat = unassociatedSsObjects # Empty table?
332 self.log.info("Found %i ssSources and %i missing ssObjects in patch %i, tract %i",
333 nSsSrc, nSsObj, patchId, tractId)
334 else:
335 ssSourceHistoryCat = None
336 unassociatedSsObjectHistoryCat = None
337
338 if (not diaSourceHistory) and not (self.config.doSolarSystemAssociation and ssSourceHistory):
339 if not self.config.doWriteEmptyTables:
340 raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")
341
342 self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
343 len(diaSourceHistoryCat), patchId, tractId)
344
345 assocResult = self.associator.run(diaSourceHistoryCat, idGenerator=idGenerator)
346
347 self.log.info("Associated DiaSources into %i DiaObjects",
348 len(assocResult.diaObjects))
349
350 if self.config.doAddDiaObjectCoords:
351 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
352 assocResult.assocDiaSources)
353
354 return pipeBase.Struct(
355 diaObjectTable=assocResult.diaObjects,
356 assocDiaSourceTable=assocResult.assocDiaSources,
357 associatedSsSources=ssSourceHistoryCat,
358 unassociatedSsObjects=unassociatedSsObjectHistoryCat,
359 )
360
361 def _addDiaObjectCoords(self, objects, sources):
362 obj = objects[['ra', 'dec']].rename(columns={"ra": "coord_ra", "dec": "coord_dec"})
363 df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
364 how='inner').set_index('diaSourceId')
365 return df
366
367 def _trimToPatch(self, cat, innerPatchBox, innerTractSkyRegion, wcs):
368 """Create generator testing if a set of DiaSources are in the
369 patch/tract.
370
371 Parameters
372 ----------
373 cat : `pandas.DataFrame`
374 Catalog of DiaSources to test within patch/tract.
375 innerPatchBox : `lsst.geom.Box2D`
376 Bounding box of the patch.
377 wcs : `lsst.geom.SkyWcs`
378 Wcs of the tract.
379
380 Returns
381 ------
382 isInPatch : `numpy.ndarray`, (N,)
383 Booleans representing if the DiaSources are contained within the
384 current patch and tract.
385 """
386 isInPatch = np.zeros(len(cat), dtype=bool)
387
388 for idx, row in cat.iterrows():
389 spPoint = geom.SpherePoint(row["ra"], row["dec"], geom.degrees)
390 pxCoord = wcs.skyToPixel(spPoint)
391 ra_rad = np.deg2rad(row["ra"])
392 dec_rad = np.deg2rad(row["dec"])
393
394 isInPatch[idx] = innerPatchBox.contains(pxCoord) and innerTractSkyRegion.contains(ra_rad, dec_rad)
395
396 return isInPatch
A floating-point coordinate rectangle geometry.
Definition Box.h:413
Point in an unspecified spherical coordinate system.
Definition SpherePoint.h:57
_addDiaObjectCoords(self, objects, sources)
_trimToPatch(self, cat, innerPatchBox, innerTractSkyRegion, wcs)