LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Static Public Attributes | List of all members
lsst.ctrl.pool.test.demoTask.DemoTask Class Reference
Inheritance diagram for lsst.ctrl.pool.test.demoTask.DemoTask:
lsst.ctrl.pool.parallel.BatchPoolTask lsst.ctrl.pool.parallel.BatchCmdLineTask

Public Member Functions

def batchWallTime (cls, time, parsedCmd, numCores)
 
def runDataRef (self, visitRef)
 
def run (self, cache, dataId, butler=None)
 
def parseAndRun (cls, *args, **kwargs)
 
def parseAndSubmit (cls, args=None, **kwargs)
 
def batchCommand (cls, args)
 Return command to run CmdLineTask. More...
 
def logOperation (self, operation, catch=False, trace=True)
 Provide a context manager for logging an operation. More...
 

Static Public Attributes

 ConfigClass = Config
 

Detailed Description

Task for demonstrating the BatchPoolTask functionality

Definition at line 12 of file demoTask.py.

Member Function Documentation

◆ batchCommand()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchCommand (   cls,
  args 
)
inherited

Return command to run CmdLineTask.

    @param cls: Class
    @param args: Parsed batch job arguments (from BatchArgumentParser)

Definition at line 476 of file parallel.py.

476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
def shCommandFromArgs(args)
Definition: parallel.py:42

◆ batchWallTime()

def lsst.ctrl.pool.test.demoTask.DemoTask.batchWallTime (   cls,
  time,
  parsedCmd,
  numCores 
)
Return walltime request for batch job

Subclasses should override if the walltime should be calculated
differently (e.g., addition of some serial time).

@param time: Requested time per iteration
@param parsedCmd: Results of argument parsing
@param numCores: Number of cores

Reimplemented from lsst.ctrl.pool.parallel.BatchCmdLineTask.

Definition at line 26 of file demoTask.py.

26  def batchWallTime(cls, time, parsedCmd, numCores):
27  """Return walltime request for batch job
28 
29  Subclasses should override if the walltime should be calculated
30  differently (e.g., addition of some serial time).
31 
32  @param time: Requested time per iteration
33  @param parsedCmd: Results of argument parsing
34  @param numCores: Number of cores
35  """
36  numTargets = [sum(1 for ccdRef in visitRef.subItems("ccd") if ccdRef.datasetExists("raw")) for
37  visitRef in parsedCmd.id.refList]
38  return time*sum(math.ceil(tt/numCores) for tt in numTargets)
39 

◆ logOperation()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.logOperation (   self,
  operation,
  catch = False,
  trace = True 
)
inherited

Provide a context manager for logging an operation.

    @param operation: description of operation (string)
    @param catch: Catch all exceptions?
    @param trace: Log a traceback of caught exception?

    Note that if 'catch' is True, all exceptions are swallowed, but there may
    be other side-effects such as undefined variables.

Definition at line 502 of file parallel.py.

502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 

◆ parseAndRun()

def lsst.ctrl.pool.parallel.BatchPoolTask.parseAndRun (   cls,
args,
**  kwargs 
)
inherited
Run with a MPI process pool

Definition at line 534 of file parallel.py.

534  def parseAndRun(cls, *args, **kwargs):
535  """Run with a MPI process pool"""
536  pool = startPool()
537  super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
538  pool.exit()
539 
540 
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216

◆ parseAndSubmit()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.parseAndSubmit (   cls,
  args = None,
**  kwargs 
)
inherited

Definition at line 435 of file parallel.py.

435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
def setBatchType(batchType)
Definition: pool.py:101

◆ run()

def lsst.ctrl.pool.test.demoTask.DemoTask.run (   self,
  cache,
  dataId,
  butler = None 
)
Read image and return number of pixels

Only the slave nodes run this method.

Definition at line 58 of file demoTask.py.

58  def run(self, cache, dataId, butler=None):
59  """Read image and return number of pixels
60 
61  Only the slave nodes run this method.
62  """
63  assert butler is not None
64  with self.logOperation("read %s" % (dataId,)):
65  raw = butler.get("raw", dataId, immediate=True)
66  dims = raw.getDimensions()
67  num = dims.getX()*dims.getY()
68  self.log.info("Read %d pixels for %s" % (num, dataId,))
69  return num
70 
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603

◆ runDataRef()

def lsst.ctrl.pool.test.demoTask.DemoTask.runDataRef (   self,
  visitRef 
)
Main entry-point

Only the master node runs this method.  It will dispatch jobs to the
slave nodes.

Definition at line 40 of file demoTask.py.

40  def runDataRef(self, visitRef):
41  """Main entry-point
42 
43  Only the master node runs this method. It will dispatch jobs to the
44  slave nodes.
45  """
46  pool = Pool("test")
47 
48  # Less overhead to transfer the butler once rather than in each dataRef
49  dataIdList = dict([(ccdRef.get("ccdExposureId"), ccdRef.dataId)
50  for ccdRef in visitRef.subItems("ccd") if ccdRef.datasetExists("raw")])
51  dataIdList = collections.OrderedDict(sorted(dataIdList.items()))
52 
53  with self.logOperation("master"):
54  total = pool.reduce(operator.add, self.run, list(dataIdList.values()),
55  butler=visitRef.getButler())
56  self.log.info("Total number of pixels read: %d" % (total,))
57 
daf::base::PropertyList * list
Definition: fits.cc:913

Member Data Documentation

◆ ConfigClass

lsst.ctrl.pool.test.demoTask.DemoTask.ConfigClass = Config
static

Definition at line 14 of file demoTask.py.


The documentation for this class was generated from the following file: