22 from __future__
import absolute_import, division
28 from builtins
import str
29 from builtins
import object
31 from lsst.base import disableImplicitThreading
33 from .task
import Task, TaskError
34 from .struct
import Struct
35 from .argumentParser
import ArgumentParser
39 __all__ = [
"CmdLineTask",
"TaskRunner",
"ButlerInitializedTaskRunner"]
43 """Wrapper around function to catch exceptions that don't inherit from Exception
45 Such exceptions aren't caught by multiprocessing, which causes the slave
46 process to crash and you end up hitting the timeout.
54 cls, exc, tb = sys.exc_info()
55 log = Log.getDefaultLogger()
56 log.warn(
"Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc()))
57 raise Exception(
"Unhandled exception: %s (%s)" % (cls.__name__, exc))
60 def _runPool(pool, timeout, function, iterable):
61 """Wrapper around pool.map_async, to handle timeout
63 This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
64 http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
66 Further wraps the function in _poolFunctionWrapper to catch exceptions
67 that don't inherit from Exception.
69 return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
72 @contextlib.contextmanager
74 """!Context manager for profiling with cProfile
76 @param filename filename to which to write profile (profiling disabled if None or empty)
77 @param log log object for logging the profile operations
79 If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
80 it returns None), which allows additional control over profiling. You can obtain this using
81 the "as" clause, e.g.:
83 with profile(filename) as prof:
86 The output cumulative profile can be printed with a command-line like:
88 python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
94 from cProfile
import Profile
97 log.info(
"Enabling cProfile profiling")
101 profile.dump_stats(filename)
103 log.info(
"cProfile stats written to %s" % filename)
107 """!Run a command-line task, using multiprocessing if requested.
109 Each command-line task (subclass of CmdLineTask) has a task runner. By default it is
110 this class, but some tasks require a subclass. See the manual "how to write a command-line task"
111 in the pipe_tasks documentation for more information.
112 See CmdLineTask.parseAndRun to see how a task runner is used.
114 You may use this task runner for your command-line task if your task has a run method
115 that takes exactly one argument: a butler data reference. Otherwise you must
116 provide a task-specific subclass of this runner for your task's `RunnerClass`
117 that overrides TaskRunner.getTargetList and possibly TaskRunner.\_\_call\_\_.
118 See TaskRunner.getTargetList for details.
120 This design matches the common pattern for command-line tasks: the run method takes a single
121 data reference, of some suitable name. Additional arguments are rare, and if present, require
122 a subclass of TaskRunner that calls these additional arguments by name.
124 Instances of this class must be picklable in order to be compatible with multiprocessing.
125 If multiprocessing is requested (parsedCmd.numProcesses > 1) then run() calls prepareForMultiProcessing
126 to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing
127 then indicate this in your task by setting class variable canMultiprocess=False.
129 Due to a python bug [1], handling a KeyboardInterrupt properly requires specifying a timeout [2]. This
130 timeout (in sec) can be specified as the "timeout" element in the output from ArgumentParser
131 (the "parsedCmd"), if available, otherwise we use TaskRunner.TIMEOUT_DEFAULT.
133 [1] http://bugs.python.org/issue8296
134 [2] http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool)
138 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
139 """!Construct a TaskRunner
141 @warning Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may
142 contain non-picklable elements. It certainly contains more data than we need to send to each
143 instance of the task.
145 @param TaskClass The class of the task to run
146 @param parsedCmd The parsed command-line arguments, as returned by the task's argument parser's
148 @param doReturnResults Should run return the collected result from each invocation of the task?
149 This is only intended for unit tests and similar use.
150 It can easily exhaust memory (if the task returns enough data and you call it enough times)
151 and it will fail when using multiprocessing if the returned data cannot be pickled.
153 @throws ImportError if multiprocessing requested (and the task supports it)
154 but the multiprocessing library cannot be imported.
165 self.
timeout = getattr(parsedCmd,
'timeout',
None)
170 if not TaskClass.canMultiprocess:
171 self.log.warn(
"This task does not support multiprocessing; using one process")
175 """!Prepare this instance for multiprocessing by removing optional non-picklable elements.
177 This is only called if the task is run under multiprocessing.
182 """!Run the task on all targets.
184 The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.
186 @return a list of results returned by TaskRunner.\_\_call\_\_, or an empty list if
187 TaskRunner.\_\_call\_\_ is not called (e.g. if TaskRunner.precall returns `False`).
188 See TaskRunner.\_\_call\_\_ for details.
193 import multiprocessing
195 pool = multiprocessing.Pool(processes=self.
numProcesses, maxtasksperchild=1)
196 mapFunc = functools.partial(_runPool, pool, self.
timeout)
202 profileName = parsedCmd.profile
if hasattr(parsedCmd,
"profile")
else None
205 if len(targetList) > 0:
206 with
profile(profileName, log):
208 resultList = list(mapFunc(self, targetList))
210 log.warn(
"Not running the task because there is no data to process; "
211 "you may preview data using \"--show data\"")
221 """!Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.\_\_call\_\_.
223 @param parsedCmd the parsed command object (an argparse.Namespace) returned by
224 \ref argumentParser.ArgumentParser.parse_args "ArgumentParser.parse_args".
225 @param **kwargs any additional keyword arguments. In the default TaskRunner
226 this is an empty dict, but having it simplifies overriding TaskRunner for tasks
227 whose run method takes additional arguments (see case (1) below).
229 The default implementation of TaskRunner.getTargetList and TaskRunner.\_\_call\_\_ works for any
230 command-line task whose run method takes exactly one argument: a data reference.
231 Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList
232 and possibly TaskRunner.\_\_call\_\_. There are two cases:
234 (1) If your command-line task has a `run` method that takes one data reference followed by additional
235 arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as
236 an argument dict. To make this easier, your overridden version of getTargetList may call
237 TaskRunner.getTargetList with the extra arguments as keyword arguments. For example,
238 the following adds an argument dict containing a single key: "calExpList", whose value is the list
239 of data IDs for the calexp ID argument:
243 def getTargetList(parsedCmd):
244 return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)
247 It is equivalent to this slightly longer version:
251 def getTargetList(parsedCmd):
252 argDict = dict(calExpList=parsedCmd.calexp.idList)
253 return [(dataId, argDict) for dataId in parsedCmd.id.idList]
256 (2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList
257 and TaskRunner.\_\_call\_\_. You may do this however you see fit, so long as TaskRunner.getTargetList
258 returns a list, each of whose elements is sent to TaskRunner.\_\_call\_\_, which runs your task.
260 return [(ref, kwargs)
for ref
in parsedCmd.id.refList]
263 """!Create a Task instance
265 @param[in] parsedCmd parsed command-line options (used for extra task args by some task runners)
266 @param[in] args args tuple passed to TaskRunner.\_\_call\_\_ (used for extra task arguments
267 by some task runners)
269 makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None,
270 but it must construct identical Task instances in either case.
272 Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and
273 TaskRunner.\_\_call\_\_
278 """The main work of 'precall'
280 We write package versions, schemas and configs, or compare these to existing
281 files on disk if present.
283 if not parsedCmd.noVersions:
284 task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
289 """!Hook for code that should run exactly once, before multiprocessing is invoked.
291 Must return True if TaskRunner.\_\_call\_\_ should subsequently be called.
293 @warning Implementations must take care to ensure that no unpicklable attributes are added to
294 the TaskRunner itself, for compatibility with multiprocessing.
296 The default implementation writes package versions, schemas and configs, or compares
297 them to existing files on disk if present.
299 task = self.
makeTask(parsedCmd=parsedCmd)
306 except Exception
as e:
307 task.log.fatal(
"Failed in task initialization: %s" % e)
308 if not isinstance(e, TaskError):
309 traceback.print_exc(file=sys.stderr)
314 """!Run the Task on a single target.
316 This default implementation assumes that the 'args' is a tuple
317 containing a data reference and a dict of keyword arguments.
319 @warning if you override this method and wish to return something when
320 doReturnResults is false, then it must be picklable to support
321 multiprocessing and it should be small enough that pickling and
322 unpickling do not add excessive overhead.
324 @param args Arguments for Task.run()
327 - None if doReturnResults false
328 - A pipe_base Struct containing these fields if doReturnResults true:
329 - dataRef: the provided data reference
330 - metadata: task metadata after execution of run
331 - result: result returned by task run, or None if the task fails
333 dataRef, kwargs = args
335 self.
log = Log.getDefaultLogger()
336 if hasattr(dataRef,
"dataId"):
337 self.log.MDC(
"LABEL", str(dataRef.dataId))
338 elif isinstance(dataRef, (list, tuple)):
339 self.log.MDC(
"LABEL", str([ref.dataId
for ref
in dataRef
if hasattr(ref,
"dataId")]))
343 result = task.run(dataRef, **kwargs)
346 result = task.run(dataRef, **kwargs)
347 except Exception
as e:
349 if hasattr(dataRef,
"dataId"):
350 task.log.fatal(
"Failed on dataId=%s: %s" % (dataRef.dataId, e))
351 elif isinstance(dataRef, (list, tuple)):
352 task.log.fatal(
"Failed on dataId=[%s]: %s" %
353 (
",".join([str(_.dataId)
for _
in dataRef]), e))
355 task.log.fatal(
"Failed on dataRef=%s: %s" % (dataRef, e))
357 if not isinstance(e, TaskError):
358 traceback.print_exc(file=sys.stderr)
359 task.writeMetadata(dataRef)
364 metadata=task.metadata,
370 """!A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to
375 """!A variant of the base version that passes a butler argument to the task's constructor
377 @param[in] parsedCmd parsed command-line options, as returned by the argument parser;
378 if specified then args is ignored
379 @param[in] args other arguments; if parsedCmd is None then this must be specified
381 @throw RuntimeError if parsedCmd and args are both None
383 if parsedCmd
is not None:
384 butler = parsedCmd.butler
385 elif args
is not None:
386 dataRef, kwargs = args
387 butler = dataRef.butlerSubset.butler
389 raise RuntimeError(
"parsedCmd or args must be specified")
394 """!Base class for command-line tasks: tasks that may be executed from the command line
396 See \ref pipeBase_introduction "pipe_base introduction" to learn what tasks are,
397 and \ref pipeTasks_writeCmdLineTask "how to write a command-line task" for more information
398 about writing command-line tasks.
399 If the second link is broken (as it will be before the documentation is cross-linked)
400 then look at the main page of pipe_tasks documentation for a link.
402 Subclasses must specify the following class variables:
403 * ConfigClass: configuration class for your task (a subclass of \ref lsst.pex.config.config.Config
404 "lsst.pex.config.Config", or if your task needs no configuration, then
405 \ref lsst.pex.config.config.Config "lsst.pex.config.Config" itself)
406 * _DefaultName: default name used for this task (a str)
408 Subclasses may also specify the following class variables:
409 * RunnerClass: a task runner class. The default is TaskRunner, which works for any task
410 with a run method that takes exactly one argument: a data reference. If your task does
411 not meet this requirement then you must supply a variant of TaskRunner; see TaskRunner
412 for more information.
413 * canMultiprocess: the default is True; set False if your task does not support multiprocessing.
415 Subclasses must specify a method named "run":
416 - By default `run` accepts a single butler data reference, but you can specify an alternate task runner
417 (subclass of TaskRunner) as the value of class variable `RunnerClass` if your run method needs
419 - `run` is expected to return its data in a Struct. This provides safety for evolution of the task
420 since new values may be added without harming existing code.
421 - The data returned by `run` must be picklable if your task is to support multiprocessing.
423 RunnerClass = TaskRunner
424 canMultiprocess =
True
428 """!A hook to allow a task to change the values of its config *after* the camera-specific
429 overrides are loaded but before any command-line overrides are applied.
431 This is necessary in some cases because the camera-specific overrides may retarget subtasks,
432 wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
434 @warning This is called by CmdLineTask.parseAndRun; other ways of constructing a config
435 will not apply these overrides.
437 @param[in] cls the class object
438 @param[in] config task configuration (an instance of cls.ConfigClass)
443 def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
444 """!Parse an argument list and run the command
446 Calling this method with no arguments specified is the standard way to run a command-line task
447 from the command line. For an example see pipe_tasks `bin/makeSkyMap.py` or almost any other
448 file in that directory.
450 @param cls the class object
451 @param args list of command-line arguments; if `None` use sys.argv
452 @param config config for task (instance of pex_config Config); if `None` use cls.ConfigClass()
453 @param log log (instance of lsst.log.Log); if `None` use the default log
454 @param doReturnResults Return the collected results from each invocation of the task?
455 This is only intended for unit tests and similar use.
456 It can easily exhaust memory (if the task returns enough data and you call it enough times)
457 and it will fail when using multiprocessing if the returned data cannot be pickled.
459 @return a Struct containing:
460 - argumentParser: the argument parser
461 - parsedCmd: the parsed command returned by the argument parser's parse_args method
462 - taskRunner: the task runner used to run the task (an instance of cls.RunnerClass)
463 - resultList: results returned by the task runner's run method, one entry per invocation.
464 This will typically be a list of `None` unless doReturnResults is `True`;
465 see cls.RunnerClass (TaskRunner by default) for more information.
467 argumentParser = cls._makeArgumentParser()
469 config = cls.ConfigClass()
470 parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
471 taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
472 resultList = taskRunner.run(parsedCmd)
474 argumentParser=argumentParser,
476 taskRunner=taskRunner,
477 resultList=resultList,
482 """!Create and return an argument parser
484 @param[in] cls the class object
485 @return the argument parser for this task.
487 By default this returns an ArgumentParser with one ID argument named `--id` of dataset type "raw".
489 Your task subclass may need to override this method to change the dataset type or data ref level,
490 or to add additional data ID arguments. If you add additional data ID arguments or your task's
491 run method takes more than a single data reference then you will also have to provide a task-specific
492 task runner (see TaskRunner for more information).
494 parser = ArgumentParser(name=cls._DefaultName)
495 parser.add_id_argument(name=
"--id", datasetType=
"raw",
496 help=
"data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
500 """!Write the configuration used for processing the data, or check that an existing
501 one is equal to the new one if present.
503 @param[in] butler data butler used to write the config.
504 The config is written to dataset type self._getConfigName()
505 @param[in] clobber a boolean flag that controls what happens if a config already has been saved:
506 - True: overwrite the existing config
507 - False: raise TaskError if this config does not match the existing config
510 if configName
is None:
513 butler.put(self.config, configName, doBackup=doBackup)
514 elif butler.datasetExists(configName):
517 oldConfig = butler.get(configName, immediate=
True)
518 except Exception
as exc:
519 raise type(exc)(
"Unable to read stored config file %s (%s); consider using --clobber-config" %
521 output =
lambda msg: self.log.fatal(
"Comparing configuration: " + msg)
522 if not self.config.compare(oldConfig, shortcut=
False, output=output):
524 (
"Config does not match existing task config %r on disk; tasks configurations " +
525 "must be consistent within the same output repo (override with --clobber-config)") %
528 butler.put(self.config, configName)
531 """!Write the schemas returned by \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs"
533 @param[in] butler data butler used to write the schema.
534 Each schema is written to the dataset type specified as the key in the dict returned by
535 \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs".
536 @param[in] clobber a boolean flag that controls what happens if a schema already has been saved:
537 - True: overwrite the existing schema
538 - False: raise TaskError if this schema does not match the existing schema
540 @warning if clobber is False and an existing schema does not match a current schema,
541 then some schemas may have been saved successfully and others may not, and there is no easy way to
544 for dataset, catalog
in self.getAllSchemaCatalogs().items():
545 schemaDataset = dataset +
"_schema"
547 butler.put(catalog, schemaDataset, doBackup=doBackup)
548 elif butler.datasetExists(schemaDataset):
549 oldSchema = butler.get(schemaDataset, immediate=
True).getSchema()
550 if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
552 (
"New schema does not match schema %r on disk; schemas must be " +
553 " consistent within the same output repo (override with --clobber-config)") %
556 butler.put(catalog, schemaDataset)
559 """!Write the metadata produced from processing the data
561 @param[in] dataRef butler data reference used to write the metadata.
562 The metadata is written to dataset type self._getMetadataName()
566 if metadataName
is not None:
567 dataRef.put(self.getFullMetadata(), metadataName)
568 except Exception
as e:
569 self.log.warn(
"Could not persist metadata for dataId=%s: %s" % (dataRef.dataId, e,))
572 """!Compare and write package versions
574 We retrieve the persisted list of packages and compare with what we're currently using.
575 We raise TaskError if there's a version mismatch.
577 Note that this operation is subject to a race condition.
579 @param[in] butler data butler used to read/write the package versions
580 @param[in] clobber overwrite any existing config? no comparison will be made
581 @param[in] doBackup if clobbering, should we backup the old files?
582 @param[in] dataset name of dataset to read/write
584 packages = Packages.fromSystem()
587 return butler.put(packages, dataset, doBackup=doBackup)
588 if not butler.datasetExists(dataset):
589 return butler.put(packages, dataset)
592 old = butler.get(dataset, immediate=
True)
593 except Exception
as exc:
594 raise type(exc)(
"Unable to read stored version dataset %s (%s); "
595 "consider using --clobber-versions or --no-versions" %
600 diff = packages.difference(old)
603 "Version mismatch (" +
604 "; ".join(
"%s: %s vs %s" % (pkg, diff[pkg][1], diff[pkg][0])
for pkg
in diff) +
605 "); consider using --clobber-versions or --no-versions")
607 extra = packages.extra(old)
610 butler.put(old, dataset, doBackup=doBackup)
613 """!Return the name of the config dataset type, or None if config is not to be persisted
615 @note The name may depend on the config; that is why this is not a class method.
617 return self._DefaultName +
"_config"
620 """!Return the name of the metadata dataset type, or None if metadata is not to be persisted
622 @note The name may depend on the config; that is why this is not a class method.
624 return self._DefaultName +
"_metadata"
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
def __init__
Construct a TaskRunner.
def makeTask
Create a Task instance.
def writePackageVersions
Compare and write package versions.
bool disableImplicitThreading()
def _makeArgumentParser
Create and return an argument parser.
def makeTask
A variant of the base version that passes a butler argument to the task's constructor.
def run
Run the task on all targets.
def __call__
Run the Task on a single target.
def writeConfig
Write the configuration used for processing the data, or check that an existing one is equal to the n...
def _getConfigName
Return the name of the config dataset type, or None if config is not to be persisted.
def applyOverrides
A hook to allow a task to change the values of its config after the camera-specific overrides are loa...
def parseAndRun
Parse an argument list and run the command.
def _getMetadataName
Return the name of the metadata dataset type, or None if metadata is not to be persisted.
def writeMetadata
Write the metadata produced from processing the data.
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to their construc...
Run a command-line task, using multiprocessing if requested.
def profile
Context manager for profiling with cProfile.
def writeSchemas
Write the schemas returned by getAllSchemaCatalogs.
Base class for command-line tasks: tasks that may be executed from the command line.