40 from threading
import Thread
41 from threading
import Event
as PyEvent
56 import os, sys, re, traceback, time
59 Pipeline class manages the operation of a multi-stage parallel pipeline.
60 The Pipeline is configured by reading a Policy file.
61 Pipeline has a __main__ portion as it serves as the main executable program
62 ('glue layer') for running a Pipeline. The Pipeline spawns Slice workers
63 for parallel computations.
68 '''Python Pipeline class implementation. Contains main pipeline workflow'''
70 def __init__(self, runId='-1', pipelinePolicyName=None, name="unnamed", workerId=None):
72 Initialize the Pipeline: create empty Queue and Stage lists;
73 import the C++ Pipeline instance; initialize the MPI environment
76 self.
TRACE = BlockTimingLog.INSTRUM+2
96 if workerId
is not None:
109 if self.
log is not None:
110 self.log.log(self.
VERB1,
'Killing Pipeline process immediately: shutdown level 1')
112 thisPid = os.getpid()
113 os.popen(
"kill -9 "+str(thisPid))
119 Delete the Pipeline object: clean up
121 if self.
log is not None:
122 self.log.log(self.
VERB1,
'Python Pipeline being deleted')
126 Initialize the Logger after opening policy file
130 dictName =
"pipeline_dict.paf"
133 if (topPolicy.exists(
'execute')):
139 if (self.executePolicy.exists(
'eventBrokerHost')):
145 doLogFile = self.executePolicy.getBool(
'localLogMode')
146 self.cppLogUtils.initializeLogger(doLogFile, self.
_pipelineName,
149 BlockTimingLog.LINUXUDATA)
152 self.
log = self.cppLogUtils.getLogger()
154 if (self.executePolicy.exists(
'logThreshold')):
155 self.
logthresh = self.executePolicy.get(
'logThreshold')
165 Configure the Pipeline by reading a Policy File
168 if (self.executePolicy.exists(
'nSlices')):
169 self.
nSlices = self.executePolicy.getInt(
'nSlices')
174 if (self.executePolicy.exists(
'barrierDelay')):
184 stgcfg = self.executePolicy.getArray(
"appStage")
186 for subpol
in stgcfg:
187 stageName = subpol.get(
"name")
188 self.stageNames.append(stageName)
190 self.executePolicy.loadPolicyFiles()
195 if (self.executePolicy.exists(
'dir')):
196 dirPolicy = self.executePolicy.get(
'dir')
198 if (dirPolicy.exists(
'shortName')):
199 shortName = dirPolicy.get(
'shortName')
200 if shortName ==
None:
201 shortName = self.pipelinePolicyName.split(
'.')[0]
203 psLookup = dirs.getDirs()
204 if (self.executePolicy.exists(
'database.url')):
205 psLookup.set(
'dbUrl', self.executePolicy.get(
'database.url'))
208 log =
Log(self.
log,
"configurePipeline")
210 "Logging messages using threshold=%i" % log.getThreshold())
211 LogRec(log, self.
VERB1) <<
"Configuring pipeline" \
214 <<
Prop(
"rank", -1) \
221 dafPersist.LogicalLocation.setLocationMap(psLookup)
227 if (self.executePolicy.exists(
'eventTimeout')):
233 fullStageList = self.executePolicy.getArray(
"appStage")
235 log.log(self.
VERB2,
"Found %d stages" % len(fullStageList))
237 fullStageNameList = [ ]
239 for stagei
in xrange(self.
nStages):
240 fullStagePolicy = fullStageList[stagei]
241 if (fullStagePolicy.exists(
'serialClass')):
242 serialName = fullStagePolicy.getString(
'serialClass')
243 stagePolicy = fullStagePolicy.get(
'stagePolicy')
245 serialName =
"lsst.pex.harness.stage.NoOpSerialProcessing"
248 fullStageNameList.append(serialName)
249 self.stagePolicyList.append(stagePolicy)
252 self.
stageNames[stagei] = fullStageNameList[-1].split(
'.')[-1]
254 "Stage %d: %s: %s" % (stagei+1, self.
stageNames[stagei],
255 fullStageNameList[-1]))
256 for astage
in fullStageNameList:
257 fullStage = astage.strip()
258 tokenList = astage.split(
'.')
259 classString = tokenList.pop()
260 classString = classString.strip()
262 package =
".".join(tokenList)
265 AppStage = __import__(package, globals(), locals(), [classString], -1)
266 StageClass = getattr(AppStage, classString)
267 self.stageClassList.append(StageClass)
276 if (self.executePolicy.exists(
'failureStage')):
277 failstg = self.executePolicy.get(
"failureStage")
280 if (failstg.exists(
'serialClass')):
282 failStagePolicy = failstg.get(
'stagePolicy')
284 self.
failSerialName =
"lsst.pex.harness.stage.NoOpSerialProcessing"
285 failStagePolicy =
None
288 tokenList = astage.split(
'.')
289 failClassString = tokenList.pop()
290 failClassString = failClassString.strip()
292 package =
".".join(tokenList)
295 FailAppStage = __import__(package, globals(), locals(), [failClassString], -1)
296 FailStageClass = getattr(FailAppStage, failClassString)
301 sysdata[
"stageId"] = -1
303 sysdata[
"runId"] = self.
_runId
304 if (failStagePolicy !=
None):
314 for item
in fullStageList:
315 self.eventTopicList.append(item.getString(
"eventTopic"))
319 for item
in fullStageList:
320 shareDataStage =
False
321 if (item.exists(
'shareData')):
322 shareDataStage = item.getBool(
'shareData')
323 self.shareDataList.append(shareDataStage)
325 log.log(self.
VERB3,
"Loading in %d trigger topics" % \
328 log.log(self.
VERB3,
"Loading in %d shareData flags" % \
334 log.log(self.
VERB3,
"eventTopic%d: %s" % (iStage+1, item))
336 log.log(Log.DEBUG,
"eventTopic%d: %s" % (iStage+1, item))
339 eventsSystem = events.EventSystem.getDefaultEventSystem()
342 if (topic ==
"None"):
345 log.log(self.
VERB2,
"Creating receiver for topic %s" % (topic))
349 if (self.executePolicy.exists(
'executionMode')
and (self.executePolicy.getString(
'executionMode') ==
"oneloop")):
353 if (self.executePolicy.exists(
'shutdownTopic')):
359 if (self.executePolicy.exists(
'exitTopic')):
360 self.
exitTopic = self.executePolicy.getString(
'exitTopic')
364 log.log(self.
VERB1,
"Pipeline configuration complete");
368 Initialize the Queue List for the Pipeline
370 for iQueue
in range(1, self.
nStages+1+1):
372 self.queueList.append(queue)
376 Initialize the Stage List for the Pipeline
378 log = self.log.timeBlock(
"initializeStages", self.
TRACE-2)
380 for iStage
in range(1, self.
nStages+1):
389 sysdata[
"stageId"] = iStage
391 sysdata[
"runId"] = self.
_runId
392 if (stagePolicy !=
"None"):
400 stageObject.initialize(outputQueue, inputQueue)
401 self.stageList.append(stageObject)
408 self.oneShutdownThread.setDaemon(
True)
409 self.oneShutdownThread.start()
413 Initialize the Queue by defining an initial dataset list
415 log = self.log.timeBlock(
"startSlices", self.
TRACE-2)
417 log.log(self.
VERB3,
"Number of slices " + str(self.
nSlices));
421 loopEventA = PyEvent()
422 self.loopEventList.append(loopEventA)
423 loopEventB = PyEvent()
424 self.loopEventList.append(loopEventB)
434 self.sliceThreadList.append(oneSliceThread)
437 log.log(self.
VERB3,
"Starting slice");
438 slicei.setDaemon(
True)
441 log.log(self.
VERB3,
"slicei is Alive");
443 log.log(self.
VERB3,
"slicei is not Alive");
450 Place an empty Clipboard in the first Queue
462 queue1.addDataset(clipboard)
466 Method to execute loop over Stages
468 startStagesLoopLog = self.log.timeBlock(
"startStagesLoop", self.
TRACE)
480 LogRec(looplog, Log.INFO) <<
"terminating pipeline after one loop/visit "
487 looplog.setPreamblePropertyInt(
"LOOPNUM", visitcount)
489 stagelog.setPreamblePropertyInt(
"LOOPNUM", visitcount)
490 proclog.setPreamblePropertyInt(
"LOOPNUM", visitcount)
497 for iStage
in range(1, self.
nStages+1):
498 stagelog.setPreamblePropertyInt(
"STAGEID", iStage)
499 stagelog.start(self.
stageNames[iStage-1] +
" loop")
500 proclog.setPreamblePropertyInt(
"STAGEID", iStage)
527 looplog.log(self.
VERB2,
"Completed Stage Loop")
535 looplog.log(Log.DEBUG,
'Retrieving finalClipboard for deletion')
537 finalClipboard = finalQueue.getNextDataset()
538 looplog.log(Log.DEBUG,
"deleting final clipboard")
541 finalClipboard.close()
544 startStagesLoopLog.log(Log.INFO,
"Shutting down pipeline");
546 startStagesLoopLog.done()
556 log =
Log(self.
log,
"checkExitBySyncPoint")
558 if((self._stop.isSet())
and (self.
exitLevel == 2)):
559 log.log(Log.INFO,
"Pipeline stop is set at exitLevel of 2")
560 log.log(Log.INFO,
"Exit here at a Synchronization point")
564 log =
Log(self.
log,
"checkExitByStage")
566 if((self._stop.isSet())
and (self.
exitLevel == 3)):
567 log.log(Log.INFO,
"Pipeline stop is set at exitLevel of 3")
568 log.log(Log.INFO,
"Exit here at the end of the Stage")
572 log =
Log(self.
log,
"checkExitByVisit")
574 if((self._stop.isSet())
and (self.
exitLevel == 4)):
575 log.log(Log.INFO,
"Pipeline stop is set at exitLevel of 4")
576 log.log(Log.INFO,
"Exit here at the end of the Visit")
581 Create an approximate barrier where all Slices intercommunicate with the Pipeline
584 log =
Log(self.
log,
"threadBarrier")
596 entryTime = time.time()
597 log.log(Log.DEBUG,
"Entry time %f" % (entryTime))
605 signalTime1 = time.time()
606 log.log(Log.DEBUG,
"Signal to Slice %d %f" % (i, signalTime1))
610 log.log(Log.DEBUG,
"Wait for signal from Slice %d" % (i))
621 while(
not (loopEventB.isSet())):
624 signalTime2 = time.time()
625 log.log(Log.DEBUG,
"Done waiting for signal from Slice %d %f" % (i, signalTime2))
627 if(loopEventB.isSet()):
634 Shutdown the Pipeline execution: delete the MPI environment
635 Send the Exit Event if required
638 self.log.log(self.
VERB2,
'Pipeline forceShutdown : Stopping Slices ')
643 self.log.log(self.
VERB2,
'Slice ' + str(i) +
' stopped.')
648 self.log.log(self.
VERB2,
'Slice ' + str(i) +
' exited.')
659 Shutdown the Pipeline execution: delete the MPI environment
660 Send the Exit Event if required
667 psPtr.setString(
"message", str(
"exiting_") + self.
_runId )
669 oneEventTransmitter.publish(psPtr)
674 self.log.log(self.
VERB2,
'Slice ' + str(i) +
' ended.')
677 self.oneShutdownThread.setStop()
678 self.oneShutdownThread.join()
679 self.log.log(self.
VERB2,
'Shutdown thread ended ')
686 THIS GOES AWAY in non MPI harness
687 If needed, calls the C++ Pipeline invokeSyncSlices
689 invlog = stagelog.timeBlock(
"invokeSyncSlices", self.
TRACE-1)
691 invlog.log(self.
VERB3,
"Calling invokeSyncSlices")
698 Executes the try/except construct for Stage preprocess() call
700 prelog = stagelog.timeBlock(
"tryPreProcess", self.
TRACE-2);
707 processlog = stagelog.timeBlock(
"preprocess", self.
TRACE)
711 prelog.log(self.
TRACE,
"Skipping process due to error")
715 trace =
"".join(traceback.format_exception(
716 sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
717 prelog.log(Log.FATAL, trace)
720 prelog.log(self.
VERB2,
"Flagging error in tryPreProcess, tryPostProcess to be skipped")
724 if(self.
failSerialName !=
"lsst.pex.harness.stage.NoOpSerialProcessing"):
726 LogRec(prelog, self.
VERB2) <<
"failureStageName exists " \
728 <<
"and failSerialName exists " \
735 clipboard = inputQueue.element()
736 clipboard.put(
"failedInStage", stage.getName())
737 clipboard.put(
"failedInStageN", iStage)
738 clipboard.put(
"failureType", str(sys.exc_info()[0]))
739 clipboard.put(
"failureMessage", str(sys.exc_info()[1]))
740 clipboard.put(
"failureTraceback", trace)
742 self.failStageObject.initialize(outputQueue, inputQueue)
744 self.
interQueue = self.failStageObject.applyPreprocess()
747 prelog.log(self.
VERB2,
"No SerialProcessing to do for failure stage")
757 Executes the try/except construct for Stage postprocess() call
759 postlog = stagelog.timeBlock(
"tryPostProcess",self.
TRACE-2);
766 processlog = stagelog.timeBlock(
"postprocess", self.
TRACE)
770 postlog.log(self.
TRACE,
"Skipping applyPostprocess due to flagged error")
774 trace =
"".join(traceback.format_exception(
775 sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
776 postlog.log(Log.FATAL, trace)
782 if(self.
failSerialName !=
"lsst.pex.harness.stage.NoOpSerialProcessing"):
784 LogRec(postlog, self.
VERB2) <<
"failureStageName exists " \
786 <<
"and failSerialName exists " \
793 self.failStageObject.initialize(outputQueue, inputQueue)
795 self.failStageObject.applyPostprocess(self.
interQueue)
798 postlog.log(self.
VERB2,
"No SerialProcessing to do for failure stage")
808 wait for a single event of a designated topic
811 eventsSystem = events.EventSystem.getDefaultEventSystem()
816 inputParamPropertySetPtr =
None
818 while((inputParamPropertySetPtr ==
None)
and (waitLoopCount < self.
eventTimeout)):
820 print "Pipeline waitForEvent Looping : checking for event ... \n"
822 inputParamPropertySetPtr = eventsSystem.receive(thisTopic, transTimeout)
824 time.sleep(sleepTimeout)
826 if((self._stop.isSet())):
829 waitLoopCount = waitLoopCount+1
831 return inputParamPropertySetPtr
836 Handles Events: transmit or receive events as specified by Policy
838 log = stagelog.timeBlock(
"handleEvents", self.
TRACE-2)
839 eventsSystem = events.EventSystem.getDefaultEventSystem()
842 thisTopic = thisTopic.strip()
845 if (thisTopic !=
"None"):
846 log.log(self.
VERB3,
"Processing topic: " + thisTopic)
849 waitlog = log.timeBlock(
"eventwait " + thisTopic, self.
TRACE,
858 if (inputParamPropertySetPtr !=
None):
859 LogRec(log, self.
TRACE) <<
"received event; contents: " \
860 << inputParamPropertySetPtr \
863 log.log(self.
VERB2,
"received event; sending it to Slices " + sliceTopic)
870 eventsSystem.publish(sliceTopic, inputParamPropertySetPtr)
872 log.log(self.
VERB2,
"event sent to Slices")
874 if((self._stop.isSet())):
876 log.log(self.
VERB2,
"Pipeline Shutting down : Shutdown event received.")
881 LogRec(log, self.
VERB2) <<
"Pipeline Shutting Down: Event timeout " \
883 <<
"reached: No or next event did not arrive " \
890 log.log(Log.DEBUG,
'No event to handle')
896 Place the event payload onto the Clipboard
898 log = self.log.timeBlock(
"populateClipboard", self.
TRACE-2);
901 clipboard = queue.element()
906 clipboard.put(eventTopic, inputParamPropertySetPtr)
912 Place an empty Clipboard in the output queue for designated stage
916 queue2.addDataset(clipboard)
920 Move the Clipboard from the input queue to output queue for the designated stage
925 clipboard = queue1.getNextDataset()
926 if (clipboard !=
None):
927 queue2.addDataset(clipboard)
931 get method for the runId
937 set method for the runId
943 get method for the shutdown event topic
949 get method for the event broker host
956 return the default message importance threshold being used for
957 recording messages. The returned value reflects the threshold
958 associated with the default root (system-wide) logger (or what it will
959 be after logging is initialized). Some underlying components may
960 override this threshold.
965 return Log.getDefaultLog().getThreshold()
969 set the default message importance threshold to be used for
970 recording messages. This will value be applied to the default
971 root (system-wide) logger (or what it will be after logging is
972 initialized) so that all software components are affected.
974 if self.
log is not None:
975 Log.getDefaultLog().setThreshold(level)
976 self.log.log(Log.INFO,
977 "Upating Root Log Message Threshold to %i" % level)
982 set the default directory into which the pipeline should write log files
983 @param logdir the directory in which log files should be written
985 if (logdir ==
"None" or logdir ==
None):
992 set the default message importance threshold to be used for
993 recording messages. This will value be applied to the default
994 root (system-wide) logger (or what it will be after logging is
995 initialized) so that all software components are affected.
997 if self.
log is not None:
998 self.log.log(Log.INFO,
999 "Updating event timeout to %i" % timeout)
1003 if appStagePolicy.getValueType(
"stagePolicy") == appStagePolicy.FILE:
1004 pfile = os.path.splitext(os.path.basename(
1005 appStagePolicy.getFile(
"stagePolicy").getPath()))[0]
1006 return trailingpolicy.sub(
'', pfile)
1010 trailingpolicy = re.compile(
r'_*(policy|dict)$', re.IGNORECASE)
a place to record messages and descriptions of the state of processing.
A LogRecord attached to a particular Log that suppports stream stream semantics.
a determination of the various directory roots that can be used by a pipeline.
Class for storing generic metadata.