LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | List of all members
lsst.ctrl.pool.pool.PoolMaster Class Reference
Inheritance diagram for lsst.ctrl.pool.pool.PoolMaster:
lsst.ctrl.pool.pool.PoolNode lsst.ctrl.pool.pool.SingletonMeta

Public Member Functions

def __init__ (self, *args, **kwargs)
 
def __del__ (self)
 
def log (self, msg, *args)
 
def command (self, cmd)
 
def map (self, context, func, dataList, *args, **kwargs)
 Scatter work to slaves and gather the results. More...
 
def reduce (self, context, reducer, func, dataList, *args, **kwargs)
 Scatter work to slaves and reduce the results. More...
 
def mapNoBalance (self, context, func, dataList, *args, **kwargs)
 Scatter work to slaves and gather the results. More...
 
def reduceNoBalance (self, context, reducer, func, dataList, *args, **kwargs)
 Scatter work to slaves and reduce the results. More...
 
def mapToPrevious (self, context, func, dataList, *args, **kwargs)
 Scatter work to the same target as before. More...
 
def reduceToPrevious (self, context, reducer, func, dataList, *args, **kwargs)
 Reduction where work goes to the same target as before. More...
 
def storeSet (self, context, **kwargs)
 Store data on slave for a particular context. More...
 
def storeDel (self, context, *nameList)
 
def storeClear (self, context)
 
def cacheClear (self, context)
 
def cacheList (self, context)
 
def storeList (self, context)
 
def exit (self)
 
def isMaster (self)
 
def __call__ (cls, *args, **kwargs)
 

Public Attributes

 root
 
 size
 
 comm
 
 rank
 
 debugger
 
 node
 

Detailed Description

Master node instance of MPI process pool

Only the master node should instantiate this.

WARNING: You should not let a pool instance hang around at program
termination, as the garbage collection behaves differently, and may
cause a segmentation fault (signal 11).

Definition at line 626 of file pool.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.ctrl.pool.pool.PoolMaster.__init__ (   self,
args,
**  kwargs 
)

Definition at line 636 of file pool.py.

636  def __init__(self, *args, **kwargs):
637  super(PoolMaster, self).__init__(*args, **kwargs)
638  assert self.root == self.rank, "This is the master node"
639 

◆ __del__()

def lsst.ctrl.pool.pool.PoolMaster.__del__ (   self)
Ensure slaves exit when we're done

Definition at line 640 of file pool.py.

640  def __del__(self):
641  """Ensure slaves exit when we're done"""
642  self.exit()
643 

Member Function Documentation

◆ __call__()

def lsst.ctrl.pool.pool.SingletonMeta.__call__ (   cls,
args,
**  kwargs 
)
inherited

Definition at line 387 of file pool.py.

387  def __call__(cls, *args, **kwargs):
388  if cls._instance is None:
389  cls._instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
390  return cls._instance
391 
392 

◆ cacheClear()

def lsst.ctrl.pool.pool.PoolMaster.cacheClear (   self,
  context 
)
Reset cache for a particular context on master and slaves

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1023 of file pool.py.

1023  def cacheClear(self, context):
1024  """Reset cache for a particular context on master and slaves"""
1025  super(PoolMaster, self).cacheClear(context)
1026  self.command("cacheClear")
1027  self.comm.broadcast(context, root=self.root)
1028 

◆ cacheList()

def lsst.ctrl.pool.pool.PoolMaster.cacheList (   self,
  context 
)
List cache contents for a particular context on master and slaves

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1030 of file pool.py.

1030  def cacheList(self, context):
1031  """List cache contents for a particular context on master and slaves"""
1032  super(PoolMaster, self).cacheList(context)
1033  self.command("cacheList")
1034  self.comm.broadcast(context, root=self.root)
1035 

◆ command()

def lsst.ctrl.pool.pool.PoolMaster.command (   self,
  cmd 
)
Send command to slaves

A command is the name of the PoolSlave method they should run.

Definition at line 648 of file pool.py.

648  def command(self, cmd):
649  """Send command to slaves
650 
651  A command is the name of the PoolSlave method they should run.
652  """
653  self.log("command", cmd)
654  self.comm.broadcast(cmd, root=self.root)
655 

◆ exit()

def lsst.ctrl.pool.pool.PoolMaster.exit (   self)
Command slaves to exit

Definition at line 1043 of file pool.py.

1043  def exit(self):
1044  """Command slaves to exit"""
1045  self.command("exit")
1046 
1047 

◆ isMaster()

def lsst.ctrl.pool.pool.PoolNode.isMaster (   self)
inherited

Definition at line 526 of file pool.py.

526  def isMaster(self):
527  return self.rank == self.root
528 

◆ log()

def lsst.ctrl.pool.pool.PoolMaster.log (   self,
  msg,
args 
)
Log a debugging message

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 644 of file pool.py.

644  def log(self, msg, *args):
645  """Log a debugging message"""
646  self.debugger.log("Master", msg, *args)
647 

◆ map()

def lsst.ctrl.pool.pool.PoolMaster.map (   self,
  context,
  func,
  dataList,
args,
**  kwargs 
)

Scatter work to slaves and gather the results.

    Work is distributed dynamically, so that slaves that finish
    quickly will receive more work.

    Each slave applies the function to the data they're provided.
    The slaves may optionally be passed a cache instance, which
    they can use to store data for subsequent executions (to ensure
    subsequent data is distributed in the same pattern as before,
    use the 'mapToPrevious' method).  The cache also contains
    data that has been stored on the slaves.

    The 'func' signature should be func(cache, data, *args, **kwargs)
    if 'context' is non-None; otherwise func(data, *args, **kwargs).

    @param context: Namespace for cache
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return list of results from applying 'func' to dataList

Definition at line 656 of file pool.py.

656  def map(self, context, func, dataList, *args, **kwargs):
657  """!Scatter work to slaves and gather the results
658 
659  Work is distributed dynamically, so that slaves that finish
660  quickly will receive more work.
661 
662  Each slave applies the function to the data they're provided.
663  The slaves may optionally be passed a cache instance, which
664  they can use to store data for subsequent executions (to ensure
665  subsequent data is distributed in the same pattern as before,
666  use the 'mapToPrevious' method). The cache also contains
667  data that has been stored on the slaves.
668 
669  The 'func' signature should be func(cache, data, *args, **kwargs)
670  if 'context' is non-None; otherwise func(data, *args, **kwargs).
671 
672  @param context: Namespace for cache
673  @param func: function for slaves to run; must be picklable
674  @param dataList: List of data to distribute to slaves; must be picklable
675  @param args: List of constant arguments
676  @param kwargs: Dict of constant arguments
677  @return list of results from applying 'func' to dataList
678  """
679  return self.reduce(context, None, func, dataList, *args, **kwargs)
680 

◆ mapNoBalance()

def lsst.ctrl.pool.pool.PoolMaster.mapNoBalance (   self,
  context,
  func,
  dataList,
args,
**  kwargs 
)

Scatter work to slaves and gather the results.

    Work is distributed statically, so there is no load balancing.

    Each slave applies the function to the data they're provided.
    The slaves may optionally be passed a cache instance, which
    they can store data in for subsequent executions (to ensure
    subsequent data is distributed in the same pattern as before,
    use the 'mapToPrevious' method).  The cache also contains
    data that has been stored on the slaves.

    The 'func' signature should be func(cache, data, *args, **kwargs)
    if 'context' is true; otherwise func(data, *args, **kwargs).

    @param context: Namespace for cache
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return list of results from applying 'func' to dataList

Definition at line 763 of file pool.py.

763  def mapNoBalance(self, context, func, dataList, *args, **kwargs):
764  """!Scatter work to slaves and gather the results
765 
766  Work is distributed statically, so there is no load balancing.
767 
768  Each slave applies the function to the data they're provided.
769  The slaves may optionally be passed a cache instance, which
770  they can store data in for subsequent executions (to ensure
771  subsequent data is distributed in the same pattern as before,
772  use the 'mapToPrevious' method). The cache also contains
773  data that has been stored on the slaves.
774 
775  The 'func' signature should be func(cache, data, *args, **kwargs)
776  if 'context' is true; otherwise func(data, *args, **kwargs).
777 
778  @param context: Namespace for cache
779  @param func: function for slaves to run; must be picklable
780  @param dataList: List of data to distribute to slaves; must be picklable
781  @param args: List of constant arguments
782  @param kwargs: Dict of constant arguments
783  @return list of results from applying 'func' to dataList
784  """
785  return self.reduceNoBalance(context, None, func, dataList, *args, **kwargs)
786 

◆ mapToPrevious()

def lsst.ctrl.pool.pool.PoolMaster.mapToPrevious (   self,
  context,
  func,
  dataList,
args,
**  kwargs 
)

Scatter work to the same target as before.

    Work is distributed so that each slave handles the same
    indices in the dataList as when 'map' was called.
    This allows the right data to go to the right cache.

    It is assumed that the dataList is the same length as when it was
    passed to 'map'.

    The 'func' signature should be func(cache, data, *args, **kwargs).

    @param context: Namespace for cache
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return list of results from applying 'func' to dataList

Definition at line 885 of file pool.py.

885  def mapToPrevious(self, context, func, dataList, *args, **kwargs):
886  """!Scatter work to the same target as before
887 
888  Work is distributed so that each slave handles the same
889  indices in the dataList as when 'map' was called.
890  This allows the right data to go to the right cache.
891 
892  It is assumed that the dataList is the same length as when it was
893  passed to 'map'.
894 
895  The 'func' signature should be func(cache, data, *args, **kwargs).
896 
897  @param context: Namespace for cache
898  @param func: function for slaves to run; must be picklable
899  @param dataList: List of data to distribute to slaves; must be picklable
900  @param args: List of constant arguments
901  @param kwargs: Dict of constant arguments
902  @return list of results from applying 'func' to dataList
903  """
904  return self.reduceToPrevious(context, None, func, dataList, *args, **kwargs)
905 

◆ reduce()

def lsst.ctrl.pool.pool.PoolMaster.reduce (   self,
  context,
  reducer,
  func,
  dataList,
args,
**  kwargs 
)

Scatter work to slaves and reduce the results.

    Work is distributed dynamically, so that slaves that finish
    quickly will receive more work.

    Each slave applies the function to the data they're provided.
    The slaves may optionally be passed a cache instance, which
    they can use to store data for subsequent executions (to ensure
    subsequent data is distributed in the same pattern as before,
    use the 'mapToPrevious' method).  The cache also contains
    data that has been stored on the slaves.

    The 'func' signature should be func(cache, data, *args, **kwargs)
    if 'context' is non-None; otherwise func(data, *args, **kwargs).

    The 'reducer' signature should be reducer(old, new). If the 'reducer'
    is None, then we will return the full list of results

    @param context: Namespace for cache
    @param reducer: function for master to run to reduce slave results; or None
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return reduced result (if reducer is non-None) or list of results
        from applying 'func' to dataList

Definition at line 683 of file pool.py.

683  def reduce(self, context, reducer, func, dataList, *args, **kwargs):
684  """!Scatter work to slaves and reduce the results
685 
686  Work is distributed dynamically, so that slaves that finish
687  quickly will receive more work.
688 
689  Each slave applies the function to the data they're provided.
690  The slaves may optionally be passed a cache instance, which
691  they can use to store data for subsequent executions (to ensure
692  subsequent data is distributed in the same pattern as before,
693  use the 'mapToPrevious' method). The cache also contains
694  data that has been stored on the slaves.
695 
696  The 'func' signature should be func(cache, data, *args, **kwargs)
697  if 'context' is non-None; otherwise func(data, *args, **kwargs).
698 
699  The 'reducer' signature should be reducer(old, new). If the 'reducer'
700  is None, then we will return the full list of results
701 
702  @param context: Namespace for cache
703  @param reducer: function for master to run to reduce slave results; or None
704  @param func: function for slaves to run; must be picklable
705  @param dataList: List of data to distribute to slaves; must be picklable
706  @param args: List of constant arguments
707  @param kwargs: Dict of constant arguments
708  @return reduced result (if reducer is non-None) or list of results
709  from applying 'func' to dataList
710  """
711  tags = Tags("request", "work")
712  num = len(dataList)
713  if self.size == 1 or num <= 1:
714  return self._reduceQueue(context, reducer, func, list(zip(list(range(num)), dataList)),
715  *args, **kwargs)
716  if self.size == num:
717  # We're shooting ourselves in the foot using dynamic distribution
718  return self.reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
719 
720  self.command("reduce")
721 
722  # Send function
723  self.log("instruct")
724  self.comm.broadcast((tags, func, reducer, args, kwargs, context), root=self.root)
725 
726  # Parcel out first set of data
727  queue = list(zip(range(num), dataList)) # index, data
728  output = [None]*num if reducer is None else None
729  initial = [None if i == self.rank else queue.pop(0) if queue else NoOp() for
730  i in range(self.size)]
731  pending = min(num, self.size - 1)
732  self.log("scatter initial jobs")
733  self.comm.scatter(initial, root=self.rank)
734 
735  while queue or pending > 0:
736  status = mpi.Status()
737  report = self.comm.recv(status=status, tag=tags.request, source=mpi.ANY_SOURCE)
738  source = status.source
739  self.log("gather from slave", source)
740  if reducer is None:
741  index, result = report
742  output[index] = result
743 
744  if queue:
745  job = queue.pop(0)
746  self.log("send job to slave", job[0], source)
747  else:
748  job = NoOp()
749  pending -= 1
750  self.comm.send(job, source, tag=tags.work)
751 
752  if reducer is not None:
753  results = self.comm.gather(None, root=self.root)
754  output = None
755  for rank in range(self.size):
756  if rank == self.root:
757  continue
758  output = reducer(output, results[rank]) if output is not None else results[rank]
759 
760  self.log("done")
761  return output
762 
int min
daf::base::PropertyList * list
Definition: fits.cc:913

◆ reduceNoBalance()

def lsst.ctrl.pool.pool.PoolMaster.reduceNoBalance (   self,
  context,
  reducer,
  func,
  dataList,
args,
**  kwargs 
)

Scatter work to slaves and reduce the results.

    Work is distributed statically, so there is no load balancing.

    Each slave applies the function to the data they're provided.
    The slaves may optionally be passed a cache instance, which
    they can store data in for subsequent executions (to ensure
    subsequent data is distributed in the same pattern as before,
    use the 'mapToPrevious' method).  The cache also contains
    data that has been stored on the slaves.

    The 'func' signature should be func(cache, data, *args, **kwargs)
    if 'context' is true; otherwise func(data, *args, **kwargs).

    The 'reducer' signature should be reducer(old, new). If the 'reducer'
    is None, then we will return the full list of results

    @param context: Namespace for cache
    @param reducer: function for master to run to reduce slave results; or None
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return reduced result (if reducer is non-None) or list of results
        from applying 'func' to dataList

Definition at line 789 of file pool.py.

789  def reduceNoBalance(self, context, reducer, func, dataList, *args, **kwargs):
790  """!Scatter work to slaves and reduce the results
791 
792  Work is distributed statically, so there is no load balancing.
793 
794  Each slave applies the function to the data they're provided.
795  The slaves may optionally be passed a cache instance, which
796  they can store data in for subsequent executions (to ensure
797  subsequent data is distributed in the same pattern as before,
798  use the 'mapToPrevious' method). The cache also contains
799  data that has been stored on the slaves.
800 
801  The 'func' signature should be func(cache, data, *args, **kwargs)
802  if 'context' is true; otherwise func(data, *args, **kwargs).
803 
804  The 'reducer' signature should be reducer(old, new). If the 'reducer'
805  is None, then we will return the full list of results
806 
807  @param context: Namespace for cache
808  @param reducer: function for master to run to reduce slave results; or None
809  @param func: function for slaves to run; must be picklable
810  @param dataList: List of data to distribute to slaves; must be picklable
811  @param args: List of constant arguments
812  @param kwargs: Dict of constant arguments
813  @return reduced result (if reducer is non-None) or list of results
814  from applying 'func' to dataList
815  """
816  tags = Tags("result", "work")
817  num = len(dataList)
818  if self.size == 1 or num <= 1:
819  return self._reduceQueue(context, reducer, func, list(zip(range(num), dataList)), *args, **kwargs)
820 
821  self.command("mapNoBalance")
822 
823  # Send function
824  self.log("instruct")
825  self.comm.broadcast((tags, func, args, kwargs, context), root=self.root)
826 
827  # Divide up the jobs
828  # Try to give root the least to do, so it also has time to manage
829  queue = list(zip(range(num), dataList)) # index, data
830  if num < self.size:
831  distribution = [[queue[i]] for i in range(num)]
832  distribution.insert(self.rank, [])
833  for i in range(num, self.size - 1):
834  distribution.append([])
835  elif num % self.size == 0:
836  numEach = num//self.size
837  distribution = [queue[i*numEach:(i+1)*numEach] for i in range(self.size)]
838  else:
839  numEach = num//self.size
840  distribution = [queue[i*numEach:(i+1)*numEach] for i in range(self.size)]
841  for i in range(numEach*self.size, num):
842  distribution[(self.rank + 1) % self.size].append
843  distribution = list([] for i in range(self.size))
844  for i, job in enumerate(queue, self.rank + 1):
845  distribution[i % self.size].append(job)
846 
847  # Distribute jobs
848  for source in range(self.size):
849  if source == self.rank:
850  continue
851  self.log("send jobs to ", source)
852  self.comm.send(distribution[source], source, tag=tags.work)
853 
854  # Execute our own jobs
855  output = [None]*num if reducer is None else None
856 
857  def ingestResults(output, nodeResults, distList):
858  if reducer is None:
859  for i, result in enumerate(nodeResults):
860  index = distList[i][0]
861  output[index] = result
862  return output
863  if output is None:
864  output = nodeResults.pop(0)
865  for result in nodeResults:
866  output = reducer(output, result)
867  return output
868 
869  ourResults = self._processQueue(context, func, distribution[self.rank], *args, **kwargs)
870  output = ingestResults(output, ourResults, distribution[self.rank])
871 
872  # Collect results
873  pending = self.size - 1
874  while pending > 0:
875  status = mpi.Status()
876  slaveResults = self.comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
877  source = status.source
878  self.log("gather from slave", source)
879  output = ingestResults(output, slaveResults, distribution[source])
880  pending -= 1
881 
882  self.log("done")
883  return output
884 
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33

◆ reduceToPrevious()

def lsst.ctrl.pool.pool.PoolMaster.reduceToPrevious (   self,
  context,
  reducer,
  func,
  dataList,
args,
**  kwargs 
)

Reduction where work goes to the same target as before.

    Work is distributed so that each slave handles the same
    indices in the dataList as when 'map' was called.
    This allows the right data to go to the right cache.

    It is assumed that the dataList is the same length as when it was
    passed to 'map'.

    The 'func' signature should be func(cache, data, *args, **kwargs).

    The 'reducer' signature should be reducer(old, new). If the 'reducer'
    is None, then we will return the full list of results

    @param context: Namespace for cache
    @param reducer: function for master to run to reduce slave results; or None
    @param func: function for slaves to run; must be picklable
    @param dataList: List of data to distribute to slaves; must be picklable
    @param args: List of constant arguments
    @param kwargs: Dict of constant arguments
    @return reduced result (if reducer is non-None) or list of results
        from applying 'func' to dataList

Definition at line 908 of file pool.py.

908  def reduceToPrevious(self, context, reducer, func, dataList, *args, **kwargs):
909  """!Reduction where work goes to the same target as before
910 
911  Work is distributed so that each slave handles the same
912  indices in the dataList as when 'map' was called.
913  This allows the right data to go to the right cache.
914 
915  It is assumed that the dataList is the same length as when it was
916  passed to 'map'.
917 
918  The 'func' signature should be func(cache, data, *args, **kwargs).
919 
920  The 'reducer' signature should be reducer(old, new). If the 'reducer'
921  is None, then we will return the full list of results
922 
923  @param context: Namespace for cache
924  @param reducer: function for master to run to reduce slave results; or None
925  @param func: function for slaves to run; must be picklable
926  @param dataList: List of data to distribute to slaves; must be picklable
927  @param args: List of constant arguments
928  @param kwargs: Dict of constant arguments
929  @return reduced result (if reducer is non-None) or list of results
930  from applying 'func' to dataList
931  """
932  if context is None:
933  raise ValueError("context must be set to map to same nodes as previous context")
934  tags = Tags("result", "work")
935  num = len(dataList)
936  if self.size == 1 or num <= 1:
937  # Can do everything here
938  return self._reduceQueue(context, reducer, func, list(zip(range(num), dataList)), *args, **kwargs)
939  if self.size == num:
940  # We're shooting ourselves in the foot using dynamic distribution
941  return self.reduceNoBalance(context, reducer, func, dataList, *args, **kwargs)
942 
943  self.command("mapToPrevious")
944 
945  # Send function
946  self.log("instruct")
947  self.comm.broadcast((tags, func, args, kwargs, context), root=self.root)
948 
949  requestList = self.comm.gather(None, root=self.root)
950  self.log("listen", requestList)
951  initial = [dataList[index] if (index is not None and index >= 0) else None for index in requestList]
952  self.log("scatter jobs", initial)
953  self.comm.scatter(initial, root=self.root)
954  pending = min(num, self.size - 1)
955 
956  if reducer is None:
957  output = [None]*num
958  else:
959  thread = ReductionThread(reducer)
960  thread.start()
961 
962  while pending > 0:
963  status = mpi.Status()
964  index, result, nextIndex = self.comm.recv(status=status, tag=tags.result, source=mpi.ANY_SOURCE)
965  source = status.source
966  self.log("gather from slave", source)
967  if reducer is None:
968  output[index] = result
969  else:
970  thread.add(result)
971 
972  if nextIndex >= 0:
973  job = dataList[nextIndex]
974  self.log("send job to slave", source)
975  self.comm.send(job, source, tag=tags.work)
976  else:
977  pending -= 1
978 
979  self.log("waiting on", pending)
980 
981  if reducer is not None:
982  output = thread.join()
983 
984  self.log("done")
985  return output
986 

◆ storeClear()

def lsst.ctrl.pool.pool.PoolMaster.storeClear (   self,
  context 
)
Reset data store for a particular context on master and slaves

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1016 of file pool.py.

1016  def storeClear(self, context):
1017  """Reset data store for a particular context on master and slaves"""
1018  super(PoolMaster, self).storeClear(context)
1019  self.command("storeClear")
1020  self.comm.broadcast(context, root=self.root)
1021 

◆ storeDel()

def lsst.ctrl.pool.pool.PoolMaster.storeDel (   self,
  context,
nameList 
)
Delete stored data on slave for a particular context

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1007 of file pool.py.

1007  def storeDel(self, context, *nameList):
1008  """Delete stored data on slave for a particular context"""
1009  super(PoolMaster, self).storeDel(context, *nameList)
1010  self.command("storeDel")
1011  self.log("tell names")
1012  self.comm.broadcast((context, nameList), root=self.root)
1013  self.log("done")
1014 

◆ storeList()

def lsst.ctrl.pool.pool.PoolMaster.storeList (   self,
  context 
)
List store contents for a particular context on master and slaves

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1037 of file pool.py.

1037  def storeList(self, context):
1038  """List store contents for a particular context on master and slaves"""
1039  super(PoolMaster, self).storeList(context)
1040  self.command("storeList")
1041  self.comm.broadcast(context, root=self.root)
1042 

◆ storeSet()

def lsst.ctrl.pool.pool.PoolMaster.storeSet (   self,
  context,
**  kwargs 
)

Store data on slave for a particular context.

    The data is made available to functions through the cache. The
    stored data differs from the cache in that it is identical for
    all operations, whereas the cache is specific to the data being
    operated upon.

    @param context: namespace for store
    @param kwargs: dict of name=value pairs

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 989 of file pool.py.

989  def storeSet(self, context, **kwargs):
990  """!Store data on slave for a particular context
991 
992  The data is made available to functions through the cache. The
993  stored data differs from the cache in that it is identical for
994  all operations, whereas the cache is specific to the data being
995  operated upon.
996 
997  @param context: namespace for store
998  @param kwargs: dict of name=value pairs
999  """
1000  super(PoolMaster, self).storeSet(context, **kwargs)
1001  self.command("storeSet")
1002  self.log("give data")
1003  self.comm.broadcast((context, kwargs), root=self.root)
1004  self.log("done")
1005 

Member Data Documentation

◆ comm

lsst.ctrl.pool.pool.PoolNode.comm
inherited

Definition at line 497 of file pool.py.

◆ debugger

lsst.ctrl.pool.pool.PoolNode.debugger
inherited

Definition at line 503 of file pool.py.

◆ node

lsst.ctrl.pool.pool.PoolNode.node
inherited

Definition at line 504 of file pool.py.

◆ rank

lsst.ctrl.pool.pool.PoolNode.rank
inherited

Definition at line 498 of file pool.py.

◆ root

lsst.ctrl.pool.pool.PoolMaster.root

Definition at line 638 of file pool.py.

◆ size

lsst.ctrl.pool.pool.PoolMaster.size

Definition at line 713 of file pool.py.


The documentation for this class was generated from the following file: