LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
drpAssociationPipe.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21#
22
23"""Pipeline for running DiaSource association in a DRP context.
24"""
25
26import numpy as np
27import pandas as pd
28
29import lsst.geom as geom
30import lsst.pex.config as pexConfig
31import lsst.pipe.base as pipeBase
32from lsst.skymap import BaseSkyMap
33
34from .coaddBase import makeSkyInfo
35from .simpleAssociation import SimpleAssociationTask
36
37__all__ = ["DrpAssociationPipeTask",
38 "DrpAssociationPipeConfig",
39 "DrpAssociationPipeConnections"]
40
41
42class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
43 dimensions=("tract", "patch", "skymap"),
44 defaultTemplates={"coaddName": "deep",
45 "warpTypeSuffix": "",
46 "fakesType": ""}):
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc="Set of catalogs of calibrated DiaSources.",
49 name="{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass="DataFrame",
51 dimensions=("instrument", "visit", "detector"),
52 deferLoad=True,
53 multiple=True
54 )
55 skyMap = pipeBase.connectionTypes.Input(
56 doc="Input definition of geometry/bbox and projection/wcs for coadded "
57 "exposures",
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass="SkyMap",
60 dimensions=("skymap", ),
61 )
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc="Catalog of DiaSources covering the patch and associated with a "
64 "DiaObject.",
65 name="{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass="DataFrame",
67 dimensions=("tract", "patch"),
68 )
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc="Catalog of DiaObjects created from spatially associating "
71 "DiaSources.",
72 name="{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass="DataFrame",
74 dimensions=("tract", "patch"),
75 )
76
77
78class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc="Task used to associate DiaSources with DiaObjects.",
84 )
85 doAddDiaObjectCoords = pexConfig.Field(
86 dtype=bool,
87 default=True,
88 doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
90 )
91 doWriteEmptyTables = pexConfig.Field(
92 dtype=bool,
93 default=False,
94 doc="If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
96 )
97
98
99class DrpAssociationPipeTask(pipeBase.PipelineTask):
100 """Driver pipeline for loading DiaSource catalogs in a patch/tract
101 region and associating them.
102 """
103 ConfigClass = DrpAssociationPipeConfig
104 _DefaultName = "drpAssociation"
105
106 def __init__(self, **kwargs):
107 super().__init__(**kwargs)
108 self.makeSubtask('associator')
109
110 def runQuantum(self, butlerQC, inputRefs, outputRefs):
111 inputs = butlerQC.get(inputRefs)
112
113 inputs["tractId"] = butlerQC.quantum.dataId["tract"]
114 inputs["patchId"] = butlerQC.quantum.dataId["patch"]
115 tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
116 "tract_patch",
117 returnMaxBits=True)
118 inputs["tractPatchId"] = tractPatchId
119 inputs["skymapBits"] = skymapBits
120
121 outputs = self.run(**inputs)
122 butlerQC.put(outputs, outputRefs)
123
124 def run(self,
125 diaSourceTables,
126 skyMap,
127 tractId,
128 patchId,
129 tractPatchId,
130 skymapBits):
131 """Trim DiaSources to the current Patch and run association.
132
133 Takes in the set of DiaSource catalogs that covers the current patch,
134 trims them to the dimensions of the patch, and [TODO: eventually]
135 runs association on the concatenated DiaSource Catalog.
136
137 Parameters
138 ----------
139 diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle`
140 Set of DiaSource catalogs potentially covering this patch/tract.
141 skyMap : `lsst.skymap.BaseSkyMap`
142 SkyMap defining the patch/tract
143 tractId : `int`
144 Id of current tract being processed.
145 patchId : `int`
146 Id of current patch being processed
147
148 Returns
149 -------
150 output : `lsst.pipe.base.Struct`
151 Results struct with attributes:
152
153 ``assocDiaSourceTable``
154 Table of DiaSources with updated value for diaObjectId.
155 (`pandas.DataFrame`)
156 ``diaObjectTable``
157 Table of DiaObjects from matching DiaSources
158 (`pandas.DataFrame`).
159 """
160 self.log.info("Running DPR Association on patch %i, tract %i...",
161 patchId, tractId)
162
163 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
164
165 # Get the patch bounding box.
166 innerPatchBox = geom.Box2D(skyInfo.patchInfo.getInnerBBox())
167
168 diaSourceHistory = []
169 for catRef in diaSourceTables:
170 cat = catRef.get(
171 datasetType=self.config.connections.diaSourceTables,
172 immediate=True)
173
174 isInTractPatch = self._trimToPatch(cat,
175 innerPatchBox,
176 skyInfo.wcs)
177
178 nDiaSrc = isInTractPatch.sum()
179 self.log.info(
180 "Read DiaSource catalog of length %i from visit %i, "
181 "detector %i. Found %i sources within the patch/tract "
182 "footprint.",
183 len(cat), catRef.dataId["visit"],
184 catRef.dataId["detector"], nDiaSrc)
185
186 if nDiaSrc <= 0:
187 continue
188
189 cutCat = cat[isInTractPatch]
190 diaSourceHistory.append(cutCat)
191
192 if diaSourceHistory:
193 diaSourceHistoryCat = pd.concat(diaSourceHistory)
194 else:
195 # No rows to associate
196 if self.config.doWriteEmptyTables:
197 self.log.info("Constructing empty table")
198 # Construct empty table using last table and dropping all the rows
199 diaSourceHistoryCat = cat.drop(cat.index)
200 else:
201 raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")
202
203 self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
204 len(diaSourceHistoryCat), patchId, tractId)
205
206 assocResult = self.associator.run(diaSourceHistoryCat,
207 tractPatchId,
208 skymapBits)
209
210 self.log.info("Associated DiaSources into %i DiaObjects",
211 len(assocResult.diaObjects))
212
213 if self.config.doAddDiaObjectCoords:
214 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
215 assocResult.assocDiaSources)
216
217 return pipeBase.Struct(
218 diaObjectTable=assocResult.diaObjects,
219 assocDiaSourceTable=assocResult.assocDiaSources)
220
221 def _addDiaObjectCoords(self, objects, sources):
222 obj = objects[['ra', 'decl']].rename(columns={"ra": "coord_ra", "decl": "coord_dec"})
223 df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
224 how='inner').set_index('diaSourceId')
225 return df
226
227 def _trimToPatch(self, cat, innerPatchBox, wcs):
228 """Create generator testing if a set of DiaSources are in the
229 patch/tract.
230
231 Parameters
232 ----------
233 cat : `pandas.DataFrame`
234 Catalog of DiaSources to test within patch/tract.
235 innerPatchBox : `lsst.geom.Box2D`
236 Bounding box of the patch.
237 wcs : `lsst.geom.SkyWcs`
238 Wcs of the tract.
239
240 Returns
241 ------
242 isInPatch : `numpy.ndarray`, (N,)
243 Booleans representing if the DiaSources are contained within the
244 current patch and tract.
245 """
246 isInPatch = np.array([
247 innerPatchBox.contains(
248 wcs.skyToPixel(
249 geom.SpherePoint(row["ra"], row["decl"], geom.degrees)))
250 for idx, row in cat.iterrows()])
251 return isInPatch
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
def run(self, coaddExposures, bbox, wcs, dataIds, **kwargs)
Definition: getTemplate.py:596
def makeSkyInfo(skyMap, tractId, patchId)
Definition: coaddBase.py:293