LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
WorkflowManager.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 from __future__ import print_function
24 from builtins import object
25 from lsst.ctrl.orca.NamedClassFactory import NamedClassFactory
26 from lsst.ctrl.orca.multithreading import SharedData
27 from lsst.ctrl.orca.exceptions import MultiIssueConfigurationError
28 import lsst.log as log
29 
30 ##
31 # @brief workflow manager base class
32 #
33 
34 
35 class WorkflowManager(object):
36  """Manage lifecycle of this workflow.
37 
38  Parameters
39  ----------
40  name : `str`
41  name of this workflow.
42  runid : `str`
43  run id.
44  repository : `str`
45  repository directory
46  prodConfig : `Config`
47  production Config
48  wfConfig : `Config`
49  workflow Config
50  """
51 
52  def __init__(self, name, runid, repository, prodConfig, wfConfig):
53 
54  # _locked: a container for data to be shared across threads that
55  # have access to this object.
56  self._locked = SharedData.SharedData(False)
57 
58  # workflow name
59  self.name = "unnamed"
60  if name is not None:
61  self.name = name
62 
63  # run id of this workflow
64  self.runid = runid
65 
66  # repository where the configuration is kept
67  self.repository = repository
68 
69  # workflow configuration
70  self.wfConfig = wfConfig
71 
72  # production configuration
73  self.prodConfig = prodConfig
74 
76 
77  log.debug("WorkflowManager:__init__")
78 
79  # the urgency level of how fast to stop the workflow
80  self.urgency = 0
81  self._launcher = None
82  self._monitor = None
83 
84  # @deprecated return the name of this workflow
85  def getName(self):
86  return self.name
87 
88  ##
89  # @brief setup, launch and monitor a workflow to its completion, and then
90  # clean-up.
91  #
92  def runWorkflow(self, statusListener):
93  """ setup, launch and monitor a workflow to its completion, and then clean up
94  """
95  log.debug("WorkflowManager:runWorkflow")
96 
97  if not self.isRunnable():
98  if self.isRunning():
99  log.info("Workflow %s is already running" % self.runid)
100  if self.isDone():
101  log.info("Workflow %s has already run; start with new runid" % self.runid)
102  return False
103 
104  try:
105  self._locked.acquire()
106 
107  if self._workflowConfigurator is None:
109  self._monitor = self._workflowLauncher.launch(statusListener)
110 
111  self.cleanUp()
112 
113  finally:
114  self._locked.release()
115  return self._monitor
116 
117  def stopWorkflow(self, urgency):
118  """Stop the workflow
119 
120  Parameters
121  ----------
122  urgency : `int`
123  urgency at which to shut down this workflow
124  """
125 
126  log.debug("WorkflowManager:stopWorkflow")
127  if self._monitor:
128  self._monitor.stopWorkflow(urgency)
129  else:
130  log.info("Workflow %s is not running" % self.name)
131 
132  def cleanUp(self):
133  """Carry out post-execution tasks for removing workflow data and state from the
134  platform and archiving/ingesting products as needed.
135  """
136 
137  log.debug("WorkflowManager:cleanUp")
138 
139  def configure(self, provSetup=None, workflowVerbosity=None):
140  """Prepare a workflow for launching
141  Parameters
142  ----------
143  provSetup : `object`
144  A provenance setup object to pass to Configurator instances.
145  workflowVerbosity : `int`
146  The logging verbosity level to set for workflows
147 
148  Returns
149  -------
150  WorkflowLauncher
151  """
152  log.debug("WorkflowManager:configure")
153  if self._workflowConfigurator:
154  log.info("production has already been configured.")
155  return
156 
157  # lock this branch of code
158  try:
159  self._locked.acquire()
160 
162  self.runid, self.repository, self.name, self.wfConfig, self.prodConfig)
163  self._workflowLauncher = self._workflowConfigurator.configure(provSetup, workflowVerbosity)
164  finally:
165  self._locked.release()
166 
167  # do specialized workflow level configuration here, this may include
168  # calling ProvenanceSetup.getWorkflowCommands()
169  return self._workflowLauncher
170 
171  def createConfigurator(self, runid, repository, wfName, wfConfig, prodConfig):
172  """Create a Workflow configurator for this workflow.
173 
174  Parameters
175  ----------
176  runid : `str`
177  the production run id
178  repository : `str`
179  the directory location of the repository
180  wfName : `str`
181  the workflow name
182  wfConfig : Config
183  the config describing the workflow
184  prodConfig : Config
185  the config describing the overall production. This provides common data
186  that needs to be shared with all pipelines.
187 
188  Returns
189  -------
190  WorkflowConfigurator
191  """
192  log.debug("WorkflowManager:createConfigurator")
193 
194  className = wfConfig.configurationClass
195  classFactory = NamedClassFactory()
196 
197  configuratorClass = classFactory.createClass(className)
198  configurator = configuratorClass(self.runid, repository, prodConfig, wfConfig, wfName)
199  return configurator
200 
201  def isRunning(self):
202  """Report whether workflow is currently running
203  """
204  if self._monitor:
205  return self._monitor.isRunning()
206  return False
207 
208  def isDone(self):
209  """Report whether workflow has completed
210 
211  Returns
212  -------
213  True if the workflow has been run to completion.
214 
215  Notes
216  -----
217  This will be true if the workflow has run normally through cleaned up or if it was stopped
218  and clean-up has been called.
219  """
220  log.debug("WorkflowManager:isDone")
221  if self._monitor:
222  return self._monitor.isDone()
223  return False
224 
225  def isRunnable(self):
226  """Report whether workflow is capable of running
227 
228  Returns
229  -------
230  True if the workflow can still be called.
231 
232  Notes
233  -----
234  This may return False because the workflow has already been run and cannot be re-run.
235  """
236  log.debug("WorkflowManager:isRunnable")
237  return not self.isRunning() and not self.isDone()
238 
239  def checkConfiguration(self, care=1, issueExc=None):
240  """Runs checks that ensure that the Workflow has been properly set up.
241 
242  Raises
243  ------
244  MultiIssueConfigurationError if problems are found
245 
246  Parameters
247  ----------
248  care : `int`
249  the thoroughness of the checks. In general, a higher number will result in more checks.
250  issueExc : `MultiIssueConfigurationError`
251  An instance of MultiIssueConfigurationError to add problems to. If not None, this
252  function will not raise an exception when problems are encountered; they will
253  merely be added to the instance. It is assumed that the caller will raise that
254  exception is necessary.
255  """
256  log.debug("WorkflowManager:createConfiguration")
257 
258  myProblems = issueExc
259  if myProblems is None:
260  myProblems = MultiIssueConfigurationError("problems encountered while checking configuration")
261 
262  # do the checks
263 
264  # raise exception if problems found
265  if not issueExc and myProblems.hasProblems():
266  raise myProblems
267 
268  def getWorkflowName(self):
269  """Accessor to workflow name
270 
271  Returns
272  -------
273  name : `str`
274  The name of this workflow.
275  """
276  return self.name
277 
278  def getNodeCount(self):
279  """Accessor to the number of nodes specified in this configuration
280 
281  Returns
282  -------
283  The number of nodes used by this workflow.
284  """
285  return self._workflowConfigurator.getNodeCount()
def runWorkflow
setup, launch and monitor a workflow to its completion, and then clean-up.