LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
cmdLineTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2015 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 from __future__ import absolute_import, division
23 import sys
24 import traceback
25 import functools
26 import contextlib
27 
28 from builtins import str
29 from builtins import object
30 
31 from lsst.base import disableImplicitThreading
32 import lsst.afw.table as afwTable
33 from .task import Task, TaskError
34 from .struct import Struct
35 from .argumentParser import ArgumentParser
36 from lsst.base import Packages
37 from lsst.log import Log
38 
39 __all__ = ["CmdLineTask", "TaskRunner", "ButlerInitializedTaskRunner"]
40 
41 
42 def _poolFunctionWrapper(function, arg):
43  """Wrapper around function to catch exceptions that don't inherit from Exception
44 
45  Such exceptions aren't caught by multiprocessing, which causes the slave
46  process to crash and you end up hitting the timeout.
47  """
48  try:
49  return function(arg)
50  except Exception:
51  raise # No worries
52  except:
53  # Need to wrap the exception with something multiprocessing will recognise
54  cls, exc, tb = sys.exc_info()
55  log = Log.getDefaultLogger()
56  log.warn("Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc()))
57  raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
58 
59 
60 def _runPool(pool, timeout, function, iterable):
61  """Wrapper around pool.map_async, to handle timeout
62 
63  This is required so as to trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
64  http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
65 
66  Further wraps the function in _poolFunctionWrapper to catch exceptions
67  that don't inherit from Exception.
68  """
69  return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
70 
71 
72 @contextlib.contextmanager
73 def profile(filename, log=None):
74  """!Context manager for profiling with cProfile
75 
76  @param filename filename to which to write profile (profiling disabled if None or empty)
77  @param log log object for logging the profile operations
78 
79  If profiling is enabled, the context manager returns the cProfile.Profile object (otherwise
80  it returns None), which allows additional control over profiling. You can obtain this using
81  the "as" clause, e.g.:
82 
83  with profile(filename) as prof:
84  runYourCodeHere()
85 
86  The output cumulative profile can be printed with a command-line like:
87 
88  python -c 'import pstats; pstats.Stats("<filename>").sort_stats("cumtime").print_stats(30)'
89  """
90  if not filename:
91  # Nothing to do
92  yield
93  return
94  from cProfile import Profile
95  profile = Profile()
96  if log is not None:
97  log.info("Enabling cProfile profiling")
98  profile.enable()
99  yield profile
100  profile.disable()
101  profile.dump_stats(filename)
102  if log is not None:
103  log.info("cProfile stats written to %s" % filename)
104 
105 
106 class TaskRunner(object):
107  """!Run a command-line task, using multiprocessing if requested.
108 
109  Each command-line task (subclass of CmdLineTask) has a task runner. By default it is
110  this class, but some tasks require a subclass. See the manual "how to write a command-line task"
111  in the pipe_tasks documentation for more information.
112  See CmdLineTask.parseAndRun to see how a task runner is used.
113 
114  You may use this task runner for your command-line task if your task has a run method
115  that takes exactly one argument: a butler data reference. Otherwise you must
116  provide a task-specific subclass of this runner for your task's `RunnerClass`
117  that overrides TaskRunner.getTargetList and possibly TaskRunner.\_\_call\_\_.
118  See TaskRunner.getTargetList for details.
119 
120  This design matches the common pattern for command-line tasks: the run method takes a single
121  data reference, of some suitable name. Additional arguments are rare, and if present, require
122  a subclass of TaskRunner that calls these additional arguments by name.
123 
124  Instances of this class must be picklable in order to be compatible with multiprocessing.
125  If multiprocessing is requested (parsedCmd.numProcesses > 1) then run() calls prepareForMultiProcessing
126  to jettison optional non-picklable elements. If your task runner is not compatible with multiprocessing
127  then indicate this in your task by setting class variable canMultiprocess=False.
128 
129  Due to a python bug [1], handling a KeyboardInterrupt properly requires specifying a timeout [2]. This
130  timeout (in sec) can be specified as the "timeout" element in the output from ArgumentParser
131  (the "parsedCmd"), if available, otherwise we use TaskRunner.TIMEOUT_DEFAULT.
132 
133  [1] http://bugs.python.org/issue8296
134  [2] http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool)
135  """
136  TIMEOUT = 9999 # Default timeout (sec) for multiprocessing
137 
138  def __init__(self, TaskClass, parsedCmd, doReturnResults=False):
139  """!Construct a TaskRunner
140 
141  @warning Do not store parsedCmd, as this instance is pickled (if multiprocessing) and parsedCmd may
142  contain non-picklable elements. It certainly contains more data than we need to send to each
143  instance of the task.
144 
145  @param TaskClass The class of the task to run
146  @param parsedCmd The parsed command-line arguments, as returned by the task's argument parser's
147  parse_args method.
148  @param doReturnResults Should run return the collected result from each invocation of the task?
149  This is only intended for unit tests and similar use.
150  It can easily exhaust memory (if the task returns enough data and you call it enough times)
151  and it will fail when using multiprocessing if the returned data cannot be pickled.
152 
153  @throws ImportError if multiprocessing requested (and the task supports it)
154  but the multiprocessing library cannot be imported.
155  """
156  self.TaskClass = TaskClass
157  self.doReturnResults = bool(doReturnResults)
158  self.config = parsedCmd.config
159  self.log = parsedCmd.log
160  self.doRaise = bool(parsedCmd.doraise)
161  self.clobberConfig = bool(parsedCmd.clobberConfig)
162  self.doBackup = not bool(parsedCmd.noBackupConfig)
163  self.numProcesses = int(getattr(parsedCmd, 'processes', 1))
164 
165  self.timeout = getattr(parsedCmd, 'timeout', None)
166  if self.timeout is None or self.timeout <= 0:
167  self.timeout = self.TIMEOUT
168 
169  if self.numProcesses > 1:
170  if not TaskClass.canMultiprocess:
171  self.log.warn("This task does not support multiprocessing; using one process")
172  self.numProcesses = 1
173 
175  """!Prepare this instance for multiprocessing by removing optional non-picklable elements.
176 
177  This is only called if the task is run under multiprocessing.
178  """
179  self.log = None
180 
181  def run(self, parsedCmd):
182  """!Run the task on all targets.
183 
184  The task is run under multiprocessing if numProcesses > 1; otherwise processing is serial.
185 
186  @return a list of results returned by TaskRunner.\_\_call\_\_, or an empty list if
187  TaskRunner.\_\_call\_\_ is not called (e.g. if TaskRunner.precall returns `False`).
188  See TaskRunner.\_\_call\_\_ for details.
189  """
190  resultList = []
191  if self.numProcesses > 1:
192  disableImplicitThreading() # To prevent thread contention
193  import multiprocessing
195  pool = multiprocessing.Pool(processes=self.numProcesses, maxtasksperchild=1)
196  mapFunc = functools.partial(_runPool, pool, self.timeout)
197  else:
198  pool = None
199  mapFunc = map
200 
201  if self.precall(parsedCmd):
202  profileName = parsedCmd.profile if hasattr(parsedCmd, "profile") else None
203  log = parsedCmd.log
204  targetList = self.getTargetList(parsedCmd)
205  if len(targetList) > 0:
206  with profile(profileName, log):
207  # Run the task using self.__call__
208  resultList = list(mapFunc(self, targetList))
209  else:
210  log.warn("Not running the task because there is no data to process; "
211  "you may preview data using \"--show data\"")
212 
213  if pool is not None:
214  pool.close()
215  pool.join()
216 
217  return resultList
218 
219  @staticmethod
220  def getTargetList(parsedCmd, **kwargs):
221  """!Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.\_\_call\_\_.
222 
223  @param parsedCmd the parsed command object (an argparse.Namespace) returned by
224  \ref argumentParser.ArgumentParser.parse_args "ArgumentParser.parse_args".
225  @param **kwargs any additional keyword arguments. In the default TaskRunner
226  this is an empty dict, but having it simplifies overriding TaskRunner for tasks
227  whose run method takes additional arguments (see case (1) below).
228 
229  The default implementation of TaskRunner.getTargetList and TaskRunner.\_\_call\_\_ works for any
230  command-line task whose run method takes exactly one argument: a data reference.
231  Otherwise you must provide a variant of TaskRunner that overrides TaskRunner.getTargetList
232  and possibly TaskRunner.\_\_call\_\_. There are two cases:
233 
234  (1) If your command-line task has a `run` method that takes one data reference followed by additional
235  arguments, then you need only override TaskRunner.getTargetList to return the additional arguments as
236  an argument dict. To make this easier, your overridden version of getTargetList may call
237  TaskRunner.getTargetList with the extra arguments as keyword arguments. For example,
238  the following adds an argument dict containing a single key: "calExpList", whose value is the list
239  of data IDs for the calexp ID argument:
240 
241  \code
242  \@staticmethod
243  def getTargetList(parsedCmd):
244  return TaskRunner.getTargetList(parsedCmd, calExpList=parsedCmd.calexp.idList)
245  \endcode
246 
247  It is equivalent to this slightly longer version:
248 
249  \code
250  \@staticmethod
251  def getTargetList(parsedCmd):
252  argDict = dict(calExpList=parsedCmd.calexp.idList)
253  return [(dataId, argDict) for dataId in parsedCmd.id.idList]
254  \endcode
255 
256  (2) If your task does not meet condition (1) then you must override both TaskRunner.getTargetList
257  and TaskRunner.\_\_call\_\_. You may do this however you see fit, so long as TaskRunner.getTargetList
258  returns a list, each of whose elements is sent to TaskRunner.\_\_call\_\_, which runs your task.
259  """
260  return [(ref, kwargs) for ref in parsedCmd.id.refList]
261 
262  def makeTask(self, parsedCmd=None, args=None):
263  """!Create a Task instance
264 
265  @param[in] parsedCmd parsed command-line options (used for extra task args by some task runners)
266  @param[in] args args tuple passed to TaskRunner.\_\_call\_\_ (used for extra task arguments
267  by some task runners)
268 
269  makeTask() can be called with either the 'parsedCmd' argument or 'args' argument set to None,
270  but it must construct identical Task instances in either case.
271 
272  Subclasses may ignore this method entirely if they reimplement both TaskRunner.precall and
273  TaskRunner.\_\_call\_\_
274  """
275  return self.TaskClass(config=self.config, log=self.log)
276 
277  def _precallImpl(self, task, parsedCmd):
278  """The main work of 'precall'
279 
280  We write package versions, schemas and configs, or compare these to existing
281  files on disk if present.
282  """
283  if not parsedCmd.noVersions:
284  task.writePackageVersions(parsedCmd.butler, clobber=parsedCmd.clobberVersions)
285  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
286  task.writeSchemas(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
287 
288  def precall(self, parsedCmd):
289  """!Hook for code that should run exactly once, before multiprocessing is invoked.
290 
291  Must return True if TaskRunner.\_\_call\_\_ should subsequently be called.
292 
293  @warning Implementations must take care to ensure that no unpicklable attributes are added to
294  the TaskRunner itself, for compatibility with multiprocessing.
295 
296  The default implementation writes package versions, schemas and configs, or compares
297  them to existing files on disk if present.
298  """
299  task = self.makeTask(parsedCmd=parsedCmd)
300 
301  if self.doRaise:
302  self._precallImpl(task, parsedCmd)
303  else:
304  try:
305  self._precallImpl(task, parsedCmd)
306  except Exception as e:
307  task.log.fatal("Failed in task initialization: %s" % e)
308  if not isinstance(e, TaskError):
309  traceback.print_exc(file=sys.stderr)
310  return False
311  return True
312 
313  def __call__(self, args):
314  """!Run the Task on a single target.
315 
316  This default implementation assumes that the 'args' is a tuple
317  containing a data reference and a dict of keyword arguments.
318 
319  @warning if you override this method and wish to return something when
320  doReturnResults is false, then it must be picklable to support
321  multiprocessing and it should be small enough that pickling and
322  unpickling do not add excessive overhead.
323 
324  @param args Arguments for Task.run()
325 
326  @return:
327  - None if doReturnResults false
328  - A pipe_base Struct containing these fields if doReturnResults true:
329  - dataRef: the provided data reference
330  - metadata: task metadata after execution of run
331  - result: result returned by task run, or None if the task fails
332  """
333  dataRef, kwargs = args
334  if self.log is None:
335  self.log = Log.getDefaultLogger()
336  if hasattr(dataRef, "dataId"):
337  self.log.MDC("LABEL", str(dataRef.dataId))
338  elif isinstance(dataRef, (list, tuple)):
339  self.log.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
340  task = self.makeTask(args=args)
341  result = None # in case the task fails
342  if self.doRaise:
343  result = task.run(dataRef, **kwargs)
344  else:
345  try:
346  result = task.run(dataRef, **kwargs)
347  except Exception as e:
348  # don't use a try block as we need to preserve the original exception
349  if hasattr(dataRef, "dataId"):
350  task.log.fatal("Failed on dataId=%s: %s" % (dataRef.dataId, e))
351  elif isinstance(dataRef, (list, tuple)):
352  task.log.fatal("Failed on dataId=[%s]: %s" %
353  (",".join([str(_.dataId) for _ in dataRef]), e))
354  else:
355  task.log.fatal("Failed on dataRef=%s: %s" % (dataRef, e))
356 
357  if not isinstance(e, TaskError):
358  traceback.print_exc(file=sys.stderr)
359  task.writeMetadata(dataRef)
360 
361  if self.doReturnResults:
362  return Struct(
363  dataRef=dataRef,
364  metadata=task.metadata,
365  result=result,
366  )
367 
368 
370  """!A TaskRunner for CmdLineTasks that require a 'butler' keyword argument to be passed to
371  their constructor.
372  """
373 
374  def makeTask(self, parsedCmd=None, args=None):
375  """!A variant of the base version that passes a butler argument to the task's constructor
376 
377  @param[in] parsedCmd parsed command-line options, as returned by the argument parser;
378  if specified then args is ignored
379  @param[in] args other arguments; if parsedCmd is None then this must be specified
380 
381  @throw RuntimeError if parsedCmd and args are both None
382  """
383  if parsedCmd is not None:
384  butler = parsedCmd.butler
385  elif args is not None:
386  dataRef, kwargs = args
387  butler = dataRef.butlerSubset.butler
388  else:
389  raise RuntimeError("parsedCmd or args must be specified")
390  return self.TaskClass(config=self.config, log=self.log, butler=butler)
391 
392 
393 class CmdLineTask(Task):
394  """!Base class for command-line tasks: tasks that may be executed from the command line
395 
396  See \ref pipeBase_introduction "pipe_base introduction" to learn what tasks are,
397  and \ref pipeTasks_writeCmdLineTask "how to write a command-line task" for more information
398  about writing command-line tasks.
399  If the second link is broken (as it will be before the documentation is cross-linked)
400  then look at the main page of pipe_tasks documentation for a link.
401 
402  Subclasses must specify the following class variables:
403  * ConfigClass: configuration class for your task (a subclass of \ref lsst.pex.config.config.Config
404  "lsst.pex.config.Config", or if your task needs no configuration, then
405  \ref lsst.pex.config.config.Config "lsst.pex.config.Config" itself)
406  * _DefaultName: default name used for this task (a str)
407 
408  Subclasses may also specify the following class variables:
409  * RunnerClass: a task runner class. The default is TaskRunner, which works for any task
410  with a run method that takes exactly one argument: a data reference. If your task does
411  not meet this requirement then you must supply a variant of TaskRunner; see TaskRunner
412  for more information.
413  * canMultiprocess: the default is True; set False if your task does not support multiprocessing.
414 
415  Subclasses must specify a method named "run":
416  - By default `run` accepts a single butler data reference, but you can specify an alternate task runner
417  (subclass of TaskRunner) as the value of class variable `RunnerClass` if your run method needs
418  something else.
419  - `run` is expected to return its data in a Struct. This provides safety for evolution of the task
420  since new values may be added without harming existing code.
421  - The data returned by `run` must be picklable if your task is to support multiprocessing.
422  """
423  RunnerClass = TaskRunner
424  canMultiprocess = True
425 
426  @classmethod
427  def applyOverrides(cls, config):
428  """!A hook to allow a task to change the values of its config *after* the camera-specific
429  overrides are loaded but before any command-line overrides are applied.
430 
431  This is necessary in some cases because the camera-specific overrides may retarget subtasks,
432  wiping out changes made in ConfigClass.setDefaults. See LSST Trac ticket #2282 for more discussion.
433 
434  @warning This is called by CmdLineTask.parseAndRun; other ways of constructing a config
435  will not apply these overrides.
436 
437  @param[in] cls the class object
438  @param[in] config task configuration (an instance of cls.ConfigClass)
439  """
440  pass
441 
442  @classmethod
443  def parseAndRun(cls, args=None, config=None, log=None, doReturnResults=False):
444  """!Parse an argument list and run the command
445 
446  Calling this method with no arguments specified is the standard way to run a command-line task
447  from the command line. For an example see pipe_tasks `bin/makeSkyMap.py` or almost any other
448  file in that directory.
449 
450  @param cls the class object
451  @param args list of command-line arguments; if `None` use sys.argv
452  @param config config for task (instance of pex_config Config); if `None` use cls.ConfigClass()
453  @param log log (instance of lsst.log.Log); if `None` use the default log
454  @param doReturnResults Return the collected results from each invocation of the task?
455  This is only intended for unit tests and similar use.
456  It can easily exhaust memory (if the task returns enough data and you call it enough times)
457  and it will fail when using multiprocessing if the returned data cannot be pickled.
458 
459  @return a Struct containing:
460  - argumentParser: the argument parser
461  - parsedCmd: the parsed command returned by the argument parser's parse_args method
462  - taskRunner: the task runner used to run the task (an instance of cls.RunnerClass)
463  - resultList: results returned by the task runner's run method, one entry per invocation.
464  This will typically be a list of `None` unless doReturnResults is `True`;
465  see cls.RunnerClass (TaskRunner by default) for more information.
466  """
467  argumentParser = cls._makeArgumentParser()
468  if config is None:
469  config = cls.ConfigClass()
470  parsedCmd = argumentParser.parse_args(config=config, args=args, log=log, override=cls.applyOverrides)
471  taskRunner = cls.RunnerClass(TaskClass=cls, parsedCmd=parsedCmd, doReturnResults=doReturnResults)
472  resultList = taskRunner.run(parsedCmd)
473  return Struct(
474  argumentParser=argumentParser,
475  parsedCmd=parsedCmd,
476  taskRunner=taskRunner,
477  resultList=resultList,
478  )
479 
480  @classmethod
482  """!Create and return an argument parser
483 
484  @param[in] cls the class object
485  @return the argument parser for this task.
486 
487  By default this returns an ArgumentParser with one ID argument named `--id` of dataset type "raw".
488 
489  Your task subclass may need to override this method to change the dataset type or data ref level,
490  or to add additional data ID arguments. If you add additional data ID arguments or your task's
491  run method takes more than a single data reference then you will also have to provide a task-specific
492  task runner (see TaskRunner for more information).
493  """
494  parser = ArgumentParser(name=cls._DefaultName)
495  parser.add_id_argument(name="--id", datasetType="raw",
496  help="data IDs, e.g. --id visit=12345 ccd=1,2^0,3")
497  return parser
498 
499  def writeConfig(self, butler, clobber=False, doBackup=True):
500  """!Write the configuration used for processing the data, or check that an existing
501  one is equal to the new one if present.
502 
503  @param[in] butler data butler used to write the config.
504  The config is written to dataset type self._getConfigName()
505  @param[in] clobber a boolean flag that controls what happens if a config already has been saved:
506  - True: overwrite the existing config
507  - False: raise TaskError if this config does not match the existing config
508  """
509  configName = self._getConfigName()
510  if configName is None:
511  return
512  if clobber:
513  butler.put(self.config, configName, doBackup=doBackup)
514  elif butler.datasetExists(configName):
515  # this may be subject to a race condition; see #2789
516  try:
517  oldConfig = butler.get(configName, immediate=True)
518  except Exception as exc:
519  raise type(exc)("Unable to read stored config file %s (%s); consider using --clobber-config" %
520  (configName, exc))
521  output = lambda msg: self.log.fatal("Comparing configuration: " + msg)
522  if not self.config.compare(oldConfig, shortcut=False, output=output):
523  raise TaskError(
524  ("Config does not match existing task config %r on disk; tasks configurations " +
525  "must be consistent within the same output repo (override with --clobber-config)") %
526  (configName,))
527  else:
528  butler.put(self.config, configName)
529 
530  def writeSchemas(self, butler, clobber=False, doBackup=True):
531  """!Write the schemas returned by \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs"
532 
533  @param[in] butler data butler used to write the schema.
534  Each schema is written to the dataset type specified as the key in the dict returned by
535  \ref task.Task.getAllSchemaCatalogs "getAllSchemaCatalogs".
536  @param[in] clobber a boolean flag that controls what happens if a schema already has been saved:
537  - True: overwrite the existing schema
538  - False: raise TaskError if this schema does not match the existing schema
539 
540  @warning if clobber is False and an existing schema does not match a current schema,
541  then some schemas may have been saved successfully and others may not, and there is no easy way to
542  tell which is which.
543  """
544  for dataset, catalog in self.getAllSchemaCatalogs().items():
545  schemaDataset = dataset + "_schema"
546  if clobber:
547  butler.put(catalog, schemaDataset, doBackup=doBackup)
548  elif butler.datasetExists(schemaDataset):
549  oldSchema = butler.get(schemaDataset, immediate=True).getSchema()
550  if not oldSchema.compare(catalog.getSchema(), afwTable.Schema.IDENTICAL):
551  raise TaskError(
552  ("New schema does not match schema %r on disk; schemas must be " +
553  " consistent within the same output repo (override with --clobber-config)") %
554  (dataset,))
555  else:
556  butler.put(catalog, schemaDataset)
557 
558  def writeMetadata(self, dataRef):
559  """!Write the metadata produced from processing the data
560 
561  @param[in] dataRef butler data reference used to write the metadata.
562  The metadata is written to dataset type self._getMetadataName()
563  """
564  try:
565  metadataName = self._getMetadataName()
566  if metadataName is not None:
567  dataRef.put(self.getFullMetadata(), metadataName)
568  except Exception as e:
569  self.log.warn("Could not persist metadata for dataId=%s: %s" % (dataRef.dataId, e,))
570 
571  def writePackageVersions(self, butler, clobber=False, doBackup=True, dataset="packages"):
572  """!Compare and write package versions
573 
574  We retrieve the persisted list of packages and compare with what we're currently using.
575  We raise TaskError if there's a version mismatch.
576 
577  Note that this operation is subject to a race condition.
578 
579  @param[in] butler data butler used to read/write the package versions
580  @param[in] clobber overwrite any existing config? no comparison will be made
581  @param[in] doBackup if clobbering, should we backup the old files?
582  @param[in] dataset name of dataset to read/write
583  """
584  packages = Packages.fromSystem()
585 
586  if clobber:
587  return butler.put(packages, dataset, doBackup=doBackup)
588  if not butler.datasetExists(dataset):
589  return butler.put(packages, dataset)
590 
591  try:
592  old = butler.get(dataset, immediate=True)
593  except Exception as exc:
594  raise type(exc)("Unable to read stored version dataset %s (%s); "
595  "consider using --clobber-versions or --no-versions" %
596  (dataset, exc))
597  # Note that because we can only detect python modules that have been imported, the stored
598  # list of products may be more or less complete than what we have now. What's important is
599  # that the products that are in common have the same version.
600  diff = packages.difference(old)
601  if diff:
602  raise TaskError(
603  "Version mismatch (" +
604  "; ".join("%s: %s vs %s" % (pkg, diff[pkg][1], diff[pkg][0]) for pkg in diff) +
605  "); consider using --clobber-versions or --no-versions")
606  # Update the old set of packages in case we have more packages that haven't been persisted.
607  extra = packages.extra(old)
608  if extra:
609  old.update(packages)
610  butler.put(old, dataset, doBackup=doBackup)
611 
612  def _getConfigName(self):
613  """!Return the name of the config dataset type, or None if config is not to be persisted
614 
615  @note The name may depend on the config; that is why this is not a class method.
616  """
617  return self._DefaultName + "_config"
618 
619  def _getMetadataName(self):
620  """!Return the name of the metadata dataset type, or None if metadata is not to be persisted
621 
622  @note The name may depend on the config; that is why this is not a class method.
623  """
624  return self._DefaultName + "_metadata"
def precall
Hook for code that should run exactly once, before multiprocessing is invoked.
Definition: cmdLineTask.py:288
def __init__
Construct a TaskRunner.
Definition: cmdLineTask.py:138
def makeTask
Create a Task instance.
Definition: cmdLineTask.py:262
def writePackageVersions
Compare and write package versions.
Definition: cmdLineTask.py:571
bool disableImplicitThreading()
Definition: threads.cc:132
def _makeArgumentParser
Create and return an argument parser.
Definition: cmdLineTask.py:481
def makeTask
A variant of the base version that passes a butler argument to the task&#39;s constructor.
Definition: cmdLineTask.py:374
def run
Run the task on all targets.
Definition: cmdLineTask.py:181
def __call__
Run the Task on a single target.
Definition: cmdLineTask.py:313
Definition: Log.h:716
def writeConfig
Write the configuration used for processing the data, or check that an existing one is equal to the n...
Definition: cmdLineTask.py:499
def _getConfigName
Return the name of the config dataset type, or None if config is not to be persisted.
Definition: cmdLineTask.py:612
def applyOverrides
A hook to allow a task to change the values of its config after the camera-specific overrides are loa...
Definition: cmdLineTask.py:427
def parseAndRun
Parse an argument list and run the command.
Definition: cmdLineTask.py:443
def _getMetadataName
Return the name of the metadata dataset type, or None if metadata is not to be persisted.
Definition: cmdLineTask.py:619
def writeMetadata
Write the metadata produced from processing the data.
Definition: cmdLineTask.py:558
def prepareForMultiProcessing
Prepare this instance for multiprocessing by removing optional non-picklable elements.
Definition: cmdLineTask.py:174
def getTargetList
Return a list of (dataRef, kwargs) to be used as arguments for TaskRunner.
Definition: cmdLineTask.py:220
A TaskRunner for CmdLineTasks that require a &#39;butler&#39; keyword argument to be passed to their construc...
Definition: cmdLineTask.py:369
Run a command-line task, using multiprocessing if requested.
Definition: cmdLineTask.py:106
def profile
Context manager for profiling with cProfile.
Definition: cmdLineTask.py:73
def writeSchemas
Write the schemas returned by getAllSchemaCatalogs.
Definition: cmdLineTask.py:530
Base class for command-line tasks: tasks that may be executed from the command line.
Definition: cmdLineTask.py:393