LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | List of all members
lsst.ctrl.pool.pool.PoolSlave Class Reference
Inheritance diagram for lsst.ctrl.pool.pool.PoolSlave:
lsst.ctrl.pool.pool.PoolNode lsst.ctrl.pool.pool.SingletonMeta

Public Member Functions

def log (self, msg, *args)
 
def run (self)
 
def reduce (self)
 
def mapNoBalance (self)
 
def mapToPrevious (self)
 
def storeSet (self)
 
def storeDel (self)
 
def storeClear (self)
 
def cacheClear (self)
 
def cacheList (self)
 
def storeList (self)
 
def exit (self)
 
def isMaster (self)
 
def storeSet (self, context, **kwargs)
 
def storeDel (self, context, *nameList)
 
def storeClear (self, context)
 
def cacheClear (self, context)
 
def cacheList (self, context)
 
def storeList (self, context)
 
def __call__ (cls, *args, **kwargs)
 

Public Attributes

 comm
 
 rank
 
 root
 
 size
 
 debugger
 
 node
 

Detailed Description

Slave node instance of MPI process pool

Definition at line 1048 of file pool.py.

Member Function Documentation

◆ __call__()

def lsst.ctrl.pool.pool.SingletonMeta.__call__ (   cls,
args,
**  kwargs 
)
inherited

Definition at line 387 of file pool.py.

387  def __call__(cls, *args, **kwargs):
388  if cls._instance is None:
389  cls._instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
390  return cls._instance
391 
392 

◆ cacheClear() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.cacheClear (   self)
Reset cache

Definition at line 1157 of file pool.py.

1157  def cacheClear(self):
1158  """Reset cache"""
1159  context = self.comm.broadcast(None, root=self.root)
1160  super(PoolSlave, self).cacheClear(context)
1161 

◆ cacheClear() [2/2]

def lsst.ctrl.pool.pool.PoolNode.cacheClear (   self,
  context 
)
inherited
Reset cache for a particular context

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 607 of file pool.py.

607  def cacheClear(self, context):
608  """Reset cache for a particular context"""
609  self.log("clearing cache", context)
610  if context not in self._cache:
611  return
612  self._cache[context] = {}
613 

◆ cacheList() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.cacheList (   self)
List cache contents

Definition at line 1162 of file pool.py.

1162  def cacheList(self):
1163  """List cache contents"""
1164  context = self.comm.broadcast(None, root=self.root)
1165  super(PoolSlave, self).cacheList(context)
1166 

◆ cacheList() [2/2]

def lsst.ctrl.pool.pool.PoolNode.cacheList (   self,
  context 
)
inherited
List contents of cache

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 614 of file pool.py.

614  def cacheList(self, context):
615  """List contents of cache"""
616  cache = self._cache[context] if context in self._cache else {}
617  sys.stderr.write("Cache on %s (%s): %s\n" % (self.node, context, cache))
618 

◆ exit()

def lsst.ctrl.pool.pool.PoolSlave.exit (   self)
Allow exit from loop in 'run'

Definition at line 1172 of file pool.py.

1172  def exit(self):
1173  """Allow exit from loop in 'run'"""
1174  return True
1175 
1176 

◆ isMaster()

def lsst.ctrl.pool.pool.PoolNode.isMaster (   self)
inherited

Definition at line 526 of file pool.py.

526  def isMaster(self):
527  return self.rank == self.root
528 

◆ log()

def lsst.ctrl.pool.pool.PoolSlave.log (   self,
  msg,
args 
)
Log a debugging message

Reimplemented from lsst.ctrl.pool.pool.PoolNode.

Definition at line 1051 of file pool.py.

1051  def log(self, msg, *args):
1052  """Log a debugging message"""
1053  assert self.rank != self.root, "This is not the master node."
1054  self.debugger.log("Slave %d" % self.rank, msg, *args)
1055 

◆ mapNoBalance()

def lsst.ctrl.pool.pool.PoolSlave.mapNoBalance (   self)
Process bulk scattered data and return results

Definition at line 1102 of file pool.py.

1102  def mapNoBalance(self):
1103  """Process bulk scattered data and return results"""
1104  self.log("waiting for instruction")
1105  tags, func, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1106  self.log("waiting for job")
1107  queue = self.comm.recv(tag=tags.work, source=self.root)
1108 
1109  resultList = []
1110  for index, data in queue:
1111  self.log("running job", index)
1112  result = self._processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1113  resultList.append(result)
1114 
1115  self.comm.send(resultList, self.root, tag=tags.result)
1116  self.log("done")
1117 

◆ mapToPrevious()

def lsst.ctrl.pool.pool.PoolSlave.mapToPrevious (   self)
Process the same scattered data processed previously

Definition at line 1119 of file pool.py.

1119  def mapToPrevious(self):
1120  """Process the same scattered data processed previously"""
1121  self.log("waiting for instruction")
1122  tags, func, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1123  queue = list(self._cache[context].keys()) if context in self._cache else None
1124  index = queue.pop(0) if queue else -1
1125  self.log("request job", index)
1126  self.comm.gather(index, root=self.root)
1127  self.log("waiting for job")
1128  data = self.comm.scatter(None, root=self.root)
1129 
1130  while index >= 0:
1131  self.log("running job")
1132  result = func(self._getCache(context, index), data, *args, **kwargs)
1133  self.log("pending", queue)
1134  nextIndex = queue.pop(0) if queue else -1
1135  self.comm.send((index, result, nextIndex), self.root, tag=tags.result)
1136  index = nextIndex
1137  if index >= 0:
1138  data = self.comm.recv(tag=tags.work, source=self.root)
1139 
1140  self.log("done")
1141 
daf::base::PropertyList * list
Definition: fits.cc:913

◆ reduce()

def lsst.ctrl.pool.pool.PoolSlave.reduce (   self)
Reduce scattered data and return results

Definition at line 1076 of file pool.py.

1076  def reduce(self):
1077  """Reduce scattered data and return results"""
1078  self.log("waiting for instruction")
1079  tags, func, reducer, args, kwargs, context = self.comm.broadcast(None, root=self.root)
1080  self.log("waiting for job")
1081  job = self.comm.scatter(None, root=self.root)
1082 
1083  out = None # Reduction result
1084  while not isinstance(job, NoOp):
1085  index, data = job
1086  self.log("running job")
1087  result = self._processQueue(context, func, [(index, data)], *args, **kwargs)[0]
1088  if reducer is None:
1089  report = (index, result)
1090  else:
1091  report = None
1092  out = reducer(out, result) if out is not None else result
1093  self.comm.send(report, self.root, tag=tags.request)
1094  self.log("waiting for job")
1095  job = self.comm.recv(tag=tags.work, source=self.root)
1096 
1097  if reducer is not None:
1098  self.comm.gather(out, root=self.root)
1099  self.log("done")
1100 

◆ run()

def lsst.ctrl.pool.pool.PoolSlave.run (   self)
Serve commands of master node

Slave accepts commands, which are the names of methods to execute.
This exits when a command returns a true value.

Definition at line 1057 of file pool.py.

1057  def run(self):
1058  """Serve commands of master node
1059 
1060  Slave accepts commands, which are the names of methods to execute.
1061  This exits when a command returns a true value.
1062  """
1063  menu = dict((cmd, getattr(self, cmd)) for cmd in ("reduce", "mapNoBalance", "mapToPrevious",
1064  "storeSet", "storeDel", "storeClear", "storeList",
1065  "cacheList", "cacheClear", "exit",))
1066  self.log("waiting for command from", self.root)
1067  command = self.comm.broadcast(None, root=self.root)
1068  self.log("command", command)
1069  while not menu[command]():
1070  self.log("waiting for command from", self.root)
1071  command = self.comm.broadcast(None, root=self.root)
1072  self.log("command", command)
1073  self.log("exiting")
1074 
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603

◆ storeClear() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.storeClear (   self)
Reset data store

Definition at line 1152 of file pool.py.

1152  def storeClear(self):
1153  """Reset data store"""
1154  context = self.comm.broadcast(None, root=self.root)
1155  super(PoolSlave, self).storeClear(context)
1156 

◆ storeClear() [2/2]

def lsst.ctrl.pool.pool.PoolNode.storeClear (   self,
  context 
)
inherited
Clear stored data for a particular context

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 600 of file pool.py.

600  def storeClear(self, context):
601  """Clear stored data for a particular context"""
602  self.log("clearing store", context)
603  if context not in self._store:
604  raise KeyError("No such context: %s" % context)
605  self._store[context] = {}
606 

◆ storeDel() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.storeDel (   self)
Delete value in store

Definition at line 1147 of file pool.py.

1147  def storeDel(self):
1148  """Delete value in store"""
1149  context, nameList = self.comm.broadcast(None, root=self.root)
1150  super(PoolSlave, self).storeDel(context, *nameList)
1151 

◆ storeDel() [2/2]

def lsst.ctrl.pool.pool.PoolNode.storeDel (   self,
  context,
nameList 
)
inherited
Delete value in store for a particular context

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 592 of file pool.py.

592  def storeDel(self, context, *nameList):
593  """Delete value in store for a particular context"""
594  self.log("deleting from store", context, nameList)
595  if context not in self._store:
596  raise KeyError("No such context: %s" % context)
597  for name in nameList:
598  del self._store[context][name]
599 

◆ storeList() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.storeList (   self)
List store contents

Definition at line 1167 of file pool.py.

1167  def storeList(self):
1168  """List store contents"""
1169  context = self.comm.broadcast(None, root=self.root)
1170  super(PoolSlave, self).storeList(context)
1171 

◆ storeList() [2/2]

def lsst.ctrl.pool.pool.PoolNode.storeList (   self,
  context 
)
inherited
List contents of store for a particular context

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 619 of file pool.py.

619  def storeList(self, context):
620  """List contents of store for a particular context"""
621  if context not in self._store:
622  raise KeyError("No such context: %s" % context)
623  sys.stderr.write("Store on %s (%s): %s\n" % (self.node, context, self._store[context]))
624 
625 

◆ storeSet() [1/2]

def lsst.ctrl.pool.pool.PoolSlave.storeSet (   self)
Set value in store

Definition at line 1142 of file pool.py.

1142  def storeSet(self):
1143  """Set value in store"""
1144  context, kwargs = self.comm.broadcast(None, root=self.root)
1145  super(PoolSlave, self).storeSet(context, **kwargs)
1146 

◆ storeSet() [2/2]

def lsst.ctrl.pool.pool.PoolNode.storeSet (   self,
  context,
**  kwargs 
)
inherited
Set values in store for a particular context

Reimplemented in lsst.ctrl.pool.pool.PoolMaster.

Definition at line 584 of file pool.py.

584  def storeSet(self, context, **kwargs):
585  """Set values in store for a particular context"""
586  self.log("storing", context, kwargs)
587  if context not in self._store:
588  self._store[context] = {}
589  for name, value in kwargs.items():
590  self._store[context][name] = value
591 

Member Data Documentation

◆ comm

lsst.ctrl.pool.pool.PoolNode.comm
inherited

Definition at line 497 of file pool.py.

◆ debugger

lsst.ctrl.pool.pool.PoolNode.debugger
inherited

Definition at line 503 of file pool.py.

◆ node

lsst.ctrl.pool.pool.PoolNode.node
inherited

Definition at line 504 of file pool.py.

◆ rank

lsst.ctrl.pool.pool.PoolNode.rank
inherited

Definition at line 498 of file pool.py.

◆ root

lsst.ctrl.pool.pool.PoolNode.root
inherited

Definition at line 499 of file pool.py.

◆ size

lsst.ctrl.pool.pool.PoolNode.size
inherited

Definition at line 500 of file pool.py.


The documentation for this class was generated from the following file: