LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
parallel.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3import re
4import os
5import os.path
6import stat
7import sys
8import pipes
9import tempfile
10import argparse
11import traceback
12import contextlib
13from lsst.pipe.base import CmdLineTask, TaskRunner
14from .pool import startPool, Pool, NODE, abortOnError, setBatchType
15from . import log as dummyLog # noqa
16
17__all__ = ["Batch", "PbsBatch", "SlurmBatch", "SmpBatch", "BATCH_TYPES", "BatchArgumentParser",
18 "BatchCmdLineTask", "BatchPoolTask", ]
19
20UMASK = 0o002 # umask to set
21
22# Functions to convert a list of arguments to a quoted shell command, provided by Dave Abrahams
23# http://stackoverflow.com/questions/967443/python-module-to-shellquote-unshellquote
24_quote_pos = re.compile('(?=[^-0-9a-zA-Z_./\n])')
25
26
27def shQuote(arg):
28 r"""Quote the argument for the shell.
29
30 >>> quote('\t')
31 '\\\t'
32 >>> quote('foo bar')
33 'foo\\ bar'
34 """
35 # This is the logic emacs uses
36 if arg:
37 return _quote_pos.sub('\\\\', arg).replace('\n', "'\n'")
38 else:
39 return "''"
40
41
43 """Convert a list of shell arguments to a shell command-line"""
44 return ' '.join([shQuote(a) for a in args])
45
46
48 """Collect Linux-specific process statistics
49
50 Parses the /proc/self/status file (N.B. Linux-specific!) into a dict
51 which is returned.
52 """
53 result = {}
54 with open("/proc/self/status") as f:
55 for line in f:
56 key, _, value = line.partition(":")
57 result[key] = value.strip()
58 return result
59
60
62 """Print the process statistics to the log"""
63 from lsst.log import Log
64 log = Log.getDefaultLogger()
65 log.info("Process stats for %s: %s" % (NODE, processStats()))
66
67
68class Batch:
69 """Base class for batch submission"""
70
71 def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
72 walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
73 verbose=False):
74 """!Constructor
75
76 @param outputDir: output directory, or None
77 @param numNodes: number of nodes
78 @param numProcsPerNode: number of processors per node
79 @param numCores: number of cores (Slurm, SMP only)
80 @param queue: name of queue, or None
81 @param jobName: name of job, or None
82 @param walltime: maximum wall clock time for job
83 @param dryrun: Dry run (only print actions that would be taken)?
84 @param doExec: exec the script instead of submitting to batch system?
85 @param mpiexec: options for mpiexec
86 @param submit: command-line options for batch submission (e.g., for qsub, sbatch)
87 @param options: options to append to script header (e.g., #PBS or #SBATCH)
88 @param verbose: produce verbose output?
89 """
90 if (numNodes <= 0 or numProcsPerNode <= 0) and numCores <= 0:
91 raise RuntimeError("Must specify numNodes+numProcs or numCores")
92
93 self.outputDiroutputDir = outputDir
94 self.numNodesnumNodes = numNodes
95 self.numProcsPerNodenumProcsPerNode = numProcsPerNode
96 self.numCoresnumCores = numCores
97 self.queuequeue = queue
98 self.jobNamejobName = jobName
99 self.walltimewalltime = walltime
100 self.dryrundryrun = dryrun
101 self.doExecdoExec = doExec
102 self.mpiexecmpiexec = mpiexec
103 self.submitsubmit = submit
104 self.optionsoptions = options
105 self.verboseverbose = verbose
106
107 def shebang(self):
108 return "#!/bin/bash"
109
110 def preamble(self, command, walltime=None):
111 """Return preamble string for script to be submitted
112
113 Most batch systems allow you to embed submission options as comments here.
114 """
115 raise NotImplementedError("Not implemented for base class")
116
117 def execution(self, command):
118 """Return execution string for script to be submitted"""
119 script = [exportEnv(),
120 "umask %03o" % UMASK,
121 "cd %s" % pipes.quote(os.getcwd()),
122 ]
123 if self.verboseverbose:
124 script += ["echo \"mpiexec is at: $(which mpiexec)\"",
125 "ulimit -a",
126 "echo 'umask: ' $(umask)",
127 "eups list -s",
128 "export",
129 "date",
130 ]
131 script += ["mpiexec %s %s" % (self.mpiexecmpiexec, command)]
132 if self.verboseverbose:
133 script += ["date",
134 "echo Done.",
135 ]
136 return "\n".join(script)
137
138 def createScript(self, command, walltime=None):
139 """!Create script to be submitted
140
141 @param command: command to run
142 @param walltime: maximum wall clock time, overrides value to constructor
143 @return name of script on filesystem
144 """
145 fd, scriptName = tempfile.mkstemp()
146 with os.fdopen(fd, "w") as f:
147 f.write(self.shebangshebang())
148 f.write('\n')
149 f.write(self.preamblepreamble(walltime))
150 f.write('\n')
151 f.write(self.executionexecution(command))
152 f.write('\n')
153
154 os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
155 return scriptName
156
157 def submitCommand(self, scriptName):
158 """!Return command to submit script
159
160 @param scriptName: name of script on filesystem
161 """
162 raise NotImplementedError("No implementation for base class")
163
164 def run(self, command, walltime=None):
165 """!Run the batch system
166
167 Creates and submits the script to execute the provided command
168
169 @param command: command to run
170 @param walltime: maximum wall clock time, overrides value to constructor
171 @return name of script on filesystem
172 """
173 scriptName = self.createScriptcreateScript(command, walltime=walltime)
174 command = self.submitCommandsubmitCommand(scriptName)
175 if self.dryrundryrun:
176 print("Would run: %s" % command)
177 elif self.doExecdoExec:
178 os.execl(scriptName, scriptName)
179 else:
180 os.system(command)
181 return scriptName
182
183
185 """Batch submission with PBS"""
186
187 def preamble(self, walltime=None):
188 if walltime is None:
189 walltime = self.walltimewalltime
190 if walltime <= 0:
191 raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
192 if self.numNodesnumNodes <= 0 or self.numProcsPerNodenumProcsPerNode <= 0:
193 raise RuntimeError(
194 "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
195 (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode))
196 if self.numCoresnumCores > 0:
197 raise RuntimeError("PBS does not support setting the number of cores")
198 return "\n".join([
199 "#PBS %s" % self.optionsoptions if self.optionsoptions is not None else "",
200 "#PBS -l nodes=%d:ppn=%d" % (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode),
201 "#PBS -l walltime=%d" % walltime if walltime is not None else "",
202 "#PBS -o %s" % self.outputDiroutputDir if self.outputDiroutputDir is not None else "",
203 "#PBS -N %s" % self.jobNamejobName if self.jobNamejobName is not None else "",
204 "#PBS -q %s" % self.queuequeue if self.queuequeue is not None else "",
205 "#PBS -j oe",
206 "#PBS -W umask=%03o" % UMASK,
207 ])
208
209 def submitCommand(self, scriptName):
210 return "qsub %s -V %s" % (self.submitsubmit if self.submitsubmit is not None else "", scriptName)
211
212
214 """Batch submission with Slurm"""
215
216 @staticmethod
217 def formatWalltime(walltime):
218 """Format walltime (in seconds) as days-hours:minutes"""
219 secInDay = 3600*24
220 secInHour = 3600
221 secInMinute = 60
222 days = walltime//secInDay
223 walltime -= days*secInDay
224 hours = walltime//secInHour
225 walltime -= hours*secInHour
226 minutes = walltime//secInMinute
227 walltime -= minutes*secInMinute
228 if walltime > 0:
229 minutes += 1
230 return "%d-%d:%d" % (days, hours, minutes)
231
232 def preamble(self, walltime=None):
233 if walltime is None:
234 walltime = self.walltimewalltime
235 if walltime <= 0:
236 raise RuntimeError("Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
237 if (self.numNodesnumNodes <= 0 or self.numProcsPerNodenumProcsPerNode <= 0) and self.numCoresnumCores <= 0:
238 raise RuntimeError(
239 "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR "
240 "number of cores (--cores=%d) not set" % (self.numNodesnumNodes, self.numProcsPerNodenumProcsPerNode, self.numCoresnumCores))
241 if self.numCoresnumCores > 0 and (self.numNodesnumNodes > 0 or self.numProcsPerNodenumProcsPerNode > 0):
242 raise RuntimeError("Must set either --nodes,--procs or --cores: not both")
243
244 outputDir = self.outputDiroutputDir if self.outputDiroutputDir is not None else os.getcwd()
245 filename = os.path.join(outputDir, (self.jobNamejobName if self.jobNamejobName is not None else "slurm") + ".o%j")
246 return "\n".join([("#SBATCH --nodes=%d" % self.numNodesnumNodes) if self.numNodesnumNodes > 0 else "",
247 ("#SBATCH --ntasks-per-node=%d" % self.numProcsPerNodenumProcsPerNode) if
248 self.numProcsPerNodenumProcsPerNode > 0 else "",
249 ("#SBATCH --ntasks=%d" % self.numCoresnumCores) if self.numCoresnumCores > 0 else "",
250 "#SBATCH --time=%s" % self.formatWalltimeformatWalltime(walltime),
251 "#SBATCH --job-name=%s" % self.jobNamejobName if self.jobNamejobName is not None else "",
252 "#SBATCH -p %s" % self.queuequeue if self.queuequeue is not None else "",
253 "#SBATCH --output=%s" % filename,
254 "#SBATCH --error=%s" % filename,
255 "#SBATCH %s" % self.optionsoptions if self.optionsoptions is not None else "",
256 ])
257
258 def submitCommand(self, scriptName):
259 return "sbatch %s %s" % (self.submitsubmit if self.submitsubmit is not None else "", scriptName)
260
261
263 """Not-really-Batch submission with multiple cores on the current node
264
265 The job is run immediately.
266 """
267
268 def __init__(self, *args, **kwargs):
269 super(SmpBatch, self).__init__(*args, **kwargs)
270 if self.numNodesnumNodesnumNodes in (0, 1) and self.numProcsPerNodenumProcsPerNodenumProcsPerNode > 0 and self.numCoresnumCoresnumCores == 0:
271 # --nodes=1 --procs=NN being used as a synonym for --cores=NN
272 self.numNodesnumNodesnumNodes = 0
273 self.numCoresnumCoresnumCores = self.numProcsPerNodenumProcsPerNodenumProcsPerNode
274 self.numProcsPerNodenumProcsPerNodenumProcsPerNode = 0
275 if self.numNodesnumNodesnumNodes > 0 or self.numProcsPerNodenumProcsPerNodenumProcsPerNode > 0:
276 raise RuntimeError("SMP does not support the --nodes and --procs command-line options; "
277 "use --cores to specify the number of cores to use")
278 if self.numCoresnumCoresnumCores > 1:
279 self.mpiexecmpiexecmpiexec = "%s -n %d" % (self.mpiexecmpiexecmpiexec if self.mpiexecmpiexecmpiexec is not None else "", self.numCoresnumCoresnumCores)
280 else:
281 self.mpiexecmpiexecmpiexec = ""
282
283 def preamble(self, walltime=None):
284 return ""
285
286 def submitCommand(self, scriptName):
287 return "exec %s" % scriptName
288
289
290BATCH_TYPES = {'none': None,
291 'None': None,
292 'pbs': PbsBatch,
293 'slurm': SlurmBatch,
294 'smp': SmpBatch,
295 } # Mapping batch type --> Batch class
296
297
298class BatchArgumentParser(argparse.ArgumentParser):
299 """An argument parser to get relevant parameters for batch submission
300
301 We want to be able to display the help for a 'parent' ArgumentParser
302 along with the batch-specific options we introduce in this class, but
303 we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent]))
304 because we want to save the list of arguments that this particular
305 BatchArgumentParser doesn't parse, so they can be passed on to a different
306 program (though we also want to parse them to check that they can be parsed).
307 """
308
309 def __init__(self, parent=None, *args, **kwargs):
310 super(BatchArgumentParser, self).__init__(*args, **kwargs)
311 self._parent_parent = parent
312 group = self.add_argument_group("Batch submission options")
313 group.add_argument("--queue", help="Queue name")
314 group.add_argument("--job", help="Job name")
315 group.add_argument("--nodes", type=int, default=0, help="Number of nodes")
316 group.add_argument("--procs", type=int, default=0, help="Number of processors per node")
317 group.add_argument("--cores", type=int, default=0, help="Number of cores (Slurm/SMP only)")
318 group.add_argument("--time", type=float, default=0,
319 help="Expected execution time per element (sec)")
320 group.add_argument("--walltime", type=float, default=0,
321 help="Expected total execution walltime (sec); overrides cores & time")
322 group.add_argument("--batch-type", dest="batchType", choices=list(BATCH_TYPES.keys()), default="smp",
323 help="Batch system to use")
324 group.add_argument("--batch-verbose", dest="batchVerbose", action="store_true", default=False,
325 help=("Enable verbose output in batch script "
326 "(including system environment information at batch start)?"))
327 group.add_argument("--batch-output", dest="batchOutput", help="Output directory")
328 group.add_argument("--batch-submit", dest="batchSubmit", help="Batch submission command-line flags")
329 group.add_argument("--batch-options", dest="batchOptions", help="Header options for batch script")
330 group.add_argument("--batch-profile", dest="batchProfile", action="store_true", default=False,
331 help="Enable profiling on batch job?")
332 group.add_argument("--batch-stats", dest="batchStats", action="store_true", default=False,
333 help="Print process stats on completion (Linux only)?")
334 group.add_argument("--dry-run", dest="dryrun", default=False, action="store_true",
335 help="Dry run?")
336 group.add_argument("--do-exec", dest="doExec", default=False, action="store_true",
337 help="Exec script instead of submit to batch system?")
338 group.add_argument("--mpiexec", default="", help="mpiexec options")
339
340 def parse_args(self, config=None, args=None, namespace=None, **kwargs):
341 args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
342 args.parent = None
343 args.leftover = None
344 if len(leftover) > 0:
345 # Save any leftovers for the parent
346 if self._parent_parent is None:
347 self.error("Unrecognised arguments: %s" % leftover)
348 args.parent = self._parent_parent.parse_args(config, args=leftover, **kwargs)
349 args.leftover = leftover
350 args.batch = self.makeBatchmakeBatch(args)
351 return args
352
353 def makeBatch(self, args):
354 """Create a Batch object from the command-line arguments"""
355 # argMapping is a dict that maps Batch init kwarg names to parsed arguments attribute *names*
356 argMapping = {'outputDir': 'batchOutput',
357 'numNodes': 'nodes',
358 'numProcsPerNode': 'procs',
359 'numCores': 'cores',
360 'walltime': 'time',
361 'queue': 'queue',
362 'jobName': 'job',
363 'dryrun': 'dryrun',
364 'doExec': 'doExec',
365 'mpiexec': 'mpiexec',
366 'submit': 'batchSubmit',
367 'options': 'batchOptions',
368 'verbose': 'batchVerbose',
369 }
370
371 if BATCH_TYPES[args.batchType] is None:
372 return None
373
374 # kwargs is a dict that maps Batch init kwarg names to parsed arguments attribute *values*
375 kwargs = {k: getattr(args, v) for k, v in argMapping.items()}
376 return BATCH_TYPES[args.batchType](**kwargs)
377
378 def format_help(self):
379 text = """This is a script for queue submission of a wrapped script.
380
381Use this program name and ignore that for the wrapped script (it will be
382passed on to the batch system). Arguments for *both* this wrapper script or the
383wrapped script are valid (if it is required for the wrapped script, it
384is required for the wrapper as well).
385
386*** Batch system submission wrapper:
387
388"""
389 text += super(BatchArgumentParser, self).format_help()
390 if self._parent_parent is not None:
391 text += """
392
393*** Wrapped script:
394
395"""
396 text += self._parent_parent.format_help()
397 return text
398
399 def format_usage(self):
400 if self._parent_parent is not None:
401 prog = self._parent_parent.prog
402 self._parent_parent.prog = self.prog
403 usage = self._parent_parent.format_usage()
404 self._parent_parent.prog = prog
405 return usage
406 return super(BatchArgumentParser, self).format_usage()
407
408
410 """Generate bash script to regenerate the current environment"""
411 output = ""
412 for key, val in os.environ.items():
413 if key in ("DISPLAY",):
414 continue
415 if val.startswith("() {"):
416 # This is a function.
417 # "Two parentheses, a single space, and a brace"
418 # is exactly the same criterion as bash uses.
419
420 # From 2014-09-25, the function name is prefixed by 'BASH_FUNC_'
421 # and suffixed by '()', which we have to remove.
422 if key.startswith("BASH_FUNC_") and key.endswith("()"):
423 key = key[10:-2]
424
425 output += "{key} {val}\nexport -f {key}\n".format(key=key, val=val)
426 else:
427 # This is a variable.
428 output += "export {key}='{val}'\n".format(key=key, val=val.replace("'", "'\"'\"'"))
429 return output
430
431
432class BatchCmdLineTask(CmdLineTask):
433
434 @classmethod
435 def parseAndSubmit(cls, args=None, **kwargs):
436 taskParser = cls._makeArgumentParser(doBatch=True, add_help=False)
437 batchParser = BatchArgumentParser(parent=taskParser)
438 batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
439 **kwargs)
440
441 if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent): # Write config, schema
442 taskParser.error("Error in task preparation")
443
444 setBatchType(batchArgs.batch)
445
446 if batchArgs.batch is None: # don't use a batch system
447 sys.argv = [sys.argv[0]] + batchArgs.leftover # Remove all batch arguments
448
449 return cls.parseAndRun()
450 else:
451 if batchArgs.walltime > 0:
452 walltime = batchArgs.walltime
453 else:
454 numCores = batchArgs.cores if batchArgs.cores > 0 else batchArgs.nodes*batchArgs.procs
455 walltime = cls.batchWallTimebatchWallTime(batchArgs.time, batchArgs.parent, numCores)
456
457 command = cls.batchCommandbatchCommand(batchArgs)
458 batchArgs.batch.run(command, walltime=walltime)
459
460 @classmethod
461 def batchWallTime(cls, time, parsedCmd, numCores):
462 """!Return walltime request for batch job
463
464 Subclasses should override if the walltime should be calculated
465 differently (e.g., addition of some serial time).
466
467 @param cls: Class
468 @param time: Requested time per iteration
469 @param parsedCmd: Results of argument parsing
470 @param numCores: Number of cores
471 """
472 numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
473 return time*numTargets/float(numCores)
474
475 @classmethod
476 def batchCommand(cls, args):
477 """!Return command to run CmdLineTask
478
479 @param cls: Class
480 @param args: Parsed batch job arguments (from BatchArgumentParser)
481 """
482 job = args.job if args.job is not None else "job"
483 module = cls.__module__
484 script = ("import os; os.umask(%#05o); " +
485 "import lsst.base; lsst.base.disableImplicitThreading(); " +
486 "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
487
488 if args.batchStats:
489 script += ("import lsst.ctrl.pool.parallel; import atexit; " +
490 "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
491
492 script += "import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
493
494 profilePre = "import cProfile; import os; cProfile.run(\"\"\""
495 profilePost = "\"\"\", filename=\"profile-" + job + "-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
496
497 return ("python -c '" + (profilePre if args.batchProfile else "") + script +
498 (profilePost if args.batchProfile else "") + "' " + shCommandFromArgs(args.leftover) +
499 " --noExit")
500
501 @contextlib.contextmanager
502 def logOperation(self, operation, catch=False, trace=True):
503 """!Provide a context manager for logging an operation
504
505 @param operation: description of operation (string)
506 @param catch: Catch all exceptions?
507 @param trace: Log a traceback of caught exception?
508
509 Note that if 'catch' is True, all exceptions are swallowed, but there may
510 be other side-effects such as undefined variables.
511 """
512 self.log.info("%s: Start %s" % (NODE, operation))
513 try:
514 yield
515 except Exception:
516 if catch:
517 cls, e, _ = sys.exc_info()
518 self.log.warn("%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
519 if trace:
520 self.log.info("%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
521 return
522 raise
523 finally:
524 self.log.info("%s: Finished %s" % (NODE, operation))
525
526
528 """Starts a BatchCmdLineTask with an MPI process pool
529
530 Use this subclass of BatchCmdLineTask if you want to use the Pool directly.
531 """
532 @classmethod
533 @abortOnError
534 def parseAndRun(cls, *args, **kwargs):
535 """Run with a MPI process pool"""
536 pool = startPool()
537 super(BatchPoolTask, cls).parseAndRun(*args, **kwargs)
538 pool.exit()
539
540
541class BatchTaskRunner(TaskRunner):
542 """Run a Task individually on a list of inputs using the MPI process pool"""
543
544 def __init__(self, *args, **kwargs):
545 """Constructor
546
547 Warn if the user specified multiprocessing.
548 """
549 TaskRunner.__init__(self, *args, **kwargs)
550 if self.numProcessesnumProcesses > 1:
551 self.log.warn("Multiprocessing arguments (-j %d) ignored since using batch processing" %
552 self.numProcessesnumProcesses)
553 self.numProcessesnumProcesses = 1
554
555 def run(self, parsedCmd):
556 """Run the task on all targets
557
558 Sole input is the result of parsing the command-line with the ArgumentParser.
559
560 Output is None if 'precall' failed; otherwise it is a list of calling ourself
561 on each element of the target list from the 'getTargetList' method.
562 """
563 resultList = None
564
565 self.prepareForMultiProcessing()
566 pool = Pool()
567
568 if self.precall(parsedCmd):
569 targetList = self.getTargetList(parsedCmd)
570 if len(targetList) > 0:
571 parsedCmd.log.info("Processing %d targets with a pool of %d processes..." %
572 (len(targetList), pool.size))
573 # Run the task using self.__call__
574 resultList = pool.map(self, targetList)
575 else:
576 parsedCmd.log.warn("Not running the task because there is no data to process; "
577 "you may preview data using \"--show data\"")
578 resultList = []
579
580 return resultList
581
582 @abortOnError
583 def __call__(self, cache, args):
584 """Run the Task on a single target
585
586 Strips out the process pool 'cache' argument.
587
588 'args' are those arguments provided by the getTargetList method.
589
590 Brings down the entire job if an exception is not caught (i.e., --doraise).
591 """
592 return TaskRunner.__call__(self, args)
593
594
596 """Runs the BatchCmdLineTask in parallel
597
598 Use this subclass of BatchCmdLineTask if you don't need to use the Pool
599 directly, but just want to iterate over many objects (like a multi-node
600 version of the '-j' command-line argument).
601 """
602 RunnerClass = BatchTaskRunner
603
604 @classmethod
605 def _makeArgumentParser(cls, *args, **kwargs):
606 """Build an ArgumentParser
607
608 Removes the batch-specific parts in order to delegate to the parent classes.
609 """
610 kwargs.pop("doBatch", False)
611 kwargs.pop("add_help", False)
612 return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
613
614 @classmethod
615 def parseAndRun(cls, *args, **kwargs):
616 """Parse an argument list and run the command
617
618 This is the entry point when we run in earnest, so start the process pool
619 so that the worker nodes don't go any further.
620 """
621 pool = startPool()
622 results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
623 pool.exit()
624 return results
def parse_args(self, config=None, args=None, namespace=None, **kwargs)
Definition: parallel.py:340
def __init__(self, parent=None, *args, **kwargs)
Definition: parallel.py:309
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
Definition: parallel.py:461
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
Definition: parallel.py:502
def parseAndSubmit(cls, args=None, **kwargs)
Definition: parallel.py:435
def batchCommand(cls, args)
Return command to run CmdLineTask.
Definition: parallel.py:476
def execution(self, command)
Definition: parallel.py:117
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:157
def run(self, command, walltime=None)
Run the batch system.
Definition: parallel.py:164
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor.
Definition: parallel.py:73
def preamble(self, command, walltime=None)
Definition: parallel.py:110
def createScript(self, command, walltime=None)
Create script to be submitted.
Definition: parallel.py:138
def parseAndRun(cls, *args, **kwargs)
Definition: parallel.py:615
def parseAndRun(cls, *args, **kwargs)
Definition: parallel.py:534
def __call__(self, cache, args)
Definition: parallel.py:583
def __init__(self, *args, **kwargs)
Definition: parallel.py:544
def preamble(self, walltime=None)
Definition: parallel.py:187
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:209
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:258
def preamble(self, walltime=None)
Definition: parallel.py:232
def __init__(self, *args, **kwargs)
Constructor.
Definition: parallel.py:268
def preamble(self, walltime=None)
Definition: parallel.py:283
def submitCommand(self, scriptName)
Return command to submit script.
Definition: parallel.py:286
daf::base::PropertyList * list
Definition: fits.cc:913
def shCommandFromArgs(args)
Definition: parallel.py:42
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
Definition: pool.py:1216
def setBatchType(batchType)
Definition: pool.py:101
Definition: Log.h:717
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174