LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Pipeline.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008, 2009, 2010 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 
26 from lsst.pex.harness.Queue import Queue
27 from lsst.pex.harness.stage import StageProcessing
28 from lsst.pex.harness.stage import NoOpSerialProcessing
29 from lsst.pex.harness.Clipboard import Clipboard
30 from lsst.pex.harness.Directories import Directories
31 from lsst.pex.logging import Log, LogRec, cout, Prop
32 from lsst.pex.logging import BlockTimingLog
33 from lsst.pex.harness import harnessLib as logutils
34 
35 from lsst.pex.harness.SliceThread import SliceThread
36 from lsst.pex.harness.ShutdownThread import ShutdownThread
37 
38 import threading
39 
40 from threading import Thread
41 from threading import Event as PyEvent
42 
43 import lsst.pex.policy as policy
44 
46 from lsst.pex.exceptions import *
47 
48 import lsst.daf.base as dafBase
49 from lsst.daf.base import *
50 import lsst.daf.persistence as dafPersist
51 from lsst.daf.persistence import *
52 
53 
54 import lsst.ctrl.events as events
55 
56 import os, sys, re, traceback, time
57 
58 """
59 Pipeline class manages the operation of a multi-stage parallel pipeline.
60 The Pipeline is configured by reading a Policy file.
61 Pipeline has a __main__ portion as it serves as the main executable program
62 ('glue layer') for running a Pipeline. The Pipeline spawns Slice workers
63 for parallel computations.
64 """
65 
66 
67 class Pipeline(object):
68  '''Python Pipeline class implementation. Contains main pipeline workflow'''
69 
70  def __init__(self, runId='-1', pipelinePolicyName=None, name="unnamed", workerId=None):
71  """
72  Initialize the Pipeline: create empty Queue and Stage lists;
73  import the C++ Pipeline instance; initialize the MPI environment
74  """
75  # log message levels
76  self.TRACE = BlockTimingLog.INSTRUM+2
77  self.VERB1 = self.TRACE
78  self.VERB2 = self.VERB1 - 1
79  self.VERB3 = self.VERB2 - 1
80  self.log = None
81  self.logthresh = None
82  self._logdir = ""
83 
84  self._pipelineName = name
85 
86  self.queueList = []
87  self.stageList = []
88  self.stageClassList = []
89  self.stagePolicyList = []
90  self.eventTopicList = []
91  self.shareDataList = []
92  self.clipboardList = []
93  self.executionMode = 0
94  self._runId = runId
95  self.pipelinePolicyName = pipelinePolicyName
96  if workerId is not None:
97  self.workerId = workerId
98  else:
99  self.workerId = "-1"
100 
101  self.cppLogUtils = logutils.LogUtils()
102  self._stop = PyEvent()
103 
104  def setStop (self):
105  self._stop.set()
106 
107  def exit (self):
108 
109  if self.log is not None:
110  self.log.log(self.VERB1, 'Killing Pipeline process immediately: shutdown level 1')
111 
112  thisPid = os.getpid()
113  os.popen("kill -9 "+str(thisPid))
114 
115  sys.exit()
116 
117  def __del__(self):
118  """
119  Delete the Pipeline object: clean up
120  """
121  if self.log is not None:
122  self.log.log(self.VERB1, 'Python Pipeline being deleted')
123 
124  def initializeLogger(self):
125  """
126  Initialize the Logger after opening policy file
127  """
128  if(self.pipelinePolicyName == None):
129  self.pipelinePolicyName = "pipeline_policy.paf"
130  dictName = "pipeline_dict.paf"
131  topPolicy = policy.Policy.createPolicy(self.pipelinePolicyName)
132 
133  if (topPolicy.exists('execute')):
134  self.executePolicy = topPolicy.get('execute')
135  else:
136  self.executePolicy = policy.Policy.createPolicy(self.pipelinePolicyName)
137 
138  # Check for eventBrokerHost
139  if (self.executePolicy.exists('eventBrokerHost')):
140  self.eventBrokerHost = self.executePolicy.getString('eventBrokerHost')
141  else:
142  self.eventBrokerHost = "lsst8.ncsa.uiuc.edu" # default value
143  self.cppLogUtils.setEventBrokerHost(self.eventBrokerHost);
144 
145  doLogFile = self.executePolicy.getBool('localLogMode')
146  self.cppLogUtils.initializeLogger(doLogFile, self._pipelineName,
147  self._runId, self._logdir,
148  self.workerId,
149  BlockTimingLog.LINUXUDATA)
150 
151  # The log for use in the Python Pipeline
152  self.log = self.cppLogUtils.getLogger()
153 
154  if (self.executePolicy.exists('logThreshold')):
155  self.logthresh = self.executePolicy.get('logThreshold')
156  else:
157  if(self.logthresh == None):
158  self.logthresh = self.TRACE
159  self.setLogThreshold(self.logthresh)
160 
161  # self.log.addDestination(cout, Log.DEBUG);
162 
163  def configurePipeline(self):
164  """
165  Configure the Pipeline by reading a Policy File
166  """
167 
168  if (self.executePolicy.exists('nSlices')):
169  self.nSlices = self.executePolicy.getInt('nSlices')
170  else:
171  self.nSlices = 0 # default value
172  self.universeSize = self.nSlices + 1;
173 
174  if (self.executePolicy.exists('barrierDelay')):
175  self.barrierDelay = self.executePolicy.getDouble('barrierDelay')
176  else:
177  self.barrierDelay = 0.000001 # default value
178 
179  # do some juggling to capture the actual stage policy names. We'll
180  # use these to assign some logical names to the stages for logging
181  # purposes. Note, however, that it is only convention that the
182  # stage policies will be specified as separate files; thus, we need
183  # a fallback.
184  stgcfg = self.executePolicy.getArray("appStage")
185  self.stageNames = []
186  for subpol in stgcfg:
187  stageName = subpol.get("name")
188  self.stageNames.append(stageName)
189 
190  self.executePolicy.loadPolicyFiles()
191 
192 
193  # Obtain the working directory space locators
194  psLookup = lsst.daf.base.PropertySet()
195  if (self.executePolicy.exists('dir')):
196  dirPolicy = self.executePolicy.get('dir')
197  shortName = None
198  if (dirPolicy.exists('shortName')):
199  shortName = dirPolicy.get('shortName')
200  if shortName == None:
201  shortName = self.pipelinePolicyName.split('.')[0]
202  dirs = Directories(dirPolicy, shortName, self._runId)
203  psLookup = dirs.getDirs()
204  if (self.executePolicy.exists('database.url')):
205  psLookup.set('dbUrl', self.executePolicy.get('database.url'))
206 
207 
208  log = Log(self.log, "configurePipeline")
209  log.log(Log.INFO,
210  "Logging messages using threshold=%i" % log.getThreshold())
211  LogRec(log, self.VERB1) << "Configuring pipeline" \
212  << Prop("universeSize", self.universeSize) \
213  << Prop("runID", self._runId) \
214  << Prop("rank", -1) \
215  << Prop("workerId", self.workerId) \
216  << LogRec.endr;
217 
218 
219  # Configure persistence logical location map with values for directory
220  # work space locators
221  dafPersist.LogicalLocation.setLocationMap(psLookup)
222 
223  log.log(self.VERB2, "eventBrokerHost %s " % self.eventBrokerHost)
224  log.log(self.VERB2, "barrierDelay %s " % self.barrierDelay)
225 
226  # Check for eventTimeout
227  if (self.executePolicy.exists('eventTimeout')):
228  self.eventTimeout = self.executePolicy.getInt('eventTimeout')
229  else:
230  self.eventTimeout = 10000000 # default value is 10 000 000
231 
232  # Process Application Stages
233  fullStageList = self.executePolicy.getArray("appStage")
234  self.nStages = len(fullStageList)
235  log.log(self.VERB2, "Found %d stages" % len(fullStageList))
236 
237  fullStageNameList = [ ]
238  self.stagePolicyList = [ ]
239  for stagei in xrange(self.nStages):
240  fullStagePolicy = fullStageList[stagei]
241  if (fullStagePolicy.exists('serialClass')):
242  serialName = fullStagePolicy.getString('serialClass')
243  stagePolicy = fullStagePolicy.get('stagePolicy')
244  else:
245  serialName = "lsst.pex.harness.stage.NoOpSerialProcessing"
246  stagePolicy = None
247 
248  fullStageNameList.append(serialName)
249  self.stagePolicyList.append(stagePolicy)
250 
251  if self.stageNames[stagei] is None:
252  self.stageNames[stagei] = fullStageNameList[-1].split('.')[-1]
253  log.log(self.VERB3,
254  "Stage %d: %s: %s" % (stagei+1, self.stageNames[stagei],
255  fullStageNameList[-1]))
256  for astage in fullStageNameList:
257  fullStage = astage.strip()
258  tokenList = astage.split('.')
259  classString = tokenList.pop()
260  classString = classString.strip()
261 
262  package = ".".join(tokenList)
263 
264  # For example package -> lsst.pex.harness.App1Stage classString -> App1Stage
265  AppStage = __import__(package, globals(), locals(), [classString], -1)
266  StageClass = getattr(AppStage, classString)
267  self.stageClassList.append(StageClass)
268 
269  #
270  # Configure the Failure Stage
271  # - Read the policy information
272  # - Import failure stage Class and make failure stage instance Object
273  #
274  self.failureStageName = None
275  self.failSerialName = None
276  if (self.executePolicy.exists('failureStage')):
277  failstg = self.executePolicy.get("failureStage")
278  self.failureStageName = failstg.get("name")
279 
280  if (failstg.exists('serialClass')):
281  self.failSerialName = failstg.getString('serialClass')
282  failStagePolicy = failstg.get('stagePolicy')
283  else:
284  self.failSerialName = "lsst.pex.harness.stage.NoOpSerialProcessing"
285  failStagePolicy = None
286 
287  astage = self.failSerialName
288  tokenList = astage.split('.')
289  failClassString = tokenList.pop()
290  failClassString = failClassString.strip()
291 
292  package = ".".join(tokenList)
293 
294  # For example package -> lsst.pex.harness.App1Stage classString -> App1Stage
295  FailAppStage = __import__(package, globals(), locals(), [failClassString], -1)
296  FailStageClass = getattr(FailAppStage, failClassString)
297 
298  sysdata = {}
299  sysdata["name"] = self.failureStageName
300  sysdata["rank"] = -1
301  sysdata["stageId"] = -1
302  sysdata["universeSize"] = self.universeSize
303  sysdata["runId"] = self._runId
304  if (failStagePolicy != None):
305  self.failStageObject = FailStageClass(failStagePolicy, self.log, self.eventBrokerHost, sysdata)
306  else:
307  self.failStageObject = FailStageClass(None, self.log, self.eventBrokerHost, sysdata)
308 
309  log.log(self.VERB2, "failureStage %s " % self.failureStageName)
310  log.log(self.VERB2, "failSerialName %s " % self.failSerialName)
311 
312  # Process Event Topics
313  self.eventTopicList = []
314  for item in fullStageList:
315  self.eventTopicList.append(item.getString("eventTopic"))
316 
317  # Process Share Data Schedule
318  self.shareDataList = []
319  for item in fullStageList:
320  shareDataStage = False
321  if (item.exists('shareData')):
322  shareDataStage = item.getBool('shareData')
323  self.shareDataList.append(shareDataStage)
324 
325  log.log(self.VERB3, "Loading in %d trigger topics" % \
326  len(filter(lambda x: x != "None", self.eventTopicList)))
327 
328  log.log(self.VERB3, "Loading in %d shareData flags" % \
329  len(filter(lambda x: x != "None", self.shareDataList)))
330 
331  for iStage in xrange(len(self.eventTopicList)):
332  item = self.eventTopicList[iStage]
333  if self.eventTopicList[iStage] != "None":
334  log.log(self.VERB3, "eventTopic%d: %s" % (iStage+1, item))
335  else:
336  log.log(Log.DEBUG, "eventTopic%d: %s" % (iStage+1, item))
337 
338 
339  eventsSystem = events.EventSystem.getDefaultEventSystem()
340 
341  for topic in self.eventTopicList:
342  if (topic == "None"):
343  pass
344  else:
345  log.log(self.VERB2, "Creating receiver for topic %s" % (topic))
346  eventsSystem.createReceiver(self.eventBrokerHost, topic)
347 
348  # Check for executionMode of oneloop
349  if (self.executePolicy.exists('executionMode') and (self.executePolicy.getString('executionMode') == "oneloop")):
350  self.executionMode = 1
351 
352  # Check for shutdownTopic
353  if (self.executePolicy.exists('shutdownTopic')):
354  self.shutdownTopic = self.executePolicy.getString('shutdownTopic')
355  else:
356  self.shutdownTopic = "triggerShutdownEvent"
357 
358  # Check for exitTopic
359  if (self.executePolicy.exists('exitTopic')):
360  self.exitTopic = self.executePolicy.getString('exitTopic')
361  else:
362  self.exitTopic = None
363 
364  log.log(self.VERB1, "Pipeline configuration complete");
365 
366  def initializeQueues(self):
367  """
368  Initialize the Queue List for the Pipeline
369  """
370  for iQueue in range(1, self.nStages+1+1):
371  queue = Queue()
372  self.queueList.append(queue)
373 
374  def initializeStages(self):
375  """
376  Initialize the Stage List for the Pipeline
377  """
378  log = self.log.timeBlock("initializeStages", self.TRACE-2)
379 
380  for iStage in range(1, self.nStages+1):
381  # Make a Policy object for the Stage Policy file
382  stagePolicy = self.stagePolicyList[iStage-1]
383  # Make an instance of the specifies Application Stage
384  # Use a constructor with the Policy as an argument
385  StageClass = self.stageClassList[iStage-1]
386  sysdata = {}
387  sysdata["name"] = self.stageNames[iStage-1]
388  sysdata["rank"] = -1
389  sysdata["stageId"] = iStage
390  sysdata["universeSize"] = self.universeSize
391  sysdata["runId"] = self._runId
392  if (stagePolicy != "None"):
393  stageObject = StageClass(stagePolicy, self.log, self.eventBrokerHost, sysdata)
394  # (self, policy=None, log=None, eventBroker=None, sysdata=None, callSetup=True):
395  else:
396  stageObject = StageClass(None, self.log, self.eventBrokerHost, sysdata)
397  inputQueue = self.queueList[iStage-1]
398  outputQueue = self.queueList[iStage]
399 
400  stageObject.initialize(outputQueue, inputQueue)
401  self.stageList.append(stageObject)
402 
403  log.done()
404 
406 
408  self.oneShutdownThread.setDaemon(True)
409  self.oneShutdownThread.start()
410 
411  def startSlices(self):
412  """
413  Initialize the Queue by defining an initial dataset list
414  """
415  log = self.log.timeBlock("startSlices", self.TRACE-2)
416 
417  log.log(self.VERB3, "Number of slices " + str(self.nSlices));
418 
419  self.loopEventList = []
420  for i in range(self.nSlices):
421  loopEventA = PyEvent()
422  self.loopEventList.append(loopEventA)
423  loopEventB = PyEvent()
424  self.loopEventList.append(loopEventB)
425 
426  self.sliceThreadList = []
427 
428  for i in range(self.nSlices):
429  k = 2*i
430  loopEventA = self.loopEventList[k]
431  loopEventB = self.loopEventList[k+1]
432  oneSliceThread = SliceThread(i, self._pipelineName, self.pipelinePolicyName, \
433  self._runId, self.logthresh, self.universeSize, loopEventA, loopEventB, self._logdir, self.workerId)
434  self.sliceThreadList.append(oneSliceThread)
435 
436  for slicei in self.sliceThreadList:
437  log.log(self.VERB3, "Starting slice");
438  slicei.setDaemon(True)
439  slicei.start()
440  if slicei.isAlive():
441  log.log(self.VERB3, "slicei is Alive");
442  else:
443  log.log(self.VERB3, "slicei is not Alive");
444 
445  log.done()
446 
447 
448  def startInitQueue(self):
449  """
450  Place an empty Clipboard in the first Queue
451  """
452  clipboard = Clipboard()
453 
454  #print "Python Pipeline Clipboard check \n"
455  #acount=0
456  #for clip in self.clipboardList:
457  # acount+=1
458  # print acount
459  # print str(clip)
460 
461  queue1 = self.queueList[0]
462  queue1.addDataset(clipboard)
463 
464  def startStagesLoop(self):
465  """
466  Method to execute loop over Stages
467  """
468  startStagesLoopLog = self.log.timeBlock("startStagesLoop", self.TRACE)
469  looplog = BlockTimingLog(self.log, "visit", self.TRACE)
470  stagelog = BlockTimingLog(looplog, "stage", self.TRACE-1)
471  proclog = BlockTimingLog(stagelog, "process", self.TRACE)
472 
473  visitcount = 0
474 
475  self.threadBarrier(0)
476 
477  while True:
478 
479  if (((self.executionMode == 1) and (visitcount == 1))):
480  LogRec(looplog, Log.INFO) << "terminating pipeline after one loop/visit "
481  #
482  # Need to shutdown Threads here
483  #
484  break
485  else:
486  visitcount += 1
487  looplog.setPreamblePropertyInt("LOOPNUM", visitcount)
488  looplog.start()
489  stagelog.setPreamblePropertyInt("LOOPNUM", visitcount)
490  proclog.setPreamblePropertyInt("LOOPNUM", visitcount)
491 
492  # self.cppPipeline.invokeContinue()
493 
494  self.startInitQueue() # place an empty clipboard in the first Queue
495 
496  self.errorFlagged = 0
497  for iStage in range(1, self.nStages+1):
498  stagelog.setPreamblePropertyInt("STAGEID", iStage)
499  stagelog.start(self.stageNames[iStage-1] + " loop")
500  proclog.setPreamblePropertyInt("STAGEID", iStage)
501 
502  stage = self.stageList[iStage-1]
503 
504  self.handleEvents(iStage, stagelog)
505 
506  # synchronize before preprocess
507  self.threadBarrier(iStage)
508 
509  self.tryPreProcess(iStage, stage, stagelog)
510 
511  # synchronize after preprocess, before process
512  self.threadBarrier(iStage)
513 
514  # synchronize after process, before postprocess
515  self.threadBarrier(iStage)
516 
517  self.tryPostProcess(iStage, stage, stagelog)
518 
519  # synchronize after postprocess
520  self.threadBarrier(iStage)
521 
522  stagelog.done()
523 
524  self.checkExitByStage()
525 
526  else:
527  looplog.log(self.VERB2, "Completed Stage Loop")
528 
529  self.checkExitByVisit()
530 
531  # Uncomment to print a list of Citizens after each visit
532  # print datap.Citizen_census(0,0), "Objects:"
533  # print datap.Citizen_census(datap.cout,0)
534 
535  looplog.log(Log.DEBUG, 'Retrieving finalClipboard for deletion')
536  finalQueue = self.queueList[self.nStages]
537  finalClipboard = finalQueue.getNextDataset()
538  looplog.log(Log.DEBUG, "deleting final clipboard")
539  looplog.done()
540  # delete entries on the clipboard
541  finalClipboard.close()
542  del finalClipboard
543 
544  startStagesLoopLog.log(Log.INFO, "Shutting down pipeline");
545  self.shutdown()
546  startStagesLoopLog.done()
547 
548 
550  return self.sliceThreadList
551 
552  def setExitLevel(self, level):
553  self.exitLevel = level
554 
556  log = Log(self.log, "checkExitBySyncPoint")
557 
558  if((self._stop.isSet()) and (self.exitLevel == 2)):
559  log.log(Log.INFO, "Pipeline stop is set at exitLevel of 2")
560  log.log(Log.INFO, "Exit here at a Synchronization point")
561  sys.exit()
562 
563  def checkExitByStage(self):
564  log = Log(self.log, "checkExitByStage")
565 
566  if((self._stop.isSet()) and (self.exitLevel == 3)):
567  log.log(Log.INFO, "Pipeline stop is set at exitLevel of 3")
568  log.log(Log.INFO, "Exit here at the end of the Stage")
569  sys.exit()
570 
571  def checkExitByVisit(self):
572  log = Log(self.log, "checkExitByVisit")
573 
574  if((self._stop.isSet()) and (self.exitLevel == 4)):
575  log.log(Log.INFO, "Pipeline stop is set at exitLevel of 4")
576  log.log(Log.INFO, "Exit here at the end of the Visit")
577  sys.exit()
578 
579  def threadBarrier(self, iStage):
580  """
581  Create an approximate barrier where all Slices intercommunicate with the Pipeline
582  """
583 
584  log = Log(self.log, "threadBarrier")
585 
586  self.checkExitBySyncPoint()
587 
588  # if((self._stop.isSet()) and (self.exitLevel == 2)):
589 
590  # log.log(Log.INFO, "Pipeline stop is set at exitLevel of 2; exit here at a synchronization point")
591  # print "Pipeline stop is set at exitLevel of 2; exit here at a synchronization point"
592  # os._exit()
593  # sys.exit()
594  # log.log(Log.INFO, "Pipeline Ever reach here ?? ")
595 
596  entryTime = time.time()
597  log.log(Log.DEBUG, "Entry time %f" % (entryTime))
598 
599 
600  for i in range(self.nSlices):
601  k = 2*i
602  loopEventA = self.loopEventList[k]
603  loopEventB = self.loopEventList[k+1]
604 
605  signalTime1 = time.time()
606  log.log(Log.DEBUG, "Signal to Slice %d %f" % (i, signalTime1))
607 
608  loopEventA.set()
609 
610  log.log(Log.DEBUG, "Wait for signal from Slice %d" % (i))
611 
612  # Wait for the B event to be set by the Slice
613  # Excute time sleep in between checks to free the GIL periodically
614  useDelay = self.barrierDelay
615 
616  if(iStage == 1):
617  useDelay = 0.1
618  if(iStage == 290):
619  useDelay = 0.1
620 
621  while( not (loopEventB.isSet())):
622  time.sleep(useDelay)
623 
624  signalTime2 = time.time()
625  log.log(Log.DEBUG, "Done waiting for signal from Slice %d %f" % (i, signalTime2))
626 
627  if(loopEventB.isSet()):
628  loopEventB.clear()
629 
630  self.checkExitBySyncPoint()
631 
632  def forceShutdown(self):
633  """
634  Shutdown the Pipeline execution: delete the MPI environment
635  Send the Exit Event if required
636  """
637 
638  self.log.log(self.VERB2, 'Pipeline forceShutdown : Stopping Slices ')
639 
640  for i in range(self.nSlices):
641  slice = self.sliceThreadList[i]
642  slice.stop()
643  self.log.log(self.VERB2, 'Slice ' + str(i) + ' stopped.')
644 
645  for i in range(self.nSlices):
646  slice = self.sliceThreadList[i]
647  slice.join()
648  self.log.log(self.VERB2, 'Slice ' + str(i) + ' exited.')
649 
650  # Also have to tell the shutdown Thread to stop
651  # self.log.log(self.VERB2, 'Telling Shutdown thread to exit')
652  # self.oneShutdownThread.exit()
653  # self.log.log(self.VERB2, 'Shutdown thread has exited')
654  # self.log.log(self.VERB2, 'Main thread exiting ')
655  # sys.exit()
656 
657  def shutdown(self):
658  """
659  Shutdown the Pipeline execution: delete the MPI environment
660  Send the Exit Event if required
661  """
662  if self.exitTopic == None:
663  pass
664  else:
665  oneEventTransmitter = events.EventTransmitter(self.eventBrokerHost, self.exitTopic)
666  psPtr = dafBase.PropertySet()
667  psPtr.setString("message", str("exiting_") + self._runId )
668 
669  oneEventTransmitter.publish(psPtr)
670 
671  for i in range(self.nSlices):
672  slice = self.sliceThreadList[i]
673  slice.join()
674  self.log.log(self.VERB2, 'Slice ' + str(i) + ' ended.')
675 
676  # Also have to tell the shutdown Thread to stop
677  self.oneShutdownThread.setStop()
678  self.oneShutdownThread.join()
679  self.log.log(self.VERB2, 'Shutdown thread ended ')
680 
681  sys.exit()
682 
683 
684  def invokeSyncSlices(self, iStage, stagelog):
685  """
686  THIS GOES AWAY in non MPI harness
687  If needed, calls the C++ Pipeline invokeSyncSlices
688  """
689  invlog = stagelog.timeBlock("invokeSyncSlices", self.TRACE-1)
690  if(self.shareDataList[iStage-1]):
691  invlog.log(self.VERB3, "Calling invokeSyncSlices")
692  # self.cppPipeline.invokeSyncSlices()
693  # invokeSyncSlices
694  invlog.done()
695 
696  def tryPreProcess(self, iStage, stage, stagelog):
697  """
698  Executes the try/except construct for Stage preprocess() call
699  """
700  prelog = stagelog.timeBlock("tryPreProcess", self.TRACE-2);
701 
702  # Important try - except construct around stage preprocess()
703  try:
704  # If no error/exception has been flagged, run preprocess()
705  # otherwise, simply pass along the Clipboard
706  if (self.errorFlagged == 0):
707  processlog = stagelog.timeBlock("preprocess", self.TRACE)
708  self.interQueue = stage.applyPreprocess()
709  processlog.done()
710  else:
711  prelog.log(self.TRACE, "Skipping process due to error")
712  self.transferClipboard(iStage)
713 
714  except:
715  trace = "".join(traceback.format_exception(
716  sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
717  prelog.log(Log.FATAL, trace)
718 
719  # Flag that an exception occurred to guide the framework to skip processing
720  prelog.log(self.VERB2, "Flagging error in tryPreProcess, tryPostProcess to be skipped")
721  self.errorFlagged = 1
722 
723  if(self.failureStageName != None):
724  if(self.failSerialName != "lsst.pex.harness.stage.NoOpSerialProcessing"):
725 
726  LogRec(prelog, self.VERB2) << "failureStageName exists " \
727  << self.failureStageName \
728  << "and failSerialName exists " \
729  << self.failSerialName \
730  << LogRec.endr;
731 
732  inputQueue = self.queueList[iStage-1]
733  outputQueue = self.queueList[iStage]
734 
735  clipboard = inputQueue.element()
736  clipboard.put("failedInStage", stage.getName())
737  clipboard.put("failedInStageN", iStage)
738  clipboard.put("failureType", str(sys.exc_info()[0]))
739  clipboard.put("failureMessage", str(sys.exc_info()[1]))
740  clipboard.put("failureTraceback", trace)
741 
742  self.failStageObject.initialize(outputQueue, inputQueue)
743 
744  self.interQueue = self.failStageObject.applyPreprocess()
745 
746  else:
747  prelog.log(self.VERB2, "No SerialProcessing to do for failure stage")
748 
749  # Post the cliphoard that the Stage failed to transfer to the output queue
750  self.postOutputClipboard(iStage)
751 
752  prelog.done()
753  # Done try - except around stage preprocess
754 
755  def tryPostProcess(self, iStage, stage, stagelog):
756  """
757  Executes the try/except construct for Stage postprocess() call
758  """
759  postlog = stagelog.timeBlock("tryPostProcess",self.TRACE-2);
760 
761  # Important try - except construct around stage postprocess()
762  try:
763  # If no error/exception has been flagged, run postprocess()
764  # otherwise, simply pass along the Clipboard
765  if (self.errorFlagged == 0):
766  processlog = stagelog.timeBlock("postprocess", self.TRACE)
767  stage.applyPostprocess(self.interQueue)
768  processlog.done()
769  else:
770  postlog.log(self.TRACE, "Skipping applyPostprocess due to flagged error")
771  self.transferClipboard(iStage)
772 
773  except:
774  trace = "".join(traceback.format_exception(
775  sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
776  postlog.log(Log.FATAL, trace)
777 
778  # Flag that an exception occurred to guide the framework to skip processing
779  self.errorFlagged = 1
780 
781  if(self.failureStageName != None):
782  if(self.failSerialName != "lsst.pex.harness.stage.NoOpSerialProcessing"):
783 
784  LogRec(postlog, self.VERB2) << "failureStageName exists " \
785  << self.failureStageName \
786  << "and failSerialName exists " \
787  << self.failSerialName \
788  << LogRec.endr;
789 
790  inputQueue = self.queueList[iStage-1]
791  outputQueue = self.queueList[iStage]
792 
793  self.failStageObject.initialize(outputQueue, inputQueue)
794 
795  self.failStageObject.applyPostprocess(self.interQueue)
796 
797  else:
798  postlog.log(self.VERB2, "No SerialProcessing to do for failure stage")
799 
800  # Post the cliphoard that the Stage failed to transfer to the output queue
801  self.postOutputClipboard(iStage)
802 
803  # Done try - except around stage preprocess
804  postlog.done()
805 
806  def waitForEvent(self, thisTopic):
807  """
808  wait for a single event of a designated topic
809  """
810 
811  eventsSystem = events.EventSystem.getDefaultEventSystem()
812 
813  sleepTimeout = 0.1
814  transTimeout = 900
815 
816  inputParamPropertySetPtr = None
817  waitLoopCount = 0
818  while((inputParamPropertySetPtr == None) and (waitLoopCount < self.eventTimeout)):
819  if(self.logthresh == self.VERB3):
820  print "Pipeline waitForEvent Looping : checking for event ... \n"
821 
822  inputParamPropertySetPtr = eventsSystem.receive(thisTopic, transTimeout)
823 
824  time.sleep(sleepTimeout)
825 
826  if((self._stop.isSet())):
827  break
828 
829  waitLoopCount = waitLoopCount+1
830 
831  return inputParamPropertySetPtr
832 
833 
834  def handleEvents(self, iStage, stagelog):
835  """
836  Handles Events: transmit or receive events as specified by Policy
837  """
838  log = stagelog.timeBlock("handleEvents", self.TRACE-2)
839  eventsSystem = events.EventSystem.getDefaultEventSystem()
840 
841  thisTopic = self.eventTopicList[iStage-1]
842  thisTopic = thisTopic.strip()
843 
844 
845  if (thisTopic != "None"):
846  log.log(self.VERB3, "Processing topic: " + thisTopic)
847  sliceTopic = "%s_%s" % (thisTopic, self._pipelineName)
848 
849  waitlog = log.timeBlock("eventwait " + thisTopic, self.TRACE,
850  "wait for event...")
851 
852  inputParamPropertySetPtr = self.waitForEvent(thisTopic)
853 
854  # inputParamPropertySetPtr = eventsSystem.receive(thisTopic, self.eventTimeout)
855 
856  waitlog.done()
857 
858  if (inputParamPropertySetPtr != None):
859  LogRec(log, self.TRACE) << "received event; contents: " \
860  << inputParamPropertySetPtr \
861  << LogRec.endr;
862 
863  log.log(self.VERB2, "received event; sending it to Slices " + sliceTopic)
864 
865  # Pipeline does not disassemble the payload of the event.
866  # It places the payload on the clipboard with key of the eventTopic
867  self.populateClipboard(inputParamPropertySetPtr, iStage, thisTopic)
868 
869  eventsSystem.createTransmitter(self.eventBrokerHost, sliceTopic)
870  eventsSystem.publish(sliceTopic, inputParamPropertySetPtr)
871 
872  log.log(self.VERB2, "event sent to Slices")
873  else:
874  if((self._stop.isSet())):
875  # Shutdown event received while waiting for data event
876  log.log(self.VERB2, "Pipeline Shutting down : Shutdown event received.")
877  else:
878  # event was not received after a long timeout
879  # log.log(self.VERB2, "Pipeline Shutting Down: Event timeout self.: No event arrived")
880 
881  LogRec(log, self.VERB2) << "Pipeline Shutting Down: Event timeout " \
882  << str(self.eventTimeout) \
883  << "reached: No or next event did not arrive " \
884  << LogRec.endr;
885 
886 
887  sys.exit()
888 
889  else: # This stage has no event dependence
890  log.log(Log.DEBUG, 'No event to handle')
891 
892  log.done()
893 
894  def populateClipboard(self, inputParamPropertySetPtr, iStage, eventTopic):
895  """
896  Place the event payload onto the Clipboard
897  """
898  log = self.log.timeBlock("populateClipboard", self.TRACE-2);
899 
900  queue = self.queueList[iStage-1]
901  clipboard = queue.element()
902 
903  # Pipeline does not disassemble the payload of the event.
904  # It knows nothing of the contents.
905  # It simply places the payload on the clipboard with key of the eventTopic
906  clipboard.put(eventTopic, inputParamPropertySetPtr)
907 
908  log.done()
909 
910  def postOutputClipboard(self, iStage):
911  """
912  Place an empty Clipboard in the output queue for designated stage
913  """
914  clipboard = Clipboard()
915  queue2 = self.queueList[iStage]
916  queue2.addDataset(clipboard)
917 
918  def transferClipboard(self, iStage):
919  """
920  Move the Clipboard from the input queue to output queue for the designated stage
921  """
922  # clipboard = Clipboard()
923  queue1 = self.queueList[iStage-1]
924  queue2 = self.queueList[iStage]
925  clipboard = queue1.getNextDataset()
926  if (clipboard != None):
927  queue2.addDataset(clipboard)
928 
929  def getRun(self):
930  """
931  get method for the runId
932  """
933  return self._runId
934 
935  def setRun(self, run):
936  """
937  set method for the runId
938  """
939  self._runId = run
940 
941  def getShutdownTopic(self):
942  """
943  get method for the shutdown event topic
944  """
945  return self.shutdownTopic
946 
948  """
949  get method for the event broker host
950  """
951  return self.eventBrokerHost
952 
953 
954  def getLogThreshold(self):
955  """
956  return the default message importance threshold being used for
957  recording messages. The returned value reflects the threshold
958  associated with the default root (system-wide) logger (or what it will
959  be after logging is initialized). Some underlying components may
960  override this threshold.
961  """
962  if self.log is None:
963  return self.logthresh
964  else:
965  return Log.getDefaultLog().getThreshold()
966 
967  def setLogThreshold(self, level):
968  """
969  set the default message importance threshold to be used for
970  recording messages. This will value be applied to the default
971  root (system-wide) logger (or what it will be after logging is
972  initialized) so that all software components are affected.
973  """
974  if self.log is not None:
975  Log.getDefaultLog().setThreshold(level)
976  self.log.log(Log.INFO,
977  "Upating Root Log Message Threshold to %i" % level)
978  self.logthresh = level
979 
980  def setLogDir(self, logdir):
981  """
982  set the default directory into which the pipeline should write log files
983  @param logdir the directory in which log files should be written
984  """
985  if (logdir == "None" or logdir == None):
986  self._logdir = ""
987  else:
988  self._logdir = logdir
989 
990  def setEventTimeout(self, timeout):
991  """
992  set the default message importance threshold to be used for
993  recording messages. This will value be applied to the default
994  root (system-wide) logger (or what it will be after logging is
995  initialized) so that all software components are affected.
996  """
997  if self.log is not None:
998  self.log.log(Log.INFO,
999  "Updating event timeout to %i" % timeout)
1000  self.eventTimeout = timeout
1001 
1002  def makeStageName(self, appStagePolicy):
1003  if appStagePolicy.getValueType("stagePolicy") == appStagePolicy.FILE:
1004  pfile = os.path.splitext(os.path.basename(
1005  appStagePolicy.getFile("stagePolicy").getPath()))[0]
1006  return trailingpolicy.sub('', pfile)
1007  else:
1008  return None
1009 
1010 trailingpolicy = re.compile(r'_*(policy|dict)$', re.IGNORECASE)
1011 
1012 
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154
A LogRecord attached to a particular Log that suppports stream stream semantics.
Definition: Log.h:729
a determination of the various directory roots that can be used by a pipeline.
Definition: Directories.py:73
Class for storing generic metadata.
Definition: PropertySet.h:82