LSSTApplications  1.1.2+25,10.0+13,10.0+132,10.0+133,10.0+224,10.0+41,10.0+8,10.0-1-g0f53050+14,10.0-1-g4b7b172+19,10.0-1-g61a5bae+98,10.0-1-g7408a83+3,10.0-1-gc1e0f5a+19,10.0-1-gdb4482e+14,10.0-11-g3947115+2,10.0-12-g8719d8b+2,10.0-15-ga3f480f+1,10.0-2-g4f67435,10.0-2-gcb4bc6c+26,10.0-28-gf7f57a9+1,10.0-3-g1bbe32c+14,10.0-3-g5b46d21,10.0-4-g027f45f+5,10.0-4-g86f66b5+2,10.0-4-gc4fccf3+24,10.0-40-g4349866+2,10.0-5-g766159b,10.0-5-gca2295e+25,10.0-6-g462a451+1
LSSTDataManagementBasePackage
pipeline.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 """
24 Classes corresponding to the 4 Stages that make up the LSST association pipeline:
25 
26  - load pre-existing object data for the visit FOV from chunk files
27  - match difference sources for the visit (from the detection pipeline) against objects,
28  outputting match results to the database
29  - match moving object predictions for the visit (from MOPS) against difference sources,
30  outputting match results and new objects (created from difference sources with no matches)
31  to the database
32  - store new objects into chunk delta files
33 """
34 from __future__ import with_statement
35 
36 from datetime import datetime
37 import os, os.path
38 import pdb
39 import re
40 import time
41 
42 import lsst.daf.base as base
43 import lsst.daf.persistence as persistence
44 import lsst.pex.logging as logging
45 import lsst.pex.harness as harness
46 import lsst.afw.image as image
47 import lsst.ap as ap
48 
49 
50 # ----------------------------------------------------------------
51 
52 class LoadStage(harness.Stage.Stage):
53  """
54  Stage that loads basic object data (id, position, variability probabilities)
55  for a visit FOV.
56  """
57 
58  def _massagePolicy(self):
59  loc = persistence.LogicalLocation(self._policy.get('filterTableLocation'))
60  self._policy.set('filterTableLocation', loc.locString())
61 
62  def __init__(self, stageId, policy):
63  if policy is None:
64  raise RuntimeError, "Cannot create a lsst.ap.LoadStage without a policy"
65  harness.Stage.Stage.__init__(self, stageId, policy)
66  self._firstVisit = True
67 
68  def makeVpContext(self):
69  """
70  Takes a clipboard from the input queue, creates a visit processing context
71  (a holder for all inter-stage association pipeline state), adds it to the
72  clipboard, and finally places the clipboard onto the output queue. Expected
73  on the input clipboard is an event (named 'triggerAssociationEvent')
74  containing the following information:
75  - the position of the visit center, given by keys 'ra' and 'decl' (both
76  with double precision values in units of degrees).
77  - a visit identifier given by the 'visitId' key (with an integer value).
78  - the name of the filter (a string) for the visit given by the 'filter' key.
79  - the time at which the visit will occur, given by key 'dateObs' with a double
80  precision value in MJD (TAI)
81  - [optionally] a match radius given by the 'matchRadius' key (with a double
82  precision value in units of arc-seconds).
83  """
84  clipboard = self.inputQueue.getNextDataset()
85  event = clipboard.get('triggerAssociationEvent')
87  self._policy, event, self.getRun(), self.getRank(), self.getUniverseSize() - 1
88  )
89  clipboard.put('vpContext', self.vpContext)
90  self.outputQueue.addDataset(clipboard)
91 
92  def preprocess(self):
93  """
94  Registers the incoming visit with the shared memory chunk manager.
95  """
96  assert self.inputQueue.size() == 1
97  assert self.outputQueue.size() == 0
98  if self._firstVisit:
99  self._massagePolicy()
100  ap.initialize(str(self.getRun()))
101  self._firstVisit = False
102  self.makeVpContext()
103  ap.registerVisit(self.vpContext)
104 
105  def process(self):
106  """
107  Loads object chunk and chunk delta files assigned to the slice.
108  """
109  assert self.inputQueue.size() == 1
110  assert self.outputQueue.size() == 0
111  if self._firstVisit:
112  self._massagePolicy()
113  ap.initialize(str(self.getRun()))
114  self._firstVisit = False
115  self.makeVpContext()
116  ap.loadSliceObjects(self.vpContext)
117 
118  def postprocess(self):
119  """
120  Checks to make sure all worker slices successfully loaded their share of the
121  objects for the visit FOV and builds an object index if so.
122  """
123  ap.buildObjectIndex(self.vpContext)
124 
125 
126 # --------------------------------------------------------------------------------
127 
128 class MatchDiaSourcesStage(harness.Stage.Stage):
129  """
130  Matches difference sources from the detection pipeline against objects
131  within the visits FOV. Difference sources are expected to be found on the clipboard
132  under the key 'diaSources' and match results, consisting of (difference source id,
133  object id, match distance) tuples are placed onto the clipboard under the key
134  'diaSourceToObjectMatches'.
135  """
136 
137  def __init__(self, stageId=-1, policy=None):
138  harness.Stage.Stage.__init__(self, stageId, policy)
139 
140  def preprocess(self):
141  assert self.inputQueue.size() == 1
142  assert self.outputQueue.size() == 0
143  clipboard = self.inputQueue.getNextDataset()
144  vpContext = clipboard.get('vpContext')
145  sources = clipboard.get('diaSources')
146  matches = ap.MatchPairVec()
147 
148  vpContext.setDiaSources(sources)
149  ap.matchDiaSources(matches, vpContext)
150 
151  clipboard.put('diaSourceToObjectMatches', ap.PersistableMatchPairVector(matches))
152  self.outputQueue.addDataset(clipboard)
153 
154  def process(self):
155  assert self.inputQueue.size() == 1
156  assert self.outputQueue.size() == 0
157  self.outputQueue.addDataset(self.inputQueue.getNextDataset())
158 
159  def postprocess(self):
160  pass
161 
162 
163 # --------------------------------------------------------------------------------
164 
165 class MatchMopsPredsStage(harness.Stage.Stage):
166  """
167  Matches moving object predictions for a visit against difference sources. The previous
168  stage is assumed to have read the difference sources and produced an index for them (stored
169  in 'vpContext' on the clipboard). Moving object predictions are expected to be found on the
170  clipboard under the key 'mopsPreds' and match results, consisting of (moving object id,
171  difference source id, match distance) tuples are placed onto the clipboard under the key
172  'predToDiaSourceMatches'. Finally, difference sources which didn't match anything are
173  used to create new objects - identifiers for these are placed onto the clipboard under the
174  key 'diaSourceToNewObject'.
175  """
176  def __init__(self, stageId=-1, policy=None):
177  harness.Stage.Stage.__init__(self, stageId, policy)
178 
179  def preprocess(self):
180  assert self.inputQueue.size() == 1
181  assert self.outputQueue.size() == 0
182  clipboard = self.inputQueue.getNextDataset()
183  vpContext = clipboard.get('vpContext')
184 
185  try:
186  ev1 = clipboard.get('triggerAssociationEvent')
187  ev2 = clipboard.get('triggerMatchMopsPredsEvent')
188  if ev1.getInt('visitId') != ev2.getInt('visitId'):
189  raise RuntimeError('triggerAssociationEvent.visitId != triggerMatchMopsPredsEvent.visitId')
190  preds = clipboard.get('mopsPreds')
191  matches = ap.MatchPairVec()
192  idPairs = ap.IdPairVec()
193  except:
194  ap.endVisit(vpContext, True)
195  raise
196 
197  ap.matchMops(matches, idPairs, vpContext, preds.getPredictions())
198 
199  clipboard.put('predToDiaSourceMatches', ap.PersistableMatchPairVector(matches))
200  clipboard.put('diaSourceToNewObject', ap.PersistableIdPairVector(idPairs))
201  self.outputQueue.addDataset(clipboard)
202 
203  def process(self):
204  assert self.inputQueue.size() == 1
205  assert self.outputQueue.size() == 0
206  self.outputQueue.addDataset(self.inputQueue.getNextDataset())
207 
208  def postprocess(self):
209  pass
210 
211 
212 # ----------------------------------------------------------------
213 
214 class StoreStage(harness.Stage.Stage):
215  """
216  Store new objects (created from unmatched difference sources under certain
217  conditions) obtained during the visit into chunk delta files.
218  """
219  def _runSql(self, sqlStatements, scriptFileName):
220  db = persistence.DbStorage()
221  db.setPersistLocation(self.database)
222  with open(os.path.join(self.scriptDir, scriptFileName), 'w') as scriptFile:
223  for stmt in sqlStatements:
224  stmt = stmt % self.templateDict
225  startTime = time.clock()
226  scriptFile.write(stmt)
227  scriptFile.write(';\n\n')
228  scriptFile.flush()
229  try:
230  db.executeSql(stmt)
231  except Exception, e:
232  try:
233  for line in str(e).splitlines(True):
234  scriptFile.write('-- ' + line)
235  scriptFile.flush()
236  except:
237  pass
238  raise
239  scriptFile.write('-- statement executed in %f seconds\n\n' % (time.clock() - startTime))
240 
241  def _copyObject(self):
242  """
243  Append contents of policy-specified object table to Object table in per-run database
244  """
245  if (self.objectTable and len(self.objectTable) > 0):
246  db = persistence.DbStorage()
247  db.setPersistLocation(self.database)
248  db.executeSql("INSERT INTO %s SELECT * FROM %s" %\
249  (self.templateDict['nonVarObjectTable'], self.objectTable))
250 
251  def __init__(self, stageId, policy):
252  if policy is None:
253  raise RuntimeError, "Cannot create a lsst.ap.StoreStage without a policy"
254  harness.Stage.Stage.__init__(self, stageId, policy)
255  self.filterChars = ('u','g','r','i','z','y')
256  self.templateDict = {}
257  self.additionalData = base.PropertySet()
258  self.database = persistence.LogicalLocation(policy.getString('database'))
259  self.objectTable = policy.getString('objectTable') if policy.exists('objectTable') else None
260  self.storeOutputs = policy.getBool('storeOutputs')
261  self.appendTables = policy.getBool('appendTables')
262  self.dropTables = policy.getBool('dropTables')
263  self.scriptDir = persistence.LogicalLocation(policy.getString('scriptDirectory')).locString()
264  if not os.path.exists(self.scriptDir):
265  os.makedirs(self.scriptDir)
266  self.templateDict['diaSourceTable'] = policy.getString('diaSourceTable')
267  self.templateDict['varObjectTable'] = policy.getString('varObjectTable')
268  self.templateDict['nonVarObjectTable'] = policy.getString('nonVarObjectTable')
269 
270  def initialize(self, outQueue, inQueue):
271  """
272  The master slice is in charge of database prep-work
273  """
274  harness.Stage.Stage.initialize(self, outQueue, inQueue)
275  if self.getRank() == -1:
276  self._copyObject()
277 
278  def preprocess(self):
279  pass
280 
281  def process(self):
282  """
283  Store chunk deltas assigned to the worker slice.
284  """
285  assert self.inputQueue.size() == 1
286  assert self.outputQueue.size() == 0
287  clipboard = self.inputQueue.getNextDataset()
288  self.outputQueue.addDataset(clipboard)
289  ap.storeSliceObjects(clipboard.get('vpContext'))
290 
291  def postprocess(self):
292  """
293  Check to make sure all worker slices successfully stored their share of the
294  new objects for the visit and removes the visit from the list of in-flight
295  visits being tracked by the shared memory chunk manager. Optionally:
296  - store pipeline outputs (new objects, object updates, new difference sources)
297  - append per-visit tables to global accumulator tables
298  - drop per-visit tables created by LSST pipelines
299  """
300  assert self.inputQueue.size() == 1
301  assert self.outputQueue.size() == 0
302  clipboard = self.inputQueue.getNextDataset()
303  self.outputQueue.addDataset(clipboard)
304 
305  vpContext = clipboard.get('vpContext')
306  event = clipboard.get('triggerAssociationEvent')
307  do = base.DateTime(event.getDouble('dateObs'))
308  doUtc = datetime.utcfromtimestamp(do.nsecs(base.DateTime.UTC)/1000000000).isoformat(' ')
309  dbType = re.match(r'(\w+)://', self.database.locString()).group(1)
310  visitId = event.getInt('visitId')
311 
312  self.templateDict['runId'] = self.getRun()
313  self.templateDict['visitId'] = visitId
314  self.templateDict['dateObs'] = doUtc
315  self.templateDict['filter'] = self.filterChars[vpContext.getFilterId()]
316 
317  if self.storeOutputs:
318  try:
319  self._runSql(ap.SqlStoreOutputs.sqlStatements[dbType], 'StoreOutputs_%d.sql' % visitId)
320  except:
321  ap.endVisit(vpContext, True)
322  raise
323  if not ap.endVisit(vpContext, False):
324  raise RuntimeError('Association pipeline failed: visit not committed')
325  if self.appendTables:
326  self._runSql(ap.SqlAppendTables.sqlStatements[dbType], 'AppendTables_%d.sql' % visitId)
327  if self.dropTables:
328  self._runSql(ap.SqlDropTables.sqlStatements[dbType], 'DropTables_%d.sql' % visitId)
329 
330 
331 # ----------------------------------------------------------------
332 
Container for inter-stage association pipeline state.
Definition: Stages.h:69
A persistable wrapper for a MatchPairVector.
Definition: Results.h:131
A persistable wrapper for an IdPairVector.
Definition: Results.h:164