25 from __future__
import with_statement
49 import os, sys, signal, re, traceback, time, datetime
51 from threading
import Event
as PyEvent
55 Slice represents a single parallel worker program.
56 Slice executes the loop of Stages for processing a portion of an Image (e.g.,
57 single ccd or amplifier). The processing is synchonized with serial processing
59 A Slice obtains its configuration by reading a policy file.
63 '''Slice: Python Slice class implementation. '''
66 def __init__(self, runId="TEST", pipelinePolicyName=None, name="unnamed", rank=-1, workerId=None):
68 Initialize the Slice: create an empty Queue List and Stage List;
69 Import the C++ Slice and initialize the MPI environment
72 self.
TRACE = BlockTimingLog.INSTRUM+2
97 if workerId
is not None:
106 Delete the Slice object: cleanup
108 if self.
log is not None:
109 self.log.log(self.
VERB1,
'Python Slice being deleted')
113 Initialize the Logger after opening policy file
117 dictName =
"pipeline_dict.paf"
120 if (topPolicy.exists(
'execute')):
126 if (self.executePolicy.exists(
'eventBrokerHost')):
132 doLogFile = self.executePolicy.getBool(
'localLogMode')
133 self.cppLogUtils.initializeSliceLogger(doLogFile, self.
_pipelineName,
136 BlockTimingLog.LINUXUDATA)
139 self.
log = self.cppLogUtils.getLogger()
141 if (self.executePolicy.exists(
'logThreshold')):
142 self.
logthresh = self.executePolicy.get(
'logThreshold')
153 Configure the slice via reading a Policy file
156 log =
Log(self.
log,
"configureSlice")
161 stgcfg = self.executePolicy.getArray(
"appStage")
164 for subpol
in stgcfg:
165 stageName = subpol.get(
"name")
166 self.stageNames.append(stageName)
168 self.executePolicy.loadPolicyFiles()
172 if (self.executePolicy.exists(
'dir')):
173 dirPolicy = self.executePolicy.get(
'dir')
175 if (dirPolicy.exists(
'shortName')):
176 shortName = dirPolicy.get(
'shortName')
177 if shortName ==
None:
178 shortName = self.pipelinePolicyName.split(
'.')[0]
180 psLookup = dirs.getDirs()
182 if (self.executePolicy.exists(
'database.url')):
183 psLookup.set(
'dbUrl', self.executePolicy.get(
'database.url'))
193 dafPersist.LogicalLocation.setLocationMap(psLookup)
196 if (self.executePolicy.exists(
'eventTimeout')):
202 fullStageList = self.executePolicy.getArray(
"appStage")
204 log.log(self.
VERB2,
"Found %d stages" % len(fullStageList))
207 fullStageNameList = [ ]
209 for stagei
in xrange(self.
nStages):
210 fullStagePolicy = fullStageList[stagei]
211 if (fullStagePolicy.exists(
'parallelClass')):
212 parallelName = fullStagePolicy.getString(
'parallelClass')
213 stagePolicy = fullStagePolicy.get(
'stagePolicy')
215 parallelName =
"lsst.pex.harness.stage.NoOpParallelProcessing"
218 fullStageNameList.append(parallelName)
219 self.stagePolicyList.append(stagePolicy)
222 self.
stageNames[stagei] = fullStageNameList[-1].split(
'.')[-1]
224 "Stage %d: %s: %s" % (stagei+1, self.
stageNames[stagei],
225 fullStageNameList[-1]))
227 for astage
in fullStageNameList:
228 fullStage = astage.strip()
229 tokenList = astage.split(
'.')
230 classString = tokenList.pop()
231 classString = classString.strip()
233 package =
".".join(tokenList)
236 AppStage = __import__(package, globals(), locals(), [classString], -1)
237 StageClass = getattr(AppStage, classString)
238 self.stageClassList.append(StageClass)
240 log.log(self.
VERB2,
"Imported Stage Classes")
249 if (self.executePolicy.exists(
'failureStage')):
250 failstg = self.executePolicy.get(
"failureStage")
253 if (failstg.exists(
'parallelClass')):
255 failStagePolicy = failstg.get(
'stagePolicy')
258 failStagePolicy =
None
261 tokenList = astage.split(
'.')
262 failClassString = tokenList.pop()
263 failClassString = failClassString.strip()
265 package =
".".join(tokenList)
268 FailAppStage = __import__(package, globals(), locals(), [failClassString], -1)
269 FailStageClass = getattr(FailAppStage, failClassString)
275 sysdata[
"rank"] = self.
_rank
276 sysdata[
"stageId"] = -1
278 sysdata[
"runId"] = self.
_runId
280 if (failStagePolicy !=
None):
293 for item
in fullStageList:
294 self.eventTopicList.append(item.getString(
"eventTopic"))
295 self.sliceEventTopicList.append(item.getString(
"eventTopic"))
298 if (self.executePolicy.exists(
'executionMode')
and (self.executePolicy.getString(
'executionMode') ==
"oneloop")):
303 for item
in fullStageList:
304 shareDataStage =
False
305 if (item.exists(
'shareData')):
306 shareDataStage = item.getBool(
'shareData')
307 self.shareDataList.append(shareDataStage)
309 log.log(self.
VERB3,
"Loading in %d trigger topics" % \
314 log.log(self.
VERB3,
"eventTopic%d: %s" % (iStage+1, item))
316 log.log(Log.DEBUG,
"eventTopic%d: %s" % (iStage+1, item))
324 eventsSystem = events.EventSystem.getDefaultEventSystem()
330 log.log(self.
VERB3,
"Creating receiver %s" % (topic))
335 log.log(self.
VERB1,
"Slice configuration complete");
339 Initialize the Queue List
341 log =
Log(self.
log,
"initializeQueues")
345 for iQueue
in range(1, self.
nStages+1+1):
347 self.queueList.append(queue)
353 Initialize the Stage List
355 log =
Log(self.
log,
"initializeStages")
360 for iStage
in range(1, self.
nStages+1):
369 sysdata[
"rank"] = self.
_rank
370 sysdata[
"stageId"] = iStage
372 sysdata[
"runId"] = self.
_runId
374 if (stagePolicy !=
"None"):
384 stageObject.initialize(outputQueue, inputQueue)
385 self.stageList.append(stageObject)
391 Place an empty Clipboard in the first Queue
395 queue1.addDataset(clipboard)
399 Place an empty Clipboard in the output queue for designated stage
403 queue2.addDataset(clipboard)
407 Move the Clipboard from the input queue to output queue for the designated stage
412 clipboard = queue1.getNextDataset()
413 queue2.addDataset(clipboard)
417 Execute the Stage loop. The loop progressing in step with
418 the analogous stage loop in the central Pipeline by means of
419 MPI Bcast and Barrier calls.
421 startStagesLoopLog = self.log.timeBlock(
"startStagesLoop", self.
TRACE)
425 self.log.log(Log.INFO,
"Begin startStagesLoopLog")
431 self.log.log(Log.INFO,
"visitcount %d %s " % (visitcount, datetime.datetime.now()))
434 LogRec(looplog, Log.INFO) <<
"terminating Slice Stage Loop "
439 looplog.setPreamblePropertyInt(
"LOOPNUM", visitcount)
441 stagelog.setPreamblePropertyInt(
"LOOPNUM", visitcount)
443 timesVisitStart = os.times()
447 looplog.setPreamblePropertyDouble(
"usertime", timesVisitStart[0])
448 looplog.setPreamblePropertyDouble(
"systemtime", timesVisitStart[1])
454 for iStage
in range(1, self.
nStages+1):
455 stagelog.setPreamblePropertyInt(
"STAGEID", iStage)
456 stagelog.setPreamblePropertyString(
"stagename", self.
stageNames[iStage-1])
457 stagelog.start(self.
stageNames[iStage-1] +
" loop")
458 stagelog.log(Log.INFO,
"Begin stage loop iteration iStage %d " % iStage)
469 self.
tryProcess(iStage, stageObject, stagelog)
477 stagelog.log(self.
TRACE,
"End stage loop iteration iStage %d " % iStage)
478 stagelog.log(Log.INFO,
"End stage loop iteration : ErrorCheck \
483 looplog.log(self.
VERB2,
"Completed Stage Loop")
487 looplog.log(Log.DEBUG,
488 "Retrieving final Clipboard for deletion")
490 finalClipboard = finalQueue.getNextDataset()
491 finalClipboard.close()
493 looplog.log(Log.DEBUG,
"Deleted final Clipboard")
495 looplog.log(self.
VERB3,
"Error flagged on this visit")
497 timesVisitDone = os.times()
498 utime = timesVisitDone[0] - timesVisitStart[0]
499 stime = timesVisitDone[1] - timesVisitStart[1]
500 wtime = timesVisitDone[4] - timesVisitStart[4]
501 totalTime = utime + stime
502 looplog.log(Log.INFO,
"visittimes : utime %.4f stime %.4f total %.4f wtime %.4f" % (utime, stime, totalTime, wtime) )
506 looplog.setPreamblePropertyDouble(
"usertime", timesVisitDone[0])
507 looplog.setPreamblePropertyDouble(
"systemtime", timesVisitDone[1])
512 with open(
"/proc/%d/status" % os.getpid(),
"r") as f:
514 m = re.match(
r'Vm(Size|RSS|Peak|HWM):\s+(\d+ \wB)', l)
516 memmsg +=
" %s=%s" % m.groups()
517 looplog.log(Log.INFO, memmsg)
525 startStagesLoopLog.done()
529 Create an approximate barrier where all Slices intercommunicate with the Pipeline
532 log =
Log(self.
log,
"threadBarrier")
534 entryTime = time.time()
535 log.log(Log.DEBUG,
"Slice %d waiting for signal from Pipeline %f" % (self.
_rank, entryTime))
537 self.loopEventA.wait()
539 signalTime1 = time.time()
540 log.log(Log.DEBUG,
"Slice %d done waiting; signaling back %f" % (self.
_rank, signalTime1))
542 if(self.loopEventA.isSet()):
543 self.loopEventA.clear()
545 self.loopEventB.set()
547 signalTime2 = time.time()
548 log.log(Log.DEBUG,
"Slice %d sent signal back. Exit threadBarrier %f" % (self.
_rank, signalTime2))
552 Shutdown the Slice execution
554 shutlog =
Log(self.
log,
"shutdown", Log.INFO);
556 shutlog.log(Log.INFO,
"Shutting down Slice: pid " + str(pid))
557 os.kill(pid, signal.SIGKILL)
561 Executes the try/except construct for Stage process() call
564 proclog = stagelog.timeBlock(
"tryProcess", self.
TRACE-2);
567 proclog.log(self.
VERB3,
"Getting process signal from Pipeline")
574 processlog = stagelog.timeBlock(
"process", self.
TRACE)
575 stageObject.applyProcess()
578 clipboard = outputQueue.element()
579 proclog.log(Log.INFO,
"Checking_For_Shotdown")
581 if clipboard.has_key(
"noMoreDatasets"):
582 proclog.log(Log.INFO,
"Ready_For_Shutdown")
587 proclog.log(self.
TRACE,
"Skipping process due to error")
591 trace =
"".join(traceback.format_exception(
592 sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))
593 proclog.log(Log.FATAL, trace)
600 if(self.
failParallelName !=
"lsst.pex.harness.stage.NoOpParallelProcessing"):
602 LogRec(proclog, self.
VERB2) <<
"failureStageName exists " \
604 <<
"and failParallelName exists " \
611 clipboard = inputQueue.element()
612 clipboard.put(
"failedInStage", stage.getName())
613 clipboard.put(
"failedInStageN", iStage)
614 clipboard.put(
"failureType", str(sys.exc_info()[0]))
615 clipboard.put(
"failureMessage", str(sys.exc_info()[1]))
616 clipboard.put(
"failureTraceback", trace)
618 self.failStageObject.initialize(outputQueue, inputQueue)
620 self.failStageObject.applyProcess()
622 proclog.log(self.
TRACE,
"Popping off failure stage Clipboard")
623 clipboard = outputQueue.getNextDataset()
626 proclog.log(self.
TRACE,
"Erasing and deleting failure stage Clipboard")
629 proclog.log(self.
VERB2,
"No ParallelProcessing to do for failure stage")
633 proclog.log(self.
VERB3,
"Getting end of process signal from Pipeline")
638 Handles Events: transmit or receive events as specified by Policy
640 log = stagelog.timeBlock(
"handleEvents", self.
TRACE-2)
641 eventsSystem = events.EventSystem.getDefaultEventSystem()
645 if (thisTopic !=
"None"):
646 log.log(self.
VERB3,
"Processing topic: " + thisTopic)
649 waitlog = log.timeBlock(
"eventwait " + sliceTopic, self.
TRACE,
659 inputParamPropertySetPtr = eventsSystem.receive(sliceTopic, transTimeout)
660 while(inputParamPropertySetPtr ==
None):
661 time.sleep(sleepTimeout)
662 inputParamPropertySetPtr = eventsSystem.receive(sliceTopic, transTimeout)
666 LogRec(log, self.
TRACE) <<
"received event; contents: " \
667 << inputParamPropertySetPtr \
672 log.log(self.
VERB3,
'Received event; added payload to clipboard')
674 log.log(Log.DEBUG,
'No event to handle')
680 Place the event payload onto the Clipboard
682 log =
Log(self.
log,
"populateClipboard");
683 log.log(Log.DEBUG,
'Python Pipeline populateClipboard');
686 clipboard = queue.element()
691 clipboard.put(eventTopic, inputParamPropertySetPtr)
696 get method for the runId
703 set method for the runId
709 return the default message importance threshold being used for
710 recording messages. The returned value reflects the threshold
711 associated with the default root (system-wide) logger (or what it will
712 be after logging is initialized). Some underlying components may
713 override this threshold.
714 @return int the threshold value as would be returned by
720 return Log.getDefaultLog().getThreshold()
724 set the default message importance threshold to be used for
725 recording messages. This will value be applied to the default
726 root (system-wide) logger (or what it will be after logging is
727 initialized) so that all software components are affected.
728 @param level the threshold level as expected by Log.setThreshold().
730 if self.
log is not None:
731 Log.getDefaultLog().setThreshold(level)
732 self.log.log(Log.INFO,
733 "Upating Root Log Message Threshold to %i" % level)
738 set the default directory into which the slice should write log files
739 @param logdir the directory in which log files should be written
741 if (logdir ==
"None" or logdir ==
None):
747 if appStagePolicy.getValueType(
"stagePolicy") == appStagePolicy.FILE:
748 pfile = os.path.splitext(os.path.basename(
749 appStagePolicy.getFile(
"stagePolicy").getPath()))[0]
750 return trailingpolicy.sub(
'', pfile)
763 trailingpolicy = re.compile(
r'_*(policy|dict)$', re.IGNORECASE)
a place to record messages and descriptions of the state of processing.
A LogRecord attached to a particular Log that suppports stream stream semantics.
a determination of the various directory roots that can be used by a pipeline.
Class for storing generic metadata.