LSSTApplications  17.0+11,17.0+34,17.0+56,17.0+57,17.0+59,17.0+7,17.0-1-g377950a+33,17.0.1-1-g114240f+2,17.0.1-1-g4d4fbc4+28,17.0.1-1-g55520dc+49,17.0.1-1-g5f4ed7e+52,17.0.1-1-g6dd7d69+17,17.0.1-1-g8de6c91+11,17.0.1-1-gb9095d2+7,17.0.1-1-ge9fec5e+5,17.0.1-1-gf4e0155+55,17.0.1-1-gfc65f5f+50,17.0.1-1-gfc6fb1f+20,17.0.1-10-g87f9f3f+1,17.0.1-11-ge9de802+16,17.0.1-16-ga14f7d5c+4,17.0.1-17-gc79d625+1,17.0.1-17-gdae4c4a+8,17.0.1-2-g26618f5+29,17.0.1-2-g54f2ebc+9,17.0.1-2-gf403422+1,17.0.1-20-g2ca2f74+6,17.0.1-23-gf3eadeb7+1,17.0.1-3-g7e86b59+39,17.0.1-3-gb5ca14a,17.0.1-3-gd08d533+40,17.0.1-30-g596af8797,17.0.1-4-g59d126d+4,17.0.1-4-gc69c472+5,17.0.1-6-g5afd9b9+4,17.0.1-7-g35889ee+1,17.0.1-7-gc7c8782+18,17.0.1-9-gc4bbfb2+3,w.2019.22
LSSTDataManagementBasePackage
parallel.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import re
4 import os
5 import os.path
6 import stat
7 import sys
8 import pipes
9 import tempfile
10 import argparse
11 import traceback
12 import contextlib
13 from lsst.pipe.base import CmdLineTask, TaskRunner
14 from .pool import startPool, Pool, NODE, abortOnError, setBatchType
15 from . import log as dummyLog # noqa
16 
17 __all__ = ["Batch", "PbsBatch", "SlurmBatch", "SmpBatch", "BATCH_TYPES", "BatchArgumentParser",
18  "BatchCmdLineTask", "BatchPoolTask", ]
19 
20 UMASK = 0o002 # umask to set
21 
22 # Functions to convert a list of arguments to a quoted shell command, provided by Dave Abrahams
23 # http://stackoverflow.com/questions/967443/python-module-to-shellquote-unshellquote
24 _quote_pos = re.compile('(?=[^-0-9a-zA-Z_./\n])')
25 
26 
27 def shQuote(arg):
28  r"""Quote the argument for the shell.
29 
30  >>> quote('\t')
31  '\\\t'
32  >>> quote('foo bar')
33  'foo\\ bar'
34  """
35  # This is the logic emacs uses
36  if arg:
37  return _quote_pos.sub('\\\\', arg).replace('\n', "'\n'")
38  else:
39  return "''"
40 
41 
43  """Convert a list of shell arguments to a shell command-line"""
44  return ' '.join([shQuote(a) for a in args])
45 
46 
48  """Collect Linux-specific process statistics
49 
50  Parses the /proc/self/status file (N.B. Linux-specific!) into a dict
51  which is returned.
52  """
53  result = {}
54  with open("/proc/self/status") as f:
55  for line in f:
56  key, _, value = line.partition(":")
57  result[key] = value.strip()
58  return result
59 
60 
62  """Print the process statistics to the log"""
63  from lsst.log import Log
64  log = Log.getDefaultLogger()
65  log.info("Process stats for %s: %s" % (NODE, processStats()))
66 
67 
68 class Batch(object):
69  """Base class for batch submission"""
70 
71  def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
72  walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
73  verbose=False):
74  """!Constructor
75 
76  @param outputDir: output directory, or None
77  @param numNodes: number of nodes
78  @param numProcsPerNode: number of processors per node
79  @param numCores: number of cores (Slurm, SMP only)
80  @param queue: name of queue, or None
81  @param jobName: name of job, or None
82  @param walltime: maximum wall clock time for job
83  @param dryrun: Dry run (only print actions that would be taken)?
84  @param doExec: exec the script instead of submitting to batch system?
85  @param mpiexec: options for mpiexec
86  @param submit: command-line options for batch submission (e.g., for qsub, sbatch)
87  @param options: options to append to script header (e.g., #PBS or #SBATCH)
88  @param verbose: produce verbose output?
89  """
90  if (numNodes <= 0 or numProcsPerNode <= 0) and numCores <= 0:
91  raise RuntimeError("Must specify numNodes+numProcs or numCores")
92 
93  self.outputDir = outputDir
94  self.numNodes = numNodes
95  self.numProcsPerNode = numProcsPerNode
96  self.numCores = numCores
97  self.queue = queue
98  self.jobName = jobName
99  self.walltime = walltime
100  self.dryrun = dryrun
101  self.doExec = doExec
102  self.mpiexec = mpiexec
103  self.submit = submit
104  self.options = options
105  self.verbose = verbose
106 
107  def shebang(self):
108  return "#!/bin/bash"
109 
110  def preamble(self, command, walltime=None):
111  """Return preamble string for script to be submitted
112 
113  Most batch systems allow you to embed submission options as comments here.
114  """
115  raise NotImplementedError("Not implemented for base class")
116 
117  def execution(self, command):
118  """Return execution string for script to be submitted"""
119  script = [exportEnv(),
120  "umask %03o" % UMASK,
121  "cd %s" % pipes.quote(os.getcwd()),
122  ]
123  if self.verbose:
124  script += ["echo \"mpiexec is at: $(which mpiexec)\"",
125  "ulimit -a",
126  "echo 'umask: ' $(umask)",
127  "eups list -s",
128  "export",
129  "date",
130  ]
131  script += ["mpiexec %s %s" % (self.mpiexec, command)]
132  if self.verbose:
133  script += ["date",
134  "echo Done.",
135  ]
136  return "\n".join(script)
137 
138  def createScript(self, command, walltime=None):
139  """!Create script to be submitted
140 
141  @param command: command to run
142  @param walltime: maximum wall clock time, overrides value to constructor
143  @return name of script on filesystem
144  """
145  fd, scriptName = tempfile.mkstemp()
146  with os.fdopen(fd, "w") as f:
147  f.write(self.shebang())
148  f.write('\n')
149  f.write(self.preamble(walltime))
150  f.write('\n')
151  f.write(self.execution(command))
152  f.write('\n')
153 
154  os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
155  return scriptName
156 
157  def submitCommand(self, scriptName):
158  """!Return command to submit script
159 
160  @param scriptName: name of script on filesystem
161  """
162  raise NotImplementedError("No implementation for base class")
163 
164  def run(self, command, walltime=None):
165  """!Run the batch system
166 
167  Creates and submits the script to execute the provided command
168 
169  @param command: command to run
170  @param walltime: maximum wall clock time, overrides value to constructor
171  @return name of script on filesystem
172  """
173  scriptName = self.createScript(command, walltime=walltime)
174  command = self.submitCommand(scriptName)
175  if self.dryrun:
176  print("Would run: %s" % command)
177  elif self.doExec:
178  os.execl(scriptName, scriptName)
179  else:
180  os.system(command)
181  return scriptName
182 
183 
185  """Batch submission with PBS"""
186 
187  def preamble(self, walltime=None):
188  if walltime is None:
189  walltime = self.walltime
190  if walltime <= 0:
191  raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
192  if self.numNodes <= 0 or self.numProcsPerNode <= 0:
193  raise RuntimeError(
194  "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
195  (self.numNodes, self.numProcsPerNode))
196  if self.numCores > 0:
197  raise RuntimeError("PBS does not support setting the number of cores")
198  return "\n".join([
199  "#PBS %s" % self.options if self.options is not None else "",
200  "#PBS -l nodes=%d:ppn=%d" % (self.numNodes, self.numProcsPerNode),
201  "#PBS -l walltime=%d" % walltime if walltime is not None else "",
202  "#PBS -o %s" % self.outputDir if self.outputDir is not None else "",
203  "#PBS -N %s" % self.jobName if self.jobName is not None else "",
204  "#PBS -q %s" % self.queue if self.queue is not None else "",
205  "#PBS -j oe",
206  "#PBS -W umask=%03o" % UMASK,
207  ])
208 
209  def submitCommand(self, scriptName):
210  return "qsub %s -V %s" % (self.submit if self.submit is not None else "", scriptName)
211 
212 
214  """Batch submission with Slurm"""
215 
216  @staticmethod
217  def formatWalltime(walltime):
218  """Format walltime (in seconds) as days-hours:minutes"""
219  secInDay = 3600*24
220  secInHour = 3600
221  secInMinute = 60
222  days = walltime//secInDay
223  walltime -= days*secInDay
224  hours = walltime//secInHour
225  walltime -= hours*secInHour
226  minutes = walltime//secInMinute
227  walltime -= minutes*secInMinute
228  if walltime > 0:
229  minutes += 1
230  return "%d-%d:%d" % (days, hours, minutes)
231 
232  def preamble(self, walltime=None):
233  if walltime is None:
234  walltime = self.walltime
235  if walltime <= 0:
236  raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
237  if (self.numNodes <= 0 or self.numProcsPerNode <= 0) and self.numCores <= 0:
238  raise RuntimeError(
239  "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR "
240  "number of cores (--cores=%d) not set" % (self.numNodes, self.numProcsPerNode, self.numCores))
241  if self.numCores > 0 and (self.numNodes > 0 or self.numProcsPerNode > 0):
242  raise RuntimeError("Must set either --nodes,--procs or --cores: not both")
243 
244  outputDir = self.outputDir if self.outputDir is not None else os.getcwd()
245  filename = os.path.join(outputDir, (self.jobName if self.jobName is not None else "slurm") + ".o%j")
246  return "\n".join([("#SBATCH --nodes=%d" % self.numNodes) if self.numNodes > 0 else "",
247  ("#SBATCH --ntasks-per-node=%d" % self.numProcsPerNode) if
248  self.numProcsPerNode > 0 else "",
249  ("#SBATCH --ntasks=%d" % self.numCores) if self.numCores > 0 else "",
250  "#SBATCH --time=%s" % self.formatWalltime(walltime),
251  "#SBATCH --job-name=%s" % self.jobName if self.jobName is not None else "",
252  "#SBATCH -p %s" % self.queue if self.queue is not None else "",
253  "#SBATCH --output=%s" % filename,
254  "#SBATCH --error=%s" % filename,
255  "#SBATCH %s" % self.options if self.options is not None else "",
256  ])
257 
258  def submitCommand(self, scriptName):
259  return "sbatch %s %s" % (self.submit if self.submit is not None else "", scriptName)
260 
261 
263  """Not-really-Batch submission with multiple cores on the current node
264 
265  The job is run immediately.
266  """
267 
268  def __init__(self, *args, **kwargs):
269  super(SmpBatch, self).__init__(*args, **kwargs)
270  if self.numNodes in (0, 1) and self.numProcsPerNode > 0 and self.numCores == 0:
271  # --nodes=1 --procs=NN being used as a synonym for --cores=NN
272  self.numNodes = 0
273  self.numCores = self.numProcsPerNode
275  if self.numNodes > 0 or self.numProcsPerNode > 0:
276  raise RuntimeError("SMP does not support the --nodes and --procs command-line options; "
277  "use --cores to specify the number of cores to use")
278  if self.numCores > 1:
279  self.mpiexec = "%s -n %d" % (self.mpiexec if self.mpiexec is not None else "", self.numCores)
280  else:
281  self.mpiexec = ""
282 
283  def preamble(self, walltime=None):
284  return ""
285 
286  def submitCommand(self, scriptName):
287  return "exec %s" % scriptName
288 
289 
290 BATCH_TYPES = {'none': None,
291  'None': None,
292  'pbs': PbsBatch,
293  'slurm': SlurmBatch,
294  'smp': SmpBatch,
295  } # Mapping batch type --> Batch class
296 
297 
298 class BatchArgumentParser(argparse.ArgumentParser):
299  """An argument parser to get relevant parameters for batch submission
300 
301  We want to be able to display the help for a 'parent' ArgumentParser
302  along with the batch-specific options we introduce in this class, but
303  we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent]))
304  because we want to save the list of arguments that this particular
305  BatchArgumentParser doesn't parse, so they can be passed on to a different
306  program (though we also want to parse them to check that they can be parsed).
307  """
308 
309  def __init__(self, parent=None, *args, **kwargs):
310  super(BatchArgumentParser, self).__init__(*args, **kwargs)
311  self._parent = parent
312  group = self.add_argument_group("Batch submission options")
313  group.add_argument("--queue", help="Queue name")
314  group.add_argument("--job", help="Job name")
315  group.add_argument("--nodes", type=int, default=0, help="Number of nodes")
316  group.add_argument("--procs", type=int, default=0, help="Number of processors per node")
317  group.add_argument("--cores", type=int, default=0, help="Number of cores (Slurm/SMP only)")
318  group.add_argument("--time", type=float, default=0,
319  help="Expected execution time per element (sec)")
320  group.add_argument("--batch-type", dest="batchType", choices=list(BATCH_TYPES.keys()), default="smp",
321  help="Batch system to use")
322  group.add_argument("--batch-verbose", dest="batchVerbose", action="store_true", default=False,
323  help=("Enable verbose output in batch script "
324  "(including system environment information at batch start)?"))
325  group.add_argument("--batch-output", dest="batchOutput", help="Output directory")
326  group.add_argument("--batch-submit", dest="batchSubmit", help="Batch submission command-line flags")
327  group.add_argument("--batch-options", dest="batchOptions", help="Header options for batch script")
328  group.add_argument("--batch-profile", dest="batchProfile", action="store_true", default=False,
329  help="Enable profiling on batch job?")
330  group.add_argument("--batch-stats", dest="batchStats", action="store_true", default=False,
331  help="Print process stats on completion (Linux only)?")
332  group.add_argument("--dry-run", dest="dryrun", default=False, action="store_true",
333  help="Dry run?")
334  group.add_argument("--do-exec", dest="doExec", default=False, action="store_true",
335  help="Exec script instead of submit to batch system?")
336  group.add_argument("--mpiexec", default="", help="mpiexec options")
337 
338  def parse_args(self, config=None, args=None, namespace=None, **kwargs):
339  args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
340  args.parent = None
341  args.leftover = None
342  if len(leftover) > 0:
343  # Save any leftovers for the parent
344  if self._parent is None:
345  self.error("Unrecognised arguments: %s" % leftover)
346  args.parent = self._parent.parse_args(config, args=leftover, **kwargs)
347  args.leftover = leftover
348  args.batch = self.makeBatch(args)
349  return args
350 
351  def makeBatch(self, args):
352  """Create a Batch object from the command-line arguments"""
353  # argMapping is a dict that maps Batch init kwarg names to parsed arguments attribute *names*
354  argMapping = {'outputDir': 'batchOutput',
355  'numNodes': 'nodes',
356  'numProcsPerNode': 'procs',
357  'numCores': 'cores',
358  'walltime': 'time',
359  'queue': 'queue',
360  'jobName': 'job',
361  'dryrun': 'dryrun',
362  'doExec': 'doExec',
363  'mpiexec': 'mpiexec',
364  'submit': 'batchSubmit',
365  'options': 'batchOptions',
366  'verbose': 'batchVerbose',
367  }
368 
369  if BATCH_TYPES[args.batchType] is None:
370  return None
371 
372  # kwargs is a dict that maps Batch init kwarg names to parsed arguments attribute *values*
373  kwargs = {k: getattr(args, v) for k, v in argMapping.items()}
374  return BATCH_TYPES[args.batchType](**kwargs)
375 
376  def format_help(self):
377  text = """This is a script for queue submission of a wrapped script.
378 
379 Use this program name and ignore that for the wrapped script (it will be
380 passed on to the batch system). Arguments for *both* this wrapper script or the
381 wrapped script are valid (if it is required for the wrapped script, it
382 is required for the wrapper as well).
383 
384 *** Batch system submission wrapper:
385 
386 """
387  text += super(BatchArgumentParser, self).format_help()
388  if self._parent is not None:
389  text += """
390 
391 *** Wrapped script:
392 
393 """
394  text += self._parent.format_help()
395  return text
396 
397  def format_usage(self):
398  if self._parent is not None:
399  prog = self._parent.prog
400  self._parent.prog = self.prog
401  usage = self._parent.format_usage()
402  self._parent.prog = prog
403  return usage
404  return super(BatchArgumentParser, self).format_usage()
405 
406 
407 def exportEnv():
408  """Generate bash script to regenerate the current environment"""
409  output = ""
410  for key, val in os.environ.items():
411  if key in ("DISPLAY",):
412  continue
413  if val.startswith("() {"):
414  # This is a function.
415  # "Two parentheses, a single space, and a brace"
416  # is exactly the same criterion as bash uses.
417 
418  # From 2014-09-25, the function name is prefixed by 'BASH_FUNC_'
419  # and suffixed by '()', which we have to remove.
420  if key.startswith("BASH_FUNC_") and key.endswith("()"):
421  key = key[10:-2]
422 
423  output += "{key} {val}\nexport -f {key}\n".format(key=key, val=val)
424  else:
425  # This is a variable.
426  output += "export {key}='{val}'\n".format(key=key, val=val.replace("'", "'\"'\"'"))
427  return output
428 
429 
431 
432  @classmethod
433  def parseAndSubmit(cls, args=None, **kwargs):
434  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
435  batchParser = BatchArgumentParser(parent=taskParser)
436  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
437  **kwargs)
438 
439  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
440  taskParser.error("Error in task preparation")
441 
442  setBatchType(batchArgs.batch)
443 
444  if batchArgs.batch is None: # don't use a batch system
445  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
446 
447  return cls.parseAndRun()
448  else:
449  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
450  walltime = cls.batchWallTime(batchArgs.time, batchArgs.parent, numCores)
451 
452  command = cls.batchCommand(batchArgs)
453  batchArgs.batch.run(command, walltime=walltime)
454 
455  @classmethod
456  def batchWallTime(cls, time, parsedCmd, numCores):
457  """!Return walltime request for batch job
458 
459  Subclasses should override if the walltime should be calculated
460  differently (e.g., addition of some serial time).
461 
462  @param cls: Class
463  @param time: Requested time per iteration
464  @param parsedCmd: Results of argument parsing
465  @param numCores: Number of cores
466  """
467  numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
468  return time*numTargets/float(numCores)
469 
470  @classmethod
471  def batchCommand(cls, args):
472  """!Return command to run CmdLineTask
473 
474  @param cls: Class
475  @param args: Parsed batch job arguments (from BatchArgumentParser)
476  """
477  job = args.job if args.job is not None else "job"
478  module = cls.__module__
479  script = ("import os; os.umask(%#05o); " +
480  "import lsst.base; lsst.base.disableImplicitThreading(); " +
481  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
482 
483  if args.batchStats:
484  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
485  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
486 
487  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
488 
489  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
490  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
491 
492  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
493  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
494  " --noExit")
495 
496  @contextlib.contextmanager
497  def logOperation(self, operation, catch=False, trace=True):
498  """!Provide a context manager for logging an operation
499 
500  @param operation: description of operation (string)
501  @param catch: Catch all exceptions?
502  @param trace: Log a traceback of caught exception?
503 
504  Note that if 'catch' is True, all exceptions are swallowed, but there may
505  be other side-effects such as undefined variables.
506  """
507  self.log.info("%s: Start %s" % (NODE, operation))
508  try:
509  yield
510  except Exception:
511  if catch:
512  cls, e, _ = sys.exc_info()
513  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
514  if trace:
515  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
516  return
517  raise
518  finally:
519  self.log.info("%s: Finished %s" % (NODE, operation))
520 
521 
523  """Starts a BatchCmdLineTask with an MPI process pool
524 
525  Use this subclass of BatchCmdLineTask if you want to use the Pool directly.
526  """
527  @classmethod
528  @abortOnError
529  def parseAndRun(cls, *args, **kwargs):
530  """Run with a MPI process pool"""
531  pool = startPool()
532  super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
533  pool.exit()
534 
535 
537  """Run a Task individually on a list of inputs using the MPI process pool"""
538 
539  def __init__(self, *args, **kwargs):
540  """Constructor
541 
542  Warn if the user specified multiprocessing.
543  """
544  TaskRunner.__init__(self, *args, **kwargs)
545  if self.numProcesses > 1:
546  self.log.warn("Multiprocessing arguments (-j %d) ignored since using batch processing" %
547  self.numProcesses)
548  self.numProcesses = 1
549 
550  def run(self, parsedCmd):
551  """Run the task on all targets
552 
553  Sole input is the result of parsing the command-line with the ArgumentParser.
554 
555  Output is None if 'precall' failed; otherwise it is a list of calling ourself
556  on each element of the target list from the 'getTargetList' method.
557  """
558  resultList = None
559 
561  pool = Pool()
562 
563  if self.precall(parsedCmd):
564  targetList = self.getTargetList(parsedCmd)
565  if len(targetList) > 0:
566  parsedCmd.log.info("Processing %d targets with a pool of %d processes..." %
567  (len(targetList), pool.size))
568  # Run the task using self.__call__
569  resultList = pool.map(self, targetList)
570  else:
571  parsedCmd.log.warn("Not running the task because there is no data to process; "
572  "you may preview data using \"--show data\"")
573  resultList = []
574 
575  return resultList
576 
577  @abortOnError
578  def __call__(self, cache, args):
579  """Run the Task on a single target
580 
581  Strips out the process pool 'cache' argument.
582 
583  'args' are those arguments provided by the getTargetList method.
584 
585  Brings down the entire job if an exception is not caught (i.e., --doraise).
586  """
587  return TaskRunner.__call__(self, args)
588 
589 
591  """Runs the BatchCmdLineTask in parallel
592 
593  Use this subclass of BatchCmdLineTask if you don't need to use the Pool
594  directly, but just want to iterate over many objects (like a multi-node
595  version of the '-j' command-line argument).
596  """
597  RunnerClass = BatchTaskRunner
598 
599  @classmethod
600  def _makeArgumentParser(cls, *args, **kwargs):
601  """Build an ArgumentParser
602 
603  Removes the batch-specific parts in order to delegate to the parent classes.
604  """
605  kwargs.pop("doBatch", False)
606  kwargs.pop("add_help", False)
607  return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
608 
609  @classmethod
610  def parseAndRun(cls, *args, **kwargs):
611  """Parse an argument list and run the command
612 
613  This is the entry point when we run in earnest, so start the process pool
614  so that the worker nodes don't go any further.
615  """
616  pool = startPool()
617  results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
618  pool.exit()
619  return results
def parseAndRun(cls, args, kwargs)
Definition: parallel.py:529
def __call__(self, cache, args)
Definition: parallel.py:578
def preamble(self, walltime=None)
Definition: parallel.py:283
def parseAndRun(cls, args, kwargs)
Definition: parallel.py:610
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
Definition: cmdLineTask.py:549
def batchCommand(cls, args)
Return command to run CmdLineTask.
Definition: parallel.py:471
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor.
Definition: parallel.py:73
def __init__(self, parent=None, args, kwargs)
Definition: parallel.py:309
def preamble(self, walltime=None)
Definition: parallel.py:232
def submitCommand(self, scriptName)
Definition: parallel.py:286
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1217
Definition: Log.h:691
def parseAndSubmit(cls, args=None, kwargs)
Definition: parallel.py:433
def createScript(self, command, walltime=None)
Create script to be submitted.
Definition: parallel.py:138
def __init__(self, args, kwargs)
Definition: parallel.py:268
def preamble(self, walltime=None)
Definition: parallel.py:187
def submitCommand(self, scriptName)
Definition: parallel.py:258
def parse_args(self, config=None, args=None, namespace=None, kwargs)
Definition: parallel.py:338
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:157
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: parallel.py:456
def preamble(self, command, walltime=None)
Definition: parallel.py:110
def submitCommand(self, scriptName)
Definition: parallel.py:209
def setBatchType(batchType)
Definition: pool.py:102
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:497
def __init__(self, args, kwargs)
Definition: parallel.py:539
def getTargetList(parsedCmd, kwargs)
Definition: cmdLineTask.py:233
daf::base::PropertyList * list
Definition: fits.cc:885
def run(self, command, walltime=None)
Run the batch system.
Definition: parallel.py:164
def execution(self, command)
Definition: parallel.py:117
def shCommandFromArgs(args)
Definition: parallel.py:42