14 from .pool 
import startPool, Pool, NODE, abortOnError, setBatchType
    15 from . 
import log 
as dummyLog  
    17 __all__ = [
"Batch", 
"PbsBatch", 
"SlurmBatch", 
"SmpBatch", 
"BATCH_TYPES", 
"BatchArgumentParser",
    18            "BatchCmdLineTask", 
"BatchPoolTask", ]
    24 _quote_pos = re.compile(
'(?=[^-0-9a-zA-Z_./\n])')
    28     r"""Quote the argument for the shell.    37         return _quote_pos.sub(
'\\\\', arg).replace(
'\n', 
"'\n'")
    43     """Convert a list of shell arguments to a shell command-line"""    44     return ' '.join([
shQuote(a) 
for a 
in args])
    48     """Collect Linux-specific process statistics    50     Parses the /proc/self/status file (N.B. Linux-specific!) into a dict    54     with open(
"/proc/self/status") 
as f:
    56             key, _, value = line.partition(
":")
    57             result[key] = value.strip()
    62     """Print the process statistics to the log"""    64     log = Log.getDefaultLogger()
    65     log.info(
"Process stats for %s: %s" % (NODE, 
processStats()))
    69     """Base class for batch submission"""    71     def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
    72                  walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
    76         @param outputDir: output directory, or None    77         @param numNodes: number of nodes    78         @param numProcsPerNode: number of processors per node    79         @param numCores: number of cores (Slurm, SMP only)    80         @param queue: name of queue, or None    81         @param jobName: name of job, or None    82         @param walltime: maximum wall clock time for job    83         @param dryrun: Dry run (only print actions that would be taken)?    84         @param doExec: exec the script instead of submitting to batch system?    85         @param mpiexec: options for mpiexec    86         @param submit: command-line options for batch submission (e.g., for qsub, sbatch)    87         @param options: options to append to script header (e.g., #PBS or #SBATCH)    88         @param verbose: produce verbose output?    90         if (numNodes <= 0 
or numProcsPerNode <= 0) 
and numCores <= 0:
    91             raise RuntimeError(
"Must specify numNodes+numProcs or numCores")
   111         """Return preamble string for script to be submitted   113         Most batch systems allow you to embed submission options as comments here.   115         raise NotImplementedError(
"Not implemented for base class")
   118         """Return execution string for script to be submitted"""   120                   "umask %03o" % UMASK,
   121                   "cd %s" % pipes.quote(os.getcwd()),
   124             script += [
"echo \"mpiexec is at: $(which mpiexec)\"",
   126                        "echo 'umask: ' $(umask)",
   131         script += [
"mpiexec %s %s" % (self.
mpiexec, command)]
   136         return "\n".join(script)
   139         """!Create script to be submitted   141         @param command: command to run   142         @param walltime: maximum wall clock time, overrides value to constructor   143         @return name of script on filesystem   145         fd, scriptName = tempfile.mkstemp()
   146         with os.fdopen(fd, 
"w") 
as f:
   154         os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
   158         """!Return command to submit script   160         @param scriptName: name of script on filesystem   162         raise NotImplementedError(
"No implementation for base class")
   164     def run(self, command, walltime=None):
   165         """!Run the batch system   167         Creates and submits the script to execute the provided command   169         @param command: command to run   170         @param walltime: maximum wall clock time, overrides value to constructor   171         @return name of script on filesystem   173         scriptName = self.
createScript(command, walltime=walltime)
   176             print(
"Would run: %s" % command)
   178             os.execl(scriptName, scriptName)
   185     """Batch submission with PBS"""   191             raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
   194                 "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
   197             raise RuntimeError(
"PBS does not support setting the number of cores")
   201             "#PBS -l walltime=%d" % walltime 
if walltime 
is not None else "",
   204             "#PBS -q %s" % self.
queue if self.
queue is not None else "",
   206             "#PBS -W umask=%03o" % UMASK,
   210         return "qsub %s -V %s" % (self.
submit if self.
submit is not None else "", scriptName)
   214     """Batch submission with Slurm"""   218         """Format walltime (in seconds) as days-hours:minutes"""   222         days = walltime//secInDay
   223         walltime -= days*secInDay
   224         hours = walltime//secInHour
   225         walltime -= hours*secInHour
   226         minutes = walltime//secInMinute
   227         walltime -= minutes*secInMinute
   230         return "%d-%d:%d" % (days, hours, minutes)
   236             raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
   239                 "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR "   242             raise RuntimeError(
"Must set either --nodes,--procs or --cores: not both")
   245         filename = os.path.join(outputDir, (self.
jobName if self.
jobName is not None else "slurm") + 
".o%j")
   246         return "\n".join([(
"#SBATCH --nodes=%d" % self.
numNodes) 
if self.
numNodes > 0 
else "",
   251                           "#SBATCH --job-name=%s" % self.
jobName if self.
jobName is not None else "",
   252                           "#SBATCH -p %s" % self.
queue if self.
queue is not None else "",
   253                           "#SBATCH --output=%s" % filename,
   254                           "#SBATCH --error=%s" % filename,
   259         return "sbatch %s %s" % (self.
submit if self.
submit is not None else "", scriptName)
   263     """Not-really-Batch submission with multiple cores on the current node   265     The job is run immediately.   269         super(SmpBatch, self).
__init__(*args, **kwargs)
   276             raise RuntimeError(
"SMP does not support the --nodes and --procs command-line options; "   277                                "use --cores to specify the number of cores to use")
   287         return "exec %s" % scriptName
   290 BATCH_TYPES = {
'none': 
None,
   299     """An argument parser to get relevant parameters for batch submission   301     We want to be able to display the help for a 'parent' ArgumentParser   302     along with the batch-specific options we introduce in this class, but   303     we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent]))   304     because we want to save the list of arguments that this particular   305     BatchArgumentParser doesn't parse, so they can be passed on to a different   306     program (though we also want to parse them to check that they can be parsed).   310         super(BatchArgumentParser, self).
__init__(*args, **kwargs)
   312         group = self.add_argument_group(
"Batch submission options")
   313         group.add_argument(
"--queue", help=
"Queue name")
   314         group.add_argument(
"--job", help=
"Job name")
   315         group.add_argument(
"--nodes", type=int, default=0, help=
"Number of nodes")
   316         group.add_argument(
"--procs", type=int, default=0, help=
"Number of processors per node")
   317         group.add_argument(
"--cores", type=int, default=0, help=
"Number of cores (Slurm/SMP only)")
   318         group.add_argument(
"--time", type=float, default=0,
   319                            help=
"Expected execution time per element (sec)")
   320         group.add_argument(
"--batch-type", dest=
"batchType", choices=
list(BATCH_TYPES.keys()), default=
"smp",
   321                            help=
"Batch system to use")
   322         group.add_argument(
"--batch-verbose", dest=
"batchVerbose", action=
"store_true", default=
False,
   323                            help=(
"Enable verbose output in batch script "   324                                  "(including system environment information at batch start)?"))
   325         group.add_argument(
"--batch-output", dest=
"batchOutput", help=
"Output directory")
   326         group.add_argument(
"--batch-submit", dest=
"batchSubmit", help=
"Batch submission command-line flags")
   327         group.add_argument(
"--batch-options", dest=
"batchOptions", help=
"Header options for batch script")
   328         group.add_argument(
"--batch-profile", dest=
"batchProfile", action=
"store_true", default=
False,
   329                            help=
"Enable profiling on batch job?")
   330         group.add_argument(
"--batch-stats", dest=
"batchStats", action=
"store_true", default=
False,
   331                            help=
"Print process stats on completion (Linux only)?")
   332         group.add_argument(
"--dry-run", dest=
"dryrun", default=
False, action=
"store_true",
   334         group.add_argument(
"--do-exec", dest=
"doExec", default=
False, action=
"store_true",
   335                            help=
"Exec script instead of submit to batch system?")
   336         group.add_argument(
"--mpiexec", default=
"", help=
"mpiexec options")
   338     def parse_args(self, config=None, args=None, namespace=None, **kwargs):
   339         args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
   342         if len(leftover) > 0:
   345                 self.error(
"Unrecognised arguments: %s" % leftover)
   347             args.leftover = leftover
   352         """Create a Batch object from the command-line arguments"""   354         argMapping = {
'outputDir': 
'batchOutput',
   356                       'numProcsPerNode': 
'procs',
   363                       'mpiexec': 
'mpiexec',
   364                       'submit': 
'batchSubmit',
   365                       'options': 
'batchOptions',
   366                       'verbose': 
'batchVerbose',
   369         if BATCH_TYPES[args.batchType] 
is None:
   373         kwargs = {k: getattr(args, v) 
for k, v 
in argMapping.items()}
   374         return BATCH_TYPES[args.batchType](**kwargs)
   377         text = 
"""This is a script for queue submission of a wrapped script.   379 Use this program name and ignore that for the wrapped script (it will be   380 passed on to the batch system).  Arguments for *both* this wrapper script or the   381 wrapped script are valid (if it is required for the wrapped script, it   382 is required for the wrapper as well).   384 *** Batch system submission wrapper:   387         text += super(BatchArgumentParser, self).
format_help()
   408     """Generate bash script to regenerate the current environment"""   410     for key, val 
in os.environ.items():
   411         if key 
in (
"DISPLAY",):
   413         if val.startswith(
"() {"):
   420             if key.startswith(
"BASH_FUNC_") 
and key.endswith(
"()"):
   423             output += 
"{key} {val}\nexport -f {key}\n".
format(key=key, val=val)
   426             output += 
"export {key}='{val}'\n".
format(key=key, val=val.replace(
"'", 
"'\"'\"'"))
   436         batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.
applyOverrides,
   439         if not cls.
RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent):  
   440             taskParser.error(
"Error in task preparation")
   444         if batchArgs.batch 
is None:     
   445             sys.argv = [sys.argv[0]] + batchArgs.leftover  
   449             numCores = batchArgs.cores 
if batchArgs.cores > 0 
else batchArgs.nodes*batchArgs.procs
   450             walltime = cls.
batchWallTime(batchArgs.time, batchArgs.parent, numCores)
   453             batchArgs.batch.run(command, walltime=walltime)
   457         """!Return walltime request for batch job   459         Subclasses should override if the walltime should be calculated   460         differently (e.g., addition of some serial time).   463         @param time: Requested time per iteration   464         @param parsedCmd: Results of argument parsing   465         @param numCores: Number of cores   467         numTargets = len(cls.
RunnerClass.getTargetList(parsedCmd))
   468         return time*numTargets/
float(numCores)
   472         """!Return command to run CmdLineTask   475         @param args: Parsed batch job arguments (from BatchArgumentParser)   477         job = args.job 
if args.job 
is not None else "job"   478         module = cls.__module__
   479         script = (
"import os; os.umask(%#05o); " +
   480                   "import lsst.base; lsst.base.disableImplicitThreading(); " +
   481                   "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
   484             script += (
"import lsst.ctrl.pool.parallel; import atexit; " +
   485                        "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
   487         script += 
"import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
   489         profilePre = 
"import cProfile; import os; cProfile.run(\"\"\""   490         profilePost = 
"\"\"\", filename=\"profile-" + job + 
"-%s-%d.dat\" % (os.uname()[1], os.getpid()))"   492         return (
"python -c '" + (profilePre 
if args.batchProfile 
else "") + script +
   493                 (profilePost 
if args.batchProfile 
else "") + 
"' " + 
shCommandFromArgs(args.leftover) +
   496     @contextlib.contextmanager
   498         """!Provide a context manager for logging an operation   500         @param operation: description of operation (string)   501         @param catch: Catch all exceptions?   502         @param trace: Log a traceback of caught exception?   504         Note that if 'catch' is True, all exceptions are swallowed, but there may   505         be other side-effects such as undefined variables.   507         self.
log.
info(
"%s: Start %s" % (NODE, operation))
   512                 cls, e, _ = sys.exc_info()
   513                 self.
log.
warn(
"%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
   515                     self.
log.
info(
"%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
   519             self.
log.
info(
"%s: Finished %s" % (NODE, operation))
   523     """Starts a BatchCmdLineTask with an MPI process pool   525     Use this subclass of BatchCmdLineTask if you want to use the Pool directly.   530         """Run with a MPI process pool"""   532         super(BatchPoolTask, cls).
parseAndRun(*args, **kwargs)
   537     """Run a Task individually on a list of inputs using the MPI process pool"""   542         Warn if the user specified multiprocessing.   544         TaskRunner.__init__(self, *args, **kwargs)
   546             self.
log.
warn(
"Multiprocessing arguments (-j %d) ignored since using batch processing" %
   551         """Run the task on all targets   553         Sole input is the result of parsing the command-line with the ArgumentParser.   555         Output is None if 'precall' failed; otherwise it is a list of calling ourself   556         on each element of the target list from the 'getTargetList' method.   565             if len(targetList) > 0:
   566                 parsedCmd.log.info(
"Processing %d targets with a pool of %d processes..." %
   567                                    (len(targetList), pool.size))
   569                 resultList = pool.map(self, targetList)
   571                 parsedCmd.log.warn(
"Not running the task because there is no data to process; "   572                                    "you may preview data using \"--show data\"")
   579         """Run the Task on a single target   581         Strips out the process pool 'cache' argument.   583         'args' are those arguments provided by the getTargetList method.   585         Brings down the entire job if an exception is not caught (i.e., --doraise).   587         return TaskRunner.__call__(self, args)
   591     """Runs the BatchCmdLineTask in parallel   593     Use this subclass of BatchCmdLineTask if you don't need to use the Pool   594     directly, but just want to iterate over many objects (like a multi-node   595     version of the '-j' command-line argument).   597     RunnerClass = BatchTaskRunner
   600     def _makeArgumentParser(cls, *args, **kwargs):
   601         """Build an ArgumentParser   603         Removes the batch-specific parts in order to delegate to the parent classes.   605         kwargs.pop(
"doBatch", 
False)
   606         kwargs.pop(
"add_help", 
False)
   607         return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
   611         """Parse an argument list and run the command   613         This is the entry point when we run in earnest, so start the process pool   614         so that the worker nodes don't go any further.   617         results = super(BatchParallelTask, cls).
parseAndRun(*args, **kwargs)
 def parseAndRun(cls, args, kwargs)
def __call__(self, cache, args)
def preamble(self, walltime=None)
def parseAndRun(cls, args, kwargs)
def _makeArgumentParser(cls)
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
def formatWalltime(walltime)
def batchCommand(cls, args)
Return command to run CmdLineTask. 
def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None, walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None, verbose=False)
Constructor. 
def __init__(self, parent=None, args, kwargs)
def preamble(self, walltime=None)
def prepareForMultiProcessing(self)
def submitCommand(self, scriptName)
def startPool(comm=None, root=0, killSlaves=True)
Start a process pool. 
def parseAndSubmit(cls, args=None, kwargs)
def createScript(self, command, walltime=None)
Create script to be submitted. 
def __init__(self, args, kwargs)
def precall(self, parsedCmd)
def preamble(self, walltime=None)
def submitCommand(self, scriptName)
def parse_args(self, config=None, args=None, namespace=None, kwargs)
def submitCommand(self, scriptName)
Return command to submit script. 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def batchWallTime(cls, time, parsedCmd, numCores)
Return walltime request for batch job. 
def preamble(self, command, walltime=None)
def submitCommand(self, scriptName)
def setBatchType(batchType)
def logOperation(self, operation, catch=False, trace=True)
Provide a context manager for logging an operation. 
def __init__(self, args, kwargs)
def getTargetList(parsedCmd, kwargs)
def applyOverrides(cls, config)
daf::base::PropertyList * list
def run(self, command, walltime=None)
Run the batch system. 
def makeBatch(self, args)
def execution(self, command)
def shCommandFromArgs(args)