24 from Queue
import Queue
29 A container for processing that handles one step in a pipeline. This
30 container will be realized either via the subclass, SerialProcessing
31 (when held by a Pipeline) or via the subclass, ParallelProcessing (when
34 This abstract class is implemented (via one of the subclasses,
35 SerialProcessing or ParallelProcessing) to apply a specific
36 science-oriented algorithm to data drawn from the queue.
39 def __init__(self, policy=None, log=None, eventBroker=None, sysdata=None,
42 initialize this stage. This will call setup() after all internal
43 attributes are initialized unless callSetup is False.
45 The sysdata is a dictionary intended to include data describing
46 the execution context to assume for this instance. This constructor
47 will recognize the following named items:
48 name a string name for display purposes that identifies
49 this stage does for the pipeline. If not provided,
50 a default name will be set.
51 stageId the integer identifier for this stage within the
52 sequence of stages that makes up a pipeline.
53 A value of -1 means that the stage is not part of
55 rank the integer identifier for the parallel thread that
56 hosts this instance. A value of -1 indicates the
57 master thread for the pipeline.
58 runId the string identifier for the production run that
59 this stage is a part of. If None, a default will
61 universeSize the total number of parallel threads
63 The dictionary may contain other arbitrary data. In general,
64 constructors for specific stage subclasses that provide application
65 algorithms will not be able to make use of this information without
66 special knowledge of the execution context.
68 @param policy the policy for configuring this stage. This may
69 be None if this stage does not require a policy
71 @param log the log object the stage instance should stage
72 should use. If not provided, a default will be
74 @param eventBroker the name of the host where the event broker is
75 running. If not provided, an eventBroker will
76 not be available to the stage.
77 @param sysdata a dictionary of data describing the execution
78 context. The stage uses this information to
79 set some of its internal data.
80 @param callSetup if true, the setup() function will be called as the
81 the last step in this constructor, after all data
82 is initialized. Default: True.
96 if sysdata.has_key(
"stageId"):
97 self.
stageId = sysdata[
"stageId"]
102 if sysdata.has_key(
"rank"):
103 self.
rank = sysdata[
"rank"]
108 if sysdata.has_key(
"runId"):
109 self.
runId = sysdata[
"runId"]
114 if sysdata.has_key(
"name"):
115 self.
name = sysdata[
"name"]
120 if sysdata.has_key(
"universeSize"):
141 setup the initial, internal state for this stage. This default
142 implementation does nothing; however, subclasses may override it.
143 It will be called during construction after this parent class's
144 attributes are initialized (unless the constructor is called with
151 called once when the stage is plugged into a pipeline by the manager
152 of the pipeine (either a Pipeline or Slice instance), this method
153 initializes the stage so that it is ready to process datasets. It
154 passes in the data queues it will use. Subclasses may override this
155 but should call the parent's implementation.
157 self.outputQueue = outQueue
158 self.inputQueue = inQueue
162 get the hostname where the event broker currently in use is located
180 set the MPI rank of the process running this stage
186 get the MPI rank of the process running this stage
192 set the MPI universe size
198 get the MPI universe size, an integer
204 return the name assigned to this stage.
210 discontinue processing and clean up and release resources.
215 class SerialProcessing(StageProcessing):
217 The container for the serial part of the processing that happens before
218 and after the parallel part. This processing will happen in the context
222 def __init__(self, policy=None, log=None, eventBroker=None,
223 sysdata=
None, callSetup=
True):
225 initialize this stage. This will call setup() after all internal
226 attributes are initialized unless callSetup is False.
228 @param policy the policy for configuring this stage. This may
229 be None if this stage does not require a policy
231 @param log the log object the stage instance should
232 should use. If not provided, a default will be
234 @param eventBroker the name of the host where the event broker is
235 running. If not provided, an eventBroker will
236 not be available to the stage.
237 @param sysdata a dictionary of data describing the execution
238 context. The stage uses this information to
239 set some of its internal data. See
240 StageProcessing documentation for datails.
241 @param callSetup if true, the setup() function will be called as the
242 the last step in this constructor, after all data
243 is initialized. Default: True.
245 StageProcessing.__init__(self, policy, log,eventBroker, sysdata,
False)
252 Apply the preprocess() function to data on the input queue.
253 Returned is an InputQueue that contains the clipboards processed
254 by this function and which should be passed onto applyPostprocess().
256 This implementation retrieves a single clipboard from the input
257 queue and preprocess it. While most subclasses will inherit this
258 implementation, some may override it to take more control over the
259 processing data from the input queue.
263 clipboard = self.inputQueue.element()
266 dummyClipboard = self.inputQueue.getNextDataset()
268 out.addDataset(clipboard)
273 execute the serial processing that should occur before the parallel
274 processing part of the stage on the data on the given clipboard.
276 @param clipboard the data to process, packaged as a Clipboard
278 raise RuntimeError(
"Not Implemented: preprocess()")
282 apply the postprocess() function to an ordered set of clipboards.
283 This implementation loops over the clipboards on the given InputQueue,
284 calls postprocess() on each one, and posts it to the stage outputQueue.
286 @param queue the InputQueue instance returned by applyPreprocess()
288 while queue.size() > 0:
291 clipboard = queue.element()
294 dummyClipboard = queue.getNextDataset()
295 self.outputQueue.addDataset(clipboard)
300 execute the serial processing that should happen after the parallel
301 processing part of the stage.
303 @param clipboard the data to process, packaged as a Clipboard
305 raise RuntimeError(
"Not Implemented: postprocess()")
310 a container class for the parallel processing part of a pipeline stage.
311 This processing will happen in the context of a Slice.
314 def __init__(self, policy=None, log=None, eventBroker=None,
315 sysdata=
None, callSetup=
True):
317 initialize this stage. This will call setup() after all internal
318 attributes are initialized unless callSetup is False.
320 @param policy the policy for configuring this stage. This may
321 be None if this stage does not require a policy
323 @param log the log object the stage instance should stage
324 should use. If not provided, a default will be
326 @param eventBroker the name of the host where the event broker is
327 running. If not provided, an eventBroker will
328 not be available to the stage.
329 @param sysdata a dictionary of data describing the execution
330 context. The stage uses this information to
331 set some of its internal data. See
332 StageProcessing documentation for datails.
333 @param callSetup if true, the setup() function will be called as the
334 the last step in this constructor, after all data
335 is initialized. Default: True.
337 StageProcessing.__init__(self, policy, log, eventBroker, sysdata,
False)
344 apply the process() function to data from the input queue. This
345 implementation will pull one clipboard from the input queue, call
346 process() on it, and post it to the output queue. While most
347 subclasses will inherit this default implementation, some may
348 override it to take more control over how much data to process.
352 clipboard = self.inputQueue.element()
355 dummyClipboard = self.inputQueue.getNextDataset()
356 self.outputQueue.addDataset(clipboard)
360 execute the parallel processing part of the stage within one thread
361 (Slice) of the pipeline.
363 raise RuntimeError(
"Not Implemented: process()")
368 A SerialProcessing subclass that provides no-op implementations
369 of preprocess() and postprocess().
371 The default SerialProcessing implementations normally through Runtime
377 execute the serial processing that should occur before the parallel
378 processing part of the stage on the data on the given clipboard.
380 @param clipboard the data to process, packaged as a Clipboard
386 execute the serial processing that should happen after the parallel
387 processing part of the stage.
389 @param clipboard the data to process, packaged as a Clipboard
396 class NoOpParallelProcessing(ParallelProcessing):
398 A ParallelProcessing subclass that provides a no-op implementation
401 The default ParallelProcessing implementation normally throws a Runtime
407 execute the parallel processing part of the stage within one thread
408 (Slice) of the pipeline.
410 @param clipboard the data to process, packaged as a Clipboard
416 a class that will create and initialize the StageProcessing classes that
419 There three intended ways to create a Stage instance. First is by
420 the makeStageFromPolicy() module function. Intended for use inside the
421 Pipeline-constructing classes (like Pipeline and Slice), this method
422 takes as input a policy that provides the names of the SerialProcessing
423 and ParallelProcessing classes that define the stage. Alternatively,
424 one can pass these names explicitly via the makeStage() module function.
425 The third way is by directly constructing a subclass of Stage. The
426 simplest way to create this subclass is as follows:
428 class MyStage(Stage):
429 serialClass = MySerialProcessing
430 parallelClass = MyParallelProcessing
432 If the stage does not have a parallel component, then the 'parallelClass'
433 variable does not need to be updated; likewise, for serial component.
436 serialClass = NoOpSerialProcessing
437 parallelClass = NoOpParallelProcessing
439 def __init__(self, policy, log=None, stageId=-1, eventBroker=None,
442 initialize this stage with the policy that defines the stage and
443 some contextual system data. Applications normally do not directly
444 call this constructor. Instead they either construct a Stage subclass
445 or create a Stage instance using makeStage() or makeStageFromPolicy().
447 @param policy the policy that will configure the SerialProcessing
448 and ParallelProcessing
449 @param log the log object the stage instance should
450 should use. If not provided, a default will be
452 @param eventBroker the name of the host where the event broker is
453 running. If not provided, an eventBroker will
454 not be available to the stage.
455 @param sysdata a dictionary of data describing the execution
456 context. The stage uses this information to
457 set some of its internal data. See
458 StageProcessing documentation for datails.
459 The name provided in the policy will override
460 the name in this dictionary
468 log =
Log(Log.getDefaultLog(),
"stage")
473 set the logger that should be assigned to the stage components when
480 set the instance of the event broker that the stage components should
487 override the current execution context data that will be passed to
488 the stage components when they are created with the given data.
489 Currently set data with names that are not included in the given
490 dictionary will not be affected.
494 for name
in sysdata.keys():
495 self.
sysdata[name] = sysdata[name]
499 return the currently set value for the execution context property or
500 None if it is not set.
501 @param name the name of the property
503 return self.sysdata.get(name,
None)
507 create a new SerialProcessing instance.
509 All of the parameters are optional. If not provided, they will be
510 set to the values provided by the constructor.
511 @param stageId the integer identifier for this stage within the
512 sequence of stages that makes up a pipeline.
514 @param log the log object the stage instance should stage
515 should use. If not provided, a default will be
517 @param extraSysdata a dictionary of execution context data whose items
518 will override those set via the Stage constructor
526 create a new ParallelProcessing instance.
528 All of the parameters are optional. If not provided, they will be
529 set to the values provided by the constructor (except for rank).
530 @param rank the integer identifier for the parallel thread
531 that hosts this instance. Default=-1.
532 indicating the master thread for the pipeline.
533 @param log the log object the stage instance should stage
534 should use. If not provided, a default will be
536 @param sysdata a dictionary of arbitrary contextual system that
537 the stage might need.
538 @param runId the string identifier for the production run that
539 this stage is a part of. If None, a default will
546 def _create(self, cls, rank, log, extraSysdata):
548 sysdata = self.sysdata.copy()
549 for k
in extraSysdata.keys():
550 sysdata[k] = extraSysdata[k]
551 sysdata[
"rank"] = rank
559 (modn, cln) = name.rsplit(
'.', 1)
560 mod = __import__(modn, globals(), locals(), [cln], -1)
561 stageClass = getattr(mod, cln)
567 create a Stage instance from a "stage definition policy". This policy
568 corresponds to the "appStage" parameter in a pipeline policy file that
569 defines a pipeline (described below). The resulting instance is used
570 as the factory for creating instances of SerialProcessing and
573 The stage definition policy (the "appStage" parameter) has the following
576 name a short name identifying the name (e.g. with logs)
577 serialClass a string containing the fully qualified
578 SerialProcessing class name. If the stage does not
579 have a serial component, this parameter is not
581 parallelClass a string containing the fully qualified
582 ParallelProcessing class name. If the stage does not
583 have a parallel component, this parameter is not
585 eventTopic the name of an event topic that the stage expects to
586 be received and placed on the clipboard. If no
587 event is expected, this parameter is not provided.
588 stagePolicy the policy that configures the stage and thus should
589 be passed to the SerialProcessing and
590 ParallelProcessing constructors.
592 @param stageDefPolicy the policy for defining this stage (see above).
593 @param log the log object the stage instance should stage
594 should use. If not provided, a default will be
595 used unless one is provided via setLog() on the
596 returned Stage instance.
597 @param eventBroker the name of the host where the event broker is
598 running. If not provided, an eventBroker will
599 not be available to the stage unless setEventBroker()
600 is called on the returned Stage instance.
601 @param sysdata a dictionary of data describing the execution
602 context. The stage uses this information to
603 set some of its internal data.
609 if not mysysdata.has_key(
"name"):
610 mysysdata = sysdata.copy()
611 if stageDefPolicy.exists(
"name"):
612 mysysdata[
"name"] = stageDefPolicy.get(
"name")
614 mysysdata[
"name"] =
"unknown stage"
616 stageConfigPolicy = stageDefPolicy.get(
"stagePolicy")
620 if stageDefPolicy.exists(
"serialClass"):
621 serialClass = stageDefPolicy.get(
"serialClass")
622 if stageDefPolicy.exists(
"parallelClass"):
623 parallelClass = stageDefPolicy.get(
"parallelClass")
625 return makeStage(stageConfigPolicy, serialClass, parallelClass,
626 log, eventBroker, mysysdata)
629 def makeStage(policy=None, serClsName=None, paraClsName=None, log=None,
630 eventBroker=
None, sysdata=
None):
632 create a Stage instance that is defined by a SerialProcessing class
633 and/or a ParallelProcessing class.
634 @param policy the stage configuration policy. This policy will be
635 passed to the SerialProcessing and ParallelProcessing
637 @param log the log object the stage instance should stage
638 should use. If not provided, a default will be
639 used unless one is provided via setLog() on the
640 returned Stage instance.
641 @param eventBroker the name of the host where the event broker is
642 running. If not provided, an eventBroker will
643 not be available to the stage unless setEventBroker()
644 is called on the returned Stage instance.
645 @param sysdata a dictionary of data describing the execution
646 context. The stage uses this information to
647 set some of its internal data.
649 out =
Stage(policy, log, eventBroker, sysdata)
653 if not issubclass(out.serialClass, SerialProcessing):
654 raise ValueError(
"Not a SerialProcessing subclass: " + serClsName)
657 if not issubclass(out.parallelClass, ParallelProcessing):
658 raise ValueError(
"Not a ParallelProcessing subclass: "+paraClsName)
def createSerialProcessing
a place to record messages and descriptions of the state of processing.
def createParallelProcessing