24 from functools 
import wraps, partial
 
   25 from contextlib 
import contextmanager
 
   27 import mpi4py.MPI 
as mpi
 
   32 __all__ = [
"Comm", 
"Pool", 
"startPool", 
"setBatchType", 
"getBatchType", 
"abortOnError", 
"NODE", ]
 
   34 NODE = 
"%s:%d" % (os.uname()[1], os.getpid())  
 
   38     """Unpickle an instance method 
   40     This has to be a named function rather than a lambda because 
   41     pickle needs to find it. 
   43     return getattr(obj, name)
 
   47     """Pickle an instance method 
   49     The instance method is divided into the object and the 
   53     name = method.__name__
 
   54     return unpickleInstanceMethod, (obj, name)
 
   57 copyreg.pickle(types.MethodType, pickleInstanceMethod)
 
   61     """Unpickle a function 
   63     This has to be a named function rather than a lambda because 
   64     pickle needs to find it. 
   67     module = importlib.import_module(moduleName)
 
   68     return getattr(module, funcName)
 
   74     This assumes that we can recreate the function object by grabbing 
   75     it from the proper module.  This may be violated if the function 
   76     is a lambda or in __main__.  In that case, I recommend recasting 
   77     the function as an object with a __call__ method. 
   79     Another problematic case may be a wrapped (e.g., decorated) method 
   80     in a class: the 'method' is then a function, and recreating it is 
   81     not as easy as we assume here. 
   83     moduleName = function.__module__
 
   84     funcName = function.__name__
 
   85     return unpickleFunction, (moduleName, funcName)
 
   88 copyreg.pickle(types.FunctionType, pickleFunction)
 
   93     _batchType = 
"unknown" 
   97     """Return a string giving the type of batch system in use""" 
  102     """Return a string giving the type of batch system in use""" 
  104     _batchType = batchType
 
  108     """Function decorator to throw an MPI abort on an unhandled exception""" 
  110     def wrapper(*args, **kwargs):
 
  112             return func(*args, **kwargs)
 
  113         except Exception 
as e:
 
  114             sys.stderr.write(
"%s on %s in %s: %s\n" % (
type(e).__name__, NODE, func.__name__, e))
 
  116             traceback.print_exc(file=sys.stderr)
 
  120                 mpi.COMM_WORLD.Abort(1)
 
  127     """Singleton to hold what's about to be pickled. 
  129     We hold onto the object in case there's trouble pickling, 
  130     so we can figure out what class in particular is causing 
  133     The held object is in the 'obj' attribute. 
  135     Here we use the __new__-style singleton pattern, because 
  136     we specifically want __init__ to be called each time. 
  149         """Hold onto new object""" 
  157         """Drop held object if there were no problems""" 
  163     """Try to guess what's not pickling after an exception 
  165     This tends to work if the problem is coming from the 
  166     regular pickle module.  If it's coming from the bowels 
  167     of mpi4py, there's not much that can be done. 
  170     excType, excValue, tb = sys.exc_info()
 
  181         return stack[-2].tb_frame.f_locals[
"obj"]
 
  188     """Context manager to sniff out pickle problems 
  190     If there's a pickle error, you're normally told what the problem 
  191     class is.  However, all SWIG objects are reported as "SwigPyObject". 
  192     In order to figure out which actual SWIG-ed class is causing 
  193     problems, we need to go digging. 
  197         with pickleSniffer(): 
  198             someOperationInvolvingPickle() 
  200     If 'abort' is True, will call MPI abort in the event of problems. 
  204     except Exception 
as e:
 
  205         if "SwigPyObject" not in str(e) 
or "pickle" not in str(e):
 
  210         sys.stderr.write(
"Pickling error detected: %s\n" % e)
 
  211         traceback.print_exc(file=sys.stderr)
 
  214         if obj 
is None and heldObj 
is not None:
 
  219                 pickle.dumps(heldObj)
 
  220                 sys.stderr.write(
"Hmmm, that's strange: no problem with pickling held object?!?!\n")
 
  224             sys.stderr.write(
"Unable to determine class causing pickle problems.\n")
 
  226             sys.stderr.write(
"Object that could not be pickled: %s\n" % obj)
 
  229                 mpi.COMM_WORLD.Abort(1)
 
  235     """Function decorator to catch errors in pickling and print something useful""" 
  237     def wrapper(*args, **kwargs):
 
  239             return func(*args, **kwargs)
 
  244     """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting. 
  246     As suggested by Lisandro Dalcin at: 
  247     * http://code.google.com/p/mpi4py/issues/detail?id=4 and 
  248     * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI 
  251     def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
 
  252         """!Construct an MPI.Comm wrapper 
  255         @param comm            MPI.Intracomm to wrap a duplicate of 
  256         @param recvSleep       Sleep time (seconds) for recv() 
  257         @param barrierSleep    Sleep time (seconds) for Barrier() 
  259         self = super(Comm, cls).
__new__(cls, comm.Dup())
 
  265     def recv(self, obj=None, source=0, tag=0, status=None):
 
  266         """Version of comm.recv() that doesn't busy-wait""" 
  268         while not self.Iprobe(source=source, tag=tag, status=sts):
 
  270         return super(Comm, self).
recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
 
  272     def send(self, obj=None, *args, **kwargs):
 
  274             return super(Comm, self).
send(obj, *args, **kwargs)
 
  276     def _checkBarrierComm(self):
 
  277         """Ensure the duplicate communicator is available""" 
  282         """Version of comm.Barrier() that doesn't busy-wait 
  284         A duplicate communicator is used so as not to interfere with the user's own communications. 
  293             dst = (rank + mask) % size
 
  294             src = (rank - mask + size) % size
 
  304             return super(Comm, self).bcast(value, root=root)
 
  307         """Scatter data across the nodes 
  309         The default version apparently pickles the entire 'dataList', 
  310         which can cause errors if the pickle size grows over 2^31 bytes 
  311         due to fundamental problems with pickle in python 2. Instead, 
  312         we send the data to each slave node in turn; this reduces the 
  315         @param dataList  List of data to distribute; one per node 
  317         @param root  Index of root node 
  318         @param tag  Message tag (integer) 
  319         @return  Data for this node 
  321         if self.Get_rank() == root:
 
  322             for rank, data 
in enumerate(dataList):
 
  325                 self.
send(data, rank, tag=tag)
 
  326             return dataList[root]
 
  328             return self.
recv(source=root, tag=tag)
 
  333         super(Comm, self).
Free()
 
  337     """Object to signal no operation""" 
  342     """Provides tag numbers by symbolic name in attributes""" 
  346         for i, name 
in enumerate(nameList, 1):
 
  347             setattr(self, name, i)
 
  350         return self.__class__.__name__ + repr(self.
_nameList)
 
  353         return self.__class__, tuple(self.
_nameList)
 
  357     """An object to hold stuff between different scatter calls 
  359     Includes a communicator by default, to allow intercommunication 
  364         super(Cache, self).
__init__(comm=comm)
 
  368     """!Metaclass to produce a singleton 
  370     Doing a singleton mixin without a metaclass (via __new__) is 
  371     annoying because the user has to name his __init__ something else 
  372     (otherwise it's called every time, which undoes any changes). 
  373     Using this metaclass, the class's __init__ is called exactly once. 
  375     Because this is a metaclass, note that: 
  376     * "self" here is the class 
  377     * "__init__" is making the class (it's like the body of the 
  379     * "__call__" is making an instance of the class (it's like 
  380       "__new__" in the class). 
  384         super(SingletonMeta, cls).
__init__(name, bases, dict_)
 
  394     """Debug logger singleton 
  396     Disabled by default; to enable, do: 'Debugger().enabled = True' 
  397     You can also redirect the output by changing the 'out' attribute. 
  404     def log(self, source, msg, *args):
 
  407         The 'args' are only stringified if we're enabled. 
  409         @param source: name of source 
  410         @param msg: message to write 
  411         @param args: additional outputs to append to message 
  414             self.
out.
write(
"%s: %s" % (source, msg))
 
  421     """Thread to do reduction of results 
  423     "A thread?", you say. "What about the python GIL?" 
  424     Well, because we 'sleep' when there's no immediate response from the 
  425     slaves, that gives the thread a chance to fire; and threads are easier 
  426     to manage (e.g., shared memory) than a process. 
  428     def __init__(self, reducer, initial=None, sleep=0.1):
 
  431         The 'reducer' should take two values and return a single 
  434         @param reducer  Function that does the reducing 
  435         @param initial  Initial value for reduction, or None 
  436         @param sleep  Time to sleep when there's nothing to do (sec) 
  438         threading.Thread.__init__(self, name=
"reducer")
 
  440         self.
_lock = threading.Lock()  
 
  444         self.
_done = threading.Event()  
 
  447         """Do the actual work 
  449         We pull the data out of the queue and release the lock before 
  450         operating on it. This stops us from blocking the addition of 
  451         new data to the queue. 
  462         Thread entry point, called by Thread.start 
  471         """Add data to the queue to be reduced""" 
  476         """Complete the thread 
  478         Unlike Thread.join (which always returns 'None'), we return the result 
  482         threading.Thread.join(self)
 
  487     """Node in MPI process pool 
  489     WARNING: You should not let a pool instance hang around at program 
  490     termination, as the garbage collection behaves differently, and may 
  491     cause a segmentation fault (signal 11). 
  506     def _getCache(self, context, index):
 
  507         """Retrieve cache for particular data 
  509         The cache is updated with the contents of the store. 
  511         if context 
not in self.
_cache:
 
  513         if context 
not in self.
_store:
 
  515         cache = self.
_cache[context]
 
  516         store = self.
_store[context]
 
  517         if index 
not in cache:
 
  519         cache[index].__dict__.update(store)
 
  522     def log(self, msg, *args):
 
  523         """Log a debugging message""" 
  529     def _processQueue(self, context, func, queue, *args, **kwargs):
 
  530         """!Process a queue of data 
  532         The queue consists of a list of (index, data) tuples, 
  533         where the index maps to the cache, and the data is 
  534         passed to the 'func'. 
  536         The 'func' signature should be func(cache, data, *args, **kwargs) 
  537         if 'context' is non-None; otherwise func(data, *args, **kwargs). 
  539         @param context: Namespace for cache; None to not use cache 
  540         @param func: function for slaves to run 
  541         @param queue: List of (index,data) tuples to process 
  542         @param args: Constant arguments 
  543         @param kwargs: Keyword arguments 
  544         @return list of results from applying 'func' to dataList 
  546         return self.
_reduceQueue(context, 
None, func, queue, *args, **kwargs)
 
  548     def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
 
  549         """!Reduce a queue of data 
  551         The queue consists of a list of (index, data) tuples, 
  552         where the index maps to the cache, and the data is 
  553         passed to the 'func', the output of which is reduced 
  554         using the 'reducer' (if non-None). 
  556         The 'func' signature should be func(cache, data, *args, **kwargs) 
  557         if 'context' is non-None; otherwise func(data, *args, **kwargs). 
  559         The 'reducer' signature should be reducer(old, new). If the 'reducer' 
  560         is None, then we will return the full list of results 
  562         @param context: Namespace for cache; None to not use cache 
  563         @param reducer: function for master to run to reduce slave results; or None 
  564         @param func: function for slaves to run 
  565         @param queue: List of (index,data) tuples to process 
  566         @param args: Constant arguments 
  567         @param kwargs: Keyword arguments 
  568         @return reduced result (if reducer is non-None) or list of results 
  569             from applying 'func' to dataList 
  571         if context 
is not None:
 
  572             resultList = [func(self.
_getCache(context, i), data, *args, **kwargs) 
for i, data 
in queue]
 
  574             resultList = [func(data, *args, **kwargs) 
for i, data 
in queue]
 
  577         if len(resultList) == 0:
 
  579         output = resultList.pop(0)
 
  580         for result 
in resultList:
 
  581             output = reducer(output, result)
 
  585         """Set values in store for a particular context""" 
  586         self.
log(
"storing", context, kwargs)
 
  587         if context 
not in self.
_store:
 
  589         for name, value 
in kwargs.items():
 
  590             self.
_store[context][name] = value
 
  593         """Delete value in store for a particular context""" 
  594         self.
log(
"deleting from store", context, nameList)
 
  595         if context 
not in self.
_store:
 
  596             raise KeyError(
"No such context: %s" % context)
 
  597         for name 
in nameList:
 
  598             del self.
_store[context][name]
 
  601         """Clear stored data for a particular context""" 
  602         self.
log(
"clearing store", context)
 
  603         if context 
not in self.
_store:
 
  604             raise KeyError(
"No such context: %s" % context)
 
  608         """Reset cache for a particular context""" 
  609         self.
log(
"clearing cache", context)
 
  610         if context 
not in self.
_cache:
 
  615         """List contents of cache""" 
  616         cache = self.
_cache[context] 
if context 
in self.
_cache else {}
 
  617         sys.stderr.write(
"Cache on %s (%s): %s\n" % (self.
node, context, cache))
 
  620         """List contents of store for a particular context""" 
  621         if context 
not in self.
_store:
 
  622             raise KeyError(
"No such context: %s" % context)
 
  623         sys.stderr.write(
"Store on %s (%s): %s\n" % (self.
node, context, self.
_store[context]))
 
  627     """Master node instance of MPI process pool 
  629     Only the master node should instantiate this. 
  631     WARNING: You should not let a pool instance hang around at program 
  632     termination, as the garbage collection behaves differently, and may 
  633     cause a segmentation fault (signal 11). 
  637         super(PoolMaster, self).
__init__(*args, **kwargs)
 
  638         assert self.
root == self.
rank, 
"This is the master node" 
  641         """Ensure slaves exit when we're done""" 
  644     def log(self, msg, *args):
 
  645         """Log a debugging message""" 
  649         """Send command to slaves 
  651         A command is the name of the PoolSlave method they should run. 
  653         self.
log(
"command", cmd)
 
  654         self.
comm.broadcast(cmd, root=self.
root)
 
  656     def map(self, context, func, dataList, *args, **kwargs):
 
  657         """!Scatter work to slaves and gather the results 
  659         Work is distributed dynamically, so that slaves that finish 
  660         quickly will receive more work. 
  662         Each slave applies the function to the data they're provided. 
  663         The slaves may optionally be passed a cache instance, which 
  664         they can use to store data for subsequent executions (to ensure 
  665         subsequent data is distributed in the same pattern as before, 
  666         use the 'mapToPrevious' method).  The cache also contains 
  667         data that has been stored on the slaves. 
  669         The 'func' signature should be func(cache, data, *args, **kwargs) 
  670         if 'context' is non-None; otherwise func(data, *args, **kwargs). 
  672         @param context: Namespace for cache 
  673         @param func: function for slaves to run; must be picklable 
  674         @param dataList: List of data to distribute to slaves; must be picklable 
  675         @param args: List of constant arguments 
  676         @param kwargs: Dict of constant arguments 
  677         @return list of results from applying 'func' to dataList 
  679         return self.
reduce(context, 
None, func, dataList, *args, **kwargs)
 
  683     def reduce(self, context, reducer, func, dataList, *args, **kwargs):
 
  684         """!Scatter work to slaves and reduce the results 
  686         Work is distributed dynamically, so that slaves that finish 
  687         quickly will receive more work. 
  689         Each slave applies the function to the data they're provided. 
  690         The slaves may optionally be passed a cache instance, which 
  691         they can use to store data for subsequent executions (to ensure 
  692         subsequent data is distributed in the same pattern as before, 
  693         use the 'mapToPrevious' method).  The cache also contains 
  694         data that has been stored on the slaves. 
  696         The 'func' signature should be func(cache, data, *args, **kwargs) 
  697         if 'context' is non-None; otherwise func(data, *args, **kwargs). 
  699         The 'reducer' signature should be reducer(old, new). If the 'reducer' 
  700         is None, then we will return the full list of results 
  702         @param context: Namespace for cache 
  703         @param reducer: function for master to run to reduce slave results; or None 
  704         @param func: function for slaves to run; must be picklable 
  705         @param dataList: List of data to distribute to slaves; must be picklable 
  706         @param args: List of constant arguments 
  707         @param kwargs: Dict of constant arguments 
  708         @return reduced result (if reducer is non-None) or list of results 
  709             from applying 'func' to dataList 
  711         tags = 
Tags(
"request", 
"work")
 
  718             return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
 
  724         self.
comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.
root)
 
  727         queue = 
list(zip(range(num), dataList))  
 
  728         output = [
None]*num 
if reducer 
is None else None 
  729         initial = [
None if i == self.
rank else queue.pop(0) 
if queue 
else NoOp() 
for 
  730                    i 
in range(self.
size)]
 
  731         pending = 
min(num, self.
size - 1)
 
  732         self.
log(
"scatter initial jobs")
 
  733         self.
comm.scatter(initial, root=self.
rank)
 
  735         while queue 
or pending > 0:
 
  736             status = mpi.Status()
 
  737             report = self.
comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
 
  738             source = status.source
 
  739             self.
log(
"gather from slave", source)
 
  741                 index, result = report
 
  742                 output[index] = result
 
  746                 self.
log(
"send job to slave", job[0], source)
 
  750             self.
comm.send(job, source, tag=tags.work)
 
  752         if reducer 
is not None:
 
  753             results = self.
comm.gather(
None, root=self.
root)
 
  755             for rank 
in range(self.
size):
 
  756                 if rank == self.
root:
 
  758                 output = reducer(output, results[rank]) 
if output 
is not None else results[rank]
 
  764         """!Scatter work to slaves and gather the results 
  766         Work is distributed statically, so there is no load balancing. 
  768         Each slave applies the function to the data they're provided. 
  769         The slaves may optionally be passed a cache instance, which 
  770         they can store data in for subsequent executions (to ensure 
  771         subsequent data is distributed in the same pattern as before, 
  772         use the 'mapToPrevious' method).  The cache also contains 
  773         data that has been stored on the slaves. 
  775         The 'func' signature should be func(cache, data, *args, **kwargs) 
  776         if 'context' is true; otherwise func(data, *args, **kwargs). 
  778         @param context: Namespace for cache 
  779         @param func: function for slaves to run; must be picklable 
  780         @param dataList: List of data to distribute to slaves; must be picklable 
  781         @param args: List of constant arguments 
  782         @param kwargs: Dict of constant arguments 
  783         @return list of results from applying 'func' to dataList 
  785         return self.
reduceNoBalance(context, 
None, func, dataList, *args, **kwargs)
 
  790         """!Scatter work to slaves and reduce the results 
  792         Work is distributed statically, so there is no load balancing. 
  794         Each slave applies the function to the data they're provided. 
  795         The slaves may optionally be passed a cache instance, which 
  796         they can store data in for subsequent executions (to ensure 
  797         subsequent data is distributed in the same pattern as before, 
  798         use the 'mapToPrevious' method).  The cache also contains 
  799         data that has been stored on the slaves. 
  801         The 'func' signature should be func(cache, data, *args, **kwargs) 
  802         if 'context' is true; otherwise func(data, *args, **kwargs). 
  804         The 'reducer' signature should be reducer(old, new). If the 'reducer' 
  805         is None, then we will return the full list of results 
  807         @param context: Namespace for cache 
  808         @param reducer: function for master to run to reduce slave results; or None 
  809         @param func: function for slaves to run; must be picklable 
  810         @param dataList: List of data to distribute to slaves; must be picklable 
  811         @param args: List of constant arguments 
  812         @param kwargs: Dict of constant arguments 
  813         @return reduced result (if reducer is non-None) or list of results 
  814             from applying 'func' to dataList 
  816         tags = 
Tags(
"result", 
"work")
 
  818         if self.
size == 1 
or num <= 1:
 
  819             return self.
_reduceQueue(context, reducer, func, 
list(zip(range(num), dataList)), *args, **kwargs)
 
  825         self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
 
  829         queue = 
list(zip(range(num), dataList))  
 
  831             distribution = [[queue[i]] 
for i 
in range(num)]
 
  832             distribution.insert(self.
rank, [])
 
  833             for i 
in range(num, self.
size - 1):
 
  834                 distribution.append([])
 
  835         elif num % self.
size == 0:
 
  836             numEach = num//self.
size 
  837             distribution = [queue[i*numEach:(i+1)*numEach] 
for i 
in range(self.
size)]
 
  839             numEach = num//self.
size 
  840             distribution = [queue[i*numEach:(i+1)*numEach] 
for i 
in range(self.
size)]
 
  841             for i 
in range(numEach*self.
size, num):
 
  842                 distribution[(self.
rank + 1) % self.
size].append
 
  843             distribution = 
list([] 
for i 
in range(self.
size))
 
  844             for i, job 
in enumerate(queue, self.
rank + 1):
 
  848         for source 
in range(self.
size):
 
  849             if source == self.
rank:
 
  851             self.
log(
"send jobs to ", source)
 
  852             self.
comm.send(distribution[source], source, tag=tags.work)
 
  855         output = [
None]*num 
if reducer 
is None else None 
  857         def ingestResults(output, nodeResults, distList):
 
  859                 for i, result 
in enumerate(nodeResults):
 
  860                     index = distList[i][0]
 
  861                     output[index] = result
 
  864                 output = nodeResults.pop(0)
 
  865             for result 
in nodeResults:
 
  866                 output = reducer(output, result)
 
  869         ourResults = self.
_processQueue(context, func, distribution[self.
rank], *args, **kwargs)
 
  870         output = ingestResults(output, ourResults, distribution[self.
rank])
 
  873         pending = self.
size - 1
 
  875             status = mpi.Status()
 
  876             slaveResults = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
 
  877             source = status.source
 
  878             self.
log(
"gather from slave", source)
 
  879             output = ingestResults(output, slaveResults, distribution[source])
 
  886         """!Scatter work to the same target as before 
  888         Work is distributed so that each slave handles the same 
  889         indices in the dataList as when 'map' was called. 
  890         This allows the right data to go to the right cache. 
  892         It is assumed that the dataList is the same length as when it was 
  895         The 'func' signature should be func(cache, data, *args, **kwargs). 
  897         @param context: Namespace for cache 
  898         @param func: function for slaves to run; must be picklable 
  899         @param dataList: List of data to distribute to slaves; must be picklable 
  900         @param args: List of constant arguments 
  901         @param kwargs: Dict of constant arguments 
  902         @return list of results from applying 'func' to dataList 
  904         return self.
reduceToPrevious(context, 
None, func, dataList, *args, **kwargs)
 
  909         """!Reduction where work goes to the same target as before 
  911         Work is distributed so that each slave handles the same 
  912         indices in the dataList as when 'map' was called. 
  913         This allows the right data to go to the right cache. 
  915         It is assumed that the dataList is the same length as when it was 
  918         The 'func' signature should be func(cache, data, *args, **kwargs). 
  920         The 'reducer' signature should be reducer(old, new). If the 'reducer' 
  921         is None, then we will return the full list of results 
  923         @param context: Namespace for cache 
  924         @param reducer: function for master to run to reduce slave results; or None 
  925         @param func: function for slaves to run; must be picklable 
  926         @param dataList: List of data to distribute to slaves; must be picklable 
  927         @param args: List of constant arguments 
  928         @param kwargs: Dict of constant arguments 
  929         @return reduced result (if reducer is non-None) or list of results 
  930             from applying 'func' to dataList 
  933             raise ValueError(
"context must be set to map to same nodes as previous context")
 
  934         tags = 
Tags(
"result", 
"work")
 
  936         if self.
size == 1 
or num <= 1:
 
  938             return self.
_reduceQueue(context, reducer, func, 
list(zip(range(num), dataList)), *args, **kwargs)
 
  941             return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
 
  947         self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
 
  949         requestList = self.
comm.gather(
None, root=self.
root)
 
  950         self.
log(
"listen", requestList)
 
  951         initial = [dataList[index] 
if (index 
is not None and index >= 0) 
else None for index 
in requestList]
 
  952         self.
log(
"scatter jobs", initial)
 
  953         self.
comm.scatter(initial, root=self.
root)
 
  954         pending = 
min(num, self.
size - 1)
 
  963             status = mpi.Status()
 
  964             index, result, nextIndex = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
 
  965             source = status.source
 
  966             self.
log(
"gather from slave", source)
 
  968                 output[index] = result
 
  973                 job = dataList[nextIndex]
 
  974                 self.
log(
"send job to slave", source)
 
  975                 self.
comm.send(job, source, tag=tags.work)
 
  979             self.
log(
"waiting on", pending)
 
  981         if reducer 
is not None:
 
  982             output = thread.join()
 
  990         """!Store data on slave for a particular context 
  992         The data is made available to functions through the cache. The 
  993         stored data differs from the cache in that it is identical for 
  994         all operations, whereas the cache is specific to the data being 
  997         @param context: namespace for store 
  998         @param kwargs: dict of name=value pairs 
 1000         super(PoolMaster, self).
storeSet(context, **kwargs)
 
 1002         self.
log(
"give data")
 
 1003         self.
comm.broadcast((context, kwargs), root=self.
root)
 
 1008         """Delete stored data on slave for a particular context""" 
 1009         super(PoolMaster, self).
storeDel(context, *nameList)
 
 1011         self.
log(
"tell names")
 
 1012         self.
comm.broadcast((context, nameList), root=self.
root)
 
 1017         """Reset data store for a particular context on master and slaves""" 
 1020         self.
comm.broadcast(context, root=self.
root)
 
 1024         """Reset cache for a particular context on master and slaves""" 
 1027         self.
comm.broadcast(context, root=self.
root)
 
 1031         """List cache contents for a particular context on master and slaves""" 
 1032         super(PoolMaster, self).
cacheList(context)
 
 1034         self.
comm.broadcast(context, root=self.
root)
 
 1038         """List store contents for a particular context on master and slaves""" 
 1039         super(PoolMaster, self).
storeList(context)
 
 1041         self.
comm.broadcast(context, root=self.
root)
 
 1044         """Command slaves to exit""" 
 1049     """Slave node instance of MPI process pool""" 
 1052         """Log a debugging message""" 
 1053         assert self.
rank != self.
root, 
"This is not the master node." 
 1058         """Serve commands of master node 
 1060         Slave accepts commands, which are the names of methods to execute. 
 1061         This exits when a command returns a true value. 
 1063         menu = dict((cmd, getattr(self, cmd)) 
for cmd 
in (
"reduce", 
"mapNoBalance", 
"mapToPrevious",
 
 1064                                                           "storeSet", 
"storeDel", 
"storeClear", 
"storeList",
 
 1065                                                           "cacheList", 
"cacheClear", 
"exit",))
 
 1066         self.
log(
"waiting for command from", self.
root)
 
 1067         command = self.
comm.broadcast(
None, root=self.
root)
 
 1068         self.
log(
"command", command)
 
 1069         while not menu[command]():
 
 1070             self.
log(
"waiting for command from", self.
root)
 
 1071             command = self.
comm.broadcast(
None, root=self.
root)
 
 1072             self.
log(
"command", command)
 
 1077         """Reduce scattered data and return results""" 
 1078         self.
log(
"waiting for instruction")
 
 1079         tags, func, reducer, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
 
 1080         self.
log(
"waiting for job")
 
 1081         job = self.
comm.scatter(
None, root=self.
root)
 
 1084         while not isinstance(job, NoOp):
 
 1086             self.
log(
"running job")
 
 1087             result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
 
 1089                 report = (index, result)
 
 1092                 out = reducer(out, result) 
if out 
is not None else result
 
 1093             self.
comm.send(report, self.
root, tag=tags.request)
 
 1094             self.
log(
"waiting for job")
 
 1095             job = self.
comm.recv(tag=tags.work, source=self.
root)
 
 1097         if reducer 
is not None:
 
 1098             self.
comm.gather(out, root=self.
root)
 
 1103         """Process bulk scattered data and return results""" 
 1104         self.
log(
"waiting for instruction")
 
 1105         tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
 
 1106         self.
log(
"waiting for job")
 
 1107         queue = self.
comm.recv(tag=tags.work, source=self.
root)
 
 1110         for index, data 
in queue:
 
 1111             self.
log(
"running job", index)
 
 1112             result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
 
 1113             resultList.append(result)
 
 1115         self.
comm.send(resultList, self.
root, tag=tags.result)
 
 1120         """Process the same scattered data processed previously""" 
 1121         self.
log(
"waiting for instruction")
 
 1122         tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
 
 1124         index = queue.pop(0) 
if queue 
else -1
 
 1125         self.
log(
"request job", index)
 
 1126         self.
comm.gather(index, root=self.
root)
 
 1127         self.
log(
"waiting for job")
 
 1128         data = self.
comm.scatter(
None, root=self.
root)
 
 1131             self.
log(
"running job")
 
 1132             result = func(self.
_getCache(context, index), data, *args, **kwargs)
 
 1133             self.
log(
"pending", queue)
 
 1134             nextIndex = queue.pop(0) 
if queue 
else -1
 
 1135             self.
comm.send((index, result, nextIndex), self.
root, tag=tags.result)
 
 1138                 data = self.
comm.recv(tag=tags.work, source=self.
root)
 
 1143         """Set value in store""" 
 1144         context, kwargs = self.
comm.broadcast(
None, root=self.
root)
 
 1145         super(PoolSlave, self).
storeSet(context, **kwargs)
 
 1148         """Delete value in store""" 
 1149         context, nameList = self.
comm.broadcast(
None, root=self.
root)
 
 1150         super(PoolSlave, self).
storeDel(context, *nameList)
 
 1153         """Reset data store""" 
 1154         context = self.
comm.broadcast(
None, root=self.
root)
 
 1159         context = self.
comm.broadcast(
None, root=self.
root)
 
 1163         """List cache contents""" 
 1164         context = self.
comm.broadcast(
None, root=self.
root)
 
 1165         super(PoolSlave, self).
cacheList(context)
 
 1168         """List store contents""" 
 1169         context = self.
comm.broadcast(
None, root=self.
root)
 
 1170         super(PoolSlave, self).
storeList(context)
 
 1173         """Allow exit from loop in 'run'""" 
 1178     """Metaclass for PoolWrapper to add methods pointing to PoolMaster 
 1180     The 'context' is automatically supplied to these methods as the first argument. 
 1184         instance = super(PoolWrapperMeta, cls).
__call__(context)
 
 1186         for name 
in (
"map", 
"mapNoBalance", 
"mapToPrevious",
 
 1187                      "reduce", 
"reduceNoBalance", 
"reduceToPrevious",
 
 1188                      "storeSet", 
"storeDel", 
"storeClear", 
"storeList",
 
 1189                      "cacheList", 
"cacheClear",):
 
 1190             setattr(instance, name, partial(getattr(pool, name), context))
 
 1195     """Wrap PoolMaster to automatically provide context""" 
 1198         self.
_pool = PoolMaster._instance
 
 1202         return getattr(self.
_pool, name)
 
 1208     Use this class to automatically provide 'context' to 
 1209     the PoolMaster class.  If you want to call functions 
 1210     that don't take a 'cache' object, use the PoolMaster 
 1211     class directly, and specify context=None. 
 1217     """!Start a process pool. 
 1219     Returns a PoolMaster object for the master node. 
 1220     Slave nodes are run and then optionally killed. 
 1222     If you elect not to kill the slaves, note that they 
 1223     will emerge at the point this function was called, 
 1224     which is likely very different from the point the 
 1225     master is at, so it will likely be necessary to put 
 1226     in some rank dependent code (e.g., look at the 'rank' 
 1227     attribute of the returned pools). 
 1229     Note that the pool objects should be deleted (either 
 1230     by going out of scope or explicit 'del') before program 
 1231     termination to avoid a segmentation fault. 
 1233     @param comm: MPI communicator 
 1234     @param root: Rank of root/master node 
 1235     @param killSlaves: Kill slaves on completion? 
 1239     if comm.rank == root:
 
 1240         return PoolMaster(comm, root=root)
 
 1241     slave = PoolSlave(comm, root=root)