LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
drpAssociationPipe.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 #
22 
23 """Pipeline for running DiaSource association in a DRP context.
24 """
25 
26 import numpy as np
27 import pandas as pd
28 
29 import lsst.geom as geom
30 import lsst.pex.config as pexConfig
31 import lsst.pipe.base as pipeBase
32 from lsst.skymap import BaseSkyMap
33 
34 from .coaddBase import makeSkyInfo
35 from .simpleAssociation import SimpleAssociationTask
36 
37 __all__ = ["DrpAssociationPipeTask",
38  "DrpAssociationPipeConfig",
39  "DrpAssociationPipeConnections"]
40 
41 
42 class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
43  dimensions=("tract", "patch", "skymap"),
44  defaultTemplates={"coaddName": "deep",
45  "warpTypeSuffix": "",
46  "fakesType": ""}):
47  diaSourceTables = pipeBase.connectionTypes.Input(
48  doc="Set of catalogs of calibrated DiaSources.",
49  name="{fakesType}{coaddName}Diff_diaSrcTable",
50  storageClass="DataFrame",
51  dimensions=("instrument", "visit", "detector"),
52  deferLoad=True,
53  multiple=True
54  )
55  skyMap = pipeBase.connectionTypes.Input(
56  doc="Input definition of geometry/bbox and projection/wcs for coadded "
57  "exposures",
58  name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
59  storageClass="SkyMap",
60  dimensions=("skymap", ),
61  )
62  assocDiaSourceTable = pipeBase.connectionTypes.Output(
63  doc="Catalog of DiaSources covering the patch and associated with a "
64  "DiaObject.",
65  name="{fakesType}{coaddName}Diff_assocDiaSrcTable",
66  storageClass="DataFrame",
67  dimensions=("tract", "patch"),
68  )
69  diaObjectTable = pipeBase.connectionTypes.Output(
70  doc="Catalog of DiaObjects created from spatially associating "
71  "DiaSources.",
72  name="{fakesType}{coaddName}Diff_diaObjTable",
73  storageClass="DataFrame",
74  dimensions=("tract", "patch"),
75  )
76 
77 
78 class DrpAssociationPipeConfig(
79  pipeBase.PipelineTaskConfig,
80  pipelineConnections=DrpAssociationPipeConnections):
81  associator = pexConfig.ConfigurableField(
82  target=SimpleAssociationTask,
83  doc="Task used to associate DiaSources with DiaObjects.",
84  )
85  doAddDiaObjectCoords = pexConfig.Field(
86  dtype=bool,
87  default=True,
88  doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
89  "Duplicates information, but needed for bulk ingest into qserv."
90  )
91  doWriteEmptyTables = pexConfig.Field(
92  dtype=bool,
93  default=False,
94  doc="If True, construct and write out empty diaSource and diaObject "
95  "tables. If False, raise NoWorkFound"
96  )
97 
98 
99 class DrpAssociationPipeTask(pipeBase.PipelineTask):
100  """Driver pipeline for loading DiaSource catalogs in a patch/tract
101  region and associating them.
102  """
103  ConfigClass = DrpAssociationPipeConfig
104  _DefaultName = "drpAssociation"
105 
106  def __init__(self, **kwargs):
107  super().__init__(**kwargs)
108  self.makeSubtask('associator')
109 
110  def runQuantum(self, butlerQC, inputRefs, outputRefs):
111  inputs = butlerQC.get(inputRefs)
112 
113  inputs["tractId"] = butlerQC.quantum.dataId["tract"]
114  inputs["patchId"] = butlerQC.quantum.dataId["patch"]
115  tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
116  "tract_patch",
117  returnMaxBits=True)
118  inputs["tractPatchId"] = tractPatchId
119  inputs["skymapBits"] = skymapBits
120 
121  outputs = self.run(**inputs)
122  butlerQC.put(outputs, outputRefs)
123 
124  def run(self,
125  diaSourceTables,
126  skyMap,
127  tractId,
128  patchId,
129  tractPatchId,
130  skymapBits):
131  """Trim DiaSources to the current Patch and run association.
132 
133  Takes in the set of DiaSource catalogs that covers the current patch,
134  trims them to the dimensions of the patch, and [TODO: eventually]
135  runs association on the concatenated DiaSource Catalog.
136 
137  Parameters
138  ----------
139  diaSourceTables : `list` of `lst.daf.butler.DeferredDatasetHandle`
140  Set of DiaSource catalogs potentially covering this patch/tract.
141  skyMap : `lsst.skymap.BaseSkyMap`
142  SkyMap defining the patch/tract
143  tractId : `int`
144  Id of current tract being processed.
145  patchId : `int`
146  Id of current patch being processed
147 
148  Returns
149  -------
150  output : `lsst.pipe.base.Struct`
151  Results struct with attributes:
152 
153  ``assocDiaSourceTable``
154  Table of DiaSources with updated value for diaObjectId.
155  (`pandas.DataFrame`)
156  ``diaObjectTable``
157  Table of DiaObjects from matching DiaSources
158  (`pandas.DataFrame`).
159  """
160  self.log.info("Running DPR Association on patch %i, tract %i...",
161  patchId, tractId)
162 
163  skyInfo = makeSkyInfo(skyMap, tractId, patchId)
164 
165  # Get the patch bounding box.
166  innerPatchBox = geom.Box2D(skyInfo.patchInfo.getInnerBBox())
167 
168  diaSourceHistory = []
169  for catRef in diaSourceTables:
170  cat = catRef.get(
171  datasetType=self.config.connections.diaSourceTables,
172  immediate=True)
173 
174  isInTractPatch = self._trimToPatch(cat,
175  innerPatchBox,
176  skyInfo.wcs)
177 
178  nDiaSrc = isInTractPatch.sum()
179  self.log.info(
180  "Read DiaSource catalog of length %i from visit %i, "
181  "detector %i. Found %i sources within the patch/tract "
182  "footprint.",
183  len(cat), catRef.dataId["visit"],
184  catRef.dataId["detector"], nDiaSrc)
185 
186  if nDiaSrc <= 0:
187  continue
188 
189  cutCat = cat[isInTractPatch]
190  diaSourceHistory.append(cutCat)
191 
192  if diaSourceHistory:
193  diaSourceHistoryCat = pd.concat(diaSourceHistory)
194  else:
195  # No rows to associate
196  if self.config.doWriteEmptyTables:
197  self.log.info("Constructing empty table")
198  # Construct empty table using last table and dropping all the rows
199  diaSourceHistoryCat = cat.drop(cat.index)
200  else:
201  raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")
202 
203  self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
204  len(diaSourceHistoryCat), patchId, tractId)
205 
206  assocResult = self.associator.run(diaSourceHistoryCat,
207  tractPatchId,
208  skymapBits)
209 
210  self.log.info("Associated DiaSources into %i DiaObjects",
211  len(assocResult.diaObjects))
212 
213  if self.config.doAddDiaObjectCoords:
214  assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
215  assocResult.assocDiaSources)
216 
217  return pipeBase.Struct(
218  diaObjectTable=assocResult.diaObjects,
219  assocDiaSourceTable=assocResult.assocDiaSources)
220 
221  def _addDiaObjectCoords(self, objects, sources):
222  obj = objects[['ra', 'decl']].rename(columns={"ra": "coord_ra", "decl": "coord_dec"})
223  df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
224  how='inner').set_index('diaSourceId')
225  return df
226 
227  def _trimToPatch(self, cat, innerPatchBox, wcs):
228  """Create generator testing if a set of DiaSources are in the
229  patch/tract.
230 
231  Parameters
232  ----------
233  cat : `pandas.DataFrame`
234  Catalog of DiaSources to test within patch/tract.
235  innerPatchBox : `lsst.geom.Box2D`
236  Bounding box of the patch.
237  wcs : `lsst.geom.SkyWcs`
238  Wcs of the tract.
239 
240  Returns
241  ------
242  isInPatch : `numpy.ndarray`, (N,)
243  Booleans representing if the DiaSources are contained within the
244  current patch and tract.
245  """
246  isInPatch = np.array([
247  innerPatchBox.contains(
248  wcs.skyToPixel(
249  geom.SpherePoint(row["ra"], row["decl"], geom.degrees)))
250  for idx, row in cat.iterrows()])
251  return isInPatch
A floating-point coordinate rectangle geometry.
Definition: Box.h:413
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603
def makeSkyInfo(skyMap, tractId, patchId)
Definition: coaddBase.py:289