24from functools
import wraps, partial
25from contextlib
import contextmanager
27import mpi4py.MPI
as mpi
32__all__ = [
"Comm",
"Pool",
"startPool",
"setBatchType",
"getBatchType",
"abortOnError",
"NODE", ]
34NODE =
"%s:%d" % (os.uname()[1], os.getpid())
38 """Unpickle an instance method
40 This has to be a named function rather than a lambda because
41 pickle needs to find it.
43 return getattr(obj, name)
47 """Pickle an instance method
49 The instance method is divided into the object
and the
53 name = method.__name__
54 return unpickleInstanceMethod, (obj, name)
57copyreg.pickle(types.MethodType, pickleInstanceMethod)
61 """Unpickle a function
63 This has to be a named function rather than a lambda because
64 pickle needs to find it.
67 module = importlib.import_module(moduleName)
68 return getattr(module, funcName)
74 This assumes that we can recreate the function object by grabbing
75 it from the proper module. This may be violated
if the function
76 is a
lambda or in __main__. In that case, I recommend recasting
77 the function
as an object
with a __call__ method.
79 Another problematic case may be a wrapped (e.g., decorated) method
80 in a
class: the
'method' is then a function,
and recreating it
is
81 not as easy
as we assume here.
83 moduleName = function.__module__
84 funcName = function.__name__
85 return unpickleFunction, (moduleName, funcName)
88copyreg.pickle(types.FunctionType, pickleFunction)
93 _batchType =
"unknown"
97 """Return a string giving the type of batch system in use"""
102 """Return a string giving the type of batch system in use"""
104 _batchType = batchType
108 """Function decorator to throw an MPI abort on an unhandled exception"""
110 def wrapper(*args, **kwargs):
112 return func(*args, **kwargs)
113 except Exception
as e:
114 sys.stderr.write(
"%s on %s in %s: %s\n" % (
type(e).__name__, NODE, func.__name__, e))
116 traceback.print_exc(file=sys.stderr)
120 mpi.COMM_WORLD.Abort(1)
127 """Singleton to hold what's about to be pickled.
129 We hold onto the object in case there
's trouble pickling,
130 so we can figure out what class in particular is causing
133 The held object
is in the
'obj' attribute.
135 Here we use the __new__-style singleton pattern, because
136 we specifically want __init__ to be called each time.
149 """Hold onto new object"""
157 """Drop held object if there were no problems"""
163 """Try to guess what's not pickling after an exception
165 This tends to work if the problem
is coming
from the
166 regular pickle module. If it
's coming from the bowels
167 of mpi4py, there's not much that can be done.
170 excType, excValue, tb = sys.exc_info()
181 return stack[-2].tb_frame.f_locals[
"obj"]
188 """Context manager to sniff out pickle problems
190 If there's a pickle error, you're normally told what the problem
191 class is. However, all SWIG objects are reported
as "SwigPyObject".
192 In order to figure out which actual SWIG-ed
class is causing
193 problems, we need to go digging.
198 someOperationInvolvingPickle()
200 If
'abort' is True, will call MPI abort
in the event of problems.
204 except Exception
as e:
205 if "SwigPyObject" not in str(e)
or "pickle" not in str(e):
210 sys.stderr.write(
"Pickling error detected: %s\n" % e)
211 traceback.print_exc(file=sys.stderr)
214 if obj
is None and heldObj
is not None:
219 pickle.dumps(heldObj)
220 sys.stderr.write(
"Hmmm, that's strange: no problem with pickling held object?!?!\n")
224 sys.stderr.write(
"Unable to determine class causing pickle problems.\n")
226 sys.stderr.write(
"Object that could not be pickled: %s\n" % obj)
229 mpi.COMM_WORLD.Abort(1)
235 """Function decorator to catch errors in pickling and print something useful"""
237 def wrapper(*args, **kwargs):
239 return func(*args, **kwargs)
244 """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting.
246 As suggested by Lisandro Dalcin at:
247 * http://code.google.com/p/mpi4py/issues/detail?id=4 and
248 * https://groups.google.com/forum/?fromgroups=
251 def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
252 """!Construct an MPI.Comm wrapper
255 @param comm MPI.Intracomm to wrap a duplicate of
256 @param recvSleep Sleep time (seconds)
for recv()
257 @param barrierSleep Sleep time (seconds)
for Barrier()
259 self = super(Comm, cls).__new__(cls, comm.Dup())
265 def recv(self, obj=None, source=0, tag=0, status=None):
266 """Version of comm.recv() that doesn't busy-wait"""
268 while not self.Iprobe(source=source, tag=tag, status=sts):
270 return super(Comm, self).
recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
272 def send(self, obj=None, *args, **kwargs):
274 return super(Comm, self).
send(obj, *args, **kwargs)
276 def _checkBarrierComm(self):
277 """Ensure the duplicate communicator is available"""
282 """Version of comm.Barrier() that doesn't busy-wait
284 A duplicate communicator is used so
as not to interfere
with the user
's own communications.
293 dst = (rank + mask) % size
294 src = (rank - mask + size) % size
295 req = self.
_barrierComm_barrierComm.isend(
None, dst, tag)
296 while not self.
_barrierComm_barrierComm.Iprobe(src, tag):
304 return super(Comm, self).bcast(value, root=root)
307 """Scatter data across the nodes
309 The default version apparently pickles the entire 'dataList',
310 which can cause errors
if the pickle size grows over 2^31 bytes
311 due to fundamental problems
with pickle
in python 2. Instead,
312 we send the data to each slave node
in turn; this reduces the
315 @param dataList List of data to distribute; one per node
317 @param root Index of root node
318 @param tag Message tag (integer)
319 @return Data
for this node
321 if self.Get_rank() == root:
322 for rank, data
in enumerate(dataList):
325 self.
sendsend(data, rank, tag=tag)
326 return dataList[root]
328 return self.
recvrecv(source=root, tag=tag)
333 super(Comm, self).
Free()
337 """Object to signal no operation"""
342 """Provides tag numbers by symbolic name in attributes"""
346 for i, name
in enumerate(nameList, 1):
347 setattr(self, name, i)
350 return self.__class__.__name__ + repr(self.
_nameList_nameList)
353 return self.__class__, tuple(self.
_nameList_nameList)
357 """An object to hold stuff between different scatter calls
359 Includes a communicator by default, to allow intercommunication
364 super(Cache, self).
__init__(comm=comm)
368 """!Metaclass to produce a singleton
370 Doing a singleton mixin without a metaclass (via __new__) is
371 annoying because the user has to name his __init__ something
else
372 (otherwise it
's called every time, which undoes any changes).
373 Using this metaclass, the class's __init__ is called exactly once.
375 Because this is a metaclass, note that:
376 *
"self" here
is the
class
377 *
"__init__" is making the
class (it
's like the body of the
379 *
"__call__" is making an instance of the
class (it
's like
380 "__new__" in the
class).
384 super(SingletonMeta, cls).
__init__(name, bases, dict_)
394 """Debug logger singleton
396 Disabled by default; to enable, do: 'Debugger().enabled = True'
397 You can also redirect the output by changing the
'out' attribute.
404 def log(self, source, msg, *args):
407 The 'args' are only stringified
if we
're enabled.
409 @param source: name of source
410 @param msg: message to write
411 @param args: additional outputs to append to message
414 self.
outout.
write(
"%s: %s" % (source, msg))
421 """Thread to do reduction of results
423 "A thread?", you say.
"What about the python GIL?"
424 Well, because we
'sleep' when there
's no immediate response from the
425 slaves, that gives the thread a chance to fire; and threads are easier
426 to manage (e.g., shared memory) than a process.
428 def __init__(self, reducer, initial=None, sleep=0.1):
431 The 'reducer' should take two values
and return a single
434 @param reducer Function that does the reducing
435 @param initial Initial value
for reduction,
or None
436 @param sleep Time to sleep when there
's nothing to do (sec)
438 threading.Thread.__init__(self, name="reducer")
440 self.
_lock_lock = threading.Lock()
444 self.
_done_done = threading.Event()
447 """Do the actual work
449 We pull the data out of the queue and release the lock before
450 operating on it. This stops us
from blocking the addition of
451 new data to the queue.
453 with self.
_lock_lock:
462 Thread entry point, called by Thread.start
471 """Add data to the queue to be reduced"""
472 with self.
_lock_lock:
476 """Complete the thread
478 Unlike Thread.join (which always returns 'None'), we
return the result
482 threading.Thread.join(self)
487 """Node in MPI process pool
489 WARNING: You should not let a pool instance hang around at program
490 termination,
as the garbage collection behaves differently,
and may
491 cause a segmentation fault (signal 11).
506 def _getCache(self, context, index):
507 """Retrieve cache for particular data
509 The cache is updated
with the contents of the store.
511 if context
not in self.
_cache_cache:
512 self.
_cache_cache[context] = {}
513 if context
not in self.
_store_store:
514 self.
_store_store[context] = {}
515 cache = self.
_cache_cache[context]
516 store = self.
_store_store[context]
517 if index
not in cache:
519 cache[index].__dict__.update(store)
522 def log(self, msg, *args):
523 """Log a debugging message"""
527 return self.
rankrank == self.
rootroot
529 def _processQueue(self, context, func, queue, *args, **kwargs):
530 """!Process a queue of data
532 The queue consists of a list of (index, data) tuples,
533 where the index maps to the cache, and the data
is
534 passed to the
'func'.
536 The
'func' signature should be func(cache, data, *args, **kwargs)
537 if 'context' is non-
None; otherwise func(data, *args, **kwargs).
539 @param context: Namespace
for cache;
None to
not use cache
540 @param func: function
for slaves to run
541 @param queue: List of (index,data) tuples to process
542 @param args: Constant arguments
543 @param kwargs: Keyword arguments
544 @return list of results
from applying
'func' to dataList
546 return self.
_reduceQueue_reduceQueue(context,
None, func, queue, *args, **kwargs)
548 def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
549 """!Reduce a queue of data
551 The queue consists of a list of (index, data) tuples,
552 where the index maps to the cache, and the data
is
553 passed to the
'func', the output of which
is reduced
554 using the
'reducer' (
if non-
None).
556 The
'func' signature should be func(cache, data, *args, **kwargs)
557 if 'context' is non-
None; otherwise func(data, *args, **kwargs).
559 The
'reducer' signature should be reducer(old, new). If the
'reducer'
560 is None, then we will
return the full list of results
562 @param context: Namespace
for cache;
None to
not use cache
563 @param reducer: function
for master to run to reduce slave results;
or None
564 @param func: function
for slaves to run
565 @param queue: List of (index,data) tuples to process
566 @param args: Constant arguments
567 @param kwargs: Keyword arguments
568 @return reduced result (
if reducer
is non-
None)
or list of results
569 from applying
'func' to dataList
571 if context
is not None:
572 resultList = [func(self.
_getCache_getCache(context, i), data, *args, **kwargs)
for i, data
in queue]
574 resultList = [func(data, *args, **kwargs)
for i, data
in queue]
577 if len(resultList) == 0:
579 output = resultList.pop(0)
580 for result
in resultList:
581 output = reducer(output, result)
585 """Set values in store for a particular context"""
586 self.
loglog(
"storing", context, kwargs)
587 if context
not in self.
_store_store:
588 self.
_store_store[context] = {}
589 for name, value
in kwargs.items():
590 self.
_store_store[context][name] = value
593 """Delete value in store for a particular context"""
594 self.
loglog(
"deleting from store", context, nameList)
595 if context
not in self.
_store_store:
596 raise KeyError(
"No such context: %s" % context)
597 for name
in nameList:
598 del self.
_store_store[context][name]
601 """Clear stored data for a particular context"""
602 self.
loglog(
"clearing store", context)
603 if context
not in self.
_store_store:
604 raise KeyError(
"No such context: %s" % context)
605 self.
_store_store[context] = {}
608 """Reset cache for a particular context"""
609 self.
loglog(
"clearing cache", context)
610 if context
not in self.
_cache_cache:
612 self.
_cache_cache[context] = {}
615 """List contents of cache"""
616 cache = self.
_cache_cache[context]
if context
in self.
_cache_cache
else {}
617 sys.stderr.write(
"Cache on %s (%s): %s\n" % (self.
nodenode, context, cache))
620 """List contents of store for a particular context"""
621 if context
not in self.
_store_store:
622 raise KeyError(
"No such context: %s" % context)
623 sys.stderr.write(
"Store on %s (%s): %s\n" % (self.
nodenode, context, self.
_store_store[context]))
627 """Master node instance of MPI process pool
629 Only the master node should instantiate this.
631 WARNING: You should not let a pool instance hang around at program
632 termination,
as the garbage collection behaves differently,
and may
633 cause a segmentation fault (signal 11).
637 super(PoolMaster, self).
__init__(*args, **kwargs)
641 """Ensure slaves exit when we're done"""
644 def log(self, msg, *args):
645 """Log a debugging message"""
649 """Send command to slaves
651 A command is the name of the PoolSlave method they should run.
653 self.logloglog("command", cmd)
656 def map(self, context, func, dataList, *args, **kwargs):
657 """!Scatter work to slaves and gather the results
659 Work is distributed dynamically, so that slaves that finish
660 quickly will receive more work.
662 Each slave applies the function to the data they
're provided.
663 The slaves may optionally be passed a cache instance, which
664 they can use to store data for subsequent executions (to ensure
665 subsequent data
is distributed
in the same pattern
as before,
666 use the
'mapToPrevious' method). The cache also contains
667 data that has been stored on the slaves.
669 The
'func' signature should be func(cache, data, *args, **kwargs)
670 if 'context' is non-
None; otherwise func(data, *args, **kwargs).
672 @param context: Namespace
for cache
673 @param func: function
for slaves to run; must be picklable
674 @param dataList: List of data to distribute to slaves; must be picklable
675 @param args: List of constant arguments
676 @param kwargs: Dict of constant arguments
677 @return list of results
from applying
'func' to dataList
679 return self.
reducereduce(context,
None, func, dataList, *args, **kwargs)
683 def reduce(self, context, reducer, func, dataList, *args, **kwargs):
684 """!Scatter work to slaves and reduce the results
686 Work is distributed dynamically, so that slaves that finish
687 quickly will receive more work.
689 Each slave applies the function to the data they
're provided.
690 The slaves may optionally be passed a cache instance, which
691 they can use to store data for subsequent executions (to ensure
692 subsequent data
is distributed
in the same pattern
as before,
693 use the
'mapToPrevious' method). The cache also contains
694 data that has been stored on the slaves.
696 The
'func' signature should be func(cache, data, *args, **kwargs)
697 if 'context' is non-
None; otherwise func(data, *args, **kwargs).
699 The
'reducer' signature should be reducer(old, new). If the
'reducer'
700 is None, then we will
return the full list of results
702 @param context: Namespace
for cache
703 @param reducer: function
for master to run to reduce slave results;
or None
704 @param func: function
for slaves to run; must be picklable
705 @param dataList: List of data to distribute to slaves; must be picklable
706 @param args: List of constant arguments
707 @param kwargs: Dict of constant arguments
708 @return reduced result (
if reducer
is non-
None)
or list of results
709 from applying
'func' to dataList
711 tags = Tags("request",
"work")
714 return self.
_reduceQueue_reduceQueue(context, reducer, func,
list(zip(
list(range(num)), dataList)),
718 return self.
reduceNoBalancereduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
723 self.
logloglog(
"instruct")
724 self.
commcomm.broadcast((tags, func, reducer, args, kwargs, context), root=self.
rootrootroot)
727 queue =
list(zip(range(num), dataList))
728 output = [
None]*num
if reducer
is None else None
729 initial = [
None if i == self.
rankrank
else queue.pop(0)
if queue
else NoOp()
for
732 self.
logloglog(
"scatter initial jobs")
733 self.
commcomm.scatter(initial, root=self.
rankrank)
735 while queue
or pending > 0:
736 status = mpi.Status()
737 report = self.
commcomm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
738 source = status.source
739 self.
logloglog(
"gather from slave", source)
741 index, result = report
742 output[index] = result
746 self.
logloglog(
"send job to slave", job[0], source)
750 self.
commcomm.send(job, source, tag=tags.work)
752 if reducer
is not None:
753 results = self.
commcomm.gather(
None, root=self.
rootrootroot)
755 for rank
in range(self.
sizesizesize):
758 output = reducer(output, results[rank])
if output
is not None else results[rank]
764 """!Scatter work to slaves and gather the results
766 Work is distributed statically, so there
is no load balancing.
768 Each slave applies the function to the data they
're provided.
769 The slaves may optionally be passed a cache instance, which
770 they can store data in for subsequent executions (to ensure
771 subsequent data
is distributed
in the same pattern
as before,
772 use the
'mapToPrevious' method). The cache also contains
773 data that has been stored on the slaves.
775 The
'func' signature should be func(cache, data, *args, **kwargs)
776 if 'context' is true; otherwise func(data, *args, **kwargs).
778 @param context: Namespace
for cache
779 @param func: function
for slaves to run; must be picklable
780 @param dataList: List of data to distribute to slaves; must be picklable
781 @param args: List of constant arguments
782 @param kwargs: Dict of constant arguments
783 @return list of results
from applying
'func' to dataList
785 return self.
reduceNoBalancereduceNoBalance(context,
None, func, dataList, *args, **kwargs)
790 """!Scatter work to slaves and reduce the results
792 Work is distributed statically, so there
is no load balancing.
794 Each slave applies the function to the data they
're provided.
795 The slaves may optionally be passed a cache instance, which
796 they can store data in for subsequent executions (to ensure
797 subsequent data
is distributed
in the same pattern
as before,
798 use the
'mapToPrevious' method). The cache also contains
799 data that has been stored on the slaves.
801 The
'func' signature should be func(cache, data, *args, **kwargs)
802 if 'context' is true; otherwise func(data, *args, **kwargs).
804 The
'reducer' signature should be reducer(old, new). If the
'reducer'
805 is None, then we will
return the full list of results
807 @param context: Namespace
for cache
808 @param reducer: function
for master to run to reduce slave results;
or None
809 @param func: function
for slaves to run; must be picklable
810 @param dataList: List of data to distribute to slaves; must be picklable
811 @param args: List of constant arguments
812 @param kwargs: Dict of constant arguments
813 @return reduced result (
if reducer
is non-
None)
or list of results
814 from applying
'func' to dataList
816 tags = Tags("result",
"work")
818 if self.
sizesizesize == 1
or num <= 1:
819 return self.
_reduceQueue_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
821 self.
commandcommand(
"mapNoBalance")
824 self.
logloglog(
"instruct")
825 self.
commcomm.broadcast((tags, func, args, kwargs, context), root=self.
rootrootroot)
829 queue =
list(zip(range(num), dataList))
831 distribution = [[queue[i]]
for i
in range(num)]
832 distribution.insert(self.
rankrank, [])
833 for i
in range(num, self.
sizesizesize - 1):
834 distribution.append([])
837 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
sizesizesize)]
840 distribution = [queue[i*numEach:(i+1)*numEach]
for i
in range(self.
sizesizesize)]
841 for i
in range(numEach*self.
sizesizesize, num):
842 distribution[(self.
rankrank + 1) % self.
sizesizesize].append
843 distribution =
list([]
for i
in range(self.
sizesizesize))
844 for i, job
in enumerate(queue, self.
rankrank + 1):
848 for source
in range(self.
sizesizesize):
849 if source == self.
rankrank:
851 self.
logloglog(
"send jobs to ", source)
852 self.
commcomm.send(distribution[source], source, tag=tags.work)
855 output = [
None]*num
if reducer
is None else None
857 def ingestResults(output, nodeResults, distList):
859 for i, result
in enumerate(nodeResults):
860 index = distList[i][0]
861 output[index] = result
864 output = nodeResults.pop(0)
865 for result
in nodeResults:
866 output = reducer(output, result)
869 ourResults = self.
_processQueue_processQueue(context, func, distribution[self.
rankrank], *args, **kwargs)
870 output = ingestResults(output, ourResults, distribution[self.
rankrank])
875 status = mpi.Status()
876 slaveResults = self.
commcomm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
877 source = status.source
878 self.
logloglog(
"gather from slave", source)
879 output = ingestResults(output, slaveResults, distribution[source])
886 """!Scatter work to the same target as before
888 Work is distributed so that each slave handles the same
889 indices
in the dataList
as when
'map' was called.
890 This allows the right data to go to the right cache.
892 It
is assumed that the dataList
is the same length
as when it was
895 The
'func' signature should be func(cache, data, *args, **kwargs).
897 @param context: Namespace
for cache
898 @param func: function
for slaves to run; must be picklable
899 @param dataList: List of data to distribute to slaves; must be picklable
900 @param args: List of constant arguments
901 @param kwargs: Dict of constant arguments
902 @return list of results
from applying
'func' to dataList
904 return self.
reduceToPreviousreduceToPrevious(context,
None, func, dataList, *args, **kwargs)
909 """!Reduction where work goes to the same target as before
911 Work is distributed so that each slave handles the same
912 indices
in the dataList
as when
'map' was called.
913 This allows the right data to go to the right cache.
915 It
is assumed that the dataList
is the same length
as when it was
918 The
'func' signature should be func(cache, data, *args, **kwargs).
920 The
'reducer' signature should be reducer(old, new). If the
'reducer'
921 is None, then we will
return the full list of results
923 @param context: Namespace
for cache
924 @param reducer: function
for master to run to reduce slave results;
or None
925 @param func: function
for slaves to run; must be picklable
926 @param dataList: List of data to distribute to slaves; must be picklable
927 @param args: List of constant arguments
928 @param kwargs: Dict of constant arguments
929 @return reduced result (
if reducer
is non-
None)
or list of results
930 from applying
'func' to dataList
933 raise ValueError(
"context must be set to map to same nodes as previous context")
934 tags =
Tags(
"result",
"work")
936 if self.
sizesizesize == 1
or num <= 1:
938 return self.
_reduceQueue_reduceQueue(context, reducer, func,
list(zip(range(num), dataList)), *args, **kwargs)
941 return self.
reduceNoBalancereduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
943 self.
commandcommand(
"mapToPrevious")
946 self.
logloglog(
"instruct")
947 self.
commcomm.broadcast((tags, func, args, kwargs, context), root=self.
rootrootroot)
949 requestList = self.
commcomm.gather(
None, root=self.
rootrootroot)
950 self.
logloglog(
"listen", requestList)
951 initial = [dataList[index]
if (index
is not None and index >= 0)
else None for index
in requestList]
952 self.
logloglog(
"scatter jobs", initial)
963 status = mpi.Status()
964 index, result, nextIndex = self.
commcomm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
965 source = status.source
966 self.
logloglog(
"gather from slave", source)
968 output[index] = result
973 job = dataList[nextIndex]
974 self.
logloglog(
"send job to slave", source)
975 self.
commcomm.send(job, source, tag=tags.work)
979 self.
logloglog(
"waiting on", pending)
981 if reducer
is not None:
982 output = thread.join()
990 """!Store data on slave for a particular context
992 The data is made available to functions through the cache. The
993 stored data differs
from the cache
in that it
is identical
for
994 all operations, whereas the cache
is specific to the data being
997 @param context: namespace
for store
998 @param kwargs: dict of name=value pairs
1000 super(PoolMaster, self).storeSet(context, **kwargs)
1001 self.commandcommand("storeSet")
1002 self.
logloglog(
"give data")
1003 self.
commcomm.broadcast((context, kwargs), root=self.
rootrootroot)
1008 """Delete stored data on slave for a particular context"""
1009 super(PoolMaster, self).
storeDel(context, *nameList)
1010 self.
commandcommand(
"storeDel")
1011 self.
logloglog(
"tell names")
1012 self.
commcomm.broadcast((context, nameList), root=self.
rootrootroot)
1017 """Reset data store for a particular context on master and slaves"""
1019 self.
commandcommand(
"storeClear")
1020 self.
commcomm.broadcast(context, root=self.
rootrootroot)
1024 """Reset cache for a particular context on master and slaves"""
1026 self.
commandcommand(
"cacheClear")
1027 self.
commcomm.broadcast(context, root=self.
rootrootroot)
1031 """List cache contents for a particular context on master and slaves"""
1032 super(PoolMaster, self).
cacheList(context)
1033 self.
commandcommand(
"cacheList")
1034 self.
commcomm.broadcast(context, root=self.
rootrootroot)
1038 """List store contents for a particular context on master and slaves"""
1039 super(PoolMaster, self).
storeList(context)
1040 self.
commandcommand(
"storeList")
1041 self.
commcomm.broadcast(context, root=self.
rootrootroot)
1044 """Command slaves to exit"""
1049 """Slave node instance of MPI process pool"""
1052 """Log a debugging message"""
1053 assert self.
rankrank != self.
rootroot,
"This is not the master node."
1058 """Serve commands of master node
1060 Slave accepts commands, which are the names of methods to execute.
1061 This exits when a command returns a true value.
1063 menu = dict((cmd, getattr(self, cmd)) for cmd
in (
"reduce",
"mapNoBalance",
"mapToPrevious",
1064 "storeSet",
"storeDel",
"storeClear",
"storeList",
1065 "cacheList",
"cacheClear",
"exit",))
1066 self.
logloglog(
"waiting for command from", self.
rootroot)
1067 command = self.
commcomm.broadcast(
None, root=self.
rootroot)
1068 self.
logloglog(
"command", command)
1069 while not menu[command]():
1070 self.
logloglog(
"waiting for command from", self.
rootroot)
1071 command = self.
commcomm.broadcast(
None, root=self.
rootroot)
1072 self.
logloglog(
"command", command)
1073 self.
logloglog(
"exiting")
1077 """Reduce scattered data and return results"""
1078 self.
logloglog(
"waiting for instruction")
1079 tags, func, reducer, args, kwargs, context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1080 self.
logloglog(
"waiting for job")
1081 job = self.
commcomm.scatter(
None, root=self.
rootroot)
1084 while not isinstance(job, NoOp):
1086 self.
logloglog(
"running job")
1087 result = self.
_processQueue_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1089 report = (index, result)
1092 out = reducer(out, result)
if out
is not None else result
1093 self.
commcomm.send(report, self.
rootroot, tag=tags.request)
1094 self.
logloglog(
"waiting for job")
1095 job = self.
commcomm.recv(tag=tags.work, source=self.
rootroot)
1097 if reducer
is not None:
1098 self.
commcomm.gather(out, root=self.
rootroot)
1103 """Process bulk scattered data and return results"""
1104 self.
logloglog(
"waiting for instruction")
1105 tags, func, args, kwargs, context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1106 self.
logloglog(
"waiting for job")
1107 queue = self.
commcomm.recv(tag=tags.work, source=self.
rootroot)
1110 for index, data
in queue:
1111 self.
logloglog(
"running job", index)
1112 result = self.
_processQueue_processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1113 resultList.append(result)
1115 self.
commcomm.send(resultList, self.
rootroot, tag=tags.result)
1120 """Process the same scattered data processed previously"""
1121 self.
logloglog(
"waiting for instruction")
1122 tags, func, args, kwargs, context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1124 index = queue.pop(0)
if queue
else -1
1125 self.
logloglog(
"request job", index)
1126 self.
commcomm.gather(index, root=self.
rootroot)
1127 self.
logloglog(
"waiting for job")
1128 data = self.
commcomm.scatter(
None, root=self.
rootroot)
1131 self.
logloglog(
"running job")
1132 result = func(self.
_getCache_getCache(context, index), data, *args, **kwargs)
1133 self.
logloglog(
"pending", queue)
1134 nextIndex = queue.pop(0)
if queue
else -1
1135 self.
commcomm.send((index, result, nextIndex), self.
rootroot, tag=tags.result)
1138 data = self.
commcomm.recv(tag=tags.work, source=self.
rootroot)
1143 """Set value in store"""
1144 context, kwargs = self.
commcomm.broadcast(
None, root=self.
rootroot)
1145 super(PoolSlave, self).
storeSet(context, **kwargs)
1148 """Delete value in store"""
1149 context, nameList = self.
commcomm.broadcast(
None, root=self.
rootroot)
1150 super(PoolSlave, self).
storeDel(context, *nameList)
1153 """Reset data store"""
1154 context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1159 context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1163 """List cache contents"""
1164 context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1165 super(PoolSlave, self).
cacheList(context)
1168 """List store contents"""
1169 context = self.
commcomm.broadcast(
None, root=self.
rootroot)
1170 super(PoolSlave, self).
storeList(context)
1173 """Allow exit from loop in 'run'"""
1178 """Metaclass for PoolWrapper to add methods pointing to PoolMaster
1180 The 'context' is automatically supplied to these methods
as the first argument.
1184 instance = super(PoolWrapperMeta, cls).
__call__(context)
1186 for name
in (
"map",
"mapNoBalance",
"mapToPrevious",
1187 "reduce",
"reduceNoBalance",
"reduceToPrevious",
1188 "storeSet",
"storeDel",
"storeClear",
"storeList",
1189 "cacheList",
"cacheClear",):
1190 setattr(instance, name, partial(getattr(pool, name), context))
1195 """Wrap PoolMaster to automatically provide context"""
1198 self.
_pool_pool = PoolMaster._instance
1202 return getattr(self.
_pool_pool, name)
1208 Use this class to automatically provide
'context' to
1209 the PoolMaster
class. If you want to call functions
1210 that don
't take a 'cache
' object, use the PoolMaster
1211 class directly, and specify context=
None.
1217 """!Start a process pool.
1219 Returns a PoolMaster object for the master node.
1220 Slave nodes are run
and then optionally killed.
1222 If you elect
not to kill the slaves, note that they
1223 will emerge at the point this function was called,
1224 which
is likely very different
from the point the
1225 master
is at, so it will likely be necessary to put
1226 in some rank dependent code (e.g., look at the
'rank'
1227 attribute of the returned pools).
1229 Note that the pool objects should be deleted (either
1230 by going out of scope
or explicit
'del') before program
1231 termination to avoid a segmentation fault.
1233 @param comm: MPI communicator
1234 @param root: Rank of root/master node
1235 @param killSlaves: Kill slaves on completion?
1239 if comm.rank == root:
def send(self, obj=None, *args, **kwargs)
def recv(self, obj=None, source=0, tag=0, status=None)
def scatter(self, dataList, root=0, tag=0)
def _checkBarrierComm(self)
def broadcast(self, value, root=0)
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1)
Construct an MPI.Comm wrapper.
def log(self, source, msg, *args)
Log message.
def __exit__(self, excType, excVal, tb)
def __new__(cls, hold=None)
def __init__(self, hold=None)
def cacheClear(self, context)
def mapNoBalance(self, context, func, dataList, *args, **kwargs)
Scatter work to slaves and gather the results.
def mapToPrevious(self, context, func, dataList, *args, **kwargs)
Scatter work to the same target as before.
def storeDel(self, context, *nameList)
def storeClear(self, context)
def map(self, context, func, dataList, *args, **kwargs)
Scatter work to slaves and gather the results.
def reduce(self, context, reducer, func, dataList, *args, **kwargs)
Scatter work to slaves and reduce the results.
def __init__(self, *args, **kwargs)
def storeSet(self, context, **kwargs)
Store data on slave for a particular context.
def cacheList(self, context)
def reduceNoBalance(self, context, reducer, func, dataList, *args, **kwargs)
Scatter work to slaves and reduce the results.
def log(self, msg, *args)
def storeList(self, context)
def reduceToPrevious(self, context, reducer, func, dataList, *args, **kwargs)
Reduction where work goes to the same target as before.
def _getCache(self, context, index)
def storeSet(self, context, **kwargs)
def _processQueue(self, context, func, queue, *args, **kwargs)
def storeDel(self, context, *nameList)
def __init__(self, comm=None, root=0)
def cacheClear(self, context)
def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs)
def storeList(self, context)
def cacheList(self, context)
def log(self, msg, *args)
def storeClear(self, context)
def log(self, msg, *args)
def __init__(self, context="default")
def __getattr__(self, name)
def __init__(self, reducer, initial=None, sleep=0.1)
Constructor.
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def catchPicklingError(func)
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
def setBatchType(batchType)
def unpickleFunction(moduleName, funcName)
def pickleInstanceMethod(method)
def pickleSniffer(abort=False)
def unpickleInstanceMethod(obj, name)
def pickleFunction(function)
def write(self, patchRef, catalog)
Write the output.