LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask Class Reference
Inheritance diagram for lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask:
lsst.ctrl.pool.parallel.BatchParallelTask lsst.ctrl.pool.parallel.BatchCmdLineTask

Public Member Functions

def __init__ (self, *args, **kwargs)
 
def runDataRef (self, sensorRef)
 
def parseAndRun (cls, *args, **kwargs)
 
def parseAndSubmit (cls, args=None, **kwargs)
 
def batchWallTime (cls, time, parsedCmd, numCores)
 Return walltime request for batch job. More...
 
def batchCommand (cls, args)
 Return command to run CmdLineTask. More...
 
def logOperation (self, operation, catch=False, trace=True)
 Provide a context manager for logging an operation. More...
 

Public Attributes

 ignoreCcds
 

Static Public Attributes

 ConfigClass = ProcessCcdWithFakesDriverConfig
 
 RunnerClass = ProcessCcdWithFakesTaskRunner
 

Detailed Description

Process CCDs in parallel for processCcdWithFakes

Definition at line 32 of file processCcdWithFakesDriver.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask.__init__ (   self,
args,
**  kwargs 
)
Parameters
----------
kwargs :  other keyword arguments for lsst.ctrl.pool.BatchParallelTask

Definition at line 39 of file processCcdWithFakesDriver.py.

39  def __init__(self, *args, **kwargs):
40  """
41  Parameters
42  ----------
43  kwargs : other keyword arguments for lsst.ctrl.pool.BatchParallelTask
44  """
45  BatchParallelTask.__init__(self, *args, **kwargs)
46  self.ignoreCcds = set(self.config.ignoreCcdList)
47  self.makeSubtask("processCcdWithFakes")
48  if self.config.doMakeSourceTable:
49  self.makeSubtask("writeSourceTable")
50  self.makeSubtask("transformSourceTable")
51 
daf::base::PropertySet * set
Definition: fits.cc:912

Member Function Documentation

◆ batchCommand()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchCommand (   cls,
  args 
)
inherited

Return command to run CmdLineTask.

    @param cls: Class
    @param args: Parsed batch job arguments (from BatchArgumentParser)

Definition at line 476 of file parallel.py.

476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
def shCommandFromArgs(args)
Definition: parallel.py:42

◆ batchWallTime()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.batchWallTime (   cls,
  time,
  parsedCmd,
  numCores 
)
inherited

Return walltime request for batch job.

    Subclasses should override if the walltime should be calculated
    differently (e.g., addition of some serial time).

    @param cls: Class
    @param time: Requested time per iteration
    @param parsedCmd: Results of argument parsing
    @param numCores: Number of cores

Reimplemented in lsst.pipe.drivers.multiBandDriver.MultiBandDriverTask, lsst.pipe.drivers.visualizeVisit.VisualizeVisitTask, lsst.pipe.drivers.skyCorrection.SkyCorrectionTask, lsst.pipe.drivers.ingestDriver.PoolIngestTask, lsst.pipe.drivers.constructCalibs.CalibTask, lsst.pipe.drivers.coaddDriver.CoaddDriverTask, and lsst.ctrl.pool.test.demoTask.DemoTask.

Definition at line 461 of file parallel.py.

461  def batchWallTime(cls, time, parsedCmd, numCores):
462  """!Return walltime request for batch job
463 
464  Subclasses should override if the walltime should be calculated
465  differently (e.g., addition of some serial time).
466 
467  @param cls: Class
468  @param time: Requested time per iteration
469  @param parsedCmd: Results of argument parsing
470  @param numCores: Number of cores
471  """
472  numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
473  return time*numTargets/float(numCores)
474 

◆ logOperation()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.logOperation (   self,
  operation,
  catch = False,
  trace = True 
)
inherited

Provide a context manager for logging an operation.

    @param operation: description of operation (string)
    @param catch: Catch all exceptions?
    @param trace: Log a traceback of caught exception?

    Note that if 'catch' is True, all exceptions are swallowed, but there may
    be other side-effects such as undefined variables.

Definition at line 502 of file parallel.py.

502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 

◆ parseAndRun()

def lsst.ctrl.pool.parallel.BatchParallelTask.parseAndRun (   cls,
args,
**  kwargs 
)
inherited
Parse an argument list and run the command

This is the entry point when we run in earnest, so start the process pool
so that the worker nodes don't go any further.

Definition at line 615 of file parallel.py.

615  def parseAndRun(cls, *args, **kwargs):
616  """Parse an argument list and run the command
617 
618  This is the entry point when we run in earnest, so start the process pool
619  so that the worker nodes don't go any further.
620  """
621  pool = startPool()
622  results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
623  pool.exit()
624  return results
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216

◆ parseAndSubmit()

def lsst.ctrl.pool.parallel.BatchCmdLineTask.parseAndSubmit (   cls,
  args = None,
**  kwargs 
)
inherited

Definition at line 435 of file parallel.py.

435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
def setBatchType(batchType)
Definition: pool.py:101

◆ runDataRef()

def lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask.runDataRef (   self,
  sensorRef 
)
Process a single CCD, with scatter-gather-scatter using MPI.

Definition at line 61 of file processCcdWithFakesDriver.py.

61  def runDataRef(self, sensorRef):
62  """Process a single CCD, with scatter-gather-scatter using MPI.
63  """
64  if sensorRef.dataId[self.config.ccdKey] in self.ignoreCcds:
65  self.log.warn("Ignoring %s: CCD in ignoreCcdList" %
66  (sensorRef.dataId))
67  return None
68 
69  with self.logOperation("processing %s" % (sensorRef.dataId,)):
70  result = self.processCcdWithFakes.runDataRef(sensorRef)
71  if self.config.doMakeSourceTable:
72  parquet = self.writeSourceTable.run(result.outputCat,
73  ccdVisitId=sensorRef.get('ccdExposureId'))
74  if self.config.doSaveWideSourceTable:
75  sensorRef.put(parquet.table, 'fakes_source')
76 
77  df = self.transformSourceTable.run(parquet.table,
78  funcs=self.transformSourceTable.getFunctors(),
79  dataId=sensorRef.dataId)
80  self.transformSourceTable.write(df, sensorRef)
81 
82  return result
void write(OutputArchiveHandle &handle) const override
def run(self, coaddExposures, bbox, wcs)
Definition: getTemplate.py:603

Member Data Documentation

◆ ConfigClass

lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask.ConfigClass = ProcessCcdWithFakesDriverConfig
static

Definition at line 35 of file processCcdWithFakesDriver.py.

◆ ignoreCcds

lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask.ignoreCcds

Definition at line 46 of file processCcdWithFakesDriver.py.

◆ RunnerClass

lsst.pipe.drivers.processCcdWithFakesDriver.ProcessCcdWithFakesDriverTask.RunnerClass = ProcessCcdWithFakesTaskRunner
static

Definition at line 37 of file processCcdWithFakesDriver.py.


The documentation for this class was generated from the following file: