LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Slice.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008, 2009, 2010 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 from __future__ import with_statement
26 
27 from lsst.pex.harness.Queue import Queue
28 from lsst.pex.harness.stage import StageProcessing
29 from lsst.pex.harness.stage import NoOpParallelProcessing
30 from lsst.pex.harness.Clipboard import Clipboard
31 from lsst.pex.harness.Directories import Directories
32 from lsst.pex.logging import Log, LogRec, Prop
33 from lsst.pex.logging import BlockTimingLog
34 from lsst.pex.harness import harnessLib as logutils
35 
36 import lsst.pex.policy as policy
37 import lsst.pex.exceptions as ex
38 
39 import lsst.daf.base as dafBase
40 from lsst.daf.base import *
41 import lsst.daf.persistence as dafPersist
42 from lsst.daf.persistence import *
43 
44 
45 import lsst.ctrl.events as events
47 from lsst.pex.exceptions import *
48 
49 import os, sys, signal, re, traceback, time, datetime
50 import threading
51 from threading import Event as PyEvent
52 
53 
54 """
55 Slice represents a single parallel worker program.
56 Slice executes the loop of Stages for processing a portion of an Image (e.g.,
57 single ccd or amplifier). The processing is synchonized with serial processing
58 in the main Pipeline.
59 A Slice obtains its configuration by reading a policy file.
60 """
61 
62 class Slice(object):
63  '''Slice: Python Slice class implementation. '''
64 
65  #------------------------------------------------------------------------
66  def __init__(self, runId="TEST", pipelinePolicyName=None, name="unnamed", rank=-1, workerId=None):
67  """
68  Initialize the Slice: create an empty Queue List and Stage List;
69  Import the C++ Slice and initialize the MPI environment
70  """
71  # log message levels
72  self.TRACE = BlockTimingLog.INSTRUM+2
73  self.VERB1 = self.TRACE
74  self.VERB2 = self.VERB1 - 1
75  self.VERB3 = self.VERB2 - 1
76  self.log = None
77  self.logthresh = None
78  self._logdir = ""
79 
80  self._pipelineName = name
81 
82  self.queueList = []
83  self.stageList = []
84  self.stageClassList = []
85  self.stagePolicyList = []
87  self.eventTopicList = []
88  self.shareDataList = []
89  self.shutdownTopic = "triggerShutdownEvent_slice"
90  self.executionMode = 0
91  self._runId = runId
92  self.pipelinePolicyName = pipelinePolicyName
93 
94  self.cppLogUtils = logutils.LogUtils()
95  self._rank = int(rank)
96 
97  if workerId is not None:
98  self.workerId = workerId
99  else:
100  self.workerId = -1
101 
102 
103 
104  def __del__(self):
105  """
106  Delete the Slice object: cleanup
107  """
108  if self.log is not None:
109  self.log.log(self.VERB1, 'Python Slice being deleted')
110 
111  def initializeLogger(self):
112  """
113  Initialize the Logger after opening policy file
114  """
115  if(self.pipelinePolicyName == None):
116  self.pipelinePolicyName = "pipeline_policy.paf"
117  dictName = "pipeline_dict.paf"
118  topPolicy = policy.Policy.createPolicy(self.pipelinePolicyName)
119 
120  if (topPolicy.exists('execute')):
121  self.executePolicy = topPolicy.get('execute')
122  else:
123  self.executePolicy = policy.Policy.createPolicy(self.pipelinePolicyName)
124 
125  # Check for eventBrokerHost
126  if (self.executePolicy.exists('eventBrokerHost')):
127  self.eventBrokerHost = self.executePolicy.getString('eventBrokerHost')
128  else:
129  self.eventBrokerHost = "lsst8.ncsa.uiuc.edu" # default value
130  self.cppLogUtils.setEventBrokerHost(self.eventBrokerHost)
131 
132  doLogFile = self.executePolicy.getBool('localLogMode')
133  self.cppLogUtils.initializeSliceLogger(doLogFile, self._pipelineName,
134  self._runId, self._logdir,
135  self._rank, self.workerId,
136  BlockTimingLog.LINUXUDATA)
137 
138  # The log for use in the Python Slice
139  self.log = self.cppLogUtils.getLogger()
140 
141  if (self.executePolicy.exists('logThreshold')):
142  self.logthresh = self.executePolicy.get('logThreshold')
143  else:
144  if(self.logthresh == None):
145  self.logthresh = self.TRACE
146  self.setLogThreshold(self.logthresh)
147 
148  # self.log.addDestination(cout, Log.DEBUG);
149 
150 
151  def configureSlice(self):
152  """
153  Configure the slice via reading a Policy file
154  """
155 
156  log = Log(self.log, "configureSlice")
157 
158  conflog = BlockTimingLog(self.log, "configureSlice", self.TRACE)
159  conflog.start()
160 
161  stgcfg = self.executePolicy.getArray("appStage")
162 
163  self.stageNames = []
164  for subpol in stgcfg:
165  stageName = subpol.get("name")
166  self.stageNames.append(stageName)
167 
168  self.executePolicy.loadPolicyFiles()
169 
170  # Obtain the working directory space locators
171  psLookup = lsst.daf.base.PropertySet()
172  if (self.executePolicy.exists('dir')):
173  dirPolicy = self.executePolicy.get('dir')
174  shortName = None
175  if (dirPolicy.exists('shortName')):
176  shortName = dirPolicy.get('shortName')
177  if shortName == None:
178  shortName = self.pipelinePolicyName.split('.')[0]
179  dirs = Directories(dirPolicy, shortName, self._runId)
180  psLookup = dirs.getDirs()
181 
182  if (self.executePolicy.exists('database.url')):
183  psLookup.set('dbUrl', self.executePolicy.get('database.url'))
184 
185  LogRec(log, self.VERB1) << "Configuring Slice" \
186  << Prop("universeSize", self.universeSize) \
187  << Prop("runID", self._runId) \
188  << Prop("rank", self._rank) \
189  << LogRec.endr;
190 
191  # Configure persistence logical location map with values for directory
192  # work space locators
193  dafPersist.LogicalLocation.setLocationMap(psLookup)
194 
195  # Check for eventTimeout
196  if (self.executePolicy.exists('eventTimeout')):
197  self.eventTimeout = self.executePolicy.getInt('eventTimeout')
198  else:
199  self.eventTimeout = 10000000 # default value
200 
201  # Process Application Stages
202  fullStageList = self.executePolicy.getArray("appStage")
203  self.nStages = len(fullStageList)
204  log.log(self.VERB2, "Found %d stages" % len(fullStageList))
205 
206  # extract the stage class name and associated policy file.
207  fullStageNameList = [ ]
208  self.stagePolicyList = [ ]
209  for stagei in xrange(self.nStages):
210  fullStagePolicy = fullStageList[stagei]
211  if (fullStagePolicy.exists('parallelClass')):
212  parallelName = fullStagePolicy.getString('parallelClass')
213  stagePolicy = fullStagePolicy.get('stagePolicy')
214  else:
215  parallelName = "lsst.pex.harness.stage.NoOpParallelProcessing"
216  stagePolicy = None
217 
218  fullStageNameList.append(parallelName)
219  self.stagePolicyList.append(stagePolicy)
220 
221  if self.stageNames[stagei] is None:
222  self.stageNames[stagei] = fullStageNameList[-1].split('.')[-1]
223  log.log(self.VERB3,
224  "Stage %d: %s: %s" % (stagei+1, self.stageNames[stagei],
225  fullStageNameList[-1]))
226 
227  for astage in fullStageNameList:
228  fullStage = astage.strip()
229  tokenList = astage.split('.')
230  classString = tokenList.pop()
231  classString = classString.strip()
232 
233  package = ".".join(tokenList)
234 
235  # For example package -> lsst.pex.harness.App1Stage classString -> App1Stage
236  AppStage = __import__(package, globals(), locals(), [classString], -1)
237  StageClass = getattr(AppStage, classString)
238  self.stageClassList.append(StageClass)
239 
240  log.log(self.VERB2, "Imported Stage Classes")
241 
242  #
243  # Configure the Failure Stage
244  # - Read the policy information
245  # - Import failure stage Class and make failure stage instance Object
246  #
247  self.failureStageName = None
248  self.failParallelName = None
249  if (self.executePolicy.exists('failureStage')):
250  failstg = self.executePolicy.get("failureStage")
251  self.failureStageName = failstg.get("name")
252 
253  if (failstg.exists('parallelClass')):
254  self.failParallelName = failstg.getString('parallelClass')
255  failStagePolicy = failstg.get('stagePolicy')
256  else:
257  self.failParallelName = "lsst.pex.harness.stage.NoOpParallelProcessing"
258  failStagePolicy = None
259 
260  astage = self.failParallelName
261  tokenList = astage.split('.')
262  failClassString = tokenList.pop()
263  failClassString = failClassString.strip()
264 
265  package = ".".join(tokenList)
266 
267  # For example package -> lsst.pex.harness.App1Stage classString -> App1Stage
268  FailAppStage = __import__(package, globals(), locals(), [failClassString], -1)
269  FailStageClass = getattr(FailAppStage, failClassString)
270 
271  sysdata = {}
272 
273  # sysdata["name"] = self._pipelineName
274  sysdata["name"] = self.failureStageName
275  sysdata["rank"] = self._rank
276  sysdata["stageId"] = -1
277  sysdata["universeSize"] = self.universeSize
278  sysdata["runId"] = self._runId
279 
280  if (failStagePolicy != None):
281  self.failStageObject = FailStageClass(failStagePolicy, self.log, self.eventBrokerHost, sysdata)
282  # (self, policy=None, log=None, eventBroker=None, sysdata=None, callSetup=True):
283  else:
284  self.failStageObject = FailStageClass(None, self.log, self.eventBrokerHost, sysdata)
285 
286  log.log(self.VERB2, "failureStage %s " % self.failureStageName)
287  log.log(self.VERB2, "failParallelName %s " % self.failParallelName)
288 
289 
290  # Process Event Topics
291  self.eventTopicList = [ ]
292  self.sliceEventTopicList = [ ]
293  for item in fullStageList:
294  self.eventTopicList.append(item.getString("eventTopic"))
295  self.sliceEventTopicList.append(item.getString("eventTopic"))
296 
297  # Check for executionMode of oneloop
298  if (self.executePolicy.exists('executionMode') and (self.executePolicy.getString('executionMode') == "oneloop")):
299  self.executionMode = 1
300 
301  # Process Share Data Schedule
302  self.shareDataList = []
303  for item in fullStageList:
304  shareDataStage = False
305  if (item.exists('shareData')):
306  shareDataStage = item.getBool('shareData')
307  self.shareDataList.append(shareDataStage)
308 
309  log.log(self.VERB3, "Loading in %d trigger topics" % \
310  len(filter(lambda x: x != "None", self.eventTopicList)))
311  for iStage in xrange(len(self.eventTopicList)):
312  item = self.eventTopicList[iStage]
313  if self.eventTopicList[iStage] != "None":
314  log.log(self.VERB3, "eventTopic%d: %s" % (iStage+1, item))
315  else:
316  log.log(Log.DEBUG, "eventTopic%d: %s" % (iStage+1, item))
317 
318  count = 0
319  for item in self.eventTopicList:
320  newitem = "%s_%s" % (item, self._pipelineName)
321  self.sliceEventTopicList[count] = newitem
322  count += 1
323 
324  eventsSystem = events.EventSystem.getDefaultEventSystem()
325  for topic in self.sliceEventTopicList:
326  if (topic == "None_" + self._pipelineName):
327  pass
328  else:
329  eventsSystem.createReceiver(self.eventBrokerHost, topic)
330  log.log(self.VERB3, "Creating receiver %s" % (topic))
331 
332 
333  conflog.done()
334 
335  log.log(self.VERB1, "Slice configuration complete");
336 
337  def initializeQueues(self):
338  """
339  Initialize the Queue List
340  """
341  log = Log(self.log, "initializeQueues")
342  queuelog = BlockTimingLog(self.log, "initializeQueues", self.TRACE)
343  queuelog.start()
344 
345  for iQueue in range(1, self.nStages+1+1):
346  queue = Queue()
347  self.queueList.append(queue)
348 
349  queuelog.done()
350 
351  def initializeStages(self):
352  """
353  Initialize the Stage List
354  """
355  log = Log(self.log, "initializeStages")
356 
357  istageslog = BlockTimingLog(self.log, "initializeStages", self.TRACE)
358  istageslog.start()
359 
360  for iStage in range(1, self.nStages+1):
361  # Make a Policy object for the Stage Policy file
362  stagePolicy = self.stagePolicyList[iStage-1]
363  # Make an instance of the specifies Application Stage
364  # Use a constructor with the Policy as an argument
365  StageClass = self.stageClassList[iStage-1]
366  sysdata = {}
367  # sysdata["name"] = self._pipelineName
368  sysdata["name"] = self.stageNames[iStage-1]
369  sysdata["rank"] = self._rank
370  sysdata["stageId"] = iStage
371  sysdata["universeSize"] = self.universeSize
372  sysdata["runId"] = self._runId
373  # Here
374  if (stagePolicy != "None"):
375  stageObject = StageClass(stagePolicy, self.log, self.eventBrokerHost, sysdata)
376  # (self, policy=None, log=None, eventBroker=None, sysdata=None, callSetup=True):
377  else:
378  stageObject = StageClass(None, self.log, self.eventBrokerHost, sysdata)
379 
380  inputQueue = self.queueList[iStage-1]
381  outputQueue = self.queueList[iStage]
382 
383  # stageObject.setLookup(self._lookup)
384  stageObject.initialize(outputQueue, inputQueue)
385  self.stageList.append(stageObject)
386 
387  istageslog.done()
388 
389  def startInitQueue(self):
390  """
391  Place an empty Clipboard in the first Queue
392  """
393  clipboard = Clipboard()
394  queue1 = self.queueList[0]
395  queue1.addDataset(clipboard)
396 
397  def postOutputClipboard(self, iStage):
398  """
399  Place an empty Clipboard in the output queue for designated stage
400  """
401  clipboard = Clipboard()
402  queue2 = self.queueList[iStage]
403  queue2.addDataset(clipboard)
404 
405  def transferClipboard(self, iStage):
406  """
407  Move the Clipboard from the input queue to output queue for the designated stage
408  """
409  # clipboard = Clipboard()
410  queue1 = self.queueList[iStage-1]
411  queue2 = self.queueList[iStage]
412  clipboard = queue1.getNextDataset()
413  queue2.addDataset(clipboard)
414 
415  def startStagesLoop(self):
416  """
417  Execute the Stage loop. The loop progressing in step with
418  the analogous stage loop in the central Pipeline by means of
419  MPI Bcast and Barrier calls.
420  """
421  startStagesLoopLog = self.log.timeBlock("startStagesLoop", self.TRACE)
422  looplog = BlockTimingLog(self.log, "visit", self.TRACE)
423  stagelog = BlockTimingLog(looplog, "stage", self.TRACE)
424 
425  self.log.log(Log.INFO, "Begin startStagesLoopLog")
426 
427  self.threadBarrier()
428 
429  visitcount = 0
430  while True:
431  self.log.log(Log.INFO, "visitcount %d %s " % (visitcount, datetime.datetime.now()))
432 
433  if ((self.executionMode == 1) and (visitcount == 1)):
434  LogRec(looplog, Log.INFO) << "terminating Slice Stage Loop "
435  # self.cppPipeline.invokeShutdown()
436  break
437 
438  visitcount += 1
439  looplog.setPreamblePropertyInt("LOOPNUM", visitcount)
440 
441  stagelog.setPreamblePropertyInt("LOOPNUM", visitcount)
442  # stagelog.setPreamblePropertyInt("stagename", visitcount)
443  timesVisitStart = os.times()
444 
445  # looplog.setPreamblePropertyFloat("usertime", timesVisitStart[0])
446  # looplog.setPreamblePropertyFloat("systemtime", timesVisitStart[1])
447  looplog.setPreamblePropertyDouble("usertime", timesVisitStart[0])
448  looplog.setPreamblePropertyDouble("systemtime", timesVisitStart[1])
449  looplog.start()
450 
451  self.startInitQueue() # place an empty clipboard in the first Queue
452 
453  self.errorFlagged = 0
454  for iStage in range(1, self.nStages+1):
455  stagelog.setPreamblePropertyInt("STAGEID", iStage)
456  stagelog.setPreamblePropertyString("stagename", self.stageNames[iStage-1])
457  stagelog.start(self.stageNames[iStage-1] + " loop")
458  stagelog.log(Log.INFO, "Begin stage loop iteration iStage %d " % iStage)
459 
460  stageObject = self.stageList[iStage-1]
461  self.handleEvents(iStage, stagelog)
462 
463  # synchronize before preprocess
464  self.threadBarrier()
465 
466  # synchronize after preprocess, before process
467  self.threadBarrier()
468 
469  self.tryProcess(iStage, stageObject, stagelog)
470 
471  # synchronize after process, before postprocess
472  self.threadBarrier()
473 
474  # synchronize after postprocess
475  self.threadBarrier()
476 
477  stagelog.log(self.TRACE, "End stage loop iteration iStage %d " % iStage)
478  stagelog.log(Log.INFO, "End stage loop iteration : ErrorCheck \
479  iStage %d stageName %s errorFlagged_%d " % (iStage, self.stageNames[iStage-1], self.errorFlagged) )
480 
481  stagelog.done()
482 
483  looplog.log(self.VERB2, "Completed Stage Loop")
484 
485  # If no error/exception was flagged, then clear the final Clipboard in the final Queue
486  if self.errorFlagged == 0:
487  looplog.log(Log.DEBUG,
488  "Retrieving final Clipboard for deletion")
489  finalQueue = self.queueList[self.nStages]
490  finalClipboard = finalQueue.getNextDataset()
491  finalClipboard.close()
492  del finalClipboard
493  looplog.log(Log.DEBUG, "Deleted final Clipboard")
494  else:
495  looplog.log(self.VERB3, "Error flagged on this visit")
496 
497  timesVisitDone = os.times()
498  utime = timesVisitDone[0] - timesVisitStart[0]
499  stime = timesVisitDone[1] - timesVisitStart[1]
500  wtime = timesVisitDone[4] - timesVisitStart[4]
501  totalTime = utime + stime
502  looplog.log(Log.INFO, "visittimes : utime %.4f stime %.4f total %.4f wtime %.4f" % (utime, stime, totalTime, wtime) )
503 
504  # looplog.setPreamblePropertyFloat("usertime", timesVisitDone[0])
505  # looplog.setPreamblePropertyFloat("systemtime", timesVisitDone[1])
506  looplog.setPreamblePropertyDouble("usertime", timesVisitDone[0])
507  looplog.setPreamblePropertyDouble("systemtime", timesVisitDone[1])
508  looplog.done()
509 
510  try:
511  memmsg = "mem:"
512  with open("/proc/%d/status" % os.getpid(), "r") as f:
513  for l in f:
514  m = re.match(r'Vm(Size|RSS|Peak|HWM):\s+(\d+ \wB)', l)
515  if m:
516  memmsg += " %s=%s" % m.groups()
517  looplog.log(Log.INFO, memmsg)
518  except:
519  pass
520 
521  # LogRec(looplog, Log.INFO) << Prop("usertime", utime) \
522  # << Prop("systemtime", stime) \
523  # << LogRec.endr;
524 
525  startStagesLoopLog.done()
526 
527  def threadBarrier(self):
528  """
529  Create an approximate barrier where all Slices intercommunicate with the Pipeline
530  """
531 
532  log = Log(self.log, "threadBarrier")
533 
534  entryTime = time.time()
535  log.log(Log.DEBUG, "Slice %d waiting for signal from Pipeline %f" % (self._rank, entryTime))
536 
537  self.loopEventA.wait()
538 
539  signalTime1 = time.time()
540  log.log(Log.DEBUG, "Slice %d done waiting; signaling back %f" % (self._rank, signalTime1))
541 
542  if(self.loopEventA.isSet()):
543  self.loopEventA.clear()
544 
545  self.loopEventB.set()
546 
547  signalTime2 = time.time()
548  log.log(Log.DEBUG, "Slice %d sent signal back. Exit threadBarrier %f" % (self._rank, signalTime2))
549 
550  def shutdown(self):
551  """
552  Shutdown the Slice execution
553  """
554  shutlog = Log(self.log, "shutdown", Log.INFO);
555  pid = os.getpid()
556  shutlog.log(Log.INFO, "Shutting down Slice: pid " + str(pid))
557  os.kill(pid, signal.SIGKILL)
558 
559  def tryProcess(self, iStage, stage, stagelog):
560  """
561  Executes the try/except construct for Stage process() call
562  """
563  # Important try - except construct around stage process()
564  proclog = stagelog.timeBlock("tryProcess", self.TRACE-2);
565 
566  stageObject = self.stageList[iStage-1]
567  proclog.log(self.VERB3, "Getting process signal from Pipeline")
568 
569  # Important try - except construct around stage process()
570  try:
571  # If no error/exception has been flagged, run process()
572  # otherwise, simply pass along the Clipboard
573  if (self.errorFlagged == 0):
574  processlog = stagelog.timeBlock("process", self.TRACE)
575  stageObject.applyProcess()
576 
577  outputQueue = self.queueList[iStage]
578  clipboard = outputQueue.element()
579  proclog.log(Log.INFO, "Checking_For_Shotdown")
580 
581  if clipboard.has_key("noMoreDatasets"):
582  proclog.log(Log.INFO, "Ready_For_Shutdown")
583  self.shutdown();
584 
585  processlog.done()
586  else:
587  proclog.log(self.TRACE, "Skipping process due to error")
588  self.transferClipboard(iStage)
589 
590  except:
591  trace = "".join(traceback.format_exception(
592  sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
593  proclog.log(Log.FATAL, trace)
594 
595  # Flag that an exception occurred to guide the framework to skip processing
596  self.errorFlagged = 1
597  # Post the cliphoard that the Stage failed to transfer to the output queue
598 
599  if(self.failureStageName != None):
600  if(self.failParallelName != "lsst.pex.harness.stage.NoOpParallelProcessing"):
601 
602  LogRec(proclog, self.VERB2) << "failureStageName exists " \
603  << self.failureStageName \
604  << "and failParallelName exists " \
605  << self.failParallelName \
606  << LogRec.endr;
607 
608  inputQueue = self.queueList[iStage-1]
609  outputQueue = self.queueList[iStage]
610 
611  clipboard = inputQueue.element()
612  clipboard.put("failedInStage", stage.getName())
613  clipboard.put("failedInStageN", iStage)
614  clipboard.put("failureType", str(sys.exc_info()[0]))
615  clipboard.put("failureMessage", str(sys.exc_info()[1]))
616  clipboard.put("failureTraceback", trace)
617 
618  self.failStageObject.initialize(outputQueue, inputQueue)
619 
620  self.failStageObject.applyProcess()
621 
622  proclog.log(self.TRACE, "Popping off failure stage Clipboard")
623  clipboard = outputQueue.getNextDataset()
624  clipboard.close()
625  del clipboard
626  proclog.log(self.TRACE, "Erasing and deleting failure stage Clipboard")
627 
628  else:
629  proclog.log(self.VERB2, "No ParallelProcessing to do for failure stage")
630 
631  self.postOutputClipboard(iStage)
632 
633  proclog.log(self.VERB3, "Getting end of process signal from Pipeline")
634  proclog.done()
635 
636  def handleEvents(self, iStage, stagelog):
637  """
638  Handles Events: transmit or receive events as specified by Policy
639  """
640  log = stagelog.timeBlock("handleEvents", self.TRACE-2)
641  eventsSystem = events.EventSystem.getDefaultEventSystem()
642 
643  thisTopic = self.eventTopicList[iStage-1]
644 
645  if (thisTopic != "None"):
646  log.log(self.VERB3, "Processing topic: " + thisTopic)
647  sliceTopic = self.sliceEventTopicList[iStage-1]
648 
649  waitlog = log.timeBlock("eventwait " + sliceTopic, self.TRACE,
650  "wait for event...")
651 
652  # Receive the event from the Pipeline
653  # Call with a timeout , followed by a call to time sleep to free the GIL
654  # periodically
655 
656  sleepTimeout = 0.1
657  transTimeout = 900
658 
659  inputParamPropertySetPtr = eventsSystem.receive(sliceTopic, transTimeout)
660  while(inputParamPropertySetPtr == None):
661  time.sleep(sleepTimeout)
662  inputParamPropertySetPtr = eventsSystem.receive(sliceTopic, transTimeout)
663 
664 
665  waitlog.done()
666  LogRec(log, self.TRACE) << "received event; contents: " \
667  << inputParamPropertySetPtr \
668  << LogRec.endr
669 
670 
671  self.populateClipboard(inputParamPropertySetPtr, iStage, thisTopic)
672  log.log(self.VERB3, 'Received event; added payload to clipboard')
673  else:
674  log.log(Log.DEBUG, 'No event to handle')
675 
676  log.done()
677 
678  def populateClipboard(self, inputParamPropertySetPtr, iStage, eventTopic):
679  """
680  Place the event payload onto the Clipboard
681  """
682  log = Log(self.log, "populateClipboard");
683  log.log(Log.DEBUG,'Python Pipeline populateClipboard');
684 
685  queue = self.queueList[iStage-1]
686  clipboard = queue.element()
687 
688  # Slice does not disassemble the payload of the event.
689  # It knows nothing of the contents.
690  # It simply places the payload on the clipboard with key of the eventTopic
691  clipboard.put(eventTopic, inputParamPropertySetPtr)
692 
693  #------------------------------------------------------------------------
694  def getRun(self):
695  """
696  get method for the runId
697  """
698  return self._runId
699 
700  #------------------------------------------------------------------------
701  def setRun(self, run):
702  """
703  set method for the runId
704  """
705  self._runId = run
706 
707  def getLogThreshold(self):
708  """
709  return the default message importance threshold being used for
710  recording messages. The returned value reflects the threshold
711  associated with the default root (system-wide) logger (or what it will
712  be after logging is initialized). Some underlying components may
713  override this threshold.
714  @return int the threshold value as would be returned by
715  Log.getThreshold()
716  """
717  if self.log is None:
718  return self.logthresh
719  else:
720  return Log.getDefaultLog().getThreshold()
721 
722  def setLogThreshold(self, level):
723  """
724  set the default message importance threshold to be used for
725  recording messages. This will value be applied to the default
726  root (system-wide) logger (or what it will be after logging is
727  initialized) so that all software components are affected.
728  @param level the threshold level as expected by Log.setThreshold().
729  """
730  if self.log is not None:
731  Log.getDefaultLog().setThreshold(level)
732  self.log.log(Log.INFO,
733  "Upating Root Log Message Threshold to %i" % level)
734  self.logthresh = level
735 
736  def setLogDir(self, logdir):
737  """
738  set the default directory into which the slice should write log files
739  @param logdir the directory in which log files should be written
740  """
741  if (logdir == "None" or logdir == None):
742  self._logdir = ""
743  else:
744  self._logdir = logdir
745 
746  def makeStageName(self, appStagePolicy):
747  if appStagePolicy.getValueType("stagePolicy") == appStagePolicy.FILE:
748  pfile = os.path.splitext(os.path.basename(
749  appStagePolicy.getFile("stagePolicy").getPath()))[0]
750  return trailingpolicy.sub('', pfile)
751  else:
752  return None
753 
754  def setLoopEventA(self, event):
755  self.loopEventA = event
756 
757  def setLoopEventB(self, event):
758  self.loopEventB = event
759 
760  def setUniverseSize(self, usize):
761  self.universeSize = usize
762 
763 trailingpolicy = re.compile(r'_*(policy|dict)$', re.IGNORECASE)
764 
765 
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154
A LogRecord attached to a particular Log that suppports stream stream semantics.
Definition: Log.h:729
a determination of the various directory roots that can be used by a pipeline.
Definition: Directories.py:73
Class for storing generic metadata.
Definition: PropertySet.h:82