LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
pool.py
Go to the documentation of this file.
1 # MPI process pool
2 # Copyright 2013 Paul A. Price
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/copyleft/gpl.html>
16 #
17 
18 import os
19 import sys
20 import time
21 import types
22 import copyreg
23 import threading
24 from functools import wraps, partial
25 from contextlib import contextmanager
26 
27 import mpi4py.MPI as mpi
28 
29 from lsst.pipe.base import Struct
30 from future.utils import with_metaclass
31 
32 
33 __all__ = ["Comm", "Pool", "startPool", "setBatchType", "getBatchType", "abortOnError", "NODE", ]
34 
35 NODE = "%s:%d" % (os.uname()[1], os.getpid()) # Name of node
36 
37 
38 def unpickleInstanceMethod(obj, name):
39  """Unpickle an instance method
40 
41  This has to be a named function rather than a lambda because
42  pickle needs to find it.
43  """
44  return getattr(obj, name)
45 
46 
48  """Pickle an instance method
49 
50  The instance method is divided into the object and the
51  method name.
52  """
53  obj = method.__self__
54  name = method.__name__
55  return unpickleInstanceMethod, (obj, name)
56 
57 
58 copyreg.pickle(types.MethodType, pickleInstanceMethod)
59 
60 
61 def unpickleFunction(moduleName, funcName):
62  """Unpickle a function
63 
64  This has to be a named function rather than a lambda because
65  pickle needs to find it.
66  """
67  import importlib
68  module = importlib.import_module(moduleName)
69  return getattr(module, funcName)
70 
71 
72 def pickleFunction(function):
73  """Pickle a function
74 
75  This assumes that we can recreate the function object by grabbing
76  it from the proper module. This may be violated if the function
77  is a lambda or in __main__. In that case, I recommend recasting
78  the function as an object with a __call__ method.
79 
80  Another problematic case may be a wrapped (e.g., decorated) method
81  in a class: the 'method' is then a function, and recreating it is
82  not as easy as we assume here.
83  """
84  moduleName = function.__module__
85  funcName = function.__name__
86  return unpickleFunction, (moduleName, funcName)
87 
88 
89 copyreg.pickle(types.FunctionType, pickleFunction)
90 
91 try:
92  _batchType
93 except NameError:
94  _batchType = "unknown"
95 
96 
98  """Return a string giving the type of batch system in use"""
99  return _batchType
100 
101 
102 def setBatchType(batchType):
103  """Return a string giving the type of batch system in use"""
104  global _batchType
105  _batchType = batchType
106 
107 
108 def abortOnError(func):
109  """Function decorator to throw an MPI abort on an unhandled exception"""
110  @wraps(func)
111  def wrapper(*args, **kwargs):
112  try:
113  return func(*args, **kwargs)
114  except Exception as e:
115  sys.stderr.write("%s on %s in %s: %s\n" % (type(e).__name__, NODE, func.__name__, e))
116  import traceback
117  traceback.print_exc(file=sys.stderr)
118  sys.stdout.flush()
119  sys.stderr.flush()
120  if getBatchType() is not None:
121  mpi.COMM_WORLD.Abort(1)
122  else:
123  raise
124  return wrapper
125 
126 
128  """Singleton to hold what's about to be pickled.
129 
130  We hold onto the object in case there's trouble pickling,
131  so we can figure out what class in particular is causing
132  the trouble.
133 
134  The held object is in the 'obj' attribute.
135 
136  Here we use the __new__-style singleton pattern, because
137  we specifically want __init__ to be called each time.
138  """
139 
140  _instance = None
141 
142  def __new__(cls, hold=None):
143  if cls._instance is None:
144  cls._instance = super(PickleHolder, cls).__new__(cls)
145  cls._instance.__init__(hold)
146  cls._instance.obj = None
147  return cls._instance
148 
149  def __init__(self, hold=None):
150  """Hold onto new object"""
151  if hold is not None:
152  self.obj = hold
153 
154  def __enter__(self):
155  pass
156 
157  def __exit__(self, excType, excVal, tb):
158  """Drop held object if there were no problems"""
159  if excType is None:
160  self.obj = None
161 
162 
164  """Try to guess what's not pickling after an exception
165 
166  This tends to work if the problem is coming from the
167  regular pickle module. If it's coming from the bowels
168  of mpi4py, there's not much that can be done.
169  """
170  import sys
171  excType, excValue, tb = sys.exc_info()
172  # Build a stack of traceback elements
173  stack = []
174  while tb:
175  stack.append(tb)
176  tb = tb.tb_next
177 
178  try:
179  # This is the code version of a my way to find what's not pickling in pdb.
180  # This should work if it's coming from the regular pickle module, and they
181  # haven't changed the variable names since python 2.7.3.
182  return stack[-2].tb_frame.f_locals["obj"]
183  except Exception:
184  return None
185 
186 
187 @contextmanager
188 def pickleSniffer(abort=False):
189  """Context manager to sniff out pickle problems
190 
191  If there's a pickle error, you're normally told what the problem
192  class is. However, all SWIG objects are reported as "SwigPyObject".
193  In order to figure out which actual SWIG-ed class is causing
194  problems, we need to go digging.
195 
196  Use like this:
197 
198  with pickleSniffer():
199  someOperationInvolvingPickle()
200 
201  If 'abort' is True, will call MPI abort in the event of problems.
202  """
203  try:
204  yield
205  except Exception as e:
206  if "SwigPyObject" not in str(e) or "pickle" not in str(e):
207  raise
208  import sys
209  import traceback
210 
211  sys.stderr.write("Pickling error detected: %s\n" % e)
212  traceback.print_exc(file=sys.stderr)
213  obj = guessPickleObj()
214  heldObj = PickleHolder().obj
215  if obj is None and heldObj is not None:
216  # Try to reproduce using what was being pickled using the regular pickle module,
217  # so we've got a chance of figuring out what the problem is.
218  import pickle
219  try:
220  pickle.dumps(heldObj)
221  sys.stderr.write("Hmmm, that's strange: no problem with pickling held object?!?!\n")
222  except Exception:
223  obj = guessPickleObj()
224  if obj is None:
225  sys.stderr.write("Unable to determine class causing pickle problems.\n")
226  else:
227  sys.stderr.write("Object that could not be pickled: %s\n" % obj)
228  if abort:
229  if getBatchType() is not None:
230  mpi.COMM_WORLD.Abort(1)
231  else:
232  sys.exit(1)
233 
234 
236  """Function decorator to catch errors in pickling and print something useful"""
237  @wraps(func)
238  def wrapper(*args, **kwargs):
239  with pickleSniffer(True):
240  return func(*args, **kwargs)
241  return wrapper
242 
243 
244 class Comm(mpi.Intracomm):
245  """Wrapper to mpi4py's MPI.Intracomm class to avoid busy-waiting.
246 
247  As suggested by Lisandro Dalcin at:
248  * http://code.google.com/p/mpi4py/issues/detail?id=4 and
249  * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI
250  """
251 
252  def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1):
253  """!Construct an MPI.Comm wrapper
254 
255  @param cls Class
256  @param comm MPI.Intracomm to wrap a duplicate of
257  @param recvSleep Sleep time (seconds) for recv()
258  @param barrierSleep Sleep time (seconds) for Barrier()
259  """
260  self = super(Comm, cls).__new__(cls, comm.Dup())
261  self._barrierComm = None # Duplicate communicator used for Barrier point-to-point checking
262  self._recvSleep = recvSleep
263  self._barrierSleep = barrierSleep
264  return self
265 
266  def recv(self, obj=None, source=0, tag=0, status=None):
267  """Version of comm.recv() that doesn't busy-wait"""
268  sts = mpi.Status()
269  while not self.Iprobe(source=source, tag=tag, status=sts):
270  time.sleep(self._recvSleep)
271  return super(Comm, self).recv(buf=obj, source=sts.source, tag=sts.tag, status=status)
272 
273  def send(self, obj=None, *args, **kwargs):
274  with PickleHolder(obj):
275  return super(Comm, self).send(obj, *args, **kwargs)
276 
277  def _checkBarrierComm(self):
278  """Ensure the duplicate communicator is available"""
279  if self._barrierComm is None:
280  self._barrierComm = self.Dup()
281 
282  def Barrier(self, tag=0):
283  """Version of comm.Barrier() that doesn't busy-wait
284 
285  A duplicate communicator is used so as not to interfere with the user's own communications.
286  """
287  self._checkBarrierComm()
288  size = self._barrierComm.Get_size()
289  if size == 1:
290  return
291  rank = self._barrierComm.Get_rank()
292  mask = 1
293  while mask < size:
294  dst = (rank + mask) % size
295  src = (rank - mask + size) % size
296  req = self._barrierComm.isend(None, dst, tag)
297  while not self._barrierComm.Iprobe(src, tag):
298  time.sleep(self._barrierSleep)
299  self._barrierComm.recv(None, src, tag)
300  req.Wait()
301  mask <<= 1
302 
303  def broadcast(self, value, root=0):
304  with PickleHolder(value):
305  return super(Comm, self).bcast(value, root=root)
306 
307  def scatter(self, dataList, root=0, tag=0):
308  """Scatter data across the nodes
309 
310  The default version apparently pickles the entire 'dataList',
311  which can cause errors if the pickle size grows over 2^31 bytes
312  due to fundamental problems with pickle in python 2. Instead,
313  we send the data to each slave node in turn; this reduces the
314  pickle size.
315 
316  @param dataList List of data to distribute; one per node
317  (including root)
318  @param root Index of root node
319  @param tag Message tag (integer)
320  @return Data for this node
321  """
322  if self.Get_rank() == root:
323  for rank, data in enumerate(dataList):
324  if rank == root:
325  continue
326  self.send(data, rank, tag=tag)
327  return dataList[root]
328  else:
329  return self.recv(source=root, tag=tag)
330 
331  def Free(self):
332  if self._barrierComm is not None:
333  self._barrierComm.Free()
334  super(Comm, self).Free()
335 
336 
337 class NoOp(object):
338  """Object to signal no operation"""
339  pass
340 
341 
342 class Tags(object):
343  """Provides tag numbers by symbolic name in attributes"""
344 
345  def __init__(self, *nameList):
346  self._nameList = nameList
347  for i, name in enumerate(nameList, 1):
348  setattr(self, name, i)
349 
350  def __repr__(self):
351  return self.__class__.__name__ + repr(self._nameList)
352 
353  def __reduce__(self):
354  return self.__class__, tuple(self._nameList)
355 
356 
357 class Cache(Struct):
358  """An object to hold stuff between different scatter calls
359 
360  Includes a communicator by default, to allow intercommunication
361  between nodes.
362  """
363 
364  def __init__(self, comm):
365  super(Cache, self).__init__(comm=comm)
366 
367 
369  """!Metaclass to produce a singleton
370 
371  Doing a singleton mixin without a metaclass (via __new__) is
372  annoying because the user has to name his __init__ something else
373  (otherwise it's called every time, which undoes any changes).
374  Using this metaclass, the class's __init__ is called exactly once.
375 
376  Because this is a metaclass, note that:
377  * "self" here is the class
378  * "__init__" is making the class (it's like the body of the
379  class definition).
380  * "__call__" is making an instance of the class (it's like
381  "__new__" in the class).
382  """
383 
384  def __init__(cls, name, bases, dict_):
385  super(SingletonMeta, cls).__init__(name, bases, dict_)
386  cls._instance = None
387 
388  def __call__(cls, *args, **kwargs):
389  if cls._instance is None:
390  cls._instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
391  return cls._instance
392 
393 
394 class Debugger(with_metaclass(SingletonMeta, object)):
395  """Debug logger singleton
396 
397  Disabled by default; to enable, do: 'Debugger().enabled = True'
398  You can also redirect the output by changing the 'out' attribute.
399  """
400 
401  def __init__(self):
402  self.enabled = False
403  self.out = sys.stderr
404 
405  def log(self, source, msg, *args):
406  """!Log message
407 
408  The 'args' are only stringified if we're enabled.
409 
410  @param source: name of source
411  @param msg: message to write
412  @param args: additional outputs to append to message
413  """
414  if self.enabled:
415  self.out.write("%s: %s" % (source, msg))
416  for arg in args:
417  self.out.write(" %s" % arg)
418  self.out.write("\n")
419 
420 
421 class ReductionThread(threading.Thread):
422  """Thread to do reduction of results
423 
424  "A thread?", you say. "What about the python GIL?"
425  Well, because we 'sleep' when there's no immediate response from the
426  slaves, that gives the thread a chance to fire; and threads are easier
427  to manage (e.g., shared memory) than a process.
428  """
429  def __init__(self, reducer, initial=None, sleep=0.1):
430  """!Constructor
431 
432  The 'reducer' should take two values and return a single
433  (reduced) value.
434 
435  @param reducer Function that does the reducing
436  @param initial Initial value for reduction, or None
437  @param sleep Time to sleep when there's nothing to do (sec)
438  """
439  threading.Thread.__init__(self, name="reducer")
440  self._queue = [] # Queue of stuff to be reduced
441  self._lock = threading.Lock() # Lock for the queue
442  self._reducer = reducer
443  self._sleep = sleep
444  self._result = initial # Final result
445  self._done = threading.Event() # Signal that everything is done
446 
447  def _doReduce(self):
448  """Do the actual work
449 
450  We pull the data out of the queue and release the lock before
451  operating on it. This stops us from blocking the addition of
452  new data to the queue.
453  """
454  with self._lock:
455  queue = self._queue
456  self._queue = []
457  for data in queue:
458  self._result = self._reducer(self._result, data) if self._result is not None else data
459 
460  def run(self):
461  """Do the work
462 
463  Thread entry point, called by Thread.start
464  """
465  while True:
466  self._doReduce()
467  if self._done.wait(self._sleep):
468  self._doReduce()
469  return
470 
471  def add(self, data):
472  """Add data to the queue to be reduced"""
473  with self._lock:
474  self._queue.append(data)
475 
476  def join(self):
477  """Complete the thread
478 
479  Unlike Thread.join (which always returns 'None'), we return the result
480  we calculated.
481  """
482  self._done.set()
483  threading.Thread.join(self)
484  return self._result
485 
486 
487 class PoolNode(with_metaclass(SingletonMeta, object)):
488  """Node in MPI process pool
489 
490  WARNING: You should not let a pool instance hang around at program
491  termination, as the garbage collection behaves differently, and may
492  cause a segmentation fault (signal 11).
493  """
494 
495  def __init__(self, comm=None, root=0):
496  if comm is None:
497  comm = Comm()
498  self.comm = comm
499  self.rank = self.comm.rank
500  self.root = root
501  self.size = self.comm.size
502  self._cache = {}
503  self._store = {}
505  self.node = NODE
506 
507  def _getCache(self, context, index):
508  """Retrieve cache for particular data
509 
510  The cache is updated with the contents of the store.
511  """
512  if context not in self._cache:
513  self._cache[context] = {}
514  if context not in self._store:
515  self._store[context] = {}
516  cache = self._cache[context]
517  store = self._store[context]
518  if index not in cache:
519  cache[index] = Cache(self.comm)
520  cache[index].__dict__.update(store)
521  return cache[index]
522 
523  def log(self, msg, *args):
524  """Log a debugging message"""
525  self.debugger.log("Node %d" % self.rank, msg, *args)
526 
527  def isMaster(self):
528  return self.rank == self.root
529 
530  def _processQueue(self, context, func, queue, *args, **kwargs):
531  """!Process a queue of data
532 
533  The queue consists of a list of (index, data) tuples,
534  where the index maps to the cache, and the data is
535  passed to the 'func'.
536 
537  The 'func' signature should be func(cache, data, *args, **kwargs)
538  if 'context' is non-None; otherwise func(data, *args, **kwargs).
539 
540  @param context: Namespace for cache; None to not use cache
541  @param func: function for slaves to run
542  @param queue: List of (index,data) tuples to process
543  @param args: Constant arguments
544  @param kwargs: Keyword arguments
545  @return list of results from applying 'func' to dataList
546  """
547  return self._reduceQueue(context, None, func, queue, *args, **kwargs)
548 
549  def _reduceQueue(self, context, reducer, func, queue, *args, **kwargs):
550  """!Reduce a queue of data
551 
552  The queue consists of a list of (index, data) tuples,
553  where the index maps to the cache, and the data is
554  passed to the 'func', the output of which is reduced
555  using the 'reducer' (if non-None).
556 
557  The 'func' signature should be func(cache, data, *args, **kwargs)
558  if 'context' is non-None; otherwise func(data, *args, **kwargs).
559 
560  The 'reducer' signature should be reducer(old, new). If the 'reducer'
561  is None, then we will return the full list of results
562 
563  @param context: Namespace for cache; None to not use cache
564  @param reducer: function for master to run to reduce slave results; or None
565  @param func: function for slaves to run
566  @param queue: List of (index,data) tuples to process
567  @param args: Constant arguments
568  @param kwargs: Keyword arguments
569  @return reduced result (if reducer is non-None) or list of results
570  from applying 'func' to dataList
571  """
572  if context is not None:
573  resultList = [func(self._getCache(context, i), data, *args, **kwargs) for i, data in queue]
574  else:
575  resultList = [func(data, *args, **kwargs) for i, data in queue]
576  if reducer is None:
577  return resultList
578  if len(resultList) == 0:
579  return None
580  output = resultList.pop(0)
581  for result in resultList:
582  output = reducer(output, result)
583  return output
584 
585  def storeSet(self, context, **kwargs):
586  """Set values in store for a particular context"""
587  self.log("storing", context, kwargs)
588  if context not in self._store:
589  self._store[context] = {}
590  for name, value in kwargs.items():
591  self._store[context][name] = value
592 
593  def storeDel(self, context, *nameList):
594  """Delete value in store for a particular context"""
595  self.log("deleting from store", context, nameList)
596  if context not in self._store:
597  raise KeyError("No such context: %s" % context)
598  for name in nameList:
599  del self._store[context][name]
600 
601  def storeClear(self, context):
602  """Clear stored data for a particular context"""
603  self.log("clearing store", context)
604  if context not in self._store:
605  raise KeyError("No such context: %s" % context)
606  self._store[context] = {}
607 
608  def cacheClear(self, context):
609  """Reset cache for a particular context"""
610  self.log("clearing cache", context)
611  if context not in self._cache:
612  return
613  self._cache[context] = {}
614 
615  def cacheList(self, context):
616  """List contents of cache"""
617  cache = self._cache[context] if context in self._cache else {}
618  sys.stderr.write("Cache on %s (%s): %s\n" % (self.node, context, cache))
619 
620  def storeList(self, context):
621  """List contents of store for a particular context"""
622  if context not in self._store:
623  raise KeyError("No such context: %s" % context)
624  sys.stderr.write("Store on %s (%s): %s\n" % (self.node, context, self._store[context]))
625 
626 
628  """Master node instance of MPI process pool
629 
630  Only the master node should instantiate this.
631 
632  WARNING: You should not let a pool instance hang around at program
633  termination, as the garbage collection behaves differently, and may
634  cause a segmentation fault (signal 11).
635  """
636 
637  def __init__(self, *args, **kwargs):
638  super(PoolMaster, self).__init__(*args, **kwargs)
639  assert self.root == self.rank, "This is the master node"
640 
641  def __del__(self):
642  """Ensure slaves exit when we're done"""
643  self.exit()
644 
645  def log(self, msg, *args):
646  """Log a debugging message"""
647  self.debugger.log("Master", msg, *args)
648 
649  def command(self, cmd):
650  """Send command to slaves
651 
652  A command is the name of the PoolSlave method they should run.
653  """
654  self.log("command", cmd)
655  self.comm.broadcast(cmd, root=self.root)
656 
657  def map(self, context, func, dataList, *args, **kwargs):
658  """!Scatter work to slaves and gather the results
659 
660  Work is distributed dynamically, so that slaves that finish
661  quickly will receive more work.
662 
663  Each slave applies the function to the data they're provided.
664  The slaves may optionally be passed a cache instance, which
665  they can use to store data for subsequent executions (to ensure
666  subsequent data is distributed in the same pattern as before,
667  use the 'mapToPrevious' method). The cache also contains
668  data that has been stored on the slaves.
669 
670  The 'func' signature should be func(cache, data, *args, **kwargs)
671  if 'context' is non-None; otherwise func(data, *args, **kwargs).
672 
673  @param context: Namespace for cache
674  @param func: function for slaves to run; must be picklable
675  @param dataList: List of data to distribute to slaves; must be picklable
676  @param args: List of constant arguments
677  @param kwargs: Dict of constant arguments
678  @return list of results from applying 'func' to dataList
679  """
680  return self.reduce(context, None, func, dataList, *args, **kwargs)
681 
682  @abortOnError
683  @catchPicklingError
684  def reduce(self, context, reducer, func, dataList, *args, **kwargs):
685  """!Scatter work to slaves and reduce the results
686 
687  Work is distributed dynamically, so that slaves that finish
688  quickly will receive more work.
689 
690  Each slave applies the function to the data they're provided.
691  The slaves may optionally be passed a cache instance, which
692  they can use to store data for subsequent executions (to ensure
693  subsequent data is distributed in the same pattern as before,
694  use the 'mapToPrevious' method). The cache also contains
695  data that has been stored on the slaves.
696 
697  The 'func' signature should be func(cache, data, *args, **kwargs)
698  if 'context' is non-None; otherwise func(data, *args, **kwargs).
699 
700  The 'reducer' signature should be reducer(old, new). If the 'reducer'
701  is None, then we will return the full list of results
702 
703  @param context: Namespace for cache
704  @param reducer: function for master to run to reduce slave results; or None
705  @param func: function for slaves to run; must be picklable
706  @param dataList: List of data to distribute to slaves; must be picklable
707  @param args: List of constant arguments
708  @param kwargs: Dict of constant arguments
709  @return reduced result (if reducer is non-None) or list of results
710  from applying 'func' to dataList
711  """
712  tags = Tags("request", "work")
713  num = len(dataList)
714  if self.size == 1 or num <= 1:
715  return self._reduceQueue(context, reducer, func, list(zip(list(range(num)), dataList)),
716  *args, **kwargs)
717  if self.size == num:
718  # We're shooting ourselves in the foot using dynamic distribution
719  return self.reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
720 
721  self.command("reduce")
722 
723  # Send function
724  self.log("instruct")
725  self.comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.root)
726 
727  # Parcel out first set of data
728  queue = list(zip(range(num), dataList)) # index, data
729  output = [None]*num if reducer is None else None
730  initial = [None if i == self.rank else queue.pop(0) if queue else NoOp() for
731  i in range(self.size)]
732  pending = min(num, self.size - 1)
733  self.log("scatter initial jobs")
734  self.comm.scatter(initial, root=self.rank)
735 
736  while queue or pending > 0:
737  status = mpi.Status()
738  report = self.comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
739  source = status.source
740  self.log("gather from slave", source)
741  if reducer is None:
742  index, result = report
743  output[index] = result
744 
745  if queue:
746  job = queue.pop(0)
747  self.log("send job to slave", job[0], source)
748  else:
749  job = NoOp()
750  pending -= 1
751  self.comm.send(job, source, tag=tags.work)
752 
753  if reducer is not None:
754  results = self.comm.gather(None, root=self.root)
755  output = None
756  for rank in range(self.size):
757  if rank == self.root:
758  continue
759  output = reducer(output, results[rank]) if output is not None else results[rank]
760 
761  self.log("done")
762  return output
763 
764  def mapNoBalance(self, context, func, dataList, *args, **kwargs):
765  """!Scatter work to slaves and gather the results
766 
767  Work is distributed statically, so there is no load balancing.
768 
769  Each slave applies the function to the data they're provided.
770  The slaves may optionally be passed a cache instance, which
771  they can store data in for subsequent executions (to ensure
772  subsequent data is distributed in the same pattern as before,
773  use the 'mapToPrevious' method). The cache also contains
774  data that has been stored on the slaves.
775 
776  The 'func' signature should be func(cache, data, *args, **kwargs)
777  if 'context' is true; otherwise func(data, *args, **kwargs).
778 
779  @param context: Namespace for cache
780  @param func: function for slaves to run; must be picklable
781  @param dataList: List of data to distribute to slaves; must be picklable
782  @param args: List of constant arguments
783  @param kwargs: Dict of constant arguments
784  @return list of results from applying 'func' to dataList
785  """
786  return self.reduceNoBalance(context, None, func, dataList, *args, **kwargs)
787 
788  @abortOnError
789  @catchPicklingError
790  def reduceNoBalance(self, context, reducer, func, dataList, *args, **kwargs):
791  """!Scatter work to slaves and reduce the results
792 
793  Work is distributed statically, so there is no load balancing.
794 
795  Each slave applies the function to the data they're provided.
796  The slaves may optionally be passed a cache instance, which
797  they can store data in for subsequent executions (to ensure
798  subsequent data is distributed in the same pattern as before,
799  use the 'mapToPrevious' method). The cache also contains
800  data that has been stored on the slaves.
801 
802  The 'func' signature should be func(cache, data, *args, **kwargs)
803  if 'context' is true; otherwise func(data, *args, **kwargs).
804 
805  The 'reducer' signature should be reducer(old, new). If the 'reducer'
806  is None, then we will return the full list of results
807 
808  @param context: Namespace for cache
809  @param reducer: function for master to run to reduce slave results; or None
810  @param func: function for slaves to run; must be picklable
811  @param dataList: List of data to distribute to slaves; must be picklable
812  @param args: List of constant arguments
813  @param kwargs: Dict of constant arguments
814  @return reduced result (if reducer is non-None) or list of results
815  from applying 'func' to dataList
816  """
817  tags = Tags("result", "work")
818  num = len(dataList)
819  if self.size == 1 or num <= 1:
820  return self._reduceQueue(context, reducer, func, list(zip(range(num), dataList)), *args, **kwargs)
821 
822  self.command("mapNoBalance")
823 
824  # Send function
825  self.log("instruct")
826  self.comm.broadcast((tags, func, args, kwargs, context), root=self.root)
827 
828  # Divide up the jobs
829  # Try to give root the least to do, so it also has time to manage
830  queue = list(zip(range(num), dataList)) # index, data
831  if num < self.size:
832  distribution = [[queue[i]] for i in range(num)]
833  distribution.insert(self.rank, [])
834  for i in range(num, self.size - 1):
835  distribution.append([])
836  elif num % self.size == 0:
837  numEach = num//self.size
838  distribution = [queue[i*numEach:(i+1)*numEach] for i in range(self.size)]
839  else:
840  numEach = num//self.size
841  distribution = [queue[i*numEach:(i+1)*numEach] for i in range(self.size)]
842  for i in range(numEach*self.size, num):
843  distribution[(self.rank + 1) % self.size].append
844  distribution = list([] for i in range(self.size))
845  for i, job in enumerate(queue, self.rank + 1):
846  distribution[i % self.size].append(job)
847 
848  # Distribute jobs
849  for source in range(self.size):
850  if source == self.rank:
851  continue
852  self.log("send jobs to ", source)
853  self.comm.send(distribution[source], source, tag=tags.work)
854 
855  # Execute our own jobs
856  output = [None]*num if reducer is None else None
857 
858  def ingestResults(output, nodeResults, distList):
859  if reducer is None:
860  for i, result in enumerate(nodeResults):
861  index = distList[i][0]
862  output[index] = result
863  return output
864  if output is None:
865  output = nodeResults.pop(0)
866  for result in nodeResults:
867  output = reducer(output, result)
868  return output
869 
870  ourResults = self._processQueue(context, func, distribution[self.rank], *args, **kwargs)
871  output = ingestResults(output, ourResults, distribution[self.rank])
872 
873  # Collect results
874  pending = self.size - 1
875  while pending > 0:
876  status = mpi.Status()
877  slaveResults = self.comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
878  source = status.source
879  self.log("gather from slave", source)
880  output = ingestResults(output, slaveResults, distribution[source])
881  pending -= 1
882 
883  self.log("done")
884  return output
885 
886  def mapToPrevious(self, context, func, dataList, *args, **kwargs):
887  """!Scatter work to the same target as before
888 
889  Work is distributed so that each slave handles the same
890  indices in the dataList as when 'map' was called.
891  This allows the right data to go to the right cache.
892 
893  It is assumed that the dataList is the same length as when it was
894  passed to 'map'.
895 
896  The 'func' signature should be func(cache, data, *args, **kwargs).
897 
898  @param context: Namespace for cache
899  @param func: function for slaves to run; must be picklable
900  @param dataList: List of data to distribute to slaves; must be picklable
901  @param args: List of constant arguments
902  @param kwargs: Dict of constant arguments
903  @return list of results from applying 'func' to dataList
904  """
905  return self.reduceToPrevious(context, None, func, dataList, *args, **kwargs)
906 
907  @abortOnError
908  @catchPicklingError
909  def reduceToPrevious(self, context, reducer, func, dataList, *args, **kwargs):
910  """!Reduction where work goes to the same target as before
911 
912  Work is distributed so that each slave handles the same
913  indices in the dataList as when 'map' was called.
914  This allows the right data to go to the right cache.
915 
916  It is assumed that the dataList is the same length as when it was
917  passed to 'map'.
918 
919  The 'func' signature should be func(cache, data, *args, **kwargs).
920 
921  The 'reducer' signature should be reducer(old, new). If the 'reducer'
922  is None, then we will return the full list of results
923 
924  @param context: Namespace for cache
925  @param reducer: function for master to run to reduce slave results; or None
926  @param func: function for slaves to run; must be picklable
927  @param dataList: List of data to distribute to slaves; must be picklable
928  @param args: List of constant arguments
929  @param kwargs: Dict of constant arguments
930  @return reduced result (if reducer is non-None) or list of results
931  from applying 'func' to dataList
932  """
933  if context is None:
934  raise ValueError("context must be set to map to same nodes as previous context")
935  tags = Tags("result", "work")
936  num = len(dataList)
937  if self.size == 1 or num <= 1:
938  # Can do everything here
939  return self._reduceQueue(context, reducer, func, list(zip(range(num), dataList)), *args, **kwargs)
940  if self.size == num:
941  # We're shooting ourselves in the foot using dynamic distribution
942  return self.reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
943 
944  self.command("mapToPrevious")
945 
946  # Send function
947  self.log("instruct")
948  self.comm.broadcast((tags, func, args, kwargs, context), root=self.root)
949 
950  requestList = self.comm.gather(None, root=self.root)
951  self.log("listen", requestList)
952  initial = [dataList[index] if (index is not None and index >= 0) else None for index in requestList]
953  self.log("scatter jobs", initial)
954  self.comm.scatter(initial, root=self.root)
955  pending = min(num, self.size - 1)
956 
957  if reducer is None:
958  output = [None]*num
959  else:
960  thread = ReductionThread(reducer)
961  thread.start()
962 
963  while pending > 0:
964  status = mpi.Status()
965  index, result, nextIndex = self.comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
966  source = status.source
967  self.log("gather from slave", source)
968  if reducer is None:
969  output[index] = result
970  else:
971  thread.add(result)
972 
973  if nextIndex >= 0:
974  job = dataList[nextIndex]
975  self.log("send job to slave", source)
976  self.comm.send(job, source, tag=tags.work)
977  else:
978  pending -= 1
979 
980  self.log("waiting on", pending)
981 
982  if reducer is not None:
983  output = thread.join()
984 
985  self.log("done")
986  return output
987 
988  @abortOnError
989  @catchPicklingError
990  def storeSet(self, context, **kwargs):
991  """!Store data on slave for a particular context
992 
993  The data is made available to functions through the cache. The
994  stored data differs from the cache in that it is identical for
995  all operations, whereas the cache is specific to the data being
996  operated upon.
997 
998  @param context: namespace for store
999  @param kwargs: dict of name=value pairs
1000  """
1001  super(PoolMaster, self).storeSet(context, **kwargs)
1002  self.command("storeSet")
1003  self.log("give data")
1004  self.comm.broadcast((context, kwargs), root=self.root)
1005  self.log("done")
1006 
1007  @abortOnError
1008  def storeDel(self, context, *nameList):
1009  """Delete stored data on slave for a particular context"""
1010  super(PoolMaster, self).storeDel(context, *nameList)
1011  self.command("storeDel")
1012  self.log("tell names")
1013  self.comm.broadcast((context, nameList), root=self.root)
1014  self.log("done")
1015 
1016  @abortOnError
1017  def storeClear(self, context):
1018  """Reset data store for a particular context on master and slaves"""
1019  super(PoolMaster, self).storeClear(context)
1020  self.command("storeClear")
1021  self.comm.broadcast(context, root=self.root)
1022 
1023  @abortOnError
1024  def cacheClear(self, context):
1025  """Reset cache for a particular context on master and slaves"""
1026  super(PoolMaster, self).cacheClear(context)
1027  self.command("cacheClear")
1028  self.comm.broadcast(context, root=self.root)
1029 
1030  @abortOnError
1031  def cacheList(self, context):
1032  """List cache contents for a particular context on master and slaves"""
1033  super(PoolMaster, self).cacheList(context)
1034  self.command("cacheList")
1035  self.comm.broadcast(context, root=self.root)
1036 
1037  @abortOnError
1038  def storeList(self, context):
1039  """List store contents for a particular context on master and slaves"""
1040  super(PoolMaster, self).storeList(context)
1041  self.command("storeList")
1042  self.comm.broadcast(context, root=self.root)
1043 
1044  def exit(self):
1045  """Command slaves to exit"""
1046  self.command("exit")
1047 
1048 
1050  """Slave node instance of MPI process pool"""
1051 
1052  def log(self, msg, *args):
1053  """Log a debugging message"""
1054  assert self.rank != self.root, "This is not the master node."
1055  self.debugger.log("Slave %d" % self.rank, msg, *args)
1056 
1057  @abortOnError
1058  def run(self):
1059  """Serve commands of master node
1060 
1061  Slave accepts commands, which are the names of methods to execute.
1062  This exits when a command returns a true value.
1063  """
1064  menu = dict((cmd, getattr(self, cmd)) for cmd in ("reduce", "mapNoBalance", "mapToPrevious",
1065  "storeSet", "storeDel", "storeClear", "storeList",
1066  "cacheList", "cacheClear", "exit",))
1067  self.log("waiting for command from", self.root)
1068  command = self.comm.broadcast(None, root=self.root)
1069  self.log("command", command)
1070  while not menu[command]():
1071  self.log("waiting for command from", self.root)
1072  command = self.comm.broadcast(None, root=self.root)
1073  self.log("command", command)
1074  self.log("exiting")
1075 
1076  @catchPicklingError
1077  def reduce(self):
1078  """Reduce scattered data and return results"""
1079  self.log("waiting for instruction")
1080  tags, func, reducer, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1081  self.log("waiting for job")
1082  job = self.comm.scatter(None, root=self.root)
1083 
1084  out = None # Reduction result
1085  while not isinstance(job, NoOp):
1086  index, data = job
1087  self.log("running job")
1088  result = self._processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1089  if reducer is None:
1090  report = (index, result)
1091  else:
1092  report = None
1093  out = reducer(out, result) if out is not None else result
1094  self.comm.send(report, self.root, tag=tags.request)
1095  self.log("waiting for job")
1096  job = self.comm.recv(tag=tags.work, source=self.root)
1097 
1098  if reducer is not None:
1099  self.comm.gather(out, root=self.root)
1100  self.log("done")
1101 
1102  @catchPicklingError
1103  def mapNoBalance(self):
1104  """Process bulk scattered data and return results"""
1105  self.log("waiting for instruction")
1106  tags, func, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1107  self.log("waiting for job")
1108  queue = self.comm.recv(tag=tags.work, source=self.root)
1109 
1110  resultList = []
1111  for index, data in queue:
1112  self.log("running job", index)
1113  result = self._processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1114  resultList.append(result)
1115 
1116  self.comm.send(resultList, self.root, tag=tags.result)
1117  self.log("done")
1118 
1119  @catchPicklingError
1120  def mapToPrevious(self):
1121  """Process the same scattered data processed previously"""
1122  self.log("waiting for instruction")
1123  tags, func, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1124  queue = list(self._cache[context].keys()) if context in self._cache else None
1125  index = queue.pop(0) if queue else -1
1126  self.log("request job", index)
1127  self.comm.gather(index, root=self.root)
1128  self.log("waiting for job")
1129  data = self.comm.scatter(None, root=self.root)
1130 
1131  while index >= 0:
1132  self.log("running job")
1133  result = func(self._getCache(context, index), data, *args, **kwargs)
1134  self.log("pending", queue)
1135  nextIndex = queue.pop(0) if queue else -1
1136  self.comm.send((index, result, nextIndex), self.root, tag=tags.result)
1137  index = nextIndex
1138  if index >= 0:
1139  data = self.comm.recv(tag=tags.work, source=self.root)
1140 
1141  self.log("done")
1142 
1143  def storeSet(self):
1144  """Set value in store"""
1145  context, kwargs = self.comm.broadcast(None, root=self.root)
1146  super(PoolSlave, self).storeSet(context, **kwargs)
1147 
1148  def storeDel(self):
1149  """Delete value in store"""
1150  context, nameList = self.comm.broadcast(None, root=self.root)
1151  super(PoolSlave, self).storeDel(context, *nameList)
1152 
1153  def storeClear(self):
1154  """Reset data store"""
1155  context = self.comm.broadcast(None, root=self.root)
1156  super(PoolSlave, self).storeClear(context)
1157 
1158  def cacheClear(self):
1159  """Reset cache"""
1160  context = self.comm.broadcast(None, root=self.root)
1161  super(PoolSlave, self).cacheClear(context)
1162 
1163  def cacheList(self):
1164  """List cache contents"""
1165  context = self.comm.broadcast(None, root=self.root)
1166  super(PoolSlave, self).cacheList(context)
1167 
1168  def storeList(self):
1169  """List store contents"""
1170  context = self.comm.broadcast(None, root=self.root)
1171  super(PoolSlave, self).storeList(context)
1172 
1173  def exit(self):
1174  """Allow exit from loop in 'run'"""
1175  return True
1176 
1177 
1179  """Metaclass for PoolWrapper to add methods pointing to PoolMaster
1180 
1181  The 'context' is automatically supplied to these methods as the first argument.
1182  """
1183 
1184  def __call__(cls, context="default"):
1185  instance = super(PoolWrapperMeta, cls).__call__(context)
1186  pool = PoolMaster()
1187  for name in ("map", "mapNoBalance", "mapToPrevious",
1188  "reduce", "reduceNoBalance", "reduceToPrevious",
1189  "storeSet", "storeDel", "storeClear", "storeList",
1190  "cacheList", "cacheClear",):
1191  setattr(instance, name, partial(getattr(pool, name), context))
1192  return instance
1193 
1194 
1195 class PoolWrapper(with_metaclass(PoolWrapperMeta, object)):
1196  """Wrap PoolMaster to automatically provide context"""
1197 
1198  def __init__(self, context="default"):
1199  self._pool = PoolMaster._instance
1200  self._context = context
1201 
1202  def __getattr__(self, name):
1203  return getattr(self._pool, name)
1204 
1205 
1206 class Pool(PoolWrapper): # Just gives PoolWrapper a nicer name for the user
1207  """Process Pool
1208 
1209  Use this class to automatically provide 'context' to
1210  the PoolMaster class. If you want to call functions
1211  that don't take a 'cache' object, use the PoolMaster
1212  class directly, and specify context=None.
1213  """
1214  pass
1215 
1216 
1217 def startPool(comm=None, root=0, killSlaves=True):
1218  """!Start a process pool.
1219 
1220  Returns a PoolMaster object for the master node.
1221  Slave nodes are run and then optionally killed.
1222 
1223  If you elect not to kill the slaves, note that they
1224  will emerge at the point this function was called,
1225  which is likely very different from the point the
1226  master is at, so it will likely be necessary to put
1227  in some rank dependent code (e.g., look at the 'rank'
1228  attribute of the returned pools).
1229 
1230  Note that the pool objects should be deleted (either
1231  by going out of scope or explicit 'del') before program
1232  termination to avoid a segmentation fault.
1233 
1234  @param comm: MPI communicator
1235  @param root: Rank of root/master node
1236  @param killSlaves: Kill slaves on completion?
1237  """
1238  if comm is None:
1239  comm = Comm()
1240  if comm.rank == root:
1241  return PoolMaster(comm, root=root)
1242  slave = PoolSlave(comm, root=root)
1243  slave.run()
1244  if killSlaves:
1245  del slave # Required to prevent segmentation fault on exit
1246  sys.exit()
1247  return slave
def write(self, patchRef, catalog)
Write the output.
def _reduceQueue(self, context, reducer, func, queue, args, kwargs)
Reduce a queue of data.
Definition: pool.py:549
def __new__(cls, hold=None)
Definition: pool.py:142
def reduceToPrevious(self, context, reducer, func, dataList, args, kwargs)
Reduction where work goes to the same target as before.
Definition: pool.py:909
def log(self, source, msg, args)
Log message.
Definition: pool.py:405
def pickleInstanceMethod(method)
Definition: pool.py:47
def send(self, obj=None, args, kwargs)
Definition: pool.py:273
def storeSet(self, context, kwargs)
Definition: pool.py:585
def __init__(self, nameList)
Definition: pool.py:345
def __call__(cls, context="default")
Definition: pool.py:1184
def unpickleFunction(moduleName, funcName)
Definition: pool.py:61
def guessPickleObj()
Definition: pool.py:163
def command(self, cmd)
Definition: pool.py:649
def storeDel(self, context, nameList)
Definition: pool.py:1008
def __init__(self, args, kwargs)
Definition: pool.py:637
def __init__(self, reducer, initial=None, sleep=0.1)
Constructor.
Definition: pool.py:429
def _processQueue(self, context, func, queue, args, kwargs)
Process a queue of data.
Definition: pool.py:530
def cacheList(self, context)
Definition: pool.py:1031
def __repr__(self)
Definition: pool.py:350
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def __init__(self, hold=None)
Definition: pool.py:149
def reduce(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
Definition: pool.py:684
daf::base::PropertySet * set
Definition: fits.cc:902
def log(self, msg, args)
Definition: pool.py:645
def broadcast(self, value, root=0)
Definition: pool.py:303
def mapToPrevious(self, context, func, dataList, args, kwargs)
Scatter work to the same target as before.
Definition: pool.py:886
int min
def abortOnError(func)
Definition: pool.py:108
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1217
def __call__(cls, args, kwargs)
Definition: pool.py:388
def unpickleInstanceMethod(obj, name)
Definition: pool.py:38
def map(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
Definition: pool.py:657
Metaclass to produce a singleton.
Definition: pool.py:368
def storeClear(self, context)
Definition: pool.py:601
def __new__(cls, comm=mpi.COMM_WORLD, recvSleep=0.1, barrierSleep=0.1)
Construct an MPI.Comm wrapper.
Definition: pool.py:252
def __init__(self, comm=None, root=0)
Definition: pool.py:495
def storeClear(self, context)
Definition: pool.py:1017
def storeDel(self, context, nameList)
Definition: pool.py:593
def recv(self, obj=None, source=0, tag=0, status=None)
Definition: pool.py:266
table::Key< int > type
Definition: Detector.cc:163
def _checkBarrierComm(self)
Definition: pool.py:277
def pickleFunction(function)
Definition: pool.py:72
def __exit__(self, excType, excVal, tb)
Definition: pool.py:157
def cacheList(self, context)
Definition: pool.py:615
def __reduce__(self)
Definition: pool.py:353
def storeList(self, context)
Definition: pool.py:620
def storeList(self, context)
Definition: pool.py:1038
def cacheClear(self, context)
Definition: pool.py:1024
def scatter(self, dataList, root=0, tag=0)
Definition: pool.py:307
def storeSet(self, context, kwargs)
Store data on slave for a particular context.
Definition: pool.py:990
def log(self, msg, args)
Definition: pool.py:1052
def setBatchType(batchType)
Definition: pool.py:102
def getBatchType()
Definition: pool.py:97
def pickleSniffer(abort=False)
Definition: pool.py:188
def Barrier(self, tag=0)
Definition: pool.py:282
def __init__(cls, name, bases, dict_)
Definition: pool.py:384
def __init__(self, context="default")
Definition: pool.py:1198
def mapNoBalance(self, context, func, dataList, args, kwargs)
Scatter work to slaves and gather the results.
Definition: pool.py:764
def catchPicklingError(func)
Definition: pool.py:235
def __getattr__(self, name)
Definition: pool.py:1202
daf::base::PropertyList * list
Definition: fits.cc:903
def log(self, msg, args)
Definition: pool.py:523
def __init__(self, comm)
Definition: pool.py:364
def reduceNoBalance(self, context, reducer, func, dataList, args, kwargs)
Scatter work to slaves and reduce the results.
Definition: pool.py:790
def _getCache(self, context, index)
Definition: pool.py:507
def cacheClear(self, context)
Definition: pool.py:608