24 from functools 
import wraps, partial
    25 from contextlib 
import contextmanager
    27 import mpi4py.MPI 
as mpi
    30 from future.utils 
import with_metaclass
    33 __all__ = [
"Comm", 
"Pool", 
"startPool", 
"setBatchType", 
"getBatchType", 
"abortOnError", 
"NODE", ]
    35 NODE = 
"%s:%d" % (os.uname()[1], os.getpid())  
    39     """Unpickle an instance method    41     This has to be a named function rather than a lambda because    42     pickle needs to find it.    44     return getattr(obj, name)
    48     """Pickle an instance method    50     The instance method is divided into the object and the    54     name = method.__name__
    55     return unpickleInstanceMethod, (obj, name)
    58 copyreg.pickle(types.MethodType, pickleInstanceMethod)
    62     """Unpickle a function    64     This has to be a named function rather than a lambda because    65     pickle needs to find it.    68     module = importlib.import_module(moduleName)
    69     return getattr(module, funcName)
    75     This assumes that we can recreate the function object by grabbing    76     it from the proper module.  This may be violated if the function    77     is a lambda or in __main__.  In that case, I recommend recasting    78     the function as an object with a __call__ method.    80     Another problematic case may be a wrapped (e.g., decorated) method    81     in a class: the 'method' is then a function, and recreating it is    82     not as easy as we assume here.    84     moduleName = function.__module__
    85     funcName = function.__name__
    86     return unpickleFunction, (moduleName, funcName)
    89 copyreg.pickle(types.FunctionType, pickleFunction)
    94     _batchType = 
"unknown"    98     """Return a string giving the type of batch system in use"""   103     """Return a string giving the type of batch system in use"""   105     _batchType = batchType
   109     """Function decorator to throw an MPI abort on an unhandled exception"""   111     def wrapper(*args, **kwargs):
   113             return func(*args, **kwargs)
   114         except Exception 
as e:
   115             sys.stderr.write(
"%s on %s in %s: %s\n" % (
type(e).__name__, NODE, func.__name__, e))
   117             traceback.print_exc(file=sys.stderr)
   121                 mpi.COMM_WORLD.Abort(1)
   128     """Singleton to hold what's about to be pickled.   130     We hold onto the object in case there's trouble pickling,   131     so we can figure out what class in particular is causing   134     The held object is in the 'obj' attribute.   136     Here we use the __new__-style singleton pattern, because   137     we specifically want __init__ to be called each time.   150         """Hold onto new object"""   158         """Drop held object if there were no problems"""   164     """Try to guess what's not pickling after an exception   166     This tends to work if the problem is coming from the   167     regular pickle module.  If it's coming from the bowels   168     of mpi4py, there's not much that can be done.   171     excType, excValue, tb = sys.exc_info()
   182         return stack[-2].tb_frame.f_locals[
"obj"]
   189     """Context manager to sniff out pickle problems   191     If there's a pickle error, you're normally told what the problem   192     class is.  However, all SWIG objects are reported as "SwigPyObject".   193     In order to figure out which actual SWIG-ed class is causing   194     problems, we need to go digging.   198         with pickleSniffer():   199             someOperationInvolvingPickle()   201     If 'abort' is True, will call MPI abort in the event of problems.   205     except Exception 
as e:
   206         if "SwigPyObject" not in str(e) 
or "pickle" not in str(e):
   211         sys.stderr.write(
"Pickling error detected: %s\n" % e)
   212         traceback.print_exc(file=sys.stderr)
   215         if obj 
is None and heldObj 
is not None:
   220                 pickle.dumps(heldObj)
   221                 sys.stderr.write(
"Hmmm, that's strange: no problem with pickling held object?!?!\n")
   225             sys.stderr.write(
"Unable to determine class causing pickle problems.\n")
   227             sys.stderr.write(
"Object that could not be pickled: %s\n" % obj)
   230                 mpi.COMM_WORLD.Abort(1)
   236     """Function decorator to catch errors in pickling and print something useful"""   238     def wrapper(*args, **kwargs):
   240             return func(*args, **kwargs)
   245     """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting.   247     As suggested by Lisandro Dalcin at:   248     * http://code.google.com/p/mpi4py/issues/detail?id=4 and   249     * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI   252     def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
   253         """!Construct an MPI.Comm wrapper   256         @param comm            MPI.Intracomm to wrap a duplicate of   257         @param recvSleep       Sleep time (seconds) for recv()   258         @param barrierSleep    Sleep time (seconds) for Barrier()   260         self = super(Comm, cls).
__new__(cls, comm.Dup())
   266     def recv(self, obj=None, source=0, tag=0, status=None):
   267         """Version of comm.recv() that doesn't busy-wait"""   269         while not self.Iprobe(source=source, tag=tag, status=sts):
   271         return super(Comm, self).
recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
   273     def send(self, obj=None, *args, **kwargs):
   275             return super(Comm, self).
send(obj, *args, **kwargs)
   277     def _checkBarrierComm(self):
   278         """Ensure the duplicate communicator is available"""   283         """Version of comm.Barrier() that doesn't busy-wait   285         A duplicate communicator is used so as not to interfere with the user's own communications.   294             dst = (rank + mask) % size
   295             src = (rank - mask + size) % size
   305             return super(Comm, self).bcast(value, root=root)
   308         """Scatter data across the nodes   310         The default version apparently pickles the entire 'dataList',   311         which can cause errors if the pickle size grows over 2^31 bytes   312         due to fundamental problems with pickle in python 2. Instead,   313         we send the data to each slave node in turn; this reduces the   316         @param dataList  List of data to distribute; one per node   318         @param root  Index of root node   319         @param tag  Message tag (integer)   320         @return  Data for this node   322         if self.Get_rank() == root:
   323             for rank, data 
in enumerate(dataList):
   326                 self.
send(data, rank, tag=tag)
   327             return dataList[root]
   329             return self.
recv(source=root, tag=tag)
   334         super(Comm, self).
Free()
   338     """Object to signal no operation"""   343     """Provides tag numbers by symbolic name in attributes"""   347         for i, name 
in enumerate(nameList, 1):
   348             setattr(self, name, i)
   351         return self.__class__.__name__ + repr(self.
_nameList)
   354         return self.__class__, tuple(self.
_nameList)
   358     """An object to hold stuff between different scatter calls   360     Includes a communicator by default, to allow intercommunication   365         super(Cache, self).
__init__(comm=comm)
   369     """!Metaclass to produce a singleton   371     Doing a singleton mixin without a metaclass (via __new__) is   372     annoying because the user has to name his __init__ something else   373     (otherwise it's called every time, which undoes any changes).   374     Using this metaclass, the class's __init__ is called exactly once.   376     Because this is a metaclass, note that:   377     * "self" here is the class   378     * "__init__" is making the class (it's like the body of the   380     * "__call__" is making an instance of the class (it's like   381       "__new__" in the class).   385         super(SingletonMeta, cls).
__init__(name, bases, dict_)
   395     """Debug logger singleton   397     Disabled by default; to enable, do: 'Debugger().enabled = True'   398     You can also redirect the output by changing the 'out' attribute.   405     def log(self, source, msg, *args):
   408         The 'args' are only stringified if we're enabled.   410         @param source: name of source   411         @param msg: message to write   412         @param args: additional outputs to append to message   415             self.
out.write(
"%s: %s" % (source, msg))
   417                 self.
out.write(
" %s" % arg)
   422     """Thread to do reduction of results   424     "A thread?", you say. "What about the python GIL?"   425     Well, because we 'sleep' when there's no immediate response from the   426     slaves, that gives the thread a chance to fire; and threads are easier   427     to manage (e.g., shared memory) than a process.   429     def __init__(self, reducer, initial=None, sleep=0.1):
   432         The 'reducer' should take two values and return a single   435         @param reducer  Function that does the reducing   436         @param initial  Initial value for reduction, or None   437         @param sleep  Time to sleep when there's nothing to do (sec)   439         threading.Thread.__init__(self, name=
"reducer")
   441         self.
_lock = threading.Lock()  
   445         self.
_done = threading.Event()  
   448         """Do the actual work   450         We pull the data out of the queue and release the lock before   451         operating on it. This stops us from blocking the addition of   452         new data to the queue.   463         Thread entry point, called by Thread.start   472         """Add data to the queue to be reduced"""   477         """Complete the thread   479         Unlike Thread.join (which always returns 'None'), we return the result   483         threading.Thread.join(self)
   488     """Node in MPI process pool   490     WARNING: You should not let a pool instance hang around at program   491     termination, as the garbage collection behaves differently, and may   492     cause a segmentation fault (signal 11).   507     def _getCache(self, context, index):
   508         """Retrieve cache for particular data   510         The cache is updated with the contents of the store.   512         if context 
not in self.
_cache:
   514         if context 
not in self.
_store:
   516         cache = self.
_cache[context]
   517         store = self.
_store[context]
   518         if index 
not in cache:
   520         cache[index].__dict__.update(store)
   523     def log(self, msg, *args):
   524         """Log a debugging message"""   530     def _processQueue(self, context, func, queue, *args, **kwargs):
   531         """!Process a queue of data   533         The queue consists of a list of (index, data) tuples,   534         where the index maps to the cache, and the data is   535         passed to the 'func'.   537         The 'func' signature should be func(cache, data, *args, **kwargs)   538         if 'context' is non-None; otherwise func(data, *args, **kwargs).   540         @param context: Namespace for cache; None to not use cache   541         @param func: function for slaves to run   542         @param queue: List of (index,data) tuples to process   543         @param args: Constant arguments   544         @param kwargs: Keyword arguments   545         @return list of results from applying 'func' to dataList   547         return self.
_reduceQueue(context, 
None, func, queue, *args, **kwargs)
   549     def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
   550         """!Reduce a queue of data   552         The queue consists of a list of (index, data) tuples,   553         where the index maps to the cache, and the data is   554         passed to the 'func', the output of which is reduced   555         using the 'reducer' (if non-None).   557         The 'func' signature should be func(cache, data, *args, **kwargs)   558         if 'context' is non-None; otherwise func(data, *args, **kwargs).   560         The 'reducer' signature should be reducer(old, new). If the 'reducer'   561         is None, then we will return the full list of results   563         @param context: Namespace for cache; None to not use cache   564         @param reducer: function for master to run to reduce slave results; or None   565         @param func: function for slaves to run   566         @param queue: List of (index,data) tuples to process   567         @param args: Constant arguments   568         @param kwargs: Keyword arguments   569         @return reduced result (if reducer is non-None) or list of results   570             from applying 'func' to dataList   572         if context 
is not None:
   573             resultList = [func(self.
_getCache(context, i), data, *args, **kwargs) 
for i, data 
in queue]
   575             resultList = [func(data, *args, **kwargs) 
for i, data 
in queue]
   578         if len(resultList) == 0:
   580         output = resultList.pop(0)
   581         for result 
in resultList:
   582             output = reducer(output, result)
   586         """Set values in store for a particular context"""   587         self.
log(
"storing", context, kwargs)
   588         if context 
not in self.
_store:
   590         for name, value 
in kwargs.items():
   591             self.
_store[context][name] = value
   594         """Delete value in store for a particular context"""   595         self.
log(
"deleting from store", context, nameList)
   596         if context 
not in self.
_store:
   597             raise KeyError(
"No such context: %s" % context)
   598         for name 
in nameList:
   599             del self.
_store[context][name]
   602         """Clear stored data for a particular context"""   603         self.
log(
"clearing store", context)
   604         if context 
not in self.
_store:
   605             raise KeyError(
"No such context: %s" % context)
   609         """Reset cache for a particular context"""   610         self.
log(
"clearing cache", context)
   611         if context 
not in self.
_cache:
   616         """List contents of cache"""   617         cache = self.
_cache[context] 
if context 
in self.
_cache else {}
   618         sys.stderr.write(
"Cache on %s (%s): %s\n" % (self.
node, context, cache))
   621         """List contents of store for a particular context"""   622         if context 
not in self.
_store:
   623             raise KeyError(
"No such context: %s" % context)
   624         sys.stderr.write(
"Store on %s (%s): %s\n" % (self.
node, context, self.
_store[context]))
   628     """Master node instance of MPI process pool   630     Only the master node should instantiate this.   632     WARNING: You should not let a pool instance hang around at program   633     termination, as the garbage collection behaves differently, and may   634     cause a segmentation fault (signal 11).   638         super(PoolMaster, self).
__init__(*args, **kwargs)
   639         assert self.
root == self.
rank, 
"This is the master node"   642         """Ensure slaves exit when we're done"""   645     def log(self, msg, *args):
   646         """Log a debugging message"""   650         """Send command to slaves   652         A command is the name of the PoolSlave method they should run.   654         self.
log(
"command", cmd)
   655         self.
comm.broadcast(cmd, root=self.
root)
   657     def map(self, context, func, dataList, *args, **kwargs):
   658         """!Scatter work to slaves and gather the results   660         Work is distributed dynamically, so that slaves that finish   661         quickly will receive more work.   663         Each slave applies the function to the data they're provided.   664         The slaves may optionally be passed a cache instance, which   665         they can use to store data for subsequent executions (to ensure   666         subsequent data is distributed in the same pattern as before,   667         use the 'mapToPrevious' method).  The cache also contains   668         data that has been stored on the slaves.   670         The 'func' signature should be func(cache, data, *args, **kwargs)   671         if 'context' is non-None; otherwise func(data, *args, **kwargs).   673         @param context: Namespace for cache   674         @param func: function for slaves to run; must be picklable   675         @param dataList: List of data to distribute to slaves; must be picklable   676         @param args: List of constant arguments   677         @param kwargs: Dict of constant arguments   678         @return list of results from applying 'func' to dataList   680         return self.
reduce(context, 
None, func, dataList, *args, **kwargs)
   684     def reduce(self, context, reducer, func, dataList, *args, **kwargs):
   685         """!Scatter work to slaves and reduce the results   687         Work is distributed dynamically, so that slaves that finish   688         quickly will receive more work.   690         Each slave applies the function to the data they're provided.   691         The slaves may optionally be passed a cache instance, which   692         they can use to store data for subsequent executions (to ensure   693         subsequent data is distributed in the same pattern as before,   694         use the 'mapToPrevious' method).  The cache also contains   695         data that has been stored on the slaves.   697         The 'func' signature should be func(cache, data, *args, **kwargs)   698         if 'context' is non-None; otherwise func(data, *args, **kwargs).   700         The 'reducer' signature should be reducer(old, new). If the 'reducer'   701         is None, then we will return the full list of results   703         @param context: Namespace for cache   704         @param reducer: function for master to run to reduce slave results; or None   705         @param func: function for slaves to run; must be picklable   706         @param dataList: List of data to distribute to slaves; must be picklable   707         @param args: List of constant arguments   708         @param kwargs: Dict of constant arguments   709         @return reduced result (if reducer is non-None) or list of results   710             from applying 'func' to dataList   712         tags = 
Tags(
"request", 
"work")
   719             return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
   725         self.
comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.
root)
   728         queue = 
list(zip(range(num), dataList))  
   729         output = [
None]*num 
if reducer 
is None else None   730         initial = [
None if i == self.
rank else queue.pop(0) 
if queue 
else NoOp() 
for   731                    i 
in range(self.
size)]
   732         pending = 
min(num, self.
size - 1)
   733         self.
log(
"scatter initial jobs")
   734         self.
comm.scatter(initial, root=self.
rank)
   736         while queue 
or pending > 0:
   737             status = mpi.Status()
   738             report = self.
comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
   739             source = status.source
   740             self.
log(
"gather from slave", source)
   742                 index, result = report
   743                 output[index] = result
   747                 self.
log(
"send job to slave", job[0], source)
   751             self.
comm.send(job, source, tag=tags.work)
   753         if reducer 
is not None:
   754             results = self.
comm.gather(
None, root=self.
root)
   756             for rank 
in range(self.
size):
   757                 if rank == self.
root:
   759                 output = reducer(output, results[rank]) 
if output 
is not None else results[rank]
   765         """!Scatter work to slaves and gather the results   767         Work is distributed statically, so there is no load balancing.   769         Each slave applies the function to the data they're provided.   770         The slaves may optionally be passed a cache instance, which   771         they can store data in for subsequent executions (to ensure   772         subsequent data is distributed in the same pattern as before,   773         use the 'mapToPrevious' method).  The cache also contains   774         data that has been stored on the slaves.   776         The 'func' signature should be func(cache, data, *args, **kwargs)   777         if 'context' is true; otherwise func(data, *args, **kwargs).   779         @param context: Namespace for cache   780         @param func: function for slaves to run; must be picklable   781         @param dataList: List of data to distribute to slaves; must be picklable   782         @param args: List of constant arguments   783         @param kwargs: Dict of constant arguments   784         @return list of results from applying 'func' to dataList   786         return self.
reduceNoBalance(context, 
None, func, dataList, *args, **kwargs)
   791         """!Scatter work to slaves and reduce the results   793         Work is distributed statically, so there is no load balancing.   795         Each slave applies the function to the data they're provided.   796         The slaves may optionally be passed a cache instance, which   797         they can store data in for subsequent executions (to ensure   798         subsequent data is distributed in the same pattern as before,   799         use the 'mapToPrevious' method).  The cache also contains   800         data that has been stored on the slaves.   802         The 'func' signature should be func(cache, data, *args, **kwargs)   803         if 'context' is true; otherwise func(data, *args, **kwargs).   805         The 'reducer' signature should be reducer(old, new). If the 'reducer'   806         is None, then we will return the full list of results   808         @param context: Namespace for cache   809         @param reducer: function for master to run to reduce slave results; or None   810         @param func: function for slaves to run; must be picklable   811         @param dataList: List of data to distribute to slaves; must be picklable   812         @param args: List of constant arguments   813         @param kwargs: Dict of constant arguments   814         @return reduced result (if reducer is non-None) or list of results   815             from applying 'func' to dataList   817         tags = 
Tags(
"result", 
"work")
   819         if self.
size == 1 
or num <= 1:
   820             return self.
_reduceQueue(context, reducer, func, 
list(zip(range(num), dataList)), *args, **kwargs)
   826         self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
   830         queue = 
list(zip(range(num), dataList))  
   832             distribution = [[queue[i]] 
for i 
in range(num)]
   833             distribution.insert(self.
rank, [])
   834             for i 
in range(num, self.
size - 1):
   835                 distribution.append([])
   836         elif num % self.
size == 0:
   837             numEach = num//self.
size   838             distribution = [queue[i*numEach:(i+1)*numEach] 
for i 
in range(self.
size)]
   840             numEach = num//self.
size   841             distribution = [queue[i*numEach:(i+1)*numEach] 
for i 
in range(self.
size)]
   842             for i 
in range(numEach*self.
size, num):
   843                 distribution[(self.
rank + 1) % self.
size].append
   844             distribution = 
list([] 
for i 
in range(self.
size))
   845             for i, job 
in enumerate(queue, self.
rank + 1):
   849         for source 
in range(self.
size):
   850             if source == self.
rank:
   852             self.
log(
"send jobs to ", source)
   853             self.
comm.send(distribution[source], source, tag=tags.work)
   856         output = [
None]*num 
if reducer 
is None else None   858         def ingestResults(output, nodeResults, distList):
   860                 for i, result 
in enumerate(nodeResults):
   861                     index = distList[i][0]
   862                     output[index] = result
   865                 output = nodeResults.pop(0)
   866             for result 
in nodeResults:
   867                 output = reducer(output, result)
   870         ourResults = self.
_processQueue(context, func, distribution[self.
rank], *args, **kwargs)
   871         output = ingestResults(output, ourResults, distribution[self.
rank])
   874         pending = self.
size - 1
   876             status = mpi.Status()
   877             slaveResults = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
   878             source = status.source
   879             self.
log(
"gather from slave", source)
   880             output = ingestResults(output, slaveResults, distribution[source])
   887         """!Scatter work to the same target as before   889         Work is distributed so that each slave handles the same   890         indices in the dataList as when 'map' was called.   891         This allows the right data to go to the right cache.   893         It is assumed that the dataList is the same length as when it was   896         The 'func' signature should be func(cache, data, *args, **kwargs).   898         @param context: Namespace for cache   899         @param func: function for slaves to run; must be picklable   900         @param dataList: List of data to distribute to slaves; must be picklable   901         @param args: List of constant arguments   902         @param kwargs: Dict of constant arguments   903         @return list of results from applying 'func' to dataList   905         return self.
reduceToPrevious(context, 
None, func, dataList, *args, **kwargs)
   910         """!Reduction where work goes to the same target as before   912         Work is distributed so that each slave handles the same   913         indices in the dataList as when 'map' was called.   914         This allows the right data to go to the right cache.   916         It is assumed that the dataList is the same length as when it was   919         The 'func' signature should be func(cache, data, *args, **kwargs).   921         The 'reducer' signature should be reducer(old, new). If the 'reducer'   922         is None, then we will return the full list of results   924         @param context: Namespace for cache   925         @param reducer: function for master to run to reduce slave results; or None   926         @param func: function for slaves to run; must be picklable   927         @param dataList: List of data to distribute to slaves; must be picklable   928         @param args: List of constant arguments   929         @param kwargs: Dict of constant arguments   930         @return reduced result (if reducer is non-None) or list of results   931             from applying 'func' to dataList   934             raise ValueError(
"context must be set to map to same nodes as previous context")
   935         tags = 
Tags(
"result", 
"work")
   937         if self.
size == 1 
or num <= 1:
   939             return self.
_reduceQueue(context, reducer, func, 
list(zip(range(num), dataList)), *args, **kwargs)
   942             return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
   948         self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
   950         requestList = self.
comm.gather(
None, root=self.
root)
   951         self.
log(
"listen", requestList)
   952         initial = [dataList[index] 
if (index 
is not None and index >= 0) 
else None for index 
in requestList]
   953         self.
log(
"scatter jobs", initial)
   954         self.
comm.scatter(initial, root=self.
root)
   955         pending = 
min(num, self.
size - 1)
   964             status = mpi.Status()
   965             index, result, nextIndex = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
   966             source = status.source
   967             self.
log(
"gather from slave", source)
   969                 output[index] = result
   974                 job = dataList[nextIndex]
   975                 self.
log(
"send job to slave", source)
   976                 self.
comm.send(job, source, tag=tags.work)
   980             self.
log(
"waiting on", pending)
   982         if reducer 
is not None:
   983             output = thread.join()
   991         """!Store data on slave for a particular context   993         The data is made available to functions through the cache. The   994         stored data differs from the cache in that it is identical for   995         all operations, whereas the cache is specific to the data being   998         @param context: namespace for store   999         @param kwargs: dict of name=value pairs  1001         super(PoolMaster, self).
storeSet(context, **kwargs)
  1003         self.
log(
"give data")
  1004         self.
comm.broadcast((context, kwargs), root=self.
root)
  1009         """Delete stored data on slave for a particular context"""  1010         super(PoolMaster, self).
storeDel(context, *nameList)
  1012         self.
log(
"tell names")
  1013         self.
comm.broadcast((context, nameList), root=self.
root)
  1018         """Reset data store for a particular context on master and slaves"""  1021         self.
comm.broadcast(context, root=self.
root)
  1025         """Reset cache for a particular context on master and slaves"""  1028         self.
comm.broadcast(context, root=self.
root)
  1032         """List cache contents for a particular context on master and slaves"""  1033         super(PoolMaster, self).
cacheList(context)
  1035         self.
comm.broadcast(context, root=self.
root)
  1039         """List store contents for a particular context on master and slaves"""  1040         super(PoolMaster, self).
storeList(context)
  1042         self.
comm.broadcast(context, root=self.
root)
  1045         """Command slaves to exit"""  1050     """Slave node instance of MPI process pool"""  1053         """Log a debugging message"""  1054         assert self.
rank != self.
root, 
"This is not the master node."  1059         """Serve commands of master node  1061         Slave accepts commands, which are the names of methods to execute.  1062         This exits when a command returns a true value.  1064         menu = dict((cmd, getattr(self, cmd)) 
for cmd 
in (
"reduce", 
"mapNoBalance", 
"mapToPrevious",
  1065                                                           "storeSet", 
"storeDel", 
"storeClear", 
"storeList",
  1066                                                           "cacheList", 
"cacheClear", 
"exit",))
  1067         self.
log(
"waiting for command from", self.
root)
  1068         command = self.
comm.broadcast(
None, root=self.
root)
  1069         self.
log(
"command", command)
  1070         while not menu[command]():
  1071             self.
log(
"waiting for command from", self.
root)
  1072             command = self.
comm.broadcast(
None, root=self.
root)
  1073             self.
log(
"command", command)
  1078         """Reduce scattered data and return results"""  1079         self.
log(
"waiting for instruction")
  1080         tags, func, reducer, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
  1081         self.
log(
"waiting for job")
  1082         job = self.
comm.scatter(
None, root=self.
root)
  1085         while not isinstance(job, NoOp):
  1087             self.
log(
"running job")
  1088             result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
  1090                 report = (index, result)
  1093                 out = reducer(out, result) 
if out 
is not None else result
  1094             self.
comm.send(report, self.
root, tag=tags.request)
  1095             self.
log(
"waiting for job")
  1096             job = self.
comm.recv(tag=tags.work, source=self.
root)
  1098         if reducer 
is not None:
  1099             self.
comm.gather(out, root=self.
root)
  1104         """Process bulk scattered data and return results"""  1105         self.
log(
"waiting for instruction")
  1106         tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
  1107         self.
log(
"waiting for job")
  1108         queue = self.
comm.recv(tag=tags.work, source=self.
root)
  1111         for index, data 
in queue:
  1112             self.
log(
"running job", index)
  1113             result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
  1114             resultList.append(result)
  1116         self.
comm.send(resultList, self.
root, tag=tags.result)
  1121         """Process the same scattered data processed previously"""  1122         self.
log(
"waiting for instruction")
  1123         tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
  1125         index = queue.pop(0) 
if queue 
else -1
  1126         self.
log(
"request job", index)
  1127         self.
comm.gather(index, root=self.
root)
  1128         self.
log(
"waiting for job")
  1129         data = self.
comm.scatter(
None, root=self.
root)
  1132             self.
log(
"running job")
  1133             result = func(self.
_getCache(context, index), data, *args, **kwargs)
  1134             self.
log(
"pending", queue)
  1135             nextIndex = queue.pop(0) 
if queue 
else -1
  1136             self.
comm.send((index, result, nextIndex), self.
root, tag=tags.result)
  1139                 data = self.
comm.recv(tag=tags.work, source=self.
root)
  1144         """Set value in store"""  1145         context, kwargs = self.
comm.broadcast(
None, root=self.
root)
  1146         super(PoolSlave, self).
storeSet(context, **kwargs)
  1149         """Delete value in store"""  1150         context, nameList = self.
comm.broadcast(
None, root=self.
root)
  1151         super(PoolSlave, self).
storeDel(context, *nameList)
  1154         """Reset data store"""  1155         context = self.
comm.broadcast(
None, root=self.
root)
  1160         context = self.
comm.broadcast(
None, root=self.
root)
  1164         """List cache contents"""  1165         context = self.
comm.broadcast(
None, root=self.
root)
  1166         super(PoolSlave, self).
cacheList(context)
  1169         """List store contents"""  1170         context = self.
comm.broadcast(
None, root=self.
root)
  1171         super(PoolSlave, self).
storeList(context)
  1174         """Allow exit from loop in 'run'"""  1179     """Metaclass for PoolWrapper to add methods pointing to PoolMaster  1181     The 'context' is automatically supplied to these methods as the first argument.  1185         instance = super(PoolWrapperMeta, cls).
__call__(context)
  1187         for name 
in (
"map", 
"mapNoBalance", 
"mapToPrevious",
  1188                      "reduce", 
"reduceNoBalance", 
"reduceToPrevious",
  1189                      "storeSet", 
"storeDel", 
"storeClear", 
"storeList",
  1190                      "cacheList", 
"cacheClear",):
  1191             setattr(instance, name, partial(getattr(pool, name), context))
  1196     """Wrap PoolMaster to automatically provide context"""  1199         self.
_pool = PoolMaster._instance
  1203         return getattr(self.
_pool, name)
  1209     Use this class to automatically provide 'context' to  1210     the PoolMaster class.  If you want to call functions  1211     that don't take a 'cache' object, use the PoolMaster  1212     class directly, and specify context=None.  1218     """!Start a process pool.  1220     Returns a PoolMaster object for the master node.  1221     Slave nodes are run and then optionally killed.  1223     If you elect not to kill the slaves, note that they  1224     will emerge at the point this function was called,  1225     which is likely very different from the point the  1226     master is at, so it will likely be necessary to put  1227     in some rank dependent code (e.g., look at the 'rank'  1228     attribute of the returned pools).  1230     Note that the pool objects should be deleted (either  1231     by going out of scope or explicit 'del') before program  1232     termination to avoid a segmentation fault.  1234     @param comm: MPI communicator  1235     @param root: Rank of root/master node  1236     @param killSlaves: Kill slaves on completion?  1240     if comm.rank == root:
  1241         return PoolMaster(comm, root=root)
  1242     slave = PoolSlave(comm, root=root)
 
def _reduceQueue(self, context, reducer, func, queue, args, kwargs)
Reduce a queue of data. 
def __new__(cls, hold=None)
def reduceToPrevious(self, context, reducer, func, dataList, args, kwargs)
Reduction where work goes to the same target as before. 
def log(self, source, msg, args)
Log message. 
def pickleInstanceMethod(method)
def send(self, obj=None, args, kwargs)
def storeSet(self, context, kwargs)
def unpickleFunction(moduleName, funcName)
def storeDel(self, context, nameList)
def __init__(self, args, kwargs)
def __init__(self, reducer, initial=None, sleep=0.1)
Constructor. 
def _processQueue(self, context, func, queue, args, kwargs)
Process a queue of data. 
def cacheList(self, context)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series. 
def __init__(self, hold=None)
def reduce(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results. 
daf::base::PropertySet * set
def broadcast(self, value, root=0)
def mapToPrevious(self, context, func, dataList, args, kwargs)
Scatter work to the same target as before. 
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool. 
def unpickleInstanceMethod(obj, name)
def map(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results. 
def storeClear(self, context)
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1)
Construct an MPI.Comm wrapper. 
def __init__(self, comm=None, root=0)
def storeClear(self, context)
def storeDel(self, context, nameList)
def recv(self, obj=None, source=0, tag=0, status=None)
def _checkBarrierComm(self)
def pickleFunction(function)
def __exit__(self, excType, excVal, tb)
def cacheList(self, context)
def storeList(self, context)
def storeList(self, context)
def cacheClear(self, context)
def scatter(self, dataList, root=0, tag=0)
def storeSet(self, context, kwargs)
Store data on slave for a particular context. 
def setBatchType(batchType)
def pickleSniffer(abort=False)
def __init__(self, context="default")
def mapNoBalance(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results. 
def catchPicklingError(func)
def __getattr__(self, name)
daf::base::PropertyList * list
def reduceNoBalance(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results. 
def _getCache(self, context, index)
def cacheClear(self, context)