LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
drpAssociationPipe.py
Go to the documentation of this file.
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22"""Pipeline for running DiaSource association in a DRP context.
23"""
24
25__all__ = ["DrpAssociationPipeTask",
26 "DrpAssociationPipeConfig",
27 "DrpAssociationPipeConnections"]
28
29import numpy as np
30import pandas as pd
31
32import lsst.geom as geom
33import lsst.pex.config as pexConfig
34import lsst.pipe.base as pipeBase
35from lsst.meas.base import SkyMapIdGeneratorConfig
36from lsst.skymap import BaseSkyMap
37
38from .coaddBase import makeSkyInfo
39from .simpleAssociation import SimpleAssociationTask
40
41
42class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
43 dimensions=("tract", "patch", "skymap"),
44 defaultTemplates={"coaddName": "deep",
45 "warpTypeSuffix": "",
46 "fakesType": ""}):
47 diaSourceTables = pipeBase.connectionTypes.Input(
48 doc="Set of catalogs of calibrated DiaSources.",
49 name="{fakesType}{coaddName}Diff_diaSrcTable",
50 storageClass="DataFrame",
51 dimensions=("instrument", "visit", "detector"),
52 deferLoad=True,
53 multiple=True
54 )
55 skyMap = pipeBase.connectionTypes.Input(
56 doc="Input definition of geometry/bbox and projection/wcs for coadded "
57 "exposures",
58 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59 storageClass="SkyMap",
60 dimensions=("skymap", ),
61 )
62 assocDiaSourceTable = pipeBase.connectionTypes.Output(
63 doc="Catalog of DiaSources covering the patch and associated with a "
64 "DiaObject.",
65 name="{fakesType}{coaddName}Diff_assocDiaSrcTable",
66 storageClass="DataFrame",
67 dimensions=("tract", "patch"),
68 )
69 diaObjectTable = pipeBase.connectionTypes.Output(
70 doc="Catalog of DiaObjects created from spatially associating "
71 "DiaSources.",
72 name="{fakesType}{coaddName}Diff_diaObjTable",
73 storageClass="DataFrame",
74 dimensions=("tract", "patch"),
75 )
76
77
78class DrpAssociationPipeConfig(
79 pipeBase.PipelineTaskConfig,
80 pipelineConnections=DrpAssociationPipeConnections):
81 associator = pexConfig.ConfigurableField(
82 target=SimpleAssociationTask,
83 doc="Task used to associate DiaSources with DiaObjects.",
84 )
85 doAddDiaObjectCoords = pexConfig.Field(
86 dtype=bool,
87 default=True,
88 doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
89 "Duplicates information, but needed for bulk ingest into qserv."
90 )
91 doWriteEmptyTables = pexConfig.Field(
92 dtype=bool,
93 default=False,
94 doc="If True, construct and write out empty diaSource and diaObject "
95 "tables. If False, raise NoWorkFound"
96 )
97 idGenerator = SkyMapIdGeneratorConfig.make_field()
98
99
100class DrpAssociationPipeTask(pipeBase.PipelineTask):
101 """Driver pipeline for loading DiaSource catalogs in a patch/tract
102 region and associating them.
103 """
104 ConfigClass = DrpAssociationPipeConfig
105 _DefaultName = "drpAssociation"
106
107 def __init__(self, **kwargs):
108 super().__init__(**kwargs)
109 self.makeSubtask('associator')
110
111 def runQuantum(self, butlerQC, inputRefs, outputRefs):
112 inputs = butlerQC.get(inputRefs)
113
114 inputs["tractId"] = butlerQC.quantum.dataId["tract"]
115 inputs["patchId"] = butlerQC.quantum.dataId["patch"]
116 inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
117
118 outputs = self.run(**inputs)
119 butlerQC.put(outputs, outputRefs)
120
121 def run(self,
122 diaSourceTables,
123 skyMap,
124 tractId,
125 patchId,
126 idGenerator=None):
127 """Trim DiaSources to the current Patch and run association.
128
129 Takes in the set of DiaSource catalogs that covers the current patch,
130 trims them to the dimensions of the patch, and [TODO: eventually]
131 runs association on the concatenated DiaSource Catalog.
132
133 Parameters
134 ----------
135 diaSourceTables : `list` of `lsst.daf.butler.DeferredDatasetHandle`
136 Set of DiaSource catalogs potentially covering this patch/tract.
137 skyMap : `lsst.skymap.BaseSkyMap`
138 SkyMap defining the patch/tract
139 tractId : `int`
140 Id of current tract being processed.
141 patchId : `int`
142 Id of current patch being processed.
143 idGenerator : `lsst.meas.base.IdGenerator`, optional
144 Object that generates Object IDs and random number generator seeds.
145
146 Returns
147 -------
148 output : `lsst.pipe.base.Struct`
149 Results struct with attributes:
150
151 ``assocDiaSourceTable``
152 Table of DiaSources with updated value for diaObjectId.
153 (`pandas.DataFrame`)
154 ``diaObjectTable``
155 Table of DiaObjects from matching DiaSources
156 (`pandas.DataFrame`).
157 """
158 self.log.info("Running DPR Association on patch %i, tract %i...",
159 patchId, tractId)
160
161 skyInfo = makeSkyInfo(skyMap, tractId, patchId)
162
163 # Get the patch bounding box.
164 innerPatchBox = geom.Box2D(skyInfo.patchInfo.getInnerBBox())
165
166 diaSourceHistory = []
167 for catRef in diaSourceTables:
168 cat = catRef.get()
169
170 isInTractPatch = self._trimToPatch(cat,
171 innerPatchBox,
172 skyInfo.wcs)
173
174 nDiaSrc = isInTractPatch.sum()
175 self.log.info(
176 "Read DiaSource catalog of length %i from visit %i, "
177 "detector %i. Found %i sources within the patch/tract "
178 "footprint.",
179 len(cat), catRef.dataId["visit"],
180 catRef.dataId["detector"], nDiaSrc)
181
182 if nDiaSrc <= 0:
183 continue
184
185 cutCat = cat[isInTractPatch]
186 diaSourceHistory.append(cutCat)
187
188 if diaSourceHistory:
189 diaSourceHistoryCat = pd.concat(diaSourceHistory)
190 else:
191 # No rows to associate
192 if self.config.doWriteEmptyTables:
193 self.log.info("Constructing empty table")
194 # Construct empty table using last table and dropping all the rows
195 diaSourceHistoryCat = cat.drop(cat.index)
196 else:
197 raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")
198
199 self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
200 len(diaSourceHistoryCat), patchId, tractId)
201
202 assocResult = self.associator.run(diaSourceHistoryCat, idGenerator=idGenerator)
203
204 self.log.info("Associated DiaSources into %i DiaObjects",
205 len(assocResult.diaObjects))
206
207 if self.config.doAddDiaObjectCoords:
208 assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
209 assocResult.assocDiaSources)
210
211 return pipeBase.Struct(
212 diaObjectTable=assocResult.diaObjects,
213 assocDiaSourceTable=assocResult.assocDiaSources)
214
215 def _addDiaObjectCoords(self, objects, sources):
216 obj = objects[['ra', 'dec']].rename(columns={"ra": "coord_ra", "dec": "coord_dec"})
217 df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
218 how='inner').set_index('diaSourceId')
219 return df
220
221 def _trimToPatch(self, cat, innerPatchBox, wcs):
222 """Create generator testing if a set of DiaSources are in the
223 patch/tract.
224
225 Parameters
226 ----------
227 cat : `pandas.DataFrame`
228 Catalog of DiaSources to test within patch/tract.
229 innerPatchBox : `lsst.geom.Box2D`
230 Bounding box of the patch.
231 wcs : `lsst.geom.SkyWcs`
232 Wcs of the tract.
233
234 Returns
235 ------
236 isInPatch : `numpy.ndarray`, (N,)
237 Booleans representing if the DiaSources are contained within the
238 current patch and tract.
239 """
240 isInPatch = np.array([
241 innerPatchBox.contains(
242 wcs.skyToPixel(
243 geom.SpherePoint(row["ra"], row["dec"], geom.degrees)))
244 for idx, row in cat.iterrows()])
245 return isInPatch
A floating-point coordinate rectangle geometry.
Definition Box.h:413
Point in an unspecified spherical coordinate system.
Definition SpherePoint.h:57