LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
WorkflowManager.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 from lsst.ctrl.orca.NamedClassFactory import NamedClassFactory
24 from lsst.ctrl.orca.StatusListener import StatusListener
25 import lsst.log as log
26 import lsst.pex.config as pexConfig
27 from lsst.ctrl.orca.multithreading import SharedData
28 from lsst.ctrl.orca.DataAnnouncer import DataAnnouncer
29 
30 ## workflow manager base class
32  ##
33  # @brief Manage lifecycle of this workflow
34  #
35  def __init__(self, name, runid, repository, prodConfig, wfConfig):
36 
37  # _locked: a container for data to be shared across threads that
38  # have access to this object.
39  self._locked = SharedData(False)
40 
41  ## workflow name
42  self.name = "unnamed"
43  if name != None:
44  self.name = name
45 
46  ## run id of this workflow
47  self.runid = runid
48 
49  ## repository where the configuration is kept
50  self.repository = repository
51 
52  ## workflow configuration
53  self.wfConfig = wfConfig
54 
55  ## production configuration
56  self.prodConfig = prodConfig
57 
59 
60  log.debug("WorkflowManager:__init__")
61 
62  ## the urgency level of how fast to stop the workflow
63  self.urgency = 0
64  self._launcher = None
65  self._monitor = None
66 
67  ##
68  # @deprecated return the name of this workflow
69  #
70  def getName(self):
71  return self.name
72 
73  ##
74  # @brief setup, launch and monitor a workflow to its completion, and then
75  # clean-up.
76  #
77  def runWorkflow(self, statusListener, loggerManagers):
78  log.debug("WorkflowManager:runWorkflow")
79 
80  if not self.isRunnable():
81  if self.isRunning():
82  log.info("Workflow %s is already running" % self.runid)
83  if self.isDone():
84  log.info("Workflow %s has already run; start with new runid" % self.runid)
85  return False
86 
87  try:
88  self._locked.acquire()
89 
90  if self._workflowConfigurator == None:
92  self._monitor = self._workflowLauncher.launch(statusListener, loggerManagers)
93 
94  # self.cleanUp()
95 
96  finally:
97  self._locked.release()
98  return self._monitor
99 
100  ##
101  # @brief stop the workflow.
102  #
103  def stopWorkflow(self, urgency):
104  log.debug("WorkflowManager:stopWorkflow")
105  if self._monitor:
106  self._monitor.stopWorkflow(urgency)
107  else:
108  log.info("Workflow %s is not running" % self.name)
109 
110  ##
111  # @brief carry out post-execution tasks for removing workflow data and
112  # state from the platform and archiving/ingesting products as
113  # needed.
114  #
115  def cleanUp(self):
116  log.debug("WorkflowManager:cleanUp")
117 
118 
119 
120  ##
121  # @brief prepare a workflow for launching.
122  # @param provSetup a provenance setup object to pass to
123  # DatabaseConfigurator instances
124  # @param workflowVerbosity the log level at which to emit messages
125  # @return WorkflowLauncher
126  def configure(self, provSetup=None, workflowVerbosity=None):
127  log.debug("WorkflowManager:configure")
128  if self._workflowConfigurator:
129  log.info("production has already been configured.")
130  return
131 
132  # lock this branch of code
133  try:
134  self._locked.acquire()
135 
136  self._workflowConfigurator = self.createConfigurator(self.runid, self.repository, self.name, self.wfConfig, self.prodConfig)
137  self._workflowLauncher = self._workflowConfigurator.configure(provSetup, workflowVerbosity)
138  finally:
139  self._locked.release()
140 
141  # do specialized workflow level configuration here, this may include
142  # calling ProvenanceSetup.getWorkflowCommands()
143  return self._workflowLauncher
144 
145  ##
146  # @brief create a Workflow configurator for this workflow.
147  #
148  # @param runid the production run id
149  # @param repository the directory location of the repository
150  # @param wfName the workflow name
151  # @param wfConfig the config describing the workflow
152  # @param prodConfig the config describing the overall production. This
153  # provides common data (e.g. event broker host)
154  # that needs to be shared with all pipelines.
155  def createConfigurator(self, runid, repository, wfName, wfConfig, prodConfig):
156  log.debug("WorkflowManager:createConfigurator")
157 
158  className = wfConfig.configurationClass
159  classFactory = NamedClassFactory()
160 
161  configuratorClass = classFactory.createClass(className)
162  configurator = configuratorClass(self.runid, repository, prodConfig, wfConfig, wfName)
163  return configurator
164 
165  ##
166  # @brief determine whether production is currently running
167  #
168  def isRunning(self):
169  if self._monitor:
170  return self._monitor.isRunning()
171  return False
172 
173  ##
174  # @brief return True if the workflow has been run to completion. This will
175  # be true if the workflow has run normally through cleaned up or
176  # if it was stopped and clean-up has been called.
177  #
178  def isDone(self):
179  log.debug("WorkflowManager:isDone")
180  if self._monitor:
181  return self._monitor.isDone()
182  return False
183 
184  ##
185  # @brief return True if the workflow can still be called. This may return
186  # False because the workflow has already been run and cannot be
187  # re-run.
188  #
189  def isRunnable(self):
190  log.debug("WorkflowManager:isRunnable")
191  return not self.isRunning() and not self.isDone()
192 
193  ##
194  # @brief Runs checks that ensure that the Workflow has been properly set up.
195  # @param care the thoroughness of the checks.
196  # @param issueExc an instance of MultiIssueConfigurationError to add
197  # problems to. If not None, this function will not
198  # raise an exception when problems are encountered; they
199  # will merely be added to the instance. It is assumed
200  # that the caller will raise that exception is necessary.
201  #
202  def checkConfiguration(self, care=1, issueExc=None):
203  # care - an indication of how throughly to check. In general, a
204  # higher number will result in more checks being run.
205  log.debug("WorkflowManager:createConfiguration")
206 
207  myProblems = issueExc
208  if myProblems is None:
209  myProblems = MultiIssueConfigurationError("problems encountered while checking configuration")
210 
211  # do the checks
212 
213  # raise exception if problems found
214  if not issueExc and myProblems.hasProblems():
215  raise myProblems
216 
217 
218  ##
219  # return the name of this workflow
220  #
221  def getWorkflowName(self):
222  return self.name
223 
224  ##
225  # return the number of nodes used by f this workflow
226  #
227  def getNodeCount(self):
228  return self._workflowConfigurator.getNodeCount()
229 
230  ##
231  # Announce that data is available for this workflow
232  #
233  def announceData(self):
234  announcer = DataAnnouncer(self.runid, self.prodConfig, self.wfConfig)
235  if announcer.announce():
236  print "Data announced via config for %s" % self.name
237  else:
238  print "No data announced for %s. Waiting for events from external source" % self.name
def isRunning
determine whether production is currently running
def isRunnable
return True if the workflow can still be called.
def cleanUp
carry out post-execution tasks for removing workflow data and state from the platform and archiving/i...
def __init__
Manage lifecycle of this workflow.
Definition: Log.h:716
repository
repository where the configuration is kept
def createConfigurator
create a Workflow configurator for this workflow.
def runWorkflow
setup, launch and monitor a workflow to its completion, and then clean-up.
def checkConfiguration
Runs checks that ensure that the Workflow has been properly set up.
urgency
the urgency level of how fast to stop the workflow
def isDone
return True if the workflow has been run to completion.
def getNodeCount
return the number of nodes used by f this workflow
def announceData
Announce that data is available for this workflow.
def getWorkflowName
return the name of this workflow
def configure
prepare a workflow for launching.