14 from .pool 
import startPool, Pool, NODE, abortOnError, setBatchType
 
   15 from . 
import log 
as dummyLog  
 
   17 __all__ = [
"Batch", 
"PbsBatch", 
"SlurmBatch", 
"SmpBatch", 
"BATCH_TYPES", 
"BatchArgumentParser",
 
   18            "BatchCmdLineTask", 
"BatchPoolTask", ]
 
   24 _quote_pos = re.compile(
'(?=[^-0-9a-zA-Z_./\n])')
 
   28     r"""Quote the argument for the shell. 
   37         return _quote_pos.sub(
'\\\\', arg).replace(
'\n', 
"'\n'")
 
   43     """Convert a list of shell arguments to a shell command-line""" 
   44     return ' '.join([
shQuote(a) 
for a 
in args])
 
   48     """Collect Linux-specific process statistics 
   50     Parses the /proc/self/status file (N.B. Linux-specific!) into a dict 
   54     with open(
"/proc/self/status") 
as f:
 
   56             key, _, value = line.partition(
":")
 
   57             result[key] = value.strip()
 
   62     """Print the process statistics to the log""" 
   64     log = Log.getDefaultLogger()
 
   65     log.info(
"Process stats for %s: %s" % (NODE, 
processStats()))
 
   69     """Base class for batch submission""" 
   71     def __init__(self, outputDir=None, numNodes=0, numProcsPerNode=0, numCores=0, queue=None, jobName=None,
 
   72                  walltime=0.0, dryrun=False, doExec=False, mpiexec="", submit=None, options=None,
 
   76         @param outputDir: output directory, or None 
   77         @param numNodes: number of nodes 
   78         @param numProcsPerNode: number of processors per node 
   79         @param numCores: number of cores (Slurm, SMP only) 
   80         @param queue: name of queue, or None 
   81         @param jobName: name of job, or None 
   82         @param walltime: maximum wall clock time for job 
   83         @param dryrun: Dry run (only print actions that would be taken)? 
   84         @param doExec: exec the script instead of submitting to batch system? 
   85         @param mpiexec: options for mpiexec 
   86         @param submit: command-line options for batch submission (e.g., for qsub, sbatch) 
   87         @param options: options to append to script header (e.g., #PBS or #SBATCH) 
   88         @param verbose: produce verbose output? 
   90         if (numNodes <= 0 
or numProcsPerNode <= 0) 
and numCores <= 0:
 
   91             raise RuntimeError(
"Must specify numNodes+numProcs or numCores")
 
  111         """Return preamble string for script to be submitted 
  113         Most batch systems allow you to embed submission options as comments here. 
  115         raise NotImplementedError(
"Not implemented for base class")
 
  118         """Return execution string for script to be submitted""" 
  120                   "umask %03o" % UMASK,
 
  121                   "cd %s" % pipes.quote(os.getcwd()),
 
  124             script += [
"echo \"mpiexec is at: $(which mpiexec)\"",
 
  126                        "echo 'umask: ' $(umask)",
 
  131         script += [
"mpiexec %s %s" % (self.
mpiexec, command)]
 
  136         return "\n".join(script)
 
  139         """!Create script to be submitted 
  141         @param command: command to run 
  142         @param walltime: maximum wall clock time, overrides value to constructor 
  143         @return name of script on filesystem 
  145         fd, scriptName = tempfile.mkstemp()
 
  146         with os.fdopen(fd, 
"w") 
as f:
 
  154         os.chmod(scriptName, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
 
  158         """!Return command to submit script 
  160         @param scriptName: name of script on filesystem 
  162         raise NotImplementedError(
"No implementation for base class")
 
  164     def run(self, command, walltime=None):
 
  165         """!Run the batch system 
  167         Creates and submits the script to execute the provided command 
  169         @param command: command to run 
  170         @param walltime: maximum wall clock time, overrides value to constructor 
  171         @return name of script on filesystem 
  173         scriptName = self.
createScript(command, walltime=walltime)
 
  176             print(
"Would run: %s" % command)
 
  178             os.execl(scriptName, scriptName)
 
  185     """Batch submission with PBS""" 
  191             raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
 
  194                 "Number of nodes (--nodes=%d) or number of processors per node (--procs=%d) not set" %
 
  197             raise RuntimeError(
"PBS does not support setting the number of cores")
 
  201             "#PBS -l walltime=%d" % walltime 
if walltime 
is not None else "",
 
  204             "#PBS -q %s" % self.
queue if self.
queue is not None else "",
 
  206             "#PBS -W umask=%03o" % UMASK,
 
  210         return "qsub %s -V %s" % (self.
submit if self.
submit is not None else "", scriptName)
 
  214     """Batch submission with Slurm""" 
  218         """Format walltime (in seconds) as days-hours:minutes""" 
  222         days = walltime//secInDay
 
  223         walltime -= days*secInDay
 
  224         hours = walltime//secInHour
 
  225         walltime -= hours*secInHour
 
  226         minutes = walltime//secInMinute
 
  227         walltime -= minutes*secInMinute
 
  230         return "%d-%d:%d" % (days, hours, minutes)
 
  236             raise RuntimeError(
"Non-positive walltime: %s (did you forget '--time'?)" % (walltime,))
 
  239                 "Number of nodes (--nodes=%d) and number of processors per node (--procs=%d) not set OR " 
  242             raise RuntimeError(
"Must set either --nodes,--procs or --cores: not both")
 
  245         filename = os.path.join(outputDir, (self.
jobName if self.
jobName is not None else "slurm") + 
".o%j")
 
  246         return "\n".join([(
"#SBATCH --nodes=%d" % self.
numNodes) 
if self.
numNodes > 0 
else "",
 
  251                           "#SBATCH --job-name=%s" % self.
jobName if self.
jobName is not None else "",
 
  252                           "#SBATCH -p %s" % self.
queue if self.
queue is not None else "",
 
  253                           "#SBATCH --output=%s" % filename,
 
  254                           "#SBATCH --error=%s" % filename,
 
  259         return "sbatch %s %s" % (self.
submit if self.
submit is not None else "", scriptName)
 
  263     """Not-really-Batch submission with multiple cores on the current node 
  265     The job is run immediately. 
  269         super(SmpBatch, self).
__init__(*args, **kwargs)
 
  276             raise RuntimeError(
"SMP does not support the --nodes and --procs command-line options; " 
  277                                "use --cores to specify the number of cores to use")
 
  287         return "exec %s" % scriptName
 
  290 BATCH_TYPES = {
'none': 
None,
 
  299     """An argument parser to get relevant parameters for batch submission 
  301     We want to be able to display the help for a 'parent' ArgumentParser 
  302     along with the batch-specific options we introduce in this class, but 
  303     we don't want to swallow the parent (i.e., ArgumentParser(parents=[parent])) 
  304     because we want to save the list of arguments that this particular 
  305     BatchArgumentParser doesn't parse, so they can be passed on to a different 
  306     program (though we also want to parse them to check that they can be parsed). 
  310         super(BatchArgumentParser, self).
__init__(*args, **kwargs)
 
  312         group = self.add_argument_group(
"Batch submission options")
 
  313         group.add_argument(
"--queue", help=
"Queue name")
 
  314         group.add_argument(
"--job", help=
"Job name")
 
  315         group.add_argument(
"--nodes", type=int, default=0, help=
"Number of nodes")
 
  316         group.add_argument(
"--procs", type=int, default=0, help=
"Number of processors per node")
 
  317         group.add_argument(
"--cores", type=int, default=0, help=
"Number of cores (Slurm/SMP only)")
 
  318         group.add_argument(
"--time", type=float, default=0,
 
  319                            help=
"Expected execution time per element (sec)")
 
  320         group.add_argument(
"--walltime", type=float, default=0,
 
  321                            help=
"Expected total execution walltime (sec); overrides cores & time")
 
  322         group.add_argument(
"--batch-type", dest=
"batchType", choices=
list(BATCH_TYPES.keys()), default=
"smp",
 
  323                            help=
"Batch system to use")
 
  324         group.add_argument(
"--batch-verbose", dest=
"batchVerbose", action=
"store_true", default=
False,
 
  325                            help=(
"Enable verbose output in batch script " 
  326                                  "(including system environment information at batch start)?"))
 
  327         group.add_argument(
"--batch-output", dest=
"batchOutput", help=
"Output directory")
 
  328         group.add_argument(
"--batch-submit", dest=
"batchSubmit", help=
"Batch submission command-line flags")
 
  329         group.add_argument(
"--batch-options", dest=
"batchOptions", help=
"Header options for batch script")
 
  330         group.add_argument(
"--batch-profile", dest=
"batchProfile", action=
"store_true", default=
False,
 
  331                            help=
"Enable profiling on batch job?")
 
  332         group.add_argument(
"--batch-stats", dest=
"batchStats", action=
"store_true", default=
False,
 
  333                            help=
"Print process stats on completion (Linux only)?")
 
  334         group.add_argument(
"--dry-run", dest=
"dryrun", default=
False, action=
"store_true",
 
  336         group.add_argument(
"--do-exec", dest=
"doExec", default=
False, action=
"store_true",
 
  337                            help=
"Exec script instead of submit to batch system?")
 
  338         group.add_argument(
"--mpiexec", default=
"", help=
"mpiexec options")
 
  340     def parse_args(self, config=None, args=None, namespace=None, **kwargs):
 
  341         args, leftover = super(BatchArgumentParser, self).parse_known_args(args=args, namespace=namespace)
 
  344         if len(leftover) > 0:
 
  347                 self.error(
"Unrecognised arguments: %s" % leftover)
 
  349             args.leftover = leftover
 
  354         """Create a Batch object from the command-line arguments""" 
  356         argMapping = {
'outputDir': 
'batchOutput',
 
  358                       'numProcsPerNode': 
'procs',
 
  365                       'mpiexec': 
'mpiexec',
 
  366                       'submit': 
'batchSubmit',
 
  367                       'options': 
'batchOptions',
 
  368                       'verbose': 
'batchVerbose',
 
  371         if BATCH_TYPES[args.batchType] 
is None:
 
  375         kwargs = {k: getattr(args, v) 
for k, v 
in argMapping.items()}
 
  376         return BATCH_TYPES[args.batchType](**kwargs)
 
  379         text = 
"""This is a script for queue submission of a wrapped script. 
  381 Use this program name and ignore that for the wrapped script (it will be 
  382 passed on to the batch system).  Arguments for *both* this wrapper script or the 
  383 wrapped script are valid (if it is required for the wrapped script, it 
  384 is required for the wrapper as well). 
  386 *** Batch system submission wrapper: 
  389         text += super(BatchArgumentParser, self).
format_help()
 
  410     """Generate bash script to regenerate the current environment""" 
  412     for key, val 
in os.environ.items():
 
  413         if key 
in (
"DISPLAY",):
 
  415         if val.startswith(
"() {"):
 
  422             if key.startswith(
"BASH_FUNC_") 
and key.endswith(
"()"):
 
  425             output += 
"{key} {val}\nexport -f {key}\n".
format(key=key, val=val)
 
  428             output += 
"export {key}='{val}'\n".
format(key=key, val=val.replace(
"'", 
"'\"'\"'"))
 
  438         batchArgs = batchParser.parse_args(config=cls.ConfigClass(), args=args, override=cls.
applyOverrides,
 
  441         if not cls.
RunnerClass(cls, batchArgs.parent).precall(batchArgs.parent):  
 
  442             taskParser.error(
"Error in task preparation")
 
  446         if batchArgs.batch 
is None:     
 
  447             sys.argv = [sys.argv[0]] + batchArgs.leftover  
 
  451             if batchArgs.walltime > 0:
 
  452                 walltime = batchArgs.walltime
 
  454                 numCores = batchArgs.cores 
if batchArgs.cores > 0 
else batchArgs.nodes*batchArgs.procs
 
  455                 walltime = cls.
batchWallTime(batchArgs.time, batchArgs.parent, numCores)
 
  458             batchArgs.batch.run(command, walltime=walltime)
 
  462         """!Return walltime request for batch job 
  464         Subclasses should override if the walltime should be calculated 
  465         differently (e.g., addition of some serial time). 
  468         @param time: Requested time per iteration 
  469         @param parsedCmd: Results of argument parsing 
  470         @param numCores: Number of cores 
  472         numTargets = len(cls.
RunnerClass.getTargetList(parsedCmd))
 
  473         return time*numTargets/float(numCores)
 
  477         """!Return command to run CmdLineTask 
  480         @param args: Parsed batch job arguments (from BatchArgumentParser) 
  482         job = args.job 
if args.job 
is not None else "job" 
  483         module = cls.__module__
 
  484         script = (
"import os; os.umask(%#05o); " +
 
  485                   "import lsst.base; lsst.base.disableImplicitThreading(); " +
 
  486                   "import lsst.ctrl.pool.log; lsst.ctrl.pool.log.jobLog(\"%s\"); ") % (UMASK, job)
 
  489             script += (
"import lsst.ctrl.pool.parallel; import atexit; " +
 
  490                        "atexit.register(lsst.ctrl.pool.parallel.printProcessStats); ")
 
  492         script += 
"import %s; %s.%s.parseAndRun();" % (module, module, cls.__name__)
 
  494         profilePre = 
"import cProfile; import os; cProfile.run(\"\"\"" 
  495         profilePost = 
"\"\"\", filename=\"profile-" + job + 
"-%s-%d.dat\" % (os.uname()[1], os.getpid()))" 
  497         return (
"python -c '" + (profilePre 
if args.batchProfile 
else "") + script +
 
  498                 (profilePost 
if args.batchProfile 
else "") + 
"' " + 
shCommandFromArgs(args.leftover) +
 
  501     @contextlib.contextmanager
 
  503         """!Provide a context manager for logging an operation 
  505         @param operation: description of operation (string) 
  506         @param catch: Catch all exceptions? 
  507         @param trace: Log a traceback of caught exception? 
  509         Note that if 'catch' is True, all exceptions are swallowed, but there may 
  510         be other side-effects such as undefined variables. 
  512         self.
log.
info(
"%s: Start %s" % (NODE, operation))
 
  517                 cls, e, _ = sys.exc_info()
 
  518                 self.
log.
warn(
"%s: Caught %s while %s: %s" % (NODE, cls.__name__, operation, e))
 
  520                     self.
log.
info(
"%s: Traceback:\n%s" % (NODE, traceback.format_exc()))
 
  524             self.
log.
info(
"%s: Finished %s" % (NODE, operation))
 
  528     """Starts a BatchCmdLineTask with an MPI process pool 
  530     Use this subclass of BatchCmdLineTask if you want to use the Pool directly. 
  535         """Run with a MPI process pool""" 
  537         super(BatchPoolTask, cls).
parseAndRun(*args, **kwargs)
 
  542     """Run a Task individually on a list of inputs using the MPI process pool""" 
  547         Warn if the user specified multiprocessing. 
  549         TaskRunner.__init__(self, *args, **kwargs)
 
  551             self.
log.
warn(
"Multiprocessing arguments (-j %d) ignored since using batch processing" %
 
  556         """Run the task on all targets 
  558         Sole input is the result of parsing the command-line with the ArgumentParser. 
  560         Output is None if 'precall' failed; otherwise it is a list of calling ourself 
  561         on each element of the target list from the 'getTargetList' method. 
  570             if len(targetList) > 0:
 
  571                 parsedCmd.log.info(
"Processing %d targets with a pool of %d processes..." %
 
  572                                    (len(targetList), pool.size))
 
  574                 resultList = pool.map(self, targetList)
 
  576                 parsedCmd.log.warn(
"Not running the task because there is no data to process; " 
  577                                    "you may preview data using \"--show data\"")
 
  584         """Run the Task on a single target 
  586         Strips out the process pool 'cache' argument. 
  588         'args' are those arguments provided by the getTargetList method. 
  590         Brings down the entire job if an exception is not caught (i.e., --doraise). 
  592         return TaskRunner.__call__(self, args)
 
  596     """Runs the BatchCmdLineTask in parallel 
  598     Use this subclass of BatchCmdLineTask if you don't need to use the Pool 
  599     directly, but just want to iterate over many objects (like a multi-node 
  600     version of the '-j' command-line argument). 
  602     RunnerClass = BatchTaskRunner
 
  605     def _makeArgumentParser(cls, *args, **kwargs):
 
  606         """Build an ArgumentParser 
  608         Removes the batch-specific parts in order to delegate to the parent classes. 
  610         kwargs.pop(
"doBatch", 
False)
 
  611         kwargs.pop(
"add_help", 
False)
 
  612         return super(BatchCmdLineTask, cls)._makeArgumentParser(*args, **kwargs)
 
  616         """Parse an argument list and run the command 
  618         This is the entry point when we run in earnest, so start the process pool 
  619         so that the worker nodes don't go any further. 
  622         results = super(BatchParallelTask, cls).
parseAndRun(*args, **kwargs)