22 __all__ = [
"CmdLineTask",
"TaskRunner",
"ButlerInitializedTaskRunner",
"LegacyTaskRunner"]
30 from lsst.base import disableImplicitThreading
32 from .task
import Task, TaskError
33 from .struct
import Struct
34 from .argumentParser
import ArgumentParser
39 def _runPool(pool, timeout, function, iterable):
40 """Wrapper around ``pool.map_async``, to handle timeout 42 This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see 43 http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool 45 return pool.map_async(function, iterable).get(timeout)
48 @contextlib.contextmanager
50 """Context manager for profiling with cProfile. 56 Filename to which to write profile (profiling disabled if `None` or empty). 57 log : `lsst.log.Log`, optional 58 Log object for logging the profile operations. 60 If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise 61 it returns None), which allows additional control over profiling. You can obtain this using 62 the "as" clause, e.g.: 64 with profile(filename) as prof: 67 The output cumulative profile can be printed with a command-line like:: 69 python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)' 75 from cProfile
import Profile
78 log.info(
"Enabling cProfile profiling")
82 profile.dump_stats(filename)
84 log.info(
"cProfile stats written to %s" % filename)
88 """Run a command-line task, using `multiprocessing` if requested. 92 TaskClass : `lsst.pipe.base.Task` subclass 93 The class of the task to run. 94 parsedCmd : `argparse.Namespace` 95 The parsed command-line arguments, as returned by the task's argument parser's 96 `~lsst.pipe.base.ArgumentParser.parse_args` method. 100 Do not store ``parsedCmd``, as this instance is pickled (if multiprocessing) and parsedCmd may 101 contain non-picklable elements. It certainly contains more data than we need to send to each 102 instance of the task. 103 doReturnResults : `bool`, optional 104 Should run return the collected result from each invocation of the task? This is only intended for 105 unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you 106 call it enough times) and it will fail when using multiprocessing if the returned data cannot be 109 Note that even if ``doReturnResults`` is False a struct with a single member "exitStatus" is returned, 110 with value 0 or 1 to be returned to the unix shell. 115 If multiprocessing is requested (and the task supports it) but the multiprocessing library cannot be 120 Each command-line task (subclass of `lsst.pipe.base.CmdLineTask`) has a task runner. By default it is this 121 class, but some tasks require a subclass. See the manual :ref:`creating-a-command-line-task` for more 122 information. See `CmdLineTask.parseAndRun` to see how a task runner is used. 124 You may use this task runner for your command-line task if your task has a runDataRef method that takes 125 exactly one argument: a butler data reference. Otherwise you must provide a task-specific subclass of 126 this runner for your task's ``RunnerClass`` that overrides `TaskRunner.getTargetList` and possibly 127 `TaskRunner.__call__`. See `TaskRunner.getTargetList` for details. 129 This design matches the common pattern for command-line tasks: the runDataRef method takes a single data 130 reference, of some suitable name. Additional arguments are rare, and if present, require a subclass of 131 `TaskRunner` that calls these additional arguments by name. 133 Instances of this class must be picklable in order to be compatible with multiprocessing. If 134 multiprocessing is requested (``parsedCmd.numProcesses > 1``) then `runDataRef` calls 135 `prepareForMultiProcessing` to jettison optional non-picklable elements. If your task runner is not 136 compatible with multiprocessing then indicate this in your task by setting class variable 137 ``canMultiprocess=False``. 139 Due to a `python bug`__, handling a `KeyboardInterrupt` properly `requires specifying a timeout`__. This 140 timeout (in sec) can be specified as the ``timeout`` element in the output from 141 `~lsst.pipe.base.ArgumentParser` (the ``parsedCmd``), if available, otherwise we use `TaskRunner.TIMEOUT`. 143 By default, we disable "implicit" threading -- ie, as provided by underlying numerical libraries such as 144 MKL or BLAS. This is designed to avoid thread contention both when a single command line task spawns 145 multiple processes and when multiple users are running on a shared system. Users can override this 146 behaviour by setting the ``LSST_ALLOW_IMPLICIT_THREADS`` environment variable. 148 .. __: http://bugs.python.org/issue8296 149 .. __: http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool 153 """Default timeout (seconds) for multiprocessing.""" 155 def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
165 self.
timeout = getattr(parsedCmd,
'timeout',
None)
170 if not TaskClass.canMultiprocess:
171 self.
log.
warn(
"This task does not support multiprocessing; using one process")
175 """Prepare this instance for multiprocessing 177 Optional non-picklable elements are removed. 179 This is only called if the task is run under multiprocessing. 184 """Run the task on all targets. 188 parsedCmd : `argparse.Namespace` 189 Parsed command `argparse.Namespace`. 194 A list of results returned by `TaskRunner.__call__`, or an empty list if `TaskRunner.__call__` 195 is not called (e.g. if `TaskRunner.precall` returns `False`). See `TaskRunner.__call__` 200 The task is run under multiprocessing if `TaskRunner.numProcesses` is more than 1; otherwise 201 processing is serial. 206 import multiprocessing
208 pool = multiprocessing.Pool(processes=self.
numProcesses, maxtasksperchild=1)
209 mapFunc = functools.partial(_runPool, pool, self.
timeout)
215 profileName = parsedCmd.profile
if hasattr(parsedCmd,
"profile")
else None 218 if len(targetList) > 0:
219 with
profile(profileName, log):
221 resultList =
list(mapFunc(self, targetList))
223 log.warn(
"Not running the task because there is no data to process; " 224 "you may preview data using \"--show data\"")
234 """Get a list of (dataRef, kwargs) for `TaskRunner.__call__`. 238 parsedCmd : `argparse.Namespace` 239 The parsed command object returned by `lsst.pipe.base.argumentParser.ArgumentParser.parse_args`. 241 Any additional keyword arguments. In the default `TaskRunner` this is an empty dict, but having 242 it simplifies overriding `TaskRunner` for tasks whose runDataRef method takes additional arguments 243 (see case (1) below). 247 The default implementation of `TaskRunner.getTargetList` and `TaskRunner.__call__` works for any 248 command-line task whose runDataRef method takes exactly one argument: a data reference. Otherwise you 249 must provide a variant of TaskRunner that overrides `TaskRunner.getTargetList` and possibly 250 `TaskRunner.__call__`. There are two cases. 254 If your command-line task has a ``runDataRef`` method that takes one data reference followed by 255 additional arguments, then you need only override `TaskRunner.getTargetList` to return the additional 256 arguments as an argument dict. To make this easier, your overridden version of 257 `~TaskRunner.getTargetList` may call `TaskRunner.getTargetList` with the extra arguments as keyword 258 arguments. For example, the following adds an argument dict containing a single key: "calExpList", 259 whose value is the list of data IDs for the calexp ID argument:: 261 def getTargetList(parsedCmd): 262 return TaskRunner.getTargetList( 264 calExpList=parsedCmd.calexp.idList 267 It is equivalent to this slightly longer version:: 270 def getTargetList(parsedCmd): 271 argDict = dict(calExpList=parsedCmd.calexp.idList) 272 return [(dataId, argDict) for dataId in parsedCmd.id.idList] 276 If your task does not meet condition (1) then you must override both TaskRunner.getTargetList and 277 `TaskRunner.__call__`. You may do this however you see fit, so long as `TaskRunner.getTargetList` 278 returns a list, each of whose elements is sent to `TaskRunner.__call__`, which runs your task. 280 return [(ref, kwargs)
for ref
in parsedCmd.id.refList]
283 """Create a Task instance. 288 Parsed command-line options (used for extra task args by some task runners). 290 Args tuple passed to `TaskRunner.__call__` (used for extra task arguments by some task runners). 294 ``makeTask`` can be called with either the ``parsedCmd`` argument or ``args`` argument set to None, 295 but it must construct identical Task instances in either case. 297 Subclasses may ignore this method entirely if they reimplement both `TaskRunner.precall` and 298 `TaskRunner.__call__`. 302 def _precallImpl(self, task, parsedCmd):
303 """The main work of `precall`. 305 We write package versions, schemas and configs, or compare these to existing files on disk if present. 307 if not parsedCmd.noVersions:
308 task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
313 """Hook for code that should run exactly once, before multiprocessing. 317 Must return True if `TaskRunner.__call__` should subsequently be called. 321 Implementations must take care to ensure that no unpicklable attributes are added to the 322 TaskRunner itself, for compatibility with multiprocessing. 324 The default implementation writes package versions, schemas and configs, or compares them to existing 325 files on disk if present. 327 task = self.
makeTask(parsedCmd=parsedCmd)
334 except Exception
as e:
335 task.log.fatal(
"Failed in task initialization: %s", e)
336 if not isinstance(e, TaskError):
337 traceback.print_exc(file=sys.stderr)
342 """Run the Task on a single target. 347 Arguments for Task.runDataRef() 351 struct : `lsst.pipe.base.Struct` 352 Contains these fields if ``doReturnResults`` is `True`: 354 - ``dataRef``: the provided data reference. 355 - ``metadata``: task metadata after execution of run. 356 - ``result``: result returned by task run, or `None` if the task fails. 357 - ``exitStatus``: 0 if the task completed successfully, 1 otherwise. 359 If ``doReturnResults`` is `False` the struct contains: 361 - ``exitStatus``: 0 if the task completed successfully, 1 otherwise. 365 This default implementation assumes that the ``args`` is a tuple containing a data reference and a 366 dict of keyword arguments. 370 If you override this method and wish to return something when ``doReturnResults`` is `False`, 371 then it must be picklable to support multiprocessing and it should be small enough that pickling 372 and unpickling do not add excessive overhead. 374 dataRef, kwargs = args
376 self.
log = Log.getDefaultLogger()
377 if hasattr(dataRef,
"dataId"):
378 self.
log.
MDC(
"LABEL", str(dataRef.dataId))
379 elif isinstance(dataRef, (list, tuple)):
380 self.
log.
MDC(
"LABEL", str([ref.dataId
for ref
in dataRef
if hasattr(ref,
"dataId")]))
385 result = self.
runTask(task, dataRef, kwargs)
388 result = self.
runTask(task, dataRef, kwargs)
389 except Exception
as e:
395 eName =
type(e).__name__
396 if hasattr(dataRef,
"dataId"):
397 task.log.fatal(
"Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
398 elif isinstance(dataRef, (list, tuple)):
399 task.log.fatal(
"Failed on dataIds=[%s]: %s: %s",
400 ", ".join(str(ref.dataId)
for ref
in dataRef), eName, e)
402 task.log.fatal(
"Failed on dataRef=%s: %s: %s", dataRef, eName, e)
404 if not isinstance(e, TaskError):
405 traceback.print_exc(file=sys.stderr)
411 task.writeMetadata(dataRef)
418 exitStatus=exitStatus,
420 metadata=task.metadata,
425 exitStatus=exitStatus,
429 """Make the actual call to `runDataRef` for this task. 433 task : `lsst.pipe.base.CmdLineTask` class 434 The class of the task to run. 436 Butler data reference that contains the data the task will process. 438 Any additional keyword arguments. See `TaskRunner.getTargetList` above. 442 The default implementation of `TaskRunner.runTask` works for any command-line task which has a 443 runDataRef method that takes a data reference and an optional set of additional keyword arguments. 444 This method returns the results generated by the task's `runDataRef` method. 447 return task.runDataRef(dataRef, **kwargs)
451 r"""A `TaskRunner` for `CmdLineTask`\ s which calls the `Task`\ 's `run` method on a `dataRef` rather 452 than the `runDataRef` method. 456 """Call `run` for this task instead of `runDataRef`. See `TaskRunner.runTask` above for details. 458 return task.run(dataRef, **kwargs)
462 r"""A `TaskRunner` for `CmdLineTask`\ s that require a ``butler`` keyword argument to be passed to 467 """A variant of the base version that passes a butler argument to the task's constructor. 471 parsedCmd : `argparse.Namespace` 472 Parsed command-line options, as returned by the `~lsst.pipe.base.ArgumentParser`; if specified 473 then args is ignored. 475 Other arguments; if ``parsedCmd`` is `None` then this must be specified. 480 Raised if ``parsedCmd`` and ``args`` are both `None`. 482 if parsedCmd
is not None:
483 butler = parsedCmd.butler
484 elif args
is not None:
485 dataRef, kwargs = args
486 butler = dataRef.butlerSubset.butler
488 raise RuntimeError(
"parsedCmd or args must be specified")
493 """Base class for command-line tasks: tasks that may be executed from the command-line. 497 See :ref:`task-framework-overview` to learn what tasks are and :ref:`creating-a-command-line-task` for 498 more information about writing command-line tasks. 500 Subclasses must specify the following class variables: 502 - ``ConfigClass``: configuration class for your task (a subclass of `lsst.pex.config.Config`, or if your 503 task needs no configuration, then `lsst.pex.config.Config` itself). 504 - ``_DefaultName``: default name used for this task (a str). 506 Subclasses may also specify the following class variables: 508 - ``RunnerClass``: a task runner class. The default is ``TaskRunner``, which works for any task 509 with a runDataRef method that takes exactly one argument: a data reference. If your task does 510 not meet this requirement then you must supply a variant of ``TaskRunner``; see ``TaskRunner`` 511 for more information. 512 - ``canMultiprocess``: the default is `True`; set `False` if your task does not support multiprocessing. 514 Subclasses must specify a method named ``runDataRef``: 516 - By default ``runDataRef`` accepts a single butler data reference, but you can specify an alternate 517 task runner (subclass of ``TaskRunner``) as the value of class variable ``RunnerClass`` if your run 518 method needs something else. 519 - ``runDataRef`` is expected to return its data in a `lsst.pipe.base.Struct`. This provides safety for 520 evolution of the task since new values may be added without harming existing code. 521 - The data returned by ``runDataRef`` must be picklable if your task is to support multiprocessing. 523 RunnerClass = TaskRunner
524 canMultiprocess =
True 528 """A hook to allow a task to change the values of its config *after* the camera-specific 529 overrides are loaded but before any command-line overrides are applied. 533 config : instance of task's ``ConfigClass`` 538 This is necessary in some cases because the camera-specific overrides may retarget subtasks, 539 wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion. 543 This is called by CmdLineTask.parseAndRun; other ways of constructing a config will not apply 549 def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
550 """Parse an argument list and run the command. 554 args : `list`, optional 555 List of command-line arguments; if `None` use `sys.argv`. 556 config : `lsst.pex.config.Config`-type, optional 557 Config for task. If `None` use `Task.ConfigClass`. 558 log : `lsst.log.Log`-type, optional 559 Log. If `None` use the default log. 560 doReturnResults : `bool`, optional 561 If `True`, return the results of this task. Default is `False`. This is only intended for 562 unit tests and similar use. It can easily exhaust memory (if the task returns enough data and you 563 call it enough times) and it will fail when using multiprocessing if the returned data cannot be 568 struct : `lsst.pipe.base.Struct` 572 the argument parser (`lsst.pipe.base.ArgumentParser`). 574 the parsed command returned by the argument parser's 575 `~lsst.pipe.base.ArgumentParser.parse_args` method 576 (`argparse.Namespace`). 578 the task runner used to run the task (an instance of `Task.RunnerClass`). 580 results returned by the task runner's ``run`` method, one entry 581 per invocation (`list`). This will typically be a list of 582 `Struct`, each containing at least an ``exitStatus`` integer 583 (0 or 1); see `Task.RunnerClass` (`TaskRunner` by default) for 588 Calling this method with no arguments specified is the standard way to run a command-line task 589 from the command-line. For an example see ``pipe_tasks`` ``bin/makeSkyMap.py`` or almost any other 590 file in that directory. 592 If one or more of the dataIds fails then this routine will exit (with a status giving the 593 number of failed dataIds) rather than returning this struct; this behaviour can be 594 overridden by specifying the ``--noExit`` command-line option. 597 commandAsStr =
" ".join(sys.argv)
604 config = cls.ConfigClass()
605 parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.
applyOverrides)
607 parsedCmd.log.info(
"Running: %s", commandAsStr)
609 taskRunner = cls.
RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
610 resultList = taskRunner.run(parsedCmd)
613 nFailed = sum(((res.exitStatus != 0)
for res
in resultList))
614 except (TypeError, AttributeError)
as e:
616 parsedCmd.log.warn(
"Unable to retrieve exit status (%s); assuming success", e)
621 parsedCmd.log.error(
"%d dataRefs failed; not exiting as --noExit was set", nFailed)
626 argumentParser=argumentParser,
628 taskRunner=taskRunner,
629 resultList=resultList,
633 def _makeArgumentParser(cls):
634 """Create and return an argument parser. 638 parser : `lsst.pipe.base.ArgumentParser` 639 The argument parser for this task. 643 By default this returns an `~lsst.pipe.base.ArgumentParser` with one ID argument named `--id` of 644 dataset type ``raw``. 646 Your task subclass may need to override this method to change the dataset type or data ref level, 647 or to add additional data ID arguments. If you add additional data ID arguments or your task's 648 runDataRef method takes more than a single data reference then you will also have to provide a 649 task-specific task runner (see TaskRunner for more information). 652 parser.add_id_argument(name=
"--id", datasetType=
"raw",
653 help=
"data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
657 """Write the configuration used for processing the data, or check that an existing 658 one is equal to the new one if present. 662 butler : `lsst.daf.persistence.Butler` 663 Data butler used to write the config. The config is written to dataset type 664 `CmdLineTask._getConfigName`. 665 clobber : `bool`, optional 666 A boolean flag that controls what happens if a config already has been saved: 667 - `True`: overwrite or rename the existing config, depending on ``doBackup``. 668 - `False`: raise `TaskError` if this config does not match the existing config. 669 doBackup : bool, optional 670 Set to `True` to backup the config files if clobbering. 673 if configName
is None:
676 butler.put(self.
config, configName, doBackup=doBackup)
677 elif butler.datasetExists(configName, write=
True):
680 oldConfig = butler.get(configName, immediate=
True)
681 except Exception
as exc:
682 raise type(exc)(
"Unable to read stored config file %s (%s); consider using --clobber-config" %
685 def logConfigMismatch(msg):
686 self.
log.
fatal(
"Comparing configuration: %s", msg)
688 if not self.
config.compare(oldConfig, shortcut=
False, output=logConfigMismatch):
690 (
"Config does not match existing task config %r on disk; tasks configurations " +
691 "must be consistent within the same output repo (override with --clobber-config)") %
694 butler.put(self.
config, configName)
697 """Write the schemas returned by `lsst.pipe.base.Task.getAllSchemaCatalogs`. 701 butler : `lsst.daf.persistence.Butler` 702 Data butler used to write the schema. Each schema is written to the dataset type specified as the 703 key in the dict returned by `~lsst.pipe.base.Task.getAllSchemaCatalogs`. 704 clobber : `bool`, optional 705 A boolean flag that controls what happens if a schema already has been saved: 706 - `True`: overwrite or rename the existing schema, depending on ``doBackup``. 707 - `False`: raise `TaskError` if this schema does not match the existing schema. 708 doBackup : `bool`, optional 709 Set to `True` to backup the schema files if clobbering. 713 If ``clobber`` is `False` and an existing schema does not match a current schema, 714 then some schemas may have been saved successfully and others may not, and there is no easy way to 718 schemaDataset = dataset +
"_schema" 720 butler.put(catalog, schemaDataset, doBackup=doBackup)
721 elif butler.datasetExists(schemaDataset, write=
True):
722 oldSchema = butler.get(schemaDataset, immediate=
True).getSchema()
723 if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
725 (
"New schema does not match schema %r on disk; schemas must be " +
726 " consistent within the same output repo (override with --clobber-config)") %
729 butler.put(catalog, schemaDataset)
732 """Write the metadata produced from processing the data. 737 Butler data reference used to write the metadata. 738 The metadata is written to dataset type `CmdLineTask._getMetadataName`. 742 if metadataName
is not None:
744 except Exception
as e:
745 self.
log.
warn(
"Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
748 """Compare and write package versions. 752 butler : `lsst.daf.persistence.Butler` 753 Data butler used to read/write the package versions. 754 clobber : `bool`, optional 755 A boolean flag that controls what happens if versions already have been saved: 756 - `True`: overwrite or rename the existing version info, depending on ``doBackup``. 757 - `False`: raise `TaskError` if this version info does not match the existing. 758 doBackup : `bool`, optional 759 If `True` and clobbering, old package version files are backed up. 760 dataset : `str`, optional 761 Name of dataset to read/write. 766 Raised if there is a version mismatch with current and persisted lists of package versions. 770 Note that this operation is subject to a race condition. 772 packages = Packages.fromSystem()
775 return butler.put(packages, dataset, doBackup=doBackup)
776 if not butler.datasetExists(dataset, write=
True):
777 return butler.put(packages, dataset)
780 old = butler.get(dataset, immediate=
True)
781 except Exception
as exc:
782 raise type(exc)(
"Unable to read stored version dataset %s (%s); " 783 "consider using --clobber-versions or --no-versions" %
788 diff = packages.difference(old)
791 "Version mismatch (" +
792 "; ".join(
"%s: %s vs %s" % (pkg, diff[pkg][1], diff[pkg][0])
for pkg
in diff) +
793 "); consider using --clobber-versions or --no-versions")
795 extra = packages.extra(old)
798 butler.put(old, dataset, doBackup=doBackup)
800 def _getConfigName(self):
801 """Get the name of the config dataset type, or `None` if config is not to be persisted. 805 The name may depend on the config; that is why this is not a class method. 807 return self._DefaultName +
"_config" 809 def _getMetadataName(self):
810 """Get the name of the metadata dataset type, or `None` if metadata is not to be persisted. 814 The name may depend on the config; that is why this is not a class method. 816 return self._DefaultName +
"_metadata"
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def _makeArgumentParser(cls)
def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False)
def _precallImpl(self, task, parsedCmd)
def runTask(self, task, dataRef, kwargs)
def getFullMetadata(self)
std::vector< SchemaItem< Flag > > * items
def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages")
def getAllSchemaCatalogs(self)
def writeSchemas(self, butler, clobber=False, doBackup=True)
def prepareForMultiProcessing(self)
def _getMetadataName(self)
bool disableImplicitThreading()
Disable threading that has not been set explicitly.
def makeTask(self, parsedCmd=None, args=None)
def writeMetadata(self, dataRef)
def precall(self, parsedCmd)
def __init__(self, TaskClass, parsedCmd, doReturnResults=False)
def profile(filename, log=None)
def makeTask(self, parsedCmd=None, args=None)
def getTargetList(parsedCmd, kwargs)
def writeConfig(self, butler, clobber=False, doBackup=True)
def applyOverrides(cls, config)
daf::base::PropertyList * list
def runTask(self, task, dataRef, kwargs)