14 from .pool
import startPool, Pool, NODE, abortOnError, setBatchType
15 from .
import log
as dummyLog
17 __all__ = [
"Batch",
"PbsBatch",
"SlurmBatch",
"SmpBatch",
"BATCH_TYPES",
"BatchArgumentParser",
18 "BatchCmdLineTask",
"BatchPoolTask", ]
24 _quote_pos = re.compile(
'(?=[^-0-9a-zA-Z_./\n])')
28 r"""Quote the argument for the shell. 37 return _quote_pos.sub(
'\\\\', arg).replace(
'\n',
"'\n'")
43 """Convert a list of shell arguments to a shell command-line""" 44 return ' '.join([
shQuote(a)
for a
in args])
48 """Collect Linux-specific process statistics 50 Parses the /proc/self/status file (N.B. Linux-specific!) into a dict 54 with open(
"/proc/self/status")
as f:
56 key, _, value = line.partition(
":")
57 result[key] = value.strip()
62 """Print the process statistics to the log""" 64 log = Log.getDefaultLogger()
65 log.info(
"Process stats for %s: %s" % (NODE,
processStats()))
69 """Base class for batch submission""" 71 def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
72 walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
76 @param outputDir: output directory, or None 77 @param numNodes: number of nodes 78 @param numProcsPerNode: number of processors per node 79 @param numCores: number of cores (Slurm, SMP only) 80 @param queue: name of queue, or None 81 @param jobName: name of job, or None 82 @param walltime: maximum wall clock time for job 83 @param dryrun: Dry run (only print actions that would be taken)? 84 @param doExec: exec the script instead of submitting to batch system? 85 @param mpiexec: options for mpiexec 86 @param submit: command-line options for batch submission (e.g., for qsub, sbatch) 87 @param options: options to append to script header (e.g., #PBS or #SBATCH) 88 @param verbose: produce verbose output? 90 if (numNodes <= 0
or numProcsPerNode <= 0)
and numCores <= 0:
91 raise RuntimeError(
"Must specify numNodes+numProcs or numCores")
111 """Return preamble string for script to be submitted 113 Most batch systems allow you to embed submission options as comments here. 115 raise NotImplementedError(
"Not implemented for base class")
118 """Return execution string for script to be submitted""" 120 "umask %03o" % UMASK,
121 "cd %s" % pipes.quote(os.getcwd()),
124 script += [
"echo \"mpiexec is at: $(which mpiexec)\"",
126 "echo 'umask: ' $(umask)",
131 script += [
"mpiexec %s %s" % (self.
mpiexec, command)]
136 return "\n".join(script)
139 """!Create script to be submitted 141 @param command: command to run 142 @param walltime: maximum wall clock time, overrides value to constructor 143 @return name of script on filesystem 145 fd, scriptName = tempfile.mkstemp()
146 with os.fdopen(fd,
"w")
as f:
154 os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
158 """!Return command to submit script 160 @param scriptName: name of script on filesystem 162 raise NotImplementedError(
"No implementation for base class")
164 def run(self, command, walltime=None):
165 """!Run the batch system 167 Creates and submits the script to execute the provided command 169 @param command: command to run 170 @param walltime: maximum wall clock time, overrides value to constructor 171 @return name of script on filesystem 173 scriptName = self.
createScript(command, walltime=walltime)
176 print(
"Would run: %s" % command)
178 os.execl(scriptName, scriptName)
185 """Batch submission with PBS""" 191 raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
194 "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
197 raise RuntimeError(
"PBS does not support setting the number of cores")
201 "#PBS -l walltime=%d" % walltime
if walltime
is not None else "",
204 "#PBS -q %s" % self.
queue if self.
queue is not None else "",
206 "#PBS -W umask=%03o" % UMASK,
210 return "qsub %s -V %s" % (self.
submit if self.
submit is not None else "", scriptName)
214 """Batch submission with Slurm""" 218 """Format walltime (in seconds) as days-hours:minutes""" 222 days = walltime//secInDay
223 walltime -= days*secInDay
224 hours = walltime//secInHour
225 walltime -= hours*secInHour
226 minutes = walltime//secInMinute
227 walltime -= minutes*secInMinute
230 return "%d-%d:%d" % (days, hours, minutes)
236 raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
239 "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR " 242 raise RuntimeError(
"Must set either --nodes,--procs or --cores: not both")
245 filename = os.path.join(outputDir, (self.
jobName if self.
jobName is not None else "slurm") +
".o%j")
246 return "\n".join([(
"#SBATCH --nodes=%d" % self.
numNodes)
if self.
numNodes > 0
else "",
251 "#SBATCH --job-name=%s" % self.
jobName if self.
jobName is not None else "",
252 "#SBATCH -p %s" % self.
queue if self.
queue is not None else "",
253 "#SBATCH --output=%s" % filename,
254 "#SBATCH --error=%s" % filename,
259 return "sbatch %s %s" % (self.
submit if self.
submit is not None else "", scriptName)
263 """Not-really-Batch submission with multiple cores on the current node 265 The job is run immediately. 269 super(SmpBatch, self).
__init__(*args, **kwargs)
276 raise RuntimeError(
"SMP does not support the --nodes and --procs command-line options; " 277 "use --cores to specify the number of cores to use")
287 return "exec %s" % scriptName
290 BATCH_TYPES = {
'none':
None,
299 """An argument parser to get relevant parameters for batch submission 301 We want to be able to display the help for a 'parent' ArgumentParser 302 along with the batch-specific options we introduce in this class, but 303 we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent])) 304 because we want to save the list of arguments that this particular 305 BatchArgumentParser doesn't parse, so they can be passed on to a different 306 program (though we also want to parse them to check that they can be parsed). 310 super(BatchArgumentParser, self).
__init__(*args, **kwargs)
312 group = self.add_argument_group(
"Batch submission options")
313 group.add_argument(
"--queue", help=
"Queue name")
314 group.add_argument(
"--job", help=
"Job name")
315 group.add_argument(
"--nodes", type=int, default=0, help=
"Number of nodes")
316 group.add_argument(
"--procs", type=int, default=0, help=
"Number of processors per node")
317 group.add_argument(
"--cores", type=int, default=0, help=
"Number of cores (Slurm/SMP only)")
318 group.add_argument(
"--time", type=float, default=0,
319 help=
"Expected execution time per element (sec)")
320 group.add_argument(
"--batch-type", dest=
"batchType", choices=
list(BATCH_TYPES.keys()), default=
"smp",
321 help=
"Batch system to use")
322 group.add_argument(
"--batch-verbose", dest=
"batchVerbose", action=
"store_true", default=
False,
323 help=(
"Enable verbose output in batch script " 324 "(including system environment information at batch start)?"))
325 group.add_argument(
"--batch-output", dest=
"batchOutput", help=
"Output directory")
326 group.add_argument(
"--batch-submit", dest=
"batchSubmit", help=
"Batch submission command-line flags")
327 group.add_argument(
"--batch-options", dest=
"batchOptions", help=
"Header options for batch script")
328 group.add_argument(
"--batch-profile", dest=
"batchProfile", action=
"store_true", default=
False,
329 help=
"Enable profiling on batch job?")
330 group.add_argument(
"--batch-stats", dest=
"batchStats", action=
"store_true", default=
False,
331 help=
"Print process stats on completion (Linux only)?")
332 group.add_argument(
"--dry-run", dest=
"dryrun", default=
False, action=
"store_true",
334 group.add_argument(
"--do-exec", dest=
"doExec", default=
False, action=
"store_true",
335 help=
"Exec script instead of submit to batch system?")
336 group.add_argument(
"--mpiexec", default=
"", help=
"mpiexec options")
338 def parse_args(self, config=None, args=None, namespace=None, **kwargs):
339 args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
342 if len(leftover) > 0:
345 self.error(
"Unrecognised arguments: %s" % leftover)
347 args.leftover = leftover
352 """Create a Batch object from the command-line arguments""" 354 argMapping = {
'outputDir':
'batchOutput',
356 'numProcsPerNode':
'procs',
363 'mpiexec':
'mpiexec',
364 'submit':
'batchSubmit',
365 'options':
'batchOptions',
366 'verbose':
'batchVerbose',
369 if BATCH_TYPES[args.batchType]
is None:
373 kwargs = {k: getattr(args, v)
for k, v
in argMapping.items()}
374 return BATCH_TYPES[args.batchType](**kwargs)
377 text =
"""This is a script for queue submission of a wrapped script. 379 Use this program name and ignore that for the wrapped script (it will be 380 passed on to the batch system). Arguments for *both* this wrapper script or the 381 wrapped script are valid (if it is required for the wrapped script, it 382 is required for the wrapper as well). 384 *** Batch system submission wrapper: 387 text += super(BatchArgumentParser, self).
format_help()
408 """Generate bash script to regenerate the current environment""" 410 for key, val
in os.environ.items():
411 if key
in (
"DISPLAY",):
413 if val.startswith(
"() {"):
420 if key.startswith(
"BASH_FUNC_")
and key.endswith(
"()"):
423 output +=
"{key} {val}\nexport -f {key}\n".
format(key=key, val=val)
426 output +=
"export {key}='{val}'\n".
format(key=key, val=val.replace(
"'",
"'\"'\"'"))
436 batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.
applyOverrides,
439 if not cls.
RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent):
440 taskParser.error(
"Error in task preparation")
444 if batchArgs.batch
is None:
445 sys.argv = [sys.argv[0]] + batchArgs.leftover
449 numCores = batchArgs.cores
if batchArgs.cores > 0
else batchArgs.nodes*batchArgs.procs
450 walltime = cls.
batchWallTime(batchArgs.time, batchArgs.parent, numCores)
453 batchArgs.batch.run(command, walltime=walltime)
457 """!Return walltime request for batch job 459 Subclasses should override if the walltime should be calculated 460 differently (e.g., addition of some serial time). 463 @param time: Requested time per iteration 464 @param parsedCmd: Results of argument parsing 465 @param numCores: Number of cores 467 numTargets = len(cls.
RunnerClass.getTargetList(parsedCmd))
468 return time*numTargets/float(numCores)
472 """!Return command to run CmdLineTask 475 @param args: Parsed batch job arguments (from BatchArgumentParser) 477 job = args.job
if args.job
is not None else "job" 478 module = cls.__module__
479 script = (
"import os; os.umask(%#05o); " +
480 "import lsst.base; lsst.base.disableImplicitThreading(); " +
481 "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
484 script += (
"import lsst.ctrl.pool.parallel; import atexit; " +
485 "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
487 script +=
"import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
489 profilePre =
"import cProfile; import os; cProfile.run(\"\"\"" 490 profilePost =
"\"\"\", filename=\"profile-" + job +
"-%s-%d.dat\" % (os.uname()[1], os.getpid()))" 492 return (
"python -c '" + (profilePre
if args.batchProfile
else "") + script +
493 (profilePost
if args.batchProfile
else "") +
"' " +
shCommandFromArgs(args.leftover) +
496 @contextlib.contextmanager
498 """!Provide a context manager for logging an operation 500 @param operation: description of operation (string) 501 @param catch: Catch all exceptions? 502 @param trace: Log a traceback of caught exception? 504 Note that if 'catch' is True, all exceptions are swallowed, but there may 505 be other side-effects such as undefined variables. 507 self.
log.
info(
"%s: Start %s" % (NODE, operation))
512 cls, e, _ = sys.exc_info()
513 self.
log.
warn(
"%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
515 self.
log.
info(
"%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
519 self.
log.
info(
"%s: Finished %s" % (NODE, operation))
523 """Starts a BatchCmdLineTask with an MPI process pool 525 Use this subclass of BatchCmdLineTask if you want to use the Pool directly. 530 """Run with a MPI process pool""" 532 super(BatchPoolTask, cls).
parseAndRun(*args, **kwargs)
537 """Run a Task individually on a list of inputs using the MPI process pool""" 542 Warn if the user specified multiprocessing. 544 TaskRunner.__init__(self, *args, **kwargs)
546 self.
log.
warn(
"Multiprocessing arguments (-j %d) ignored since using batch processing" %
551 """Run the task on all targets 553 Sole input is the result of parsing the command-line with the ArgumentParser. 555 Output is None if 'precall' failed; otherwise it is a list of calling ourself 556 on each element of the target list from the 'getTargetList' method. 565 if len(targetList) > 0:
566 parsedCmd.log.info(
"Processing %d targets with a pool of %d processes..." %
567 (len(targetList), pool.size))
569 resultList = pool.map(self, targetList)
571 parsedCmd.log.warn(
"Not running the task because there is no data to process; " 572 "you may preview data using \"--show data\"")
579 """Run the Task on a single target 581 Strips out the process pool 'cache' argument. 583 'args' are those arguments provided by the getTargetList method. 585 Brings down the entire job if an exception is not caught (i.e., --doraise). 587 return TaskRunner.__call__(self, args)
591 """Runs the BatchCmdLineTask in parallel 593 Use this subclass of BatchCmdLineTask if you don't need to use the Pool 594 directly, but just want to iterate over many objects (like a multi-node 595 version of the '-j' command-line argument). 597 RunnerClass = BatchTaskRunner
600 def _makeArgumentParser(cls, *args, **kwargs):
601 """Build an ArgumentParser 603 Removes the batch-specific parts in order to delegate to the parent classes. 605 kwargs.pop(
"doBatch",
False)
606 kwargs.pop(
"add_help",
False)
607 return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
611 """Parse an argument list and run the command 613 This is the entry point when we run in earnest, so start the process pool 614 so that the worker nodes don't go any further. 617 results = super(BatchParallelTask, cls).
parseAndRun(*args, **kwargs)
def parseAndRun(cls, args, kwargs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def __call__(self, cache, args)
def preamble(self, walltime=None)
def parseAndRun(cls, args, kwargs)
def _makeArgumentParser(cls)
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
def formatWalltime(walltime)
def batchCommand(cls, args)
Return command to run CmdLineTask.
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor.
def __init__(self, parent=None, args, kwargs)
def preamble(self, walltime=None)
def prepareForMultiProcessing(self)
def submitCommand(self, scriptName)
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool.
def parseAndSubmit(cls, args=None, kwargs)
def createScript(self, command, walltime=None)
Create script to be submitted.
def __init__(self, args, kwargs)
def precall(self, parsedCmd)
def preamble(self, walltime=None)
def submitCommand(self, scriptName)
def parse_args(self, config=None, args=None, namespace=None, kwargs)
def submitCommand(self, scriptName)
Return command to submit script.
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job.
def preamble(self, command, walltime=None)
def submitCommand(self, scriptName)
def setBatchType(batchType)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation.
def __init__(self, args, kwargs)
def getTargetList(parsedCmd, kwargs)
def applyOverrides(cls, config)
daf::base::PropertyList * list
def run(self, command, walltime=None)
Run the batch system.
def makeBatch(self, args)
def execution(self, command)
def shCommandFromArgs(args)