LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
ShutdownThread.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 
24 import thread
25 import threading
26 import os, sys, re, traceback, time
27 from threading import Thread
28 from threading import Event as PyEvent
29 
30 from lsst.pex.logging import Log, BlockTimingLog
31 
32 import lsst.ctrl.events as events
33 
34 """
35 ShutdownThread class manages a separate Python thread that runs
36 alongside a Pipeline for the purpose of 1) listening for Shutdown
37 events and 2) taking appropriate action for processing shutdown
38 in response. The levels that designate the manner in which the shutdown
39 proceeds are :
40  * Shutdown at level 1 : stop immediately by killing process (ugly)
41  * Shutdown at level 2 : exit in a clean manner (Pipeline and Slices) at a synchronization point
42  * Shutdown at level 3 : exit in a clean manner (Pipeline and Slices) at the end of a Stage
43  * Shutdown at level 4 : exit in a clean manner (Pipeline and Slices) at the end of a Visit
44 """
45 
46 
47 class ShutdownThread(threading.Thread):
48 
49  def __init__ (self, pipeline):
50  Thread.__init__(self)
51  self.pipeline = pipeline
52  self._stop = PyEvent()
53  self.pid = os.getpid()
54 
55  # log message levels
56  self.TRACE = BlockTimingLog.INSTRUM+2
57  self.VERB1 = self.TRACE
58  self.VERB2 = self.VERB1 - 1
59  self.VERB3 = self.VERB2 - 1
60  self.logthresh = self.pipeline.getLogThreshold()
61 
62  def getPid (self):
63  return self.pid
64 
65  def setStop (self):
66  self._stop.set()
67 
68  def exit (self):
69  thread.exit()
70 
71  def run(self):
72 
73  runId = self.pipeline.getRun()
74  shutdownTopic = self.pipeline.getShutdownTopic()
75  eventBrokerHost = self.pipeline.getEventBrokerHost()
76 
77  # eventsSystem = events.EventSystem.getDefaultEventSystem()
78  # eventsSystem.createReceiver(eventBrokerHost, shutdownTopic)
79  clause = "RUNID = '" + runId + "'"
80  recv = events.EventReceiver(eventBrokerHost, shutdownTopic, clause)
81  # works
82  # recv = events.EventReceiver(eventBrokerHost, shutdownTopic)
83 
84  sleepTimeout = 2.0
85  transTimeout = 900
86 
87  shutdownEvent = None
88  shutdownPropertySetPtr = None
89 
90  # while(shutdownPropertySetPtr == None):
91  while(shutdownEvent == None):
92  if(self.logthresh == Log.DEBUG):
93  print "ShutdownThread Looping : checking for Shutdown event ... \n"
94  print "ShutdownThread Looping : " + clause
95 
96  time.sleep(sleepTimeout)
97  # shutdownPropertySetPtr = eventsSystem.receive(shutdownTopic, transTimeout)
98  shutdownEvent = recv.receiveEvent(transTimeout)
99  if(shutdownEvent == None):
100  pass
101  else:
102  # if (shutdownPropertySetPtr != None):
103  # if (shutdownEvent != None):
104  shutdownPropertySetPtr = shutdownEvent.getCustomPropertySet()
105  self.level = shutdownPropertySetPtr.getInt("level")
106 
107  if(self._stop.isSet()):
108  if(self.logthresh == self.VERB3):
109  print "ShutdownThread exiting from loop : Stop from Pipeline \n"
110  sys.exit()
111 
112  self.pipeline.setExitLevel(self.level)
113 
114  # Shutdown at level 1 : stop immediately by killing process (ugly)
115  if (self.level == 1):
116  self.pipeline.exit()
117 
118  # Shutdown at level 2 : exit in a clean manner (Pipeline and Slices)
119  # at a synchronization point
120  if (self.level == 2):
121  self.pipeline.setStop()
122 
123  # Shutdown at level 3 : exit in a clean manner (Pipeline and Slices)
124  # at the end of a Stage
125  if (self.level == 3):
126  self.pipeline.setStop()
127 
128  # Shutdown at level 4 : exit in a clean manner (Pipeline and Slices)
129  # at the end of a Visit
130  if (self.level == 4):
131  self.pipeline.setStop()
132 
133  # Technique for handling no-more-date scenario still in discussion
134  # if (self.level == 5):
135  # print "SHUTDOWNTHREAD STOP PIPELINE LEVEL 5 \n"
136  # self.pipeline.setEventTimeout(2000)
137  # self.pipeline.stop()
138 
139