LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
parallel.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import re
4 import os
5 import os.path
6 import stat
7 import sys
8 import pipes
9 import tempfile
10 import argparse
11 import traceback
12 import contextlib
13 from lsst.pipe.base import CmdLineTask, TaskRunner
14 from .pool import startPool, Pool, NODE, abortOnError, setBatchType
15 from . import log as dummyLog # noqa
16 
17 __all__ = ["Batch", "PbsBatch", "SlurmBatch", "SmpBatch", "BATCH_TYPES", "BatchArgumentParser",
18  "BatchCmdLineTask", "BatchPoolTask", ]
19 
20 UMASK = 0o002 # umask to set
21 
22 # Functions to convert a list of arguments to a quoted shell command, provided by Dave Abrahams
23 # http://stackoverflow.com/questions/967443/python-module-to-shellquote-unshellquote
24 _quote_pos = re.compile('(?=[^-0-9a-zA-Z_./\n])')
25 
26 
27 def shQuote(arg):
28  r"""Quote the argument for the shell.
29 
30  >>> quote('\t')
31  '\\\t'
32  >>> quote('foo bar')
33  'foo\\ bar'
34  """
35  # This is the logic emacs uses
36  if arg:
37  return _quote_pos.sub('\\\\', arg).replace('\n', "'\n'")
38  else:
39  return "''"
40 
41 
43  """Convert a list of shell arguments to a shell command-line"""
44  return ' '.join([shQuote(a) for a in args])
45 
46 
48  """Collect Linux-specific process statistics
49 
50  Parses the /proc/self/status file (N.B. Linux-specific!) into a dict
51  which is returned.
52  """
53  result = {}
54  with open("/proc/self/status") as f:
55  for line in f:
56  key, _, value = line.partition(":")
57  result[key] = value.strip()
58  return result
59 
60 
62  """Print the process statistics to the log"""
63  from lsst.log import Log
64  log = Log.getDefaultLogger()
65  log.info("Process stats for %s: %s" % (NODE, processStats()))
66 
67 
68 class Batch:
69  """Base class for batch submission"""
70 
71  def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
72  walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
73  verbose=False):
74  """!Constructor
75 
76  @param outputDir: output directory, or None
77  @param numNodes: number of nodes
78  @param numProcsPerNode: number of processors per node
79  @param numCores: number of cores (Slurm, SMP only)
80  @param queue: name of queue, or None
81  @param jobName: name of job, or None
82  @param walltime: maximum wall clock time for job
83  @param dryrun: Dry run (only print actions that would be taken)?
84  @param doExec: exec the script instead of submitting to batch system?
85  @param mpiexec: options for mpiexec
86  @param submit: command-line options for batch submission (e.g., for qsub, sbatch)
87  @param options: options to append to script header (e.g., #PBS or #SBATCH)
88  @param verbose: produce verbose output?
89  """
90  if (numNodes <= 0 or numProcsPerNode <= 0) and numCores <= 0:
91  raise RuntimeError("Must specify numNodes+numProcs or numCores")
92 
93  self.outputDiroutputDir = outputDir
94  self.numNodesnumNodes = numNodes
95  self.numProcsPerNodenumProcsPerNode = numProcsPerNode
96  self.numCoresnumCores = numCores
97  self.queuequeue = queue
98  self.jobNamejobName = jobName
99  self.walltimewalltime = walltime
100  self.dryrundryrun = dryrun
101  self.doExecdoExec = doExec
102  self.mpiexecmpiexec = mpiexec
103  self.submitsubmit = submit
104  self.optionsoptions = options
105  self.verboseverbose = verbose
106 
107  def shebang(self):
108  return "#!/bin/bash"
109 
110  def preamble(self, command, walltime=None):
111  """Return preamble string for script to be submitted
112 
113  Most batch systems allow you to embed submission options as comments here.
114  """
115  raise NotImplementedError("Not implemented for base class")
116 
117  def execution(self, command):
118  """Return execution string for script to be submitted"""
119  script = [exportEnv(),
120  "umask %03o" % UMASK,
121  "cd %s" % pipes.quote(os.getcwd()),
122  ]
123  if self.verboseverbose:
124  script += ["echo \"mpiexec is at: $(which mpiexec)\"",
125  "ulimit -a",
126  "echo 'umask: ' $(umask)",
127  "eups list -s",
128  "export",
129  "date",
130  ]
131  script += ["mpiexec %s %s" % (self.mpiexecmpiexec, command)]
132  if self.verboseverbose:
133  script += ["date",
134  "echo Done.",
135  ]
136  return "\n".join(script)
137 
138  def createScript(self, command, walltime=None):
139  """!Create script to be submitted
140 
141  @param command: command to run
142  @param walltime: maximum wall clock time, overrides value to constructor
143  @return name of script on filesystem
144  """
145  fd, scriptName = tempfile.mkstemp()
146  with os.fdopen(fd, "w") as f:
147  f.write(self.shebangshebang())
148  f.write('\n')
149  f.write(self.preamblepreamble(walltime))
150  f.write('\n')
151  f.write(self.executionexecution(command))
152  f.write('\n')
153 
154  os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
155  return scriptName
156 
157  def submitCommand(self, scriptName):
158  """!Return command to submit script
159 
160  @param scriptName: name of script on filesystem
161  """
162  raise NotImplementedError("No implementation for base class")
163 
164  def run(self, command, walltime=None):
165  """!Run the batch system
166 
167  Creates and submits the script to execute the provided command
168 
169  @param command: command to run
170  @param walltime: maximum wall clock time, overrides value to constructor
171  @return name of script on filesystem
172  """
173  scriptName = self.createScriptcreateScript(command, walltime=walltime)
174  command = self.submitCommandsubmitCommand(scriptName)
175  if self.dryrundryrun:
176  print("Would run: %s" % command)
177  elif self.doExecdoExec:
178  os.execl(scriptName, scriptName)
179  else:
180  os.system(command)
181  return scriptName
182 
183 
185  """Batch submission with PBS"""
186 
187  def preamble(self, walltime=None):
188  if walltime is None:
189  walltime = self.walltimewalltime
190  if walltime <= 0:
191  raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
192  if self.numNodesnumNodes <= 0 or self.numProcsPerNodenumProcsPerNode <= 0:
193  raise RuntimeError(
194  "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
195  (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode))
196  if self.numCoresnumCores > 0:
197  raise RuntimeError("PBS does not support setting the number of cores")
198  return "\n".join([
199  "#PBS %s" % self.optionsoptions if self.optionsoptions is not None else "",
200  "#PBS -l nodes=%d:ppn=%d" % (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode),
201  "#PBS -l walltime=%d" % walltime if walltime is not None else "",
202  "#PBS -o %s" % self.outputDiroutputDir if self.outputDiroutputDir is not None else "",
203  "#PBS -N %s" % self.jobNamejobName if self.jobNamejobName is not None else "",
204  "#PBS -q %s" % self.queuequeue if self.queuequeue is not None else "",
205  "#PBS -j oe",
206  "#PBS -W umask=%03o" % UMASK,
207  ])
208 
209  def submitCommand(self, scriptName):
210  return "qsub %s -V %s" % (self.submitsubmit if self.submitsubmit is not None else "", scriptName)
211 
212 
214  """Batch submission with Slurm"""
215 
216  @staticmethod
217  def formatWalltime(walltime):
218  """Format walltime (in seconds) as days-hours:minutes"""
219  secInDay = 3600*24
220  secInHour = 3600
221  secInMinute = 60
222  days = walltime//secInDay
223  walltime -= days*secInDay
224  hours = walltime//secInHour
225  walltime -= hours*secInHour
226  minutes = walltime//secInMinute
227  walltime -= minutes*secInMinute
228  if walltime > 0:
229  minutes += 1
230  return "%d-%d:%d" % (days, hours, minutes)
231 
232  def preamble(self, walltime=None):
233  if walltime is None:
234  walltime = self.walltimewalltime
235  if walltime <= 0:
236  raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
237  if (self.numNodesnumNodes <= 0 or self.numProcsPerNodenumProcsPerNode <= 0) and self.numCoresnumCores <= 0:
238  raise RuntimeError(
239  "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR "
240  "number of cores (--cores=%d) not set" % (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode, self.numCoresnumCores))
241  if self.numCoresnumCores > 0 and (self.numNodesnumNodes > 0 or self.numProcsPerNodenumProcsPerNode > 0):
242  raise RuntimeError("Must set either --nodes,--procs or --cores: not both")
243 
244  outputDir = self.outputDiroutputDir if self.outputDiroutputDir is not None else os.getcwd()
245  filename = os.path.join(outputDir, (self.jobNamejobName if self.jobNamejobName is not None else "slurm") + ".o%j")
246  return "\n".join([("#SBATCH --nodes=%d" % self.numNodesnumNodes) if self.numNodesnumNodes > 0 else "",
247  ("#SBATCH --ntasks-per-node=%d" % self.numProcsPerNodenumProcsPerNode) if
248  self.numProcsPerNodenumProcsPerNode > 0 else "",
249  ("#SBATCH --ntasks=%d" % self.numCoresnumCores) if self.numCoresnumCores > 0 else "",
250  "#SBATCH --time=%s" % self.formatWalltimeformatWalltime(walltime),
251  "#SBATCH --job-name=%s" % self.jobNamejobName if self.jobNamejobName is not None else "",
252  "#SBATCH -p %s" % self.queuequeue if self.queuequeue is not None else "",
253  "#SBATCH --output=%s" % filename,
254  "#SBATCH --error=%s" % filename,
255  "#SBATCH %s" % self.optionsoptions if self.optionsoptions is not None else "",
256  ])
257 
258  def submitCommand(self, scriptName):
259  return "sbatch %s %s" % (self.submitsubmit if self.submitsubmit is not None else "", scriptName)
260 
261 
263  """Not-really-Batch submission with multiple cores on the current node
264 
265  The job is run immediately.
266  """
267 
268  def __init__(self, *args, **kwargs):
269  super(SmpBatch, self).__init__(*args, **kwargs)
270  if self.numNodesnumNodesnumNodes in (0, 1) and self.numProcsPerNodenumProcsPerNodenumProcsPerNode > 0 and self.numCoresnumCoresnumCores == 0:
271  # --nodes=1 --procs=NN being used as a synonym for --cores=NN
272  self.numNodesnumNodesnumNodes = 0
273  self.numCoresnumCoresnumCores = self.numProcsPerNodenumProcsPerNodenumProcsPerNode
274  self.numProcsPerNodenumProcsPerNodenumProcsPerNode = 0
275  if self.numNodesnumNodesnumNodes > 0 or self.numProcsPerNodenumProcsPerNodenumProcsPerNode > 0:
276  raise RuntimeError("SMP does not support the --nodes and --procs command-line options; "
277  "use --cores to specify the number of cores to use")
278  if self.numCoresnumCoresnumCores > 1:
279  self.mpiexecmpiexecmpiexec = "%s -n %d" % (self.mpiexecmpiexecmpiexec if self.mpiexecmpiexecmpiexec is not None else "", self.numCoresnumCoresnumCores)
280  else:
281  self.mpiexecmpiexecmpiexec = ""
282 
283  def preamble(self, walltime=None):
284  return ""
285 
286  def submitCommand(self, scriptName):
287  return "exec %s" % scriptName
288 
289 
290 BATCH_TYPES = {'none': None,
291  'None': None,
292  'pbs': PbsBatch,
293  'slurm': SlurmBatch,
294  'smp': SmpBatch,
295  } # Mapping batch type --> Batch class
296 
297 
298 class BatchArgumentParser(argparse.ArgumentParser):
299  """An argument parser to get relevant parameters for batch submission
300 
301  We want to be able to display the help for a 'parent' ArgumentParser
302  along with the batch-specific options we introduce in this class, but
303  we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent]))
304  because we want to save the list of arguments that this particular
305  BatchArgumentParser doesn't parse, so they can be passed on to a different
306  program (though we also want to parse them to check that they can be parsed).
307  """
308 
309  def __init__(self, parent=None, *args, **kwargs):
310  super(BatchArgumentParser, self).__init__(*args, **kwargs)
311  self._parent_parent = parent
312  group = self.add_argument_group("Batch submission options")
313  group.add_argument("--queue", help="Queue name")
314  group.add_argument("--job", help="Job name")
315  group.add_argument("--nodes", type=int, default=0, help="Number of nodes")
316  group.add_argument("--procs", type=int, default=0, help="Number of processors per node")
317  group.add_argument("--cores", type=int, default=0, help="Number of cores (Slurm/SMP only)")
318  group.add_argument("--time", type=float, default=0,
319  help="Expected execution time per element (sec)")
320  group.add_argument("--walltime", type=float, default=0,
321  help="Expected total execution walltime (sec); overrides cores & time")
322  group.add_argument("--batch-type", dest="batchType", choices=list(BATCH_TYPES.keys()), default="smp",
323  help="Batch system to use")
324  group.add_argument("--batch-verbose", dest="batchVerbose", action="store_true", default=False,
325  help=("Enable verbose output in batch script "
326  "(including system environment information at batch start)?"))
327  group.add_argument("--batch-output", dest="batchOutput", help="Output directory")
328  group.add_argument("--batch-submit", dest="batchSubmit", help="Batch submission command-line flags")
329  group.add_argument("--batch-options", dest="batchOptions", help="Header options for batch script")
330  group.add_argument("--batch-profile", dest="batchProfile", action="store_true", default=False,
331  help="Enable profiling on batch job?")
332  group.add_argument("--batch-stats", dest="batchStats", action="store_true", default=False,
333  help="Print process stats on completion (Linux only)?")
334  group.add_argument("--dry-run", dest="dryrun", default=False, action="store_true",
335  help="Dry run?")
336  group.add_argument("--do-exec", dest="doExec", default=False, action="store_true",
337  help="Exec script instead of submit to batch system?")
338  group.add_argument("--mpiexec", default="", help="mpiexec options")
339 
340  def parse_args(self, config=None, args=None, namespace=None, **kwargs):
341  args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
342  args.parent = None
343  args.leftover = None
344  if len(leftover) > 0:
345  # Save any leftovers for the parent
346  if self._parent_parent is None:
347  self.error("Unrecognised arguments: %s" % leftover)
348  args.parent = self._parent_parent.parse_args(config, args=leftover, **kwargs)
349  args.leftover = leftover
350  args.batch = self.makeBatchmakeBatch(args)
351  return args
352 
353  def makeBatch(self, args):
354  """Create a Batch object from the command-line arguments"""
355  # argMapping is a dict that maps Batch init kwarg names to parsed arguments attribute *names*
356  argMapping = {'outputDir': 'batchOutput',
357  'numNodes': 'nodes',
358  'numProcsPerNode': 'procs',
359  'numCores': 'cores',
360  'walltime': 'time',
361  'queue': 'queue',
362  'jobName': 'job',
363  'dryrun': 'dryrun',
364  'doExec': 'doExec',
365  'mpiexec': 'mpiexec',
366  'submit': 'batchSubmit',
367  'options': 'batchOptions',
368  'verbose': 'batchVerbose',
369  }
370 
371  if BATCH_TYPES[args.batchType] is None:
372  return None
373 
374  # kwargs is a dict that maps Batch init kwarg names to parsed arguments attribute *values*
375  kwargs = {k: getattr(args, v) for k, v in argMapping.items()}
376  return BATCH_TYPES[args.batchType](**kwargs)
377 
378  def format_help(self):
379  text = """This is a script for queue submission of a wrapped script.
380 
381 Use this program name and ignore that for the wrapped script (it will be
382 passed on to the batch system). Arguments for *both* this wrapper script or the
383 wrapped script are valid (if it is required for the wrapped script, it
384 is required for the wrapper as well).
385 
386 *** Batch system submission wrapper:
387 
388 """
389  text += super(BatchArgumentParser, self).format_help()
390  if self._parent_parent is not None:
391  text += """
392 
393 *** Wrapped script:
394 
395 """
396  text += self._parent_parent.format_help()
397  return text
398 
399  def format_usage(self):
400  if self._parent_parent is not None:
401  prog = self._parent_parent.prog
402  self._parent_parent.prog = self.prog
403  usage = self._parent_parent.format_usage()
404  self._parent_parent.prog = prog
405  return usage
406  return super(BatchArgumentParser, self).format_usage()
407 
408 
409 def exportEnv():
410  """Generate bash script to regenerate the current environment"""
411  output = ""
412  for key, val in os.environ.items():
413  if key in ("DISPLAY",):
414  continue
415  if val.startswith("() {"):
416  # This is a function.
417  # "Two parentheses, a single space, and a brace"
418  # is exactly the same criterion as bash uses.
419 
420  # From 2014-09-25, the function name is prefixed by 'BASH_FUNC_'
421  # and suffixed by '()', which we have to remove.
422  if key.startswith("BASH_FUNC_") and key.endswith("()"):
423  key = key[10:-2]
424 
425  output += "{key} {val}\nexport -f {key}\n".format(key=key, val=val)
426  else:
427  # This is a variable.
428  output += "export {key}='{val}'\n".format(key=key, val=val.replace("'", "'\"'\"'"))
429  return output
430 
431 
432 class BatchCmdLineTask(CmdLineTask):
433 
434  @classmethod
435  def parseAndSubmit(cls, args=None, **kwargs):
436  taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437  batchParser = BatchArgumentParser(parent=taskParser)
438  batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439  **kwargs)
440 
441  if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442  taskParser.error("Error in task preparation")
443 
444  setBatchType(batchArgs.batch)
445 
446  if batchArgs.batch is None: # don't use a batch system
447  sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448 
449  return cls.parseAndRun()
450  else:
451  if batchArgs.walltime > 0:
452  walltime = batchArgs.walltime
453  else:
454  numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455  walltime = cls.batchWallTimebatchWallTime(batchArgs.time, batchArgs.parent, numCores)
456 
457  command = cls.batchCommandbatchCommand(batchArgs)
458  batchArgs.batch.run(command, walltime=walltime)
459 
460  @classmethod
461  def batchWallTime(cls, time, parsedCmd, numCores):
462  """!Return walltime request for batch job
463 
464  Subclasses should override if the walltime should be calculated
465  differently (e.g., addition of some serial time).
466 
467  @param cls: Class
468  @param time: Requested time per iteration
469  @param parsedCmd: Results of argument parsing
470  @param numCores: Number of cores
471  """
472  numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
473  return time*numTargets/float(numCores)
474 
475  @classmethod
476  def batchCommand(cls, args):
477  """!Return command to run CmdLineTask
478 
479  @param cls: Class
480  @param args: Parsed batch job arguments (from BatchArgumentParser)
481  """
482  job = args.job if args.job is not None else "job"
483  module = cls.__module__
484  script = ("import os; os.umask(%#05o); " +
485  "import lsst.base; lsst.base.disableImplicitThreading(); " +
486  "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487 
488  if args.batchStats:
489  script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490  "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491 
492  script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493 
494  profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495  profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496 
497  return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498  (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499  " --noExit")
500 
501  @contextlib.contextmanager
502  def logOperation(self, operation, catch=False, trace=True):
503  """!Provide a context manager for logging an operation
504 
505  @param operation: description of operation (string)
506  @param catch: Catch all exceptions?
507  @param trace: Log a traceback of caught exception?
508 
509  Note that if 'catch' is True, all exceptions are swallowed, but there may
510  be other side-effects such as undefined variables.
511  """
512  self.log.info("%s: Start %s" % (NODE, operation))
513  try:
514  yield
515  except Exception:
516  if catch:
517  cls, e, _ = sys.exc_info()
518  self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519  if trace:
520  self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521  return
522  raise
523  finally:
524  self.log.info("%s: Finished %s" % (NODE, operation))
525 
526 
528  """Starts a BatchCmdLineTask with an MPI process pool
529 
530  Use this subclass of BatchCmdLineTask if you want to use the Pool directly.
531  """
532  @classmethod
533  @abortOnError
534  def parseAndRun(cls, *args, **kwargs):
535  """Run with a MPI process pool"""
536  pool = startPool()
537  super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
538  pool.exit()
539 
540 
541 class BatchTaskRunner(TaskRunner):
542  """Run a Task individually on a list of inputs using the MPI process pool"""
543 
544  def __init__(self, *args, **kwargs):
545  """Constructor
546 
547  Warn if the user specified multiprocessing.
548  """
549  TaskRunner.__init__(self, *args, **kwargs)
550  if self.numProcessesnumProcesses > 1:
551  self.log.warn("Multiprocessing arguments (-j %d) ignored since using batch processing" %
552  self.numProcessesnumProcesses)
553  self.numProcessesnumProcesses = 1
554 
555  def run(self, parsedCmd):
556  """Run the task on all targets
557 
558  Sole input is the result of parsing the command-line with the ArgumentParser.
559 
560  Output is None if 'precall' failed; otherwise it is a list of calling ourself
561  on each element of the target list from the 'getTargetList' method.
562  """
563  resultList = None
564 
565  self.prepareForMultiProcessing()
566  pool = Pool()
567 
568  if self.precall(parsedCmd):
569  targetList = self.getTargetList(parsedCmd)
570  if len(targetList) > 0:
571  parsedCmd.log.info("Processing %d targets with a pool of %d processes..." %
572  (len(targetList), pool.size))
573  # Run the task using self.__call__
574  resultList = pool.map(self, targetList)
575  else:
576  parsedCmd.log.warn("Not running the task because there is no data to process; "
577  "you may preview data using \"--show data\"")
578  resultList = []
579 
580  return resultList
581 
582  @abortOnError
583  def __call__(self, cache, args):
584  """Run the Task on a single target
585 
586  Strips out the process pool 'cache' argument.
587 
588  'args' are those arguments provided by the getTargetList method.
589 
590  Brings down the entire job if an exception is not caught (i.e., --doraise).
591  """
592  return TaskRunner.__call__(self, args)
593 
594 
596  """Runs the BatchCmdLineTask in parallel
597 
598  Use this subclass of BatchCmdLineTask if you don't need to use the Pool
599  directly, but just want to iterate over many objects (like a multi-node
600  version of the '-j' command-line argument).
601  """
602  RunnerClass = BatchTaskRunner
603 
604  @classmethod
605  def _makeArgumentParser(cls, *args, **kwargs):
606  """Build an ArgumentParser
607 
608  Removes the batch-specific parts in order to delegate to the parent classes.
609  """
610  kwargs.pop("doBatch", False)
611  kwargs.pop("add_help", False)
612  return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
613 
614  @classmethod
615  def parseAndRun(cls, *args, **kwargs):
616  """Parse an argument list and run the command
617 
618  This is the entry point when we run in earnest, so start the process pool
619  so that the worker nodes don't go any further.
620  """
621  pool = startPool()
622  results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
623  pool.exit()
624  return results
def parse_args(self, config=None, args=None, namespace=None, **kwargs)
Definition: parallel.py:340
def __init__(self, parent=None, *args, **kwargs)
Definition: parallel.py:309
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: parallel.py:461
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:502
def parseAndSubmit(cls, args=None, **kwargs)
Definition: parallel.py:435
def batchCommand(cls, args)
Return command to run CmdLineTask.
Definition: parallel.py:476
def execution(self, command)
Definition: parallel.py:117
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:157
def run(self, command, walltime=None)
Run the batch system.
Definition: parallel.py:164
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor.
Definition: parallel.py:73
def preamble(self, command, walltime=None)
Definition: parallel.py:110
def createScript(self, command, walltime=None)
Create script to be submitted.
Definition: parallel.py:138
def parseAndRun(cls, *args, **kwargs)
Definition: parallel.py:615
def parseAndRun(cls, *args, **kwargs)
Definition: parallel.py:534
def __call__(self, cache, args)
Definition: parallel.py:583
def __init__(self, *args, **kwargs)
Definition: parallel.py:544
def preamble(self, walltime=None)
Definition: parallel.py:187
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:209
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:258
def preamble(self, walltime=None)
Definition: parallel.py:232
def __init__(self, *args, **kwargs)
Definition: parallel.py:268
def preamble(self, walltime=None)
Definition: parallel.py:283
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:286
daf::base::PropertyList * list
Definition: fits.cc:913
def shCommandFromArgs(args)
Definition: parallel.py:42
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216
def setBatchType(batchType)
Definition: pool.py:101
Definition: Log.h:717
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174