24 from functools
import wraps, partial
25 from contextlib
import contextmanager
27 import mpi4py.MPI
as mpi
32 __all__ = [
"Comm",
"Pool",
"startPool",
"setBatchType",
"getBatchType",
"abortOnError",
"NODE", ]
34 NODE =
"%s:%d" % (os.uname()[1], os.getpid())
38 """Unpickle an instance method 40 This has to be a named function rather than a lambda because 41 pickle needs to find it. 43 return getattr(obj, name)
47 """Pickle an instance method 49 The instance method is divided into the object and the 53 name = method.__name__
54 return unpickleInstanceMethod, (obj, name)
57 copyreg.pickle(types.MethodType, pickleInstanceMethod)
61 """Unpickle a function 63 This has to be a named function rather than a lambda because 64 pickle needs to find it. 67 module = importlib.import_module(moduleName)
68 return getattr(module, funcName)
74 This assumes that we can recreate the function object by grabbing 75 it from the proper module. This may be violated if the function 76 is a lambda or in __main__. In that case, I recommend recasting 77 the function as an object with a __call__ method. 79 Another problematic case may be a wrapped (e.g., decorated) method 80 in a class: the 'method' is then a function, and recreating it is 81 not as easy as we assume here. 83 moduleName = function.__module__
84 funcName = function.__name__
85 return unpickleFunction, (moduleName, funcName)
88 copyreg.pickle(types.FunctionType, pickleFunction)
93 _batchType =
"unknown" 97 """Return a string giving the type of batch system in use""" 102 """Return a string giving the type of batch system in use""" 104 _batchType = batchType
108 """Function decorator to throw an MPI abort on an unhandled exception""" 110 def wrapper(*args, **kwargs):
112 return func(*args, **kwargs)
113 except Exception
as e:
114 sys.stderr.write(
"%s on %s in %s: %s\n" % (
type(e).__name__, NODE, func.__name__, e))
116 traceback.print_exc(file=sys.stderr)
120 mpi.COMM_WORLD.Abort(1)
127 """Singleton to hold what's about to be pickled. 129 We hold onto the object in case there's trouble pickling, 130 so we can figure out what class in particular is causing 133 The held object is in the 'obj' attribute. 135 Here we use the __new__-style singleton pattern, because 136 we specifically want __init__ to be called each time. 149 """Hold onto new object""" 157 """Drop held object if there were no problems""" 163 """Try to guess what's not pickling after an exception 165 This tends to work if the problem is coming from the 166 regular pickle module. If it's coming from the bowels 167 of mpi4py, there's not much that can be done. 170 excType, excValue, tb = sys.exc_info()
181 return stack[-2].tb_frame.f_locals[
"obj"]
188 """Context manager to sniff out pickle problems 190 If there's a pickle error, you're normally told what the problem 191 class is. However, all SWIG objects are reported as "SwigPyObject". 192 In order to figure out which actual SWIG-ed class is causing 193 problems, we need to go digging. 197 with pickleSniffer(): 198 someOperationInvolvingPickle() 200 If 'abort' is True, will call MPI abort in the event of problems. 204 except Exception
as e:
205 if "SwigPyObject" not in str(e)
or "pickle" not in str(e):
210 sys.stderr.write(
"Pickling error detected: %s\n" % e)
211 traceback.print_exc(file=sys.stderr)
214 if obj
is None and heldObj
is not None:
219 pickle.dumps(heldObj)
220 sys.stderr.write(
"Hmmm, that's strange: no problem with pickling held object?!?!\n")
224 sys.stderr.write(
"Unable to determine class causing pickle problems.\n")
226 sys.stderr.write(
"Object that could not be pickled: %s\n" % obj)
229 mpi.COMM_WORLD.Abort(1)
235 """Function decorator to catch errors in pickling and print something useful""" 237 def wrapper(*args, **kwargs):
239 return func(*args, **kwargs)
244 """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting. 246 As suggested by Lisandro Dalcin at: 247 * http://code.google.com/p/mpi4py/issues/detail?id=4 and 248 * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI 251 def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
252 """!Construct an MPI.Comm wrapper 255 @param comm MPI.Intracomm to wrap a duplicate of 256 @param recvSleep Sleep time (seconds) for recv() 257 @param barrierSleep Sleep time (seconds) for Barrier() 259 self = super(Comm, cls).
__new__(cls, comm.Dup())
265 def recv(self, obj=None, source=0, tag=0, status=None):
266 """Version of comm.recv() that doesn't busy-wait""" 268 while not self.Iprobe(source=source, tag=tag, status=sts):
270 return super(Comm, self).
recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
272 def send(self, obj=None, *args, **kwargs):
274 return super(Comm, self).
send(obj, *args, **kwargs)
276 def _checkBarrierComm(self):
277 """Ensure the duplicate communicator is available""" 282 """Version of comm.Barrier() that doesn't busy-wait 284 A duplicate communicator is used so as not to interfere with the user's own communications. 293 dst = (rank + mask) % size
294 src = (rank - mask + size) % size
304 return super(Comm, self).bcast(value, root=root)
307 """Scatter data across the nodes 309 The default version apparently pickles the entire 'dataList', 310 which can cause errors if the pickle size grows over 2^31 bytes 311 due to fundamental problems with pickle in python 2. Instead, 312 we send the data to each slave node in turn; this reduces the 315 @param dataList List of data to distribute; one per node 317 @param root Index of root node 318 @param tag Message tag (integer) 319 @return Data for this node 321 if self.Get_rank() == root:
322 for rank, data
in enumerate(dataList):
325 self.
send(data, rank, tag=tag)
326 return dataList[root]
328 return self.
recv(source=root, tag=tag)
333 super(Comm, self).
Free()
337 """Object to signal no operation""" 342 """Provides tag numbers by symbolic name in attributes""" 346 for i, name
in enumerate(nameList, 1):
347 setattr(self, name, i)
350 return self.__class__.__name__ + repr(self.
_nameList)
353 return self.__class__, tuple(self.
_nameList)
357 """An object to hold stuff between different scatter calls 359 Includes a communicator by default, to allow intercommunication 364 super(Cache, self).
__init__(comm=comm)
368 """!Metaclass to produce a singleton 370 Doing a singleton mixin without a metaclass (via __new__) is 371 annoying because the user has to name his __init__ something else 372 (otherwise it's called every time, which undoes any changes). 373 Using this metaclass, the class's __init__ is called exactly once. 375 Because this is a metaclass, note that: 376 * "self" here is the class 377 * "__init__" is making the class (it's like the body of the 379 * "__call__" is making an instance of the class (it's like 380 "__new__" in the class). 384 super(SingletonMeta, cls).
__init__(name, bases, dict_)
394 """Debug logger singleton 396 Disabled by default; to enable, do: 'Debugger().enabled = True' 397 You can also redirect the output by changing the 'out' attribute. 404 def log(self, source, msg, *args):
407 The 'args' are only stringified if we're enabled. 409 @param source: name of source 410 @param msg: message to write 411 @param args: additional outputs to append to message 414 self.
out.
write(
"%s: %s" % (source, msg))
421 """Thread to do reduction of results 423 "A thread?", you say. "What about the python GIL?" 424 Well, because we 'sleep' when there's no immediate response from the 425 slaves, that gives the thread a chance to fire; and threads are easier 426 to manage (e.g., shared memory) than a process. 428 def __init__(self, reducer, initial=None, sleep=0.1):
431 The 'reducer' should take two values and return a single 434 @param reducer Function that does the reducing 435 @param initial Initial value for reduction, or None 436 @param sleep Time to sleep when there's nothing to do (sec) 438 threading.Thread.__init__(self, name=
"reducer")
440 self.
_lock = threading.Lock()
444 self.
_done = threading.Event()
447 """Do the actual work 449 We pull the data out of the queue and release the lock before 450 operating on it. This stops us from blocking the addition of 451 new data to the queue. 462 Thread entry point, called by Thread.start 471 """Add data to the queue to be reduced""" 476 """Complete the thread 478 Unlike Thread.join (which always returns 'None'), we return the result 482 threading.Thread.join(self)
487 """Node in MPI process pool 489 WARNING: You should not let a pool instance hang around at program 490 termination, as the garbage collection behaves differently, and may 491 cause a segmentation fault (signal 11). 506 def _getCache(self, context, index):
507 """Retrieve cache for particular data 509 The cache is updated with the contents of the store. 511 if context
not in self.
_cache:
513 if context
not in self.
_store:
515 cache = self.
_cache[context]
516 store = self.
_store[context]
517 if index
not in cache:
519 cache[index].__dict__.update(store)
522 def log(self, msg, *args):
523 """Log a debugging message""" 529 def _processQueue(self, context, func, queue, *args, **kwargs):
530 """!Process a queue of data 532 The queue consists of a list of (index, data) tuples, 533 where the index maps to the cache, and the data is 534 passed to the 'func'. 536 The 'func' signature should be func(cache, data, *args, **kwargs) 537 if 'context' is non-None; otherwise func(data, *args, **kwargs). 539 @param context: Namespace for cache; None to not use cache 540 @param func: function for slaves to run 541 @param queue: List of (index,data) tuples to process 542 @param args: Constant arguments 543 @param kwargs: Keyword arguments 544 @return list of results from applying 'func' to dataList 546 return self.
_reduceQueue(context,
None, func, queue, *args, **kwargs)
548 def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
549 """!Reduce a queue of data 551 The queue consists of a list of (index, data) tuples, 552 where the index maps to the cache, and the data is 553 passed to the 'func', the output of which is reduced 554 using the 'reducer' (if non-None). 556 The 'func' signature should be func(cache, data, *args, **kwargs) 557 if 'context' is non-None; otherwise func(data, *args, **kwargs). 559 The 'reducer' signature should be reducer(old, new). If the 'reducer' 560 is None, then we will return the full list of results 562 @param context: Namespace for cache; None to not use cache 563 @param reducer: function for master to run to reduce slave results; or None 564 @param func: function for slaves to run 565 @param queue: List of (index,data) tuples to process 566 @param args: Constant arguments 567 @param kwargs: Keyword arguments 568 @return reduced result (if reducer is non-None) or list of results 569 from applying 'func' to dataList 571 if context
is not None:
572 resultList = [func(self.
_getCache(context, i), data, *args, **kwargs)
for i, data
in queue]
574 resultList = [func(data, *args, **kwargs)
for i, data
in queue]
577 if len(resultList) == 0:
579 output = resultList.pop(0)
580 for result
in resultList:
581 output = reducer(output, result)
585 """Set values in store for a particular context""" 586 self.
log(
"storing", context, kwargs)
587 if context
not in self.
_store:
589 for name, value
in kwargs.items():
590 self.
_store[context][name] = value
593 """Delete value in store for a particular context""" 594 self.
log(
"deleting from store", context, nameList)
595 if context
not in self.
_store:
596 raise KeyError(
"No such context: %s" % context)
597 for name
in nameList:
598 del self.
_store[context][name]
601 """Clear stored data for a particular context""" 602 self.
log(
"clearing store", context)
603 if context
not in self.
_store:
604 raise KeyError(
"No such context: %s" % context)
608 """Reset cache for a particular context""" 609 self.
log(
"clearing cache", context)
610 if context
not in self.
_cache:
615 """List contents of cache""" 616 cache = self.
_cache[context]
if context
in self.
_cache else {}
617 sys.stderr.write(
"Cache on %s (%s): %s\n" % (self.
node, context, cache))
620 """List contents of store for a particular context""" 621 if context
not in self.
_store:
622 raise KeyError(
"No such context: %s" % context)
623 sys.stderr.write(
"Store on %s (%s): %s\n" % (self.
node, context, self.
_store[context]))
627 """Master node instance of MPI process pool 629 Only the master node should instantiate this. 631 WARNING: You should not let a pool instance hang around at program 632 termination, as the garbage collection behaves differently, and may 633 cause a segmentation fault (signal 11). 637 super(PoolMaster, self).
__init__(*args, **kwargs)
638 assert self.
root == self.
rank,
"This is the master node" 641 """Ensure slaves exit when we're done""" 644 def log(self, msg, *args):
645 """Log a debugging message""" 649 """Send command to slaves 651 A command is the name of the PoolSlave method they should run. 653 self.
log(
"command", cmd)
654 self.
comm.broadcast(cmd, root=self.
root)
656 def map(self, context, func, dataList, *args, **kwargs):
657 """!Scatter work to slaves and gather the results 659 Work is distributed dynamically, so that slaves that finish 660 quickly will receive more work. 662 Each slave applies the function to the data they're provided. 663 The slaves may optionally be passed a cache instance, which 664 they can use to store data for subsequent executions (to ensure 665 subsequent data is distributed in the same pattern as before, 666 use the 'mapToPrevious' method). The cache also contains 667 data that has been stored on the slaves. 669 The 'func' signature should be func(cache, data, *args, **kwargs) 670 if 'context' is non-None; otherwise func(data, *args, **kwargs). 672 @param context: Namespace for cache 673 @param func: function for slaves to run; must be picklable 674 @param dataList: List of data to distribute to slaves; must be picklable 675 @param args: List of constant arguments 676 @param kwargs: Dict of constant arguments 677 @return list of results from applying 'func' to dataList 679 return self.
reduce(context,
None, func, dataList, *args, **kwargs)
683 def reduce(self, context, reducer, func, dataList, *args, **kwargs):
684 """!Scatter work to slaves and reduce the results 686 Work is distributed dynamically, so that slaves that finish 687 quickly will receive more work. 689 Each slave applies the function to the data they're provided. 690 The slaves may optionally be passed a cache instance, which 691 they can use to store data for subsequent executions (to ensure 692 subsequent data is distributed in the same pattern as before, 693 use the 'mapToPrevious' method). The cache also contains 694 data that has been stored on the slaves. 696 The 'func' signature should be func(cache, data, *args, **kwargs) 697 if 'context' is non-None; otherwise func(data, *args, **kwargs). 699 The 'reducer' signature should be reducer(old, new). If the 'reducer' 700 is None, then we will return the full list of results 702 @param context: Namespace for cache 703 @param reducer: function for master to run to reduce slave results; or None 704 @param func: function for slaves to run; must be picklable 705 @param dataList: List of data to distribute to slaves; must be picklable 706 @param args: List of constant arguments 707 @param kwargs: Dict of constant arguments 708 @return reduced result (if reducer is non-None) or list of results 709 from applying 'func' to dataList 711 tags =
Tags(
"request",
"work")
718 return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
724 self.
comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.
root)
727 queue =
list(zip(range(num), dataList))
728 output = [
None]*num
if reducer
is None else None 729 initial = [
None if i == self.
rank else queue.pop(0)
if queue
else NoOp()
for 730 i
in range(self.
size)]
731 pending =
min(num, self.
size - 1)
732 self.
log(
"scatter initial jobs")
733 self.
comm.scatter(initial, root=self.
rank)
735 while queue
or pending > 0:
736 status = mpi.Status()
737 report = self.
comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
738 source = status.source
739 self.
log(
"gather from slave", source)
741 index, result = report
742 output[index] = result
746 self.
log(
"send job to slave", job[0], source)
750 self.
comm.send(job, source, tag=tags.work)
752 if reducer
is not None:
753 results = self.
comm.gather(
None, root=self.
root)
755 for rank
in range(self.
size):
756 if rank == self.
root:
758 output = reducer(output, results[rank])
if output
is not None else results[rank]
764 """!Scatter work to slaves and gather the results 766 Work is distributed statically, so there is no load balancing. 768 Each slave applies the function to the data they're provided. 769 The slaves may optionally be passed a cache instance, which 770 they can store data in for subsequent executions (to ensure 771 subsequent data is distributed in the same pattern as before, 772 use the 'mapToPrevious' method). The cache also contains 773 data that has been stored on the slaves. 775 The 'func' signature should be func(cache, data, *args, **kwargs) 776 if 'context' is true; otherwise func(data, *args, **kwargs). 778 @param context: Namespace for cache 779 @param func: function for slaves to run; must be picklable 780 @param dataList: List of data to distribute to slaves; must be picklable 781 @param args: List of constant arguments 782 @param kwargs: Dict of constant arguments 783 @return list of results from applying 'func' to dataList 785 return self.
reduceNoBalance(context,
None, func, dataList, *args, **kwargs)
790 """!Scatter work to slaves and reduce the results 792 Work is distributed statically, so there is no load balancing. 794 Each slave applies the function to the data they're provided. 795 The slaves may optionally be passed a cache instance, which 796 they can store data in for subsequent executions (to ensure 797 subsequent data is distributed in the same pattern as before, 798 use the 'mapToPrevious' method). The cache also contains 799 data that has been stored on the slaves. 801 The 'func' signature should be func(cache, data, *args, **kwargs) 802 if 'context' is true; otherwise func(data, *args, **kwargs). 804 The 'reducer' signature should be reducer(old, new). If the 'reducer' 805 is None, then we will return the full list of results 807 @param context: Namespace for cache 808 @param reducer: function for master to run to reduce slave results; or None 809 @param func: function for slaves to run; must be picklable 810 @param dataList: List of data to distribute to slaves; must be picklable 811 @param args: List of constant arguments 812 @param kwargs: Dict of constant arguments 813 @return reduced result (if reducer is non-None) or list of results 814 from applying 'func' to dataList 816 tags =
Tags(
"result",
"work")
818 if self.
size == 1
or num <= 1:
819 return self.
_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
825 self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
829 queue =
list(zip(range(num), dataList))
831 distribution = [[queue[i]]
for i
in range(num)]
832 distribution.insert(self.
rank, [])
833 for i
in range(num, self.
size - 1):
834 distribution.append([])
835 elif num % self.
size == 0:
836 numEach = num//self.
size 837 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
size)]
839 numEach = num//self.
size 840 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
size)]
841 for i
in range(numEach*self.
size, num):
842 distribution[(self.
rank + 1) % self.
size].append
843 distribution =
list([]
for i
in range(self.
size))
844 for i, job
in enumerate(queue, self.
rank + 1):
848 for source
in range(self.
size):
849 if source == self.
rank:
851 self.
log(
"send jobs to ", source)
852 self.
comm.send(distribution[source], source, tag=tags.work)
855 output = [
None]*num
if reducer
is None else None 857 def ingestResults(output, nodeResults, distList):
859 for i, result
in enumerate(nodeResults):
860 index = distList[i][0]
861 output[index] = result
864 output = nodeResults.pop(0)
865 for result
in nodeResults:
866 output = reducer(output, result)
869 ourResults = self.
_processQueue(context, func, distribution[self.
rank], *args, **kwargs)
870 output = ingestResults(output, ourResults, distribution[self.
rank])
873 pending = self.
size - 1
875 status = mpi.Status()
876 slaveResults = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
877 source = status.source
878 self.
log(
"gather from slave", source)
879 output = ingestResults(output, slaveResults, distribution[source])
886 """!Scatter work to the same target as before 888 Work is distributed so that each slave handles the same 889 indices in the dataList as when 'map' was called. 890 This allows the right data to go to the right cache. 892 It is assumed that the dataList is the same length as when it was 895 The 'func' signature should be func(cache, data, *args, **kwargs). 897 @param context: Namespace for cache 898 @param func: function for slaves to run; must be picklable 899 @param dataList: List of data to distribute to slaves; must be picklable 900 @param args: List of constant arguments 901 @param kwargs: Dict of constant arguments 902 @return list of results from applying 'func' to dataList 904 return self.
reduceToPrevious(context,
None, func, dataList, *args, **kwargs)
909 """!Reduction where work goes to the same target as before 911 Work is distributed so that each slave handles the same 912 indices in the dataList as when 'map' was called. 913 This allows the right data to go to the right cache. 915 It is assumed that the dataList is the same length as when it was 918 The 'func' signature should be func(cache, data, *args, **kwargs). 920 The 'reducer' signature should be reducer(old, new). If the 'reducer' 921 is None, then we will return the full list of results 923 @param context: Namespace for cache 924 @param reducer: function for master to run to reduce slave results; or None 925 @param func: function for slaves to run; must be picklable 926 @param dataList: List of data to distribute to slaves; must be picklable 927 @param args: List of constant arguments 928 @param kwargs: Dict of constant arguments 929 @return reduced result (if reducer is non-None) or list of results 930 from applying 'func' to dataList 933 raise ValueError(
"context must be set to map to same nodes as previous context")
934 tags =
Tags(
"result",
"work")
936 if self.
size == 1
or num <= 1:
938 return self.
_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
941 return self.
reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
947 self.
comm.broadcast((tags, func, args, kwargs, context), root=self.
root)
949 requestList = self.
comm.gather(
None, root=self.
root)
950 self.
log(
"listen", requestList)
951 initial = [dataList[index]
if (index
is not None and index >= 0)
else None for index
in requestList]
952 self.
log(
"scatter jobs", initial)
953 self.
comm.scatter(initial, root=self.
root)
954 pending =
min(num, self.
size - 1)
963 status = mpi.Status()
964 index, result, nextIndex = self.
comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
965 source = status.source
966 self.
log(
"gather from slave", source)
968 output[index] = result
973 job = dataList[nextIndex]
974 self.
log(
"send job to slave", source)
975 self.
comm.send(job, source, tag=tags.work)
979 self.
log(
"waiting on", pending)
981 if reducer
is not None:
982 output = thread.join()
990 """!Store data on slave for a particular context 992 The data is made available to functions through the cache. The 993 stored data differs from the cache in that it is identical for 994 all operations, whereas the cache is specific to the data being 997 @param context: namespace for store 998 @param kwargs: dict of name=value pairs 1000 super(PoolMaster, self).
storeSet(context, **kwargs)
1002 self.
log(
"give data")
1003 self.
comm.broadcast((context, kwargs), root=self.
root)
1008 """Delete stored data on slave for a particular context""" 1009 super(PoolMaster, self).
storeDel(context, *nameList)
1011 self.
log(
"tell names")
1012 self.
comm.broadcast((context, nameList), root=self.
root)
1017 """Reset data store for a particular context on master and slaves""" 1020 self.
comm.broadcast(context, root=self.
root)
1024 """Reset cache for a particular context on master and slaves""" 1027 self.
comm.broadcast(context, root=self.
root)
1031 """List cache contents for a particular context on master and slaves""" 1032 super(PoolMaster, self).
cacheList(context)
1034 self.
comm.broadcast(context, root=self.
root)
1038 """List store contents for a particular context on master and slaves""" 1039 super(PoolMaster, self).
storeList(context)
1041 self.
comm.broadcast(context, root=self.
root)
1044 """Command slaves to exit""" 1049 """Slave node instance of MPI process pool""" 1052 """Log a debugging message""" 1053 assert self.
rank != self.
root,
"This is not the master node." 1058 """Serve commands of master node 1060 Slave accepts commands, which are the names of methods to execute. 1061 This exits when a command returns a true value. 1063 menu = dict((cmd, getattr(self, cmd))
for cmd
in (
"reduce",
"mapNoBalance",
"mapToPrevious",
1064 "storeSet",
"storeDel",
"storeClear",
"storeList",
1065 "cacheList",
"cacheClear",
"exit",))
1066 self.
log(
"waiting for command from", self.
root)
1067 command = self.
comm.broadcast(
None, root=self.
root)
1068 self.
log(
"command", command)
1069 while not menu[command]():
1070 self.
log(
"waiting for command from", self.
root)
1071 command = self.
comm.broadcast(
None, root=self.
root)
1072 self.
log(
"command", command)
1077 """Reduce scattered data and return results""" 1078 self.
log(
"waiting for instruction")
1079 tags, func, reducer, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1080 self.
log(
"waiting for job")
1081 job = self.
comm.scatter(
None, root=self.
root)
1084 while not isinstance(job, NoOp):
1086 self.
log(
"running job")
1087 result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1089 report = (index, result)
1092 out = reducer(out, result)
if out
is not None else result
1093 self.
comm.send(report, self.
root, tag=tags.request)
1094 self.
log(
"waiting for job")
1095 job = self.
comm.recv(tag=tags.work, source=self.
root)
1097 if reducer
is not None:
1098 self.
comm.gather(out, root=self.
root)
1103 """Process bulk scattered data and return results""" 1104 self.
log(
"waiting for instruction")
1105 tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1106 self.
log(
"waiting for job")
1107 queue = self.
comm.recv(tag=tags.work, source=self.
root)
1110 for index, data
in queue:
1111 self.
log(
"running job", index)
1112 result = self.
_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1113 resultList.append(result)
1115 self.
comm.send(resultList, self.
root, tag=tags.result)
1120 """Process the same scattered data processed previously""" 1121 self.
log(
"waiting for instruction")
1122 tags, func, args, kwargs, context = self.
comm.broadcast(
None, root=self.
root)
1124 index = queue.pop(0)
if queue
else -1
1125 self.
log(
"request job", index)
1126 self.
comm.gather(index, root=self.
root)
1127 self.
log(
"waiting for job")
1128 data = self.
comm.scatter(
None, root=self.
root)
1131 self.
log(
"running job")
1132 result = func(self.
_getCache(context, index), data, *args, **kwargs)
1133 self.
log(
"pending", queue)
1134 nextIndex = queue.pop(0)
if queue
else -1
1135 self.
comm.send((index, result, nextIndex), self.
root, tag=tags.result)
1138 data = self.
comm.recv(tag=tags.work, source=self.
root)
1143 """Set value in store""" 1144 context, kwargs = self.
comm.broadcast(
None, root=self.
root)
1145 super(PoolSlave, self).
storeSet(context, **kwargs)
1148 """Delete value in store""" 1149 context, nameList = self.
comm.broadcast(
None, root=self.
root)
1150 super(PoolSlave, self).
storeDel(context, *nameList)
1153 """Reset data store""" 1154 context = self.
comm.broadcast(
None, root=self.
root)
1159 context = self.
comm.broadcast(
None, root=self.
root)
1163 """List cache contents""" 1164 context = self.
comm.broadcast(
None, root=self.
root)
1165 super(PoolSlave, self).
cacheList(context)
1168 """List store contents""" 1169 context = self.
comm.broadcast(
None, root=self.
root)
1170 super(PoolSlave, self).
storeList(context)
1173 """Allow exit from loop in 'run'""" 1178 """Metaclass for PoolWrapper to add methods pointing to PoolMaster 1180 The 'context' is automatically supplied to these methods as the first argument. 1184 instance = super(PoolWrapperMeta, cls).
__call__(context)
1186 for name
in (
"map",
"mapNoBalance",
"mapToPrevious",
1187 "reduce",
"reduceNoBalance",
"reduceToPrevious",
1188 "storeSet",
"storeDel",
"storeClear",
"storeList",
1189 "cacheList",
"cacheClear",):
1190 setattr(instance, name, partial(getattr(pool, name), context))
1195 """Wrap PoolMaster to automatically provide context""" 1198 self.
_pool = PoolMaster._instance
1202 return getattr(self.
_pool, name)
1208 Use this class to automatically provide 'context' to 1209 the PoolMaster class. If you want to call functions 1210 that don't take a 'cache' object, use the PoolMaster 1211 class directly, and specify context=None. 1217 """!Start a process pool. 1219 Returns a PoolMaster object for the master node. 1220 Slave nodes are run and then optionally killed. 1222 If you elect not to kill the slaves, note that they 1223 will emerge at the point this function was called, 1224 which is likely very different from the point the 1225 master is at, so it will likely be necessary to put 1226 in some rank dependent code (e.g., look at the 'rank' 1227 attribute of the returned pools). 1229 Note that the pool objects should be deleted (either 1230 by going out of scope or explicit 'del') before program 1231 termination to avoid a segmentation fault. 1233 @param comm: MPI communicator 1234 @param root: Rank of root/master node 1235 @param killSlaves: Kill slaves on completion? 1239 if comm.rank == root:
1240 return PoolMaster(comm, root=root)
1241 slave = PoolSlave(comm, root=root)
def write(self, patchRef, catalog)
Write the output.
def _reduceQueue(self, context, reducer, func, queue, args, kwargs)
Reduce a queue of data.
def __new__(cls, hold=None)
def reduceToPrevious(self, context, reducer, func, dataList, args, kwargs)
Reduction where work goes to the same target as before.
def log(self, source, msg, args)
Log message.
def pickleInstanceMethod(method)
def send(self, obj=None, args, kwargs)
def storeSet(self, context, kwargs)
def unpickleFunction(moduleName, funcName)
def storeDel(self, context, nameList)
def __init__(self, args, kwargs)
def __init__(self, reducer, initial=None, sleep=0.1)
Constructor.
def _processQueue(self, context, func, queue, args, kwargs)
Process a queue of data.
def cacheList(self, context)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def __init__(self, hold=None)
def reduce(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
daf::base::PropertySet * set
def broadcast(self, value, root=0)
def mapToPrevious(self, context, func, dataList, args, kwargs)
Scatter work to the same target as before.
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
def unpickleInstanceMethod(obj, name)
def map(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
def storeClear(self, context)
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1)
Construct an MPI.Comm wrapper.
def __init__(self, comm=None, root=0)
def storeClear(self, context)
def storeDel(self, context, nameList)
def recv(self, obj=None, source=0, tag=0, status=None)
def _checkBarrierComm(self)
def pickleFunction(function)
def __exit__(self, excType, excVal, tb)
def cacheList(self, context)
def storeList(self, context)
def storeList(self, context)
def cacheClear(self, context)
def scatter(self, dataList, root=0, tag=0)
def storeSet(self, context, kwargs)
Store data on slave for a particular context.
def setBatchType(batchType)
def pickleSniffer(abort=False)
def __init__(self, context="default")
def mapNoBalance(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
def catchPicklingError(func)
def __getattr__(self, name)
daf::base::PropertyList * list
def reduceNoBalance(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
def _getCache(self, context, index)
def cacheClear(self, context)