24 Classes corresponding to the 4 Stages that make up the LSST association pipeline:
26 - load pre-existing object data for the visit FOV from chunk files
27 - match difference sources for the visit (from the detection pipeline) against objects,
28 outputting match results to the database
29 - match moving object predictions for the visit (from MOPS) against difference sources,
30 outputting match results and new objects (created from difference sources with no matches)
32 - store new objects into chunk delta files
34 from __future__
import with_statement
36 from datetime
import datetime
45 import lsst.pex.harness
as harness
54 Stage that loads basic object data (id, position, variability probabilities)
59 loc = persistence.LogicalLocation(self._policy.get(
'filterTableLocation'))
60 self._policy.set(
'filterTableLocation', loc.locString())
64 raise RuntimeError,
"Cannot create a lsst.ap.LoadStage without a policy"
65 harness.Stage.Stage.__init__(self, stageId, policy)
70 Takes a clipboard from the input queue, creates a visit processing context
71 (a holder for all inter-stage association pipeline state), adds it to the
72 clipboard, and finally places the clipboard onto the output queue. Expected
73 on the input clipboard is an event (named 'triggerAssociationEvent')
74 containing the following information:
75 - the position of the visit center, given by keys 'ra' and 'decl' (both
76 with double precision values in units of degrees).
77 - a visit identifier given by the 'visitId' key (with an integer value).
78 - the name of the filter (a string) for the visit given by the 'filter' key.
79 - the time at which the visit will occur, given by key 'dateObs' with a double
80 precision value in MJD (TAI)
81 - [optionally] a match radius given by the 'matchRadius' key (with a double
82 precision value in units of arc-seconds).
84 clipboard = self.inputQueue.getNextDataset()
85 event = clipboard.get(
'triggerAssociationEvent')
87 self._policy, event, self.getRun(), self.getRank(), self.getUniverseSize() - 1
89 clipboard.put(
'vpContext', self.
vpContext)
90 self.outputQueue.addDataset(clipboard)
94 Registers the incoming visit with the shared memory chunk manager.
96 assert self.inputQueue.size() == 1
97 assert self.outputQueue.size() == 0
100 ap.initialize(str(self.getRun()))
107 Loads object chunk and chunk delta files assigned to the slice.
109 assert self.inputQueue.size() == 1
110 assert self.outputQueue.size() == 0
113 ap.initialize(str(self.getRun()))
120 Checks to make sure all worker slices successfully loaded their share of the
121 objects for the visit FOV and builds an object index if so.
130 Matches difference sources from the detection pipeline against objects
131 within the visits FOV. Difference sources are expected to be found on the clipboard
132 under the key 'diaSources' and match results, consisting of (difference source id,
133 object id, match distance) tuples are placed onto the clipboard under the key
134 'diaSourceToObjectMatches'.
138 harness.Stage.Stage.__init__(self, stageId, policy)
141 assert self.inputQueue.size() == 1
142 assert self.outputQueue.size() == 0
143 clipboard = self.inputQueue.getNextDataset()
144 vpContext = clipboard.get(
'vpContext')
145 sources = clipboard.get(
'diaSources')
146 matches = ap.MatchPairVec()
148 vpContext.setDiaSources(sources)
149 ap.matchDiaSources(matches, vpContext)
152 self.outputQueue.addDataset(clipboard)
155 assert self.inputQueue.size() == 1
156 assert self.outputQueue.size() == 0
157 self.outputQueue.addDataset(self.inputQueue.getNextDataset())
165 class MatchMopsPredsStage(harness.Stage.Stage):
167 Matches moving object predictions for a visit against difference sources. The previous
168 stage is assumed to have read the difference sources and produced an index for them (stored
169 in 'vpContext' on the clipboard). Moving object predictions are expected to be found on the
170 clipboard under the key 'mopsPreds' and match results, consisting of (moving object id,
171 difference source id, match distance) tuples are placed onto the clipboard under the key
172 'predToDiaSourceMatches'. Finally, difference sources which didn't match anything are
173 used to create new objects - identifiers for these are placed onto the clipboard under the
174 key 'diaSourceToNewObject'.
177 harness.Stage.Stage.__init__(self, stageId, policy)
180 assert self.inputQueue.size() == 1
181 assert self.outputQueue.size() == 0
182 clipboard = self.inputQueue.getNextDataset()
183 vpContext = clipboard.get(
'vpContext')
186 ev1 = clipboard.get(
'triggerAssociationEvent')
187 ev2 = clipboard.get(
'triggerMatchMopsPredsEvent')
188 if ev1.getInt(
'visitId') != ev2.getInt(
'visitId'):
189 raise RuntimeError(
'triggerAssociationEvent.visitId != triggerMatchMopsPredsEvent.visitId')
190 preds = clipboard.get(
'mopsPreds')
191 matches = ap.MatchPairVec()
192 idPairs = ap.IdPairVec()
194 ap.endVisit(vpContext,
True)
197 ap.matchMops(matches, idPairs, vpContext, preds.getPredictions())
201 self.outputQueue.addDataset(clipboard)
204 assert self.inputQueue.size() == 1
205 assert self.outputQueue.size() == 0
206 self.outputQueue.addDataset(self.inputQueue.getNextDataset())
214 class StoreStage(harness.Stage.Stage):
216 Store new objects (created from unmatched difference sources under certain
217 conditions) obtained during the visit into chunk delta files.
219 def _runSql(self, sqlStatements, scriptFileName):
220 db = persistence.DbStorage()
221 db.setPersistLocation(self.
database)
222 with open(os.path.join(self.
scriptDir, scriptFileName),
'w')
as scriptFile:
223 for stmt
in sqlStatements:
225 startTime = time.clock()
226 scriptFile.write(stmt)
227 scriptFile.write(
';\n\n')
233 for line
in str(e).splitlines(
True):
234 scriptFile.write(
'-- ' + line)
239 scriptFile.write(
'-- statement executed in %f seconds\n\n' % (time.clock() - startTime))
243 Append contents of policy-specified object table to Object table in per-run database
246 db = persistence.DbStorage()
247 db.setPersistLocation(self.
database)
248 db.executeSql(
"INSERT INTO %s SELECT * FROM %s" %\
253 raise RuntimeError,
"Cannot create a lsst.ap.StoreStage without a policy"
254 harness.Stage.Stage.__init__(self, stageId, policy)
258 self.
database = persistence.LogicalLocation(policy.getString(
'database'))
259 self.
objectTable = policy.getString(
'objectTable')
if policy.exists(
'objectTable')
else None
263 self.
scriptDir = persistence.LogicalLocation(policy.getString(
'scriptDirectory')).locString()
266 self.
templateDict[
'diaSourceTable'] = policy.getString(
'diaSourceTable')
267 self.
templateDict[
'varObjectTable'] = policy.getString(
'varObjectTable')
268 self.
templateDict[
'nonVarObjectTable'] = policy.getString(
'nonVarObjectTable')
272 The master slice is in charge of database prep-work
274 harness.Stage.Stage.initialize(self, outQueue, inQueue)
275 if self.getRank() == -1:
283 Store chunk deltas assigned to the worker slice.
285 assert self.inputQueue.size() == 1
286 assert self.outputQueue.size() == 0
287 clipboard = self.inputQueue.getNextDataset()
288 self.outputQueue.addDataset(clipboard)
289 ap.storeSliceObjects(clipboard.get(
'vpContext'))
293 Check to make sure all worker slices successfully stored their share of the
294 new objects for the visit and removes the visit from the list of in-flight
295 visits being tracked by the shared memory chunk manager. Optionally:
296 - store pipeline outputs (new objects, object updates, new difference sources)
297 - append per-visit tables to global accumulator tables
298 - drop per-visit tables created by LSST pipelines
300 assert self.inputQueue.size() == 1
301 assert self.outputQueue.size() == 0
302 clipboard = self.inputQueue.getNextDataset()
303 self.outputQueue.addDataset(clipboard)
305 vpContext = clipboard.get(
'vpContext')
306 event = clipboard.get(
'triggerAssociationEvent')
307 do = base.DateTime(event.getDouble(
'dateObs'))
308 doUtc = datetime.utcfromtimestamp(do.nsecs(base.DateTime.UTC)/1000000000).isoformat(
' ')
309 dbType = re.match(
r'(\w+)://', self.database.locString()).group(1)
310 visitId = event.getInt(
'visitId')
319 self.
_runSql(ap.SqlStoreOutputs.sqlStatements[dbType],
'StoreOutputs_%d.sql' % visitId)
321 ap.endVisit(vpContext,
True)
323 if not ap.endVisit(vpContext,
False):
324 raise RuntimeError(
'Association pipeline failed: visit not committed')
326 self.
_runSql(ap.SqlAppendTables.sqlStatements[dbType],
'AppendTables_%d.sql' % visitId)
328 self.
_runSql(ap.SqlDropTables.sqlStatements[dbType],
'DropTables_%d.sql' % visitId)
Container for inter-stage association pipeline state.
A persistable wrapper for a MatchPairVector.
A persistable wrapper for an IdPairVector.