24 from functools
import wraps, partial
25 from contextlib
import contextmanager
27 import mpi4py.MPI
as mpi
30 from future.utils
import with_metaclass
33 __all__ = [
"Comm",
"Pool",
"startPool",
"setBatchType",
"getBatchType",
"abortOnError",
"NODE", ]
35 NODE =
"%s:%d" % (os.uname()[1], os.getpid())
39 """Unpickle an instance method 41 This has to be a named function rather than a lambda because 42 pickle needs to find it. 44 return getattr(obj, name)
48 """Pickle an instance method 50 The instance method is divided into the object and the 54 name = method.__name__
55 return unpickleInstanceMethod, (obj, name)
58 copyreg.pickle(types.MethodType, pickleInstanceMethod)
62 """Unpickle a function 64 This has to be a named function rather than a lambda because 65 pickle needs to find it. 68 module = importlib.import_module(moduleName)
69 return getattr(module, funcName)
75 This assumes that we can recreate the function object by grabbing 76 it from the proper module. This may be violated if the function 77 is a lambda or in __main__. In that case, I recommend recasting 78 the function as an object with a __call__ method. 80 Another problematic case may be a wrapped (e.g., decorated) method 81 in a class: the 'method' is then a function, and recreating it is 82 not as easy as we assume here. 84 moduleName = function.__module__
85 funcName = function.__name__
86 return unpickleFunction, (moduleName, funcName)
89 copyreg.pickle(types.FunctionType, pickleFunction)
94 _batchType =
"unknown" 98 """Return a string giving the type of batch system in use""" 103 """Return a string giving the type of batch system in use""" 105 _batchType = batchType
109 """Function decorator to throw an MPI abort on an unhandled exception""" 111 def wrapper(*args, **kwargs):
113 return func(*args, **kwargs)
114 except Exception
as e:
115 sys.stderr.write(
"%s on %s in %s: %s\n" % (
type(e).__name__, NODE, func.__name__, e))
117 traceback.print_exc(file=sys.stderr)
121 mpi.COMM_WORLD.Abort(1)
128 """Singleton to hold what's about to be pickled. 130 We hold onto the object in case there's trouble pickling, 131 so we can figure out what class in particular is causing 134 The held object is in the 'obj' attribute. 136 Here we use the __new__-style singleton pattern, because 137 we specifically want __init__ to be called each time. 150 """Hold onto new object""" 158 """Drop held object if there were no problems""" 164 """Try to guess what's not pickling after an exception 166 This tends to work if the problem is coming from the 167 regular pickle module. If it's coming from the bowels 168 of mpi4py, there's not much that can be done. 171 excType, excValue, tb = sys.exc_info()
182 return stack[-2].tb_frame.f_locals[
"obj"]
189 """Context manager to sniff out pickle problems 191 If there's a pickle error, you're normally told what the problem 192 class is. However, all SWIG objects are reported as "SwigPyObject". 193 In order to figure out which actual SWIG-ed class is causing 194 problems, we need to go digging. 198 with pickleSniffer(): 199 someOperationInvolvingPickle() 201 If 'abort' is True, will call MPI abort in the event of problems. 205 except Exception
as e:
206 if "SwigPyObject" not in str(e)
or "pickle" not in str(e):
211 sys.stderr.write(
"Pickling error detected: %s\n" % e)
212 traceback.print_exc(file=sys.stderr)
215 if obj
is None and heldObj
is not None:
220 pickle.dumps(heldObj)
221 sys.stderr.write(
"Hmmm, that's strange: no problem with pickling held object?!?!\n")
225 sys.stderr.write(
"Unable to determine class causing pickle problems.\n")
227 sys.stderr.write(
"Object that could not be pickled: %s\n" % obj)
230 mpi.COMM_WORLD.Abort(1)
236 """Function decorator to catch errors in pickling and print something useful""" 238 def wrapper(*args, **kwargs):
240 return func(*args, **kwargs)
245 """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting. 247 As suggested by Lisandro Dalcin at: 248 * http://code.google.com/p/mpi4py/issues/detail?id=4 and 249 * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI 252 def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
253 """!Construct an MPI.Comm wrapper 256 @param comm MPI.Intracomm to wrap a duplicate of 257 @param recvSleep Sleep time (seconds) for recv() 258 @param barrierSleep Sleep time (seconds) for Barrier() 260 self = super(Comm, cls).
__new__(cls, comm.Dup())
266 def recv(self, obj=None, source=0, tag=0, status=None):
267 """Version of comm.recv() that doesn't busy-wait""" 269 while not self.Iprobe(source=source, tag=tag, status=sts):
271 return super(Comm, self).
recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
273 def send(self, obj=None, *args, **kwargs):
275 return super(Comm, self).
send(obj, *args, **kwargs)
277 def _checkBarrierComm(self):
278 """Ensure the duplicate communicator is available""" 283 """Version of comm.Barrier() that doesn't busy-wait 285 A duplicate communicator is used so as not to interfere with the user's own communications. 294 dst = (rank + mask) % size
295 src = (rank - mask + size) % size
305 return super(Comm, self).bcast(value, root=root)
308 """Scatter data across the nodes 310 The default version apparently pickles the entire 'dataList', 311 which can cause errors if the pickle size grows over 2^31 bytes 312 due to fundamental problems with pickle in python 2. Instead, 313 we send the data to each slave node in turn; this reduces the 316 @param dataList List of data to distribute; one per node 318 @param root Index of root node 319 @param tag Message tag (integer) 320 @return Data for this node 322 if self.Get_rank() == root:
323 for rank, data
in enumerate(dataList):
326 self.
send(data, rank, tag=tag)
327 return dataList[root]
329 return self.
recv(source=root, tag=tag)
334 super(Comm, self).
Free()
338 """Object to signal no operation""" 343 """Provides tag numbers by symbolic name in attributes""" 347 for i, name
in enumerate(nameList, 1):
348 setattr(self, name, i)
351 return self.__class__.__name__ + repr(self.
_nameList)
354 return self.__class__, tuple(self.
_nameList)
358 """An object to hold stuff between different scatter calls 360 Includes a communicator by default, to allow intercommunication 365 super(Cache, self).
__init__(comm=comm)
369 """!Metaclass to produce a singleton 371 Doing a singleton mixin without a metaclass (via __new__) is 372 annoying because the user has to name his __init__ something else 373 (otherwise it's called every time, which undoes any changes). 374 Using this metaclass, the class's __init__ is called exactly once. 376 Because this is a metaclass, note that: 377 * "self" here is the class 378 * "__init__" is making the class (it's like the body of the 380 * "__call__" is making an instance of the class (it's like 381 "__new__" in the class). 385 super(SingletonMeta, cls).
__init__(name, bases, dict_)
395 """Debug logger singleton 397 Disabled by default; to enable, do: 'Debugger().enabled = True' 398 You can also redirect the output by changing the 'out' attribute. 405 def log(self, source, msg, *args):
408 The 'args' are only stringified if we're enabled. 410 @param source: name of source 411 @param msg: message to write 412 @param args: additional outputs to append to message 415 self.
out.write(
"%s: %s" % (source, msg))
417 self.
out.write(
" %s" % arg)
422 """Thread to do reduction of results 424 "A thread?", you say. "What about the python GIL?" 425 Well, because we 'sleep' when there's no immediate response from the 426 slaves, that gives the thread a chance to fire; and threads are easier 427 to manage (e.g., shared memory) than a process. 429 def __init__(self, reducer, initial=None, sleep=0.1):
432 The 'reducer' should take two values and return a single 435 @param reducer Function that does the reducing 436 @param initial Initial value for reduction, or None 437 @param sleep Time to sleep when there's nothing to do (sec) 439 threading.Thread.__init__(self, name=
"reducer")
441 self.
_lock = threading.Lock()
445 self.
_done = threading.Event()
448 """Do the actual work 450 We pull the data out of the queue and release the lock before 451 operating on it. This stops us from blocking the addition of 452 new data to the queue. 463 Thread entry point, called by Thread.start 472 """Add data to the queue to be reduced""" 477 """Complete the thread 479 Unlike Thread.join (which always returns 'None'), we return the result 483 threading.Thread.join(self)
488 """Node in MPI process pool 490 WARNING: You should not let a pool instance hang around at program 491 termination, as the garbage collection behaves differently, and may 492 cause a segmentation fault (signal 11). 507 def _getCache(self, context, index):
508 """Retrieve cache for particular data 510 The cache is updated with the contents of the store. 512 if context
not in self.
_cache:
514 if context
not in self.
_store:
516 cache = self.
_cache[context]
517 store = self.
_store[context]
518 if index
not in cache:
520 cache[index].__dict__.update(store)
523 def log(self, msg, *args):
524 """Log a debugging message""" 530 def _processQueue(self, context, func, queue, *args, **kwargs):
531 """!Process a queue of data 533 The queue consists of a list of (index, data) tuples, 534 where the index maps to the cache, and the data is 535 passed to the 'func'. 537 The 'func' signature should be func(cache, data, *args, **kwargs) 538 if 'context' is non-None; otherwise func(data, *args, **kwargs). 540 @param context: Namespace for cache; None to not use cache 541 @param func: function for slaves to run 542 @param queue: List of (index,data) tuples to process 543 @param args: Constant arguments 544 @param kwargs: Keyword arguments 545 @return list of results from applying 'func' to dataList 547 return self.
_reduceQueue(context,
None, func, queue, *args, **kwargs)
549 def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
550 """!Reduce a queue of data 552 The queue consists of a list of (index, data) tuples, 553 where the index maps to the cache, and the data is 554 passed to the 'func', the output of which is reduced 555 using the 'reducer' (if non-None). 557 The 'func' signature should be func(cache, data, *args, **kwargs) 558 if 'context' is non-None; otherwise func(data, *args, **kwargs). 560 The 'reducer' signature should be reducer(old, new). If the 'reducer' 561 is None, then we will return the full list of results 563 @param context: Namespace for cache; None to not use cache 564 @param reducer: function for master to run to reduce slave results; or None 565 @param func: function for slaves to run 566 @param queue: List of (index,data) tuples to process 567 @param args: Constant arguments 568 @param kwargs: Keyword arguments 569 @return reduced result (if reducer is non-None) or list of results 570 from applying 'func' to dataList 572 if context
is not None:
573 resultList = [func(self.
_getCache(context, i), data, *args, **kwargs)
for i, data
in queue]
575 resultList = [func(data, *args, **kwargs)
for i, data
in queue]
578 if len(resultList) == 0:
580 output = resultList.pop(0)
581 for result
in resultList:
582 output = reducer(output, result)
586 """Set values in store for a particular context""" 587 self.
log(
"storing", context, kwargs)
588 if context
not in self.
_store:
590 for name, value
in kwargs.items():
591 self.
_store[context][name] = value
594 """Delete value in store for a particular context""" 595 self.
log(
"deleting from store", context, nameList)
596 if context
not in self.
_store:
597 raise KeyError(
"No such context: %s" % context)
598 for name
in nameList:
599 del self.
_store[context][name]
602 """Clear stored data for a particular context""" 603 self.
log(
"clearing store", context)
604 if context
not in self.
_store:
605 raise KeyError(
"No such context: %s" % context)
609 """Reset cache for a particular context""" 610 self.
log(
"clearing cache", context)
611 if context
not in self.
_cache:
616 """List contents of cache""" 617 cache = self.
_cache[context]
if context
in self.
_cache else {}
618 sys.stderr.write(
"Cache on %s (%s): %s\n" % (self.
node, context, cache))
621 """List contents of store for a particular context""" 622 if context
not in self.
_store:
623 raise KeyError(
"No such context: %s" % context)
624 sys.stderr.write(
"Store on %s (%s): %s\n" % (self.
node, context, self.
_store[context]))
628 """Master node instance of MPI process pool 630 Only the master node should instantiate this. 632 WARNING: You should not let a pool instance hang around at program 633 termination, as the garbage collection behaves differently, and may 634 cause a segmentation fault (signal 11). 638 super(PoolMaster, self).
__init__(*args, **kwargs)
639 assert self.
root == self.
rank,
"This is the master node" 642 """Ensure slaves exit when we're done""" 645 def log(self, msg, *args):
646 """Log a debugging message""" 650 """Send command to slaves 652 A command is the name of the PoolSlave method they should run. 654 self.
log(
"command", cmd)
655 self.
comm.broadcast(cmd, root=self.
root)
657 def map(self, context, func, dataList, *args, **kwargs):
658 """!Scatter work to slaves and gather the results 660 Work is distributed dynamically, so that slaves that finish 661 quickly will receive more work. 663 Each slave applies the function to the data they're provided. 664 The slaves may optionally be passed a cache instance, which 665 they can use to store data for subsequent executions (to ensure 666 subsequent data is distributed in the same pattern as before, 667 use the 'mapToPrevious' method). The cache also contains 668 data that has been stored on the slaves. 670 The 'func' signature should be func(cache, data, *args, **kwargs) 671 if 'context' is non-None; otherwise func(data, *args, **kwargs). 673 @param context: Namespace for cache 674 @param func: function for slaves to run; must be picklable 675 @param dataList: List of data to distribute to slaves; must be picklable 676 @param args: List of constant arguments 677 @param kwargs: Dict of constant arguments 678 @return list of results from applying 'func' to dataList 680 return self.
reduce(context,
None, func, dataList, *args, **kwargs)
684 def reduce(self, context, reducer, func, dataList, *args, **kwargs):
685 """!Scatter work to slaves and reduce the results 687 Work is distributed dynamically, so that slaves that finish 688 quickly will receive more work. 690 Each slave applies the function to the data they're provided. 691 The slaves may optionally be passed a cache instance, which 692 they can use to store data for subsequent executions (to ensure 693 subsequent data is distributed in the same pattern as before, 694 use the 'mapToPrevious' method). The cache also contains 695 data that has been stored on the slaves. 697 The 'func' signature should be func(cache, data, *args, **kwargs) 698 if 'context' is non-None; otherwise func(data, *args, **kwargs). 700 The 'reducer' signature should be reducer(old, new). If the 'reducer' 701 is None, then we will return the full list of results 703 @param context: Namespace for cache 704 @param reducer: function for master to run to reduce slave results; or None 705 @param func: function for slaves to run; must be picklable 706 @param dataList: List of data to distribute to slaves; must be picklable 707 @param args: List of constant arguments 708 @param kwargs: Dict of constant arguments 709 @return reduced result (if reducer is non-None) or list of results 710 from applying 'func' to dataList 712 tags =
Tags(
"request",
"work")
719 return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
725 self.
comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.
root)
728 queue =
list(zip(range(num), dataList))
729 output = [
None]*num
if reducer
is None else None 730 initial = [
None if i == self.
rank else queue.pop(0)
if queue
else NoOp()
for 731 i
in range(self.
size)]
732 pending =
min(num, self.
size - 1)
733 self.
log(
"scatter initial jobs")
734 self.
comm.scatter(initial, root=self.
rank)
736 while queue
or pending > 0:
737 status = mpi.Status()
738 report = self.
comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
739 source = status.source
740 self.
log(
"gather from slave", source)
742 index, result = report
743 output[index] = result
747 self.
log(
"send job to slave", job[0], source)
751 self.
comm.send(job, source, tag=tags.work)
753 if reducer
is not None:
754 results = self.
comm.gather(
None, root=self.
root)
756 for rank
in range(self.
size):
757 if rank == self.
root:
759 output = reducer(output, results[rank])
if output
is not None else results[rank]
765 """!Scatter work to slaves and gather the results 767 Work is distributed statically, so there is no load balancing. 769 Each slave applies the function to the data they're provided. 770 The slaves may optionally be passed a cache instance, which 771 they can store data in for subsequent executions (to ensure 772 subsequent data is distributed in the same pattern as before, 773 use the 'mapToPrevious' method). The cache also contains 774 data that has been stored on the slaves. 776 The 'func' signature should be func(cache, data, *args, **kwargs) 777 if 'context' is true; otherwise func(data, *args, **kwargs). 779 @param context: Namespace for cache 780 @param func: function for slaves to run; must be picklable 781 @param dataList: List of data to distribute to slaves; must be picklable 782 @param args: List of constant arguments 783 @param kwargs: Dict of constant arguments 784 @return list of results from applying 'func' to dataList 786 return self.
reduceNoBalance(context,
None, func, dataList, *args, **kwargs)
791 """!Scatter work to slaves and reduce the results 793 Work is distributed statically, so there is no load balancing. 795 Each slave applies the function to the data they're provided. 796 The slaves may optionally be passed a cache instance, which 797 they can store data in for subsequent executions (to ensure 798 subsequent data is distributed in the same pattern as before, 799 use the 'mapToPrevious' method). The cache also contains 800 data that has been stored on the slaves. 802 The 'func' signature should be func(cache, data, *args, **kwargs) 803 if 'context' is true; otherwise func(data, *args, **kwargs). 805 The 'reducer' signature should be reducer(old, new). If the 'reducer' 806 is None, then we will return the full list of results 808 @param context: Namespace for cache 809 @param reducer: function for master to run to reduce slave results; or None 810 @param func: function for slaves to run; must be picklable 811 @param dataList: List of data to distribute to slaves; must be picklable 812 @param args: List of constant arguments 813 @param kwargs: Dict of constant arguments 814 @return reduced result (if reducer is non-None) or list of results 815 from applying 'func' to dataList 817 tags =
Tags(
"result",
"work")
819 if self.
size == 1
or num <= 1:
820 return self.
_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
826 self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
830 queue =
list(zip(range(num), dataList))
832 distribution = [[queue[i]]
for i
in range(num)]
833 distribution.insert(self.
rank, [])
834 for i
in range(num, self.
size - 1):
835 distribution.append([])
836 elif num % self.
size == 0:
837 numEach = num//self.
size 838 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
size)]
840 numEach = num//self.
size 841 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
size)]
842 for i
in range(numEach*self.
size, num):
843 distribution[(self.
rank + 1) % self.
size].append
844 distribution =
list([]
for i
in range(self.
size))
845 for i, job
in enumerate(queue, self.
rank + 1):
849 for source
in range(self.
size):
850 if source == self.
rank:
852 self.
log(
"send jobs to ", source)
853 self.
comm.send(distribution[source], source, tag=tags.work)
856 output = [
None]*num
if reducer
is None else None 858 def ingestResults(output, nodeResults, distList):
860 for i, result
in enumerate(nodeResults):
861 index = distList[i][0]
862 output[index] = result
865 output = nodeResults.pop(0)
866 for result
in nodeResults:
867 output = reducer(output, result)
870 ourResults = self.
_processQueue(context, func, distribution[self.
rank], *args, **kwargs)
871 output = ingestResults(output, ourResults, distribution[self.
rank])
874 pending = self.
size - 1
876 status = mpi.Status()
877 slaveResults = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
878 source = status.source
879 self.
log(
"gather from slave", source)
880 output = ingestResults(output, slaveResults, distribution[source])
887 """!Scatter work to the same target as before 889 Work is distributed so that each slave handles the same 890 indices in the dataList as when 'map' was called. 891 This allows the right data to go to the right cache. 893 It is assumed that the dataList is the same length as when it was 896 The 'func' signature should be func(cache, data, *args, **kwargs). 898 @param context: Namespace for cache 899 @param func: function for slaves to run; must be picklable 900 @param dataList: List of data to distribute to slaves; must be picklable 901 @param args: List of constant arguments 902 @param kwargs: Dict of constant arguments 903 @return list of results from applying 'func' to dataList 905 return self.
reduceToPrevious(context,
None, func, dataList, *args, **kwargs)
910 """!Reduction where work goes to the same target as before 912 Work is distributed so that each slave handles the same 913 indices in the dataList as when 'map' was called. 914 This allows the right data to go to the right cache. 916 It is assumed that the dataList is the same length as when it was 919 The 'func' signature should be func(cache, data, *args, **kwargs). 921 The 'reducer' signature should be reducer(old, new). If the 'reducer' 922 is None, then we will return the full list of results 924 @param context: Namespace for cache 925 @param reducer: function for master to run to reduce slave results; or None 926 @param func: function for slaves to run; must be picklable 927 @param dataList: List of data to distribute to slaves; must be picklable 928 @param args: List of constant arguments 929 @param kwargs: Dict of constant arguments 930 @return reduced result (if reducer is non-None) or list of results 931 from applying 'func' to dataList 934 raise ValueError(
"context must be set to map to same nodes as previous context")
935 tags =
Tags(
"result",
"work")
937 if self.
size == 1
or num <= 1:
939 return self.
_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
942 return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
948 self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
950 requestList = self.
comm.gather(
None, root=self.
root)
951 self.
log(
"listen", requestList)
952 initial = [dataList[index]
if (index
is not None and index >= 0)
else None for index
in requestList]
953 self.
log(
"scatter jobs", initial)
954 self.
comm.scatter(initial, root=self.
root)
955 pending =
min(num, self.
size - 1)
964 status = mpi.Status()
965 index, result, nextIndex = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
966 source = status.source
967 self.
log(
"gather from slave", source)
969 output[index] = result
974 job = dataList[nextIndex]
975 self.
log(
"send job to slave", source)
976 self.
comm.send(job, source, tag=tags.work)
980 self.
log(
"waiting on", pending)
982 if reducer
is not None:
983 output = thread.join()
991 """!Store data on slave for a particular context 993 The data is made available to functions through the cache. The 994 stored data differs from the cache in that it is identical for 995 all operations, whereas the cache is specific to the data being 998 @param context: namespace for store 999 @param kwargs: dict of name=value pairs 1001 super(PoolMaster, self).
storeSet(context, **kwargs)
1003 self.
log(
"give data")
1004 self.
comm.broadcast((context, kwargs), root=self.
root)
1009 """Delete stored data on slave for a particular context""" 1010 super(PoolMaster, self).
storeDel(context, *nameList)
1012 self.
log(
"tell names")
1013 self.
comm.broadcast((context, nameList), root=self.
root)
1018 """Reset data store for a particular context on master and slaves""" 1021 self.
comm.broadcast(context, root=self.
root)
1025 """Reset cache for a particular context on master and slaves""" 1028 self.
comm.broadcast(context, root=self.
root)
1032 """List cache contents for a particular context on master and slaves""" 1033 super(PoolMaster, self).
cacheList(context)
1035 self.
comm.broadcast(context, root=self.
root)
1039 """List store contents for a particular context on master and slaves""" 1040 super(PoolMaster, self).
storeList(context)
1042 self.
comm.broadcast(context, root=self.
root)
1045 """Command slaves to exit""" 1050 """Slave node instance of MPI process pool""" 1053 """Log a debugging message""" 1054 assert self.
rank != self.
root,
"This is not the master node." 1059 """Serve commands of master node 1061 Slave accepts commands, which are the names of methods to execute. 1062 This exits when a command returns a true value. 1064 menu = dict((cmd, getattr(self, cmd))
for cmd
in (
"reduce",
"mapNoBalance",
"mapToPrevious",
1065 "storeSet",
"storeDel",
"storeClear",
"storeList",
1066 "cacheList",
"cacheClear",
"exit",))
1067 self.
log(
"waiting for command from", self.
root)
1068 command = self.
comm.broadcast(
None, root=self.
root)
1069 self.
log(
"command", command)
1070 while not menu[command]():
1071 self.
log(
"waiting for command from", self.
root)
1072 command = self.
comm.broadcast(
None, root=self.
root)
1073 self.
log(
"command", command)
1078 """Reduce scattered data and return results""" 1079 self.
log(
"waiting for instruction")
1080 tags, func, reducer, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1081 self.
log(
"waiting for job")
1082 job = self.
comm.scatter(
None, root=self.
root)
1085 while not isinstance(job, NoOp):
1087 self.
log(
"running job")
1088 result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1090 report = (index, result)
1093 out = reducer(out, result)
if out
is not None else result
1094 self.
comm.send(report, self.
root, tag=tags.request)
1095 self.
log(
"waiting for job")
1096 job = self.
comm.recv(tag=tags.work, source=self.
root)
1098 if reducer
is not None:
1099 self.
comm.gather(out, root=self.
root)
1104 """Process bulk scattered data and return results""" 1105 self.
log(
"waiting for instruction")
1106 tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1107 self.
log(
"waiting for job")
1108 queue = self.
comm.recv(tag=tags.work, source=self.
root)
1111 for index, data
in queue:
1112 self.
log(
"running job", index)
1113 result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1114 resultList.append(result)
1116 self.
comm.send(resultList, self.
root, tag=tags.result)
1121 """Process the same scattered data processed previously""" 1122 self.
log(
"waiting for instruction")
1123 tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1125 index = queue.pop(0)
if queue
else -1
1126 self.
log(
"request job", index)
1127 self.
comm.gather(index, root=self.
root)
1128 self.
log(
"waiting for job")
1129 data = self.
comm.scatter(
None, root=self.
root)
1132 self.
log(
"running job")
1133 result = func(self.
_getCache(context, index), data, *args, **kwargs)
1134 self.
log(
"pending", queue)
1135 nextIndex = queue.pop(0)
if queue
else -1
1136 self.
comm.send((index, result, nextIndex), self.
root, tag=tags.result)
1139 data = self.
comm.recv(tag=tags.work, source=self.
root)
1144 """Set value in store""" 1145 context, kwargs = self.
comm.broadcast(
None, root=self.
root)
1146 super(PoolSlave, self).
storeSet(context, **kwargs)
1149 """Delete value in store""" 1150 context, nameList = self.
comm.broadcast(
None, root=self.
root)
1151 super(PoolSlave, self).
storeDel(context, *nameList)
1154 """Reset data store""" 1155 context = self.
comm.broadcast(
None, root=self.
root)
1160 context = self.
comm.broadcast(
None, root=self.
root)
1164 """List cache contents""" 1165 context = self.
comm.broadcast(
None, root=self.
root)
1166 super(PoolSlave, self).
cacheList(context)
1169 """List store contents""" 1170 context = self.
comm.broadcast(
None, root=self.
root)
1171 super(PoolSlave, self).
storeList(context)
1174 """Allow exit from loop in 'run'""" 1179 """Metaclass for PoolWrapper to add methods pointing to PoolMaster 1181 The 'context' is automatically supplied to these methods as the first argument. 1185 instance = super(PoolWrapperMeta, cls).
__call__(context)
1187 for name
in (
"map",
"mapNoBalance",
"mapToPrevious",
1188 "reduce",
"reduceNoBalance",
"reduceToPrevious",
1189 "storeSet",
"storeDel",
"storeClear",
"storeList",
1190 "cacheList",
"cacheClear",):
1191 setattr(instance, name, partial(getattr(pool, name), context))
1196 """Wrap PoolMaster to automatically provide context""" 1199 self.
_pool = PoolMaster._instance
1203 return getattr(self.
_pool, name)
1209 Use this class to automatically provide 'context' to 1210 the PoolMaster class. If you want to call functions 1211 that don't take a 'cache' object, use the PoolMaster 1212 class directly, and specify context=None. 1218 """!Start a process pool. 1220 Returns a PoolMaster object for the master node. 1221 Slave nodes are run and then optionally killed. 1223 If you elect not to kill the slaves, note that they 1224 will emerge at the point this function was called, 1225 which is likely very different from the point the 1226 master is at, so it will likely be necessary to put 1227 in some rank dependent code (e.g., look at the 'rank' 1228 attribute of the returned pools). 1230 Note that the pool objects should be deleted (either 1231 by going out of scope or explicit 'del') before program 1232 termination to avoid a segmentation fault. 1234 @param comm: MPI communicator 1235 @param root: Rank of root/master node 1236 @param killSlaves: Kill slaves on completion? 1240 if comm.rank == root:
1241 return PoolMaster(comm, root=root)
1242 slave = PoolSlave(comm, root=root)
def _reduceQueue(self, context, reducer, func, queue, args, kwargs)
Reduce a queue of data.
def __new__(cls, hold=None)
def reduceToPrevious(self, context, reducer, func, dataList, args, kwargs)
Reduction where work goes to the same target as before.
def log(self, source, msg, args)
Log message.
def pickleInstanceMethod(method)
def send(self, obj=None, args, kwargs)
def storeSet(self, context, kwargs)
def unpickleFunction(moduleName, funcName)
def storeDel(self, context, nameList)
def __init__(self, args, kwargs)
def __init__(self, reducer, initial=None, sleep=0.1)
Constructor.
def _processQueue(self, context, func, queue, args, kwargs)
Process a queue of data.
def cacheList(self, context)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def __init__(self, hold=None)
def reduce(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
daf::base::PropertySet * set
def broadcast(self, value, root=0)
def mapToPrevious(self, context, func, dataList, args, kwargs)
Scatter work to the same target as before.
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
def unpickleInstanceMethod(obj, name)
def map(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
def storeClear(self, context)
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1)
Construct an MPI.Comm wrapper.
def __init__(self, comm=None, root=0)
def storeClear(self, context)
def storeDel(self, context, nameList)
def recv(self, obj=None, source=0, tag=0, status=None)
def _checkBarrierComm(self)
def pickleFunction(function)
def __exit__(self, excType, excVal, tb)
def cacheList(self, context)
def storeList(self, context)
def storeList(self, context)
def cacheClear(self, context)
def scatter(self, dataList, root=0, tag=0)
def storeSet(self, context, kwargs)
Store data on slave for a particular context.
def setBatchType(batchType)
def pickleSniffer(abort=False)
def __init__(self, context="default")
def mapNoBalance(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
def catchPicklingError(func)
def __getattr__(self, name)
daf::base::PropertyList * list
def reduceNoBalance(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
def _getCache(self, context, index)
def cacheClear(self, context)