14from .pool
import startPool, Pool, NODE, abortOnError, setBatchType
15from .
import log
as dummyLog
17__all__ = [
"Batch",
"PbsBatch",
"SlurmBatch",
"SmpBatch",
"BATCH_TYPES",
"BatchArgumentParser",
18 "BatchCmdLineTask",
"BatchPoolTask", ]
24_quote_pos = re.compile(
'(?=[^-0-9a-zA-Z_./\n])')
28 r"""Quote the argument for the shell.
37 return _quote_pos.sub(
'\\\\', arg).replace(
'\n',
"'\n'")
43 """Convert a list of shell arguments to a shell command-line"""
44 return ' '.join([
shQuote(a)
for a
in args])
48 """Collect Linux-specific process statistics
50 Parses the /proc/self/status file (N.B. Linux-specific!) into a dict
54 with open(
"/proc/self/status")
as f:
56 key, _, value = line.partition(
":")
57 result[key] = value.strip()
62 """Print the process statistics to the log"""
64 log = Log.getDefaultLogger()
65 log.info(
"Process stats for %s: %s" % (NODE,
processStats()))
69 """Base class for batch submission"""
71 def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
72 walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
76 @param outputDir: output directory,
or None
77 @param numNodes: number of nodes
78 @param numProcsPerNode: number of processors per node
79 @param numCores: number of cores (Slurm, SMP only)
80 @param queue: name of queue,
or None
81 @param jobName: name of job,
or None
82 @param walltime: maximum wall clock time
for job
83 @param dryrun: Dry run (only
print actions that would be taken)?
84 @param doExec: exec the script instead of submitting to batch system?
85 @param mpiexec: options
for mpiexec
86 @param submit: command-line options
for batch submission (e.g.,
for qsub, sbatch)
87 @param options: options to append to script header (e.g.,
88 @param verbose: produce verbose output?
90 if (numNodes <= 0
or numProcsPerNode <= 0)
and numCores <= 0:
91 raise RuntimeError(
"Must specify numNodes+numProcs or numCores")
111 """Return preamble string for script to be submitted
113 Most batch systems allow you to embed submission options as comments here.
115 raise NotImplementedError(
"Not implemented for base class")
118 """Return execution string for script to be submitted"""
120 "umask %03o" % UMASK,
121 "cd %s" % pipes.quote(os.getcwd()),
124 script += [
"echo \"mpiexec is at: $(which mpiexec)\"",
126 "echo 'umask: ' $(umask)",
131 script += [
"mpiexec %s %s" % (self.
mpiexecmpiexec, command)]
136 return "\n".join(script)
139 """!Create script to be submitted
141 @param command: command to run
142 @param walltime: maximum wall clock time, overrides value to constructor
143 @return name of script on filesystem
145 fd, scriptName = tempfile.mkstemp()
146 with os.fdopen(fd,
"w")
as f:
149 f.write(self.
preamblepreamble(walltime))
151 f.write(self.
executionexecution(command))
154 os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
158 """!Return command to submit script
160 @param scriptName: name of script on filesystem
162 raise NotImplementedError(
"No implementation for base class")
164 def run(self, command, walltime=None):
165 """!Run the batch system
167 Creates and submits the script to execute the provided command
169 @param command: command to run
170 @param walltime: maximum wall clock time, overrides value to constructor
171 @return name of script on filesystem
173 scriptName = self.createScriptcreateScript(command, walltime=walltime)
176 print(
"Would run: %s" % command)
178 os.execl(scriptName, scriptName)
185 """Batch submission with PBS"""
191 raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
194 "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
197 raise RuntimeError(
"PBS does not support setting the number of cores")
199 "#PBS %s" % self.
optionsoptions
if self.
optionsoptions
is not None else "",
201 "#PBS -l walltime=%d" % walltime
if walltime
is not None else "",
202 "#PBS -o %s" % self.
outputDiroutputDir
if self.
outputDiroutputDir
is not None else "",
203 "#PBS -N %s" % self.
jobNamejobName
if self.
jobNamejobName
is not None else "",
204 "#PBS -q %s" % self.
queuequeue
if self.
queuequeue
is not None else "",
206 "#PBS -W umask=%03o" % UMASK,
210 return "qsub %s -V %s" % (self.
submitsubmit
if self.
submitsubmit
is not None else "", scriptName)
214 """Batch submission with Slurm"""
218 """Format walltime (in seconds) as days-hours:minutes"""
222 days = walltime//secInDay
223 walltime -= days*secInDay
224 hours = walltime//secInHour
225 walltime -= hours*secInHour
226 minutes = walltime//secInMinute
227 walltime -= minutes*secInMinute
230 return "%d-%d:%d" % (days, hours, minutes)
236 raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
239 "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR "
242 raise RuntimeError(
"Must set either --nodes,--procs or --cores: not both")
244 outputDir = self.
outputDiroutputDir
if self.
outputDiroutputDir
is not None else os.getcwd()
245 filename = os.path.join(outputDir, (self.
jobNamejobName
if self.
jobNamejobName
is not None else "slurm") +
".o%j")
246 return "\n".join([(
"#SBATCH --nodes=%d" % self.
numNodesnumNodes)
if self.
numNodesnumNodes > 0
else "",
247 (
"#SBATCH --ntasks-per-node=%d" % self.
numProcsPerNodenumProcsPerNode)
if
249 (
"#SBATCH --ntasks=%d" % self.
numCoresnumCores)
if self.
numCoresnumCores > 0
else "",
250 "#SBATCH --time=%s" % self.
formatWalltimeformatWalltime(walltime),
251 "#SBATCH --job-name=%s" % self.
jobNamejobName
if self.
jobNamejobName
is not None else "",
252 "#SBATCH -p %s" % self.
queuequeue
if self.
queuequeue
is not None else "",
253 "#SBATCH --output=%s" % filename,
254 "#SBATCH --error=%s" % filename,
255 "#SBATCH %s" % self.
optionsoptions
if self.
optionsoptions
is not None else "",
259 return "sbatch %s %s" % (self.
submitsubmit
if self.
submitsubmit
is not None else "", scriptName)
263 """Not-really-Batch submission with multiple cores on the current node
265 The job is run immediately.
269 super(SmpBatch, self).
__init__(*args, **kwargs)
276 raise RuntimeError(
"SMP does not support the --nodes and --procs command-line options; "
277 "use --cores to specify the number of cores to use")
287 return "exec %s" % scriptName
290BATCH_TYPES = {
'none':
None,
299 """An argument parser to get relevant parameters for batch submission
301 We want to be able to display the help for a
'parent' ArgumentParser
302 along
with the batch-specific options we introduce
in this
class, but
303 we don
't want to swallow the parent (i.e., ArgumentParser(parents=[parent]))
304 because we want to save the list of arguments that this particular
305 BatchArgumentParser doesn't parse, so they can be passed on to a different
306 program (though we also want to parse them to check that they can be parsed).
310 super(BatchArgumentParser, self).
__init__(*args, **kwargs)
312 group = self.add_argument_group(
"Batch submission options")
313 group.add_argument(
"--queue", help=
"Queue name")
314 group.add_argument(
"--job", help=
"Job name")
315 group.add_argument(
"--nodes", type=int, default=0, help=
"Number of nodes")
316 group.add_argument(
"--procs", type=int, default=0, help=
"Number of processors per node")
317 group.add_argument(
"--cores", type=int, default=0, help=
"Number of cores (Slurm/SMP only)")
318 group.add_argument(
"--time", type=float, default=0,
319 help=
"Expected execution time per element (sec)")
320 group.add_argument(
"--walltime", type=float, default=0,
321 help=
"Expected total execution walltime (sec); overrides cores & time")
322 group.add_argument(
"--batch-type", dest=
"batchType", choices=
list(BATCH_TYPES.keys()), default=
"smp",
323 help=
"Batch system to use")
324 group.add_argument(
"--batch-verbose", dest=
"batchVerbose", action=
"store_true", default=
False,
325 help=(
"Enable verbose output in batch script "
326 "(including system environment information at batch start)?"))
327 group.add_argument(
"--batch-output", dest=
"batchOutput", help=
"Output directory")
328 group.add_argument(
"--batch-submit", dest=
"batchSubmit", help=
"Batch submission command-line flags")
329 group.add_argument(
"--batch-options", dest=
"batchOptions", help=
"Header options for batch script")
330 group.add_argument(
"--batch-profile", dest=
"batchProfile", action=
"store_true", default=
False,
331 help=
"Enable profiling on batch job?")
332 group.add_argument(
"--batch-stats", dest=
"batchStats", action=
"store_true", default=
False,
333 help=
"Print process stats on completion (Linux only)?")
334 group.add_argument(
"--dry-run", dest=
"dryrun", default=
False, action=
"store_true",
336 group.add_argument(
"--do-exec", dest=
"doExec", default=
False, action=
"store_true",
337 help=
"Exec script instead of submit to batch system?")
338 group.add_argument(
"--mpiexec", default=
"", help=
"mpiexec options")
340 def parse_args(self, config=None, args=None, namespace=None, **kwargs):
341 args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
344 if len(leftover) > 0:
346 if self.
_parent_parent
is None:
347 self.error(
"Unrecognised arguments: %s" % leftover)
349 args.leftover = leftover
350 args.batch = self.
makeBatchmakeBatch(args)
354 """Create a Batch object from the command-line arguments"""
356 argMapping = {
'outputDir':
'batchOutput',
358 'numProcsPerNode':
'procs',
365 'mpiexec':
'mpiexec',
366 'submit':
'batchSubmit',
367 'options':
'batchOptions',
368 'verbose':
'batchVerbose',
371 if BATCH_TYPES[args.batchType]
is None:
375 kwargs = {k: getattr(args, v)
for k, v
in argMapping.items()}
376 return BATCH_TYPES[args.batchType](**kwargs)
379 text =
"""This is a script for queue submission of a wrapped script.
381Use this program name and ignore that for the wrapped script (it will be
382passed on to the batch system). Arguments for *both* this wrapper script
or the
383wrapped script are valid (
if it
is required
for the wrapped script, it
384is required
for the wrapper
as well).
386*** Batch system submission wrapper:
389 text += super(BatchArgumentParser, self).format_help()
390 if self.
_parent_parent
is not None:
400 if self.
_parent_parent
is not None:
401 prog = self.
_parent_parent.prog
402 self.
_parent_parent.prog = self.prog
404 self.
_parent_parent.prog = prog
410 """Generate bash script to regenerate the current environment"""
412 for key, val
in os.environ.items():
413 if key
in (
"DISPLAY",):
415 if val.startswith(
"() {"):
422 if key.startswith(
"BASH_FUNC_")
and key.endswith(
"()"):
425 output +=
"{key} {val}\nexport -f {key}\n".
format(key=key, val=val)
428 output +=
"export {key}='{val}'\n".
format(key=key, val=val.replace(
"'",
"'\"'\"'"))
436 taskParser = cls._makeArgumentParser(doBatch=
True, add_help=
False)
438 batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.applyOverrides,
441 if not cls.RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent):
442 taskParser.error(
"Error in task preparation")
446 if batchArgs.batch
is None:
447 sys.argv = [sys.argv[0]] + batchArgs.leftover
449 return cls.parseAndRun()
451 if batchArgs.walltime > 0:
452 walltime = batchArgs.walltime
454 numCores = batchArgs.cores
if batchArgs.cores > 0
else batchArgs.nodes*batchArgs.procs
455 walltime = cls.
batchWallTimebatchWallTime(batchArgs.time, batchArgs.parent, numCores)
458 batchArgs.batch.run(command, walltime=walltime)
462 """!Return walltime request for batch job
464 Subclasses should override if the walltime should be calculated
465 differently (e.g., addition of some serial time).
468 @param time: Requested time per iteration
469 @param parsedCmd: Results of argument parsing
470 @param numCores: Number of cores
472 numTargets = len(cls.RunnerClass.getTargetList(parsedCmd))
473 return time*numTargets/
float(numCores)
477 """!Return command to run CmdLineTask
480 @param args: Parsed batch job arguments (
from BatchArgumentParser)
482 job = args.job if args.job
is not None else "job"
483 module = cls.__module__
484 script = (
"import os; os.umask(%#05o); " +
485 "import lsst.base; lsst.base.disableImplicitThreading(); " +
486 "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
489 script += (
"import lsst.ctrl.pool.parallel; import atexit; " +
490 "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
492 script +=
"import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
494 profilePre =
"import cProfile; import os; cProfile.run(\"\"\""
495 profilePost =
"\"\"\", filename=\"profile-" + job +
"-%s-%d.dat\" % (os.uname()[1], os.getpid()))"
497 return (
"python -c '" + (profilePre
if args.batchProfile
else "") + script +
498 (profilePost
if args.batchProfile
else "") +
"' " +
shCommandFromArgs(args.leftover) +
501 @contextlib.contextmanager
503 """!Provide a context manager for logging an operation
505 @param operation: description of operation (string)
506 @param catch: Catch all exceptions?
507 @param trace: Log a traceback of caught exception?
509 Note that
if 'catch' is True, all exceptions are swallowed, but there may
510 be other side-effects such
as undefined variables.
512 self.log.info("%s: Start %s" % (NODE, operation))
517 cls, e, _ = sys.exc_info()
518 self.log.
warn(
"%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
520 self.log.
info(
"%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
524 self.log.
info(
"%s: Finished %s" % (NODE, operation))
528 """Starts a BatchCmdLineTask with an MPI process pool
530 Use this subclass of BatchCmdLineTask if you want to use the Pool directly.
535 """Run with a MPI process pool"""
537 super(BatchPoolTask, cls).
parseAndRun(*args, **kwargs)
542 """Run a Task individually on a list of inputs using the MPI process pool"""
547 Warn if the user specified multiprocessing.
549 TaskRunner.__init__(self, *args, **kwargs)
551 self.log.
warn(
"Multiprocessing arguments (-j %d) ignored since using batch processing" %
556 """Run the task on all targets
558 Sole input is the result of parsing the command-line
with the ArgumentParser.
560 Output
is None if 'precall' failed; otherwise it
is a list of calling ourself
561 on each element of the target list
from the
'getTargetList' method.
565 self.prepareForMultiProcessing()
568 if self.precall(parsedCmd):
569 targetList = self.getTargetList(parsedCmd)
570 if len(targetList) > 0:
571 parsedCmd.log.info(
"Processing %d targets with a pool of %d processes..." %
572 (len(targetList), pool.size))
574 resultList = pool.map(self, targetList)
576 parsedCmd.log.warn(
"Not running the task because there is no data to process; "
577 "you may preview data using \"--show data\"")
584 """Run the Task on a single target
586 Strips out the process pool 'cache' argument.
588 'args' are those arguments provided by the getTargetList method.
590 Brings down the entire job
if an exception
is not caught (i.e., --doraise).
592 return TaskRunner.__call__(self, args)
596 """Runs the BatchCmdLineTask in parallel
598 Use this subclass of BatchCmdLineTask if you don
't need to use the Pool
599 directly, but just want to iterate over many objects (like a multi-node
600 version of the '-j' command-line argument).
602 RunnerClass = BatchTaskRunner
605 def _makeArgumentParser(cls, *args, **kwargs):
606 """Build an ArgumentParser
608 Removes the batch-specific parts in order to delegate to the parent classes.
610 kwargs.pop("doBatch",
False)
611 kwargs.pop(
"add_help",
False)
612 return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
616 """Parse an argument list and run the command
618 This is the entry point when we run
in earnest, so start the process pool
619 so that the worker nodes don
't go any further.
622 results = super(BatchParallelTask, cls).parseAndRun(*args, **kwargs)
def makeBatch(self, args)
def parse_args(self, config=None, args=None, namespace=None, **kwargs)
def __init__(self, parent=None, *args, **kwargs)
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def parseAndSubmit(cls, args=None, **kwargs)
def batchCommand(cls, args)
Return command to run CmdLineTask.
def execution(self, command)
def submitCommand(self, scriptName)
Return command to submit script.
def run(self, command, walltime=None)
Run the batch system.
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor.
def preamble(self, command, walltime=None)
def createScript(self, command, walltime=None)
Create script to be submitted.
def parseAndRun(cls, *args, **kwargs)
def parseAndRun(cls, *args, **kwargs)
def __call__(self, cache, args)
def __init__(self, *args, **kwargs)
def preamble(self, walltime=None)
def submitCommand(self, scriptName)
Return command to submit script.
def formatWalltime(walltime)
def submitCommand(self, scriptName)
Return command to submit script.
def preamble(self, walltime=None)
def __init__(self, *args, **kwargs)
Constructor.
def preamble(self, walltime=None)
def submitCommand(self, scriptName)
Return command to submit script.
daf::base::PropertyList * list
def shCommandFromArgs(args)
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
def setBatchType(batchType)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)