LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
CondorJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008, 2009, 2010 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 
26 import os
27 import subprocess
28 import sys
29 import re
30 import time
31 import lsst.log as log
32 
33 
34 ## CondorJobs - handles interaction with HTCondor
35 # This class is highly dependent on the output of the condor commands
36 # condor_submit and condor_q
37 #
38 class CondorJobs:
39  ## initializer
40  def __init__(self):
41  log.debug("CondorJobs:__init__")
42  return
43 
44 
45  ## submit a condor file, and return the job number associated with it.
46  # expected output:
47  # Submitting job(s).
48  # Logging submit event(s).
49  # 1 job(s) submitted to cluster 1317.
50 
51  def submitJob(self, condorFile):
52  log.debug("CondorJobs:submitJob")
53  clusterexp = re.compile("1 job\(s\) submitted to cluster (\d+).")
54 
55  submitRequest = "condor_submit %s" % condorFile
56 
57  pop = os.popen(submitRequest, "r")
58 
59  line = pop.readline()
60  #line = pop.readline()
61  line = pop.readline()
62  num = clusterexp.findall(line)
63  if len(num) == 0:
64  return None
65  print "submitted job # %s as file %s" % (num[0], condorFile)
66  return num[0]
67 
68 
69 
70  ## wait for a condor job to reach it's run state.
71  # expected output:
72  #-- Submitter: srp@lsst6.ncsa.uiuc.edu : <141.142.15.103:40900> : lsst6.ncsa.uiuc.edu
73  # ID OWNER SUBMITTED RUN_TIME ST PRI SIZE CMD
74  #1016.0 srp 5/24 09:17 0+00:00:00 I 0 0.0 launch_joboffices_
75  #1017.0 srp 5/24 09:18 0+00:00:00 R 0 0.0 launch_joboffices_
76 
77  def waitForJobToRun(self, num, extramsg=None):
78  log.debug("CondorJobs:waitForJobToRun")
79  jobNum = "%s.0" % num
80  queueExp = re.compile("\S+")
81  cJobSeen = 0
82  print "waiting for job %s to run." % num
83  if extramsg != None:
84  print extramsg
85  secondsWaited = 0
86  while 1:
87  pop = os.popen("condor_q", "r")
88  bJobSeenNow = False
89  if (secondsWaited > 0) and ((secondsWaited % 60) == 0):
90  minutes = secondsWaited/60
91  print "waited %d minute%s so far. still waiting for job %s to run." % ((secondsWaited / 60), ("" if (minutes == 1) else "s"), num)
92  while 1:
93  line = pop.readline()
94  if not line:
95  break
96  values = queueExp.findall(line)
97  if len(values) == 0:
98  continue
99  runstate = values[5]
100  if (values[0] == jobNum):
101  cJobSeen = cJobSeen + 1
102  bJobSeenNow = True
103  if (values[0] == jobNum) and (runstate == 'R'):
104  pop.close()
105  print "Job %s is now being run." % num
106  return runstate
107  if (values[0] == jobNum) and (runstate == 'H'):
108  pop.close()
109  # throw exception here
110  print "Job %s is being held. Please review the logs." % num
111  return runstate
112  if (values[0] == jobNum) and (runstate == 'X'):
113  print values
114  pop.close()
115  # throw exception here
116  print "Saw job %s, but it was being aborted" % num
117  return runstate
118  if (values[0] == jobNum) and (runstate == 'C'):
119  pop.close()
120  # throw exception here
121  print "Job %s is being cancelled." % num
122  return runstate
123  # check to see if we've seen the job before, but that
124  # it disappeared
125  if (cJobSeen > 0) and (bJobSeenNow == False):
126  pop.close()
127  print "Was monitoring job %s, but it exitted." % num
128  # throw exception
129  return None
130  pop.close()
131  time.sleep(1)
132  secondsWaited = secondsWaited + 1
133 
134  ## waits for all jobs to enter the run state
135  def waitForAllJobsToRun(self, numList):
136  log.debug("CondorJobs:waitForAllJobsToRun")
137  queueExp = re.compile("\S+")
138  jobList = list(numList)
139  while 1:
140  pop = os.popen("condor_q", "r")
141  while 1:
142  line = pop.readline()
143  if not line:
144  break
145  values = queueExp.findall(line)
146  if len(values) == 0:
147  continue
148  jobNum = values[0]
149  runstate = values[5]
150  for jobEntry in jobList:
151  jobId = "%s.0" % jobEntry
152  if (jobNum == jobId) and (runstate == 'R'):
153  jobList = [job for job in jobList if job[:] != jobEntry]
154  if len(jobList) == 0:
155  return
156  break
157  else:
158  continue
159  if (jobNum == jobEntry) and (runstate == 'H'):
160  pop.close()
161  # throw exception here
162  return
163  pop.close()
164  time.sleep(1)
165 
166  ## submit a condor dag and return its cluster number
167  def condorSubmitDag(self, filename):
168  log.debug("CondorJobs: submitCondorDag "+filename)
169  # Just a note about why this was done this way...
170  # There's something wierd about how "condor_submit_dag" prints it's output.
171  # If you run it on the command line, it'll print the "1 job(s) submitted"
172  # message as one of the last lines of output.
173  # If you redirect output, even on the command line, to a file, it will
174  # be one of the first lines.
175  # In an effort to avoid having to fix any output behavior issues in the
176  # future, we just try and match every line of output with "1 jobs(s) submitted"
177  # and if we find, it, we grab the cluster id out of that line.
178  clusterexp = re.compile("1 job\(s\) submitted to cluster (\d+).")
179  cmd = "condor_submit_dag %s" % filename
180  print cmd
181  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
182  output = []
183  line = process.stdout.readline()
184  i = 0
185  while line != "":
186  line = line.strip()
187  output.append(line)
188  line = process.stdout.readline()
189  i += 1
190  for line in output:
191  num = clusterexp.findall(line)
192  if len(num) != 0:
193  # read the rest (if any) and terminate
194  stdoutdata, stderrdata = process.communicate()
195  return num[0]
196  # read the rest (if any) and terminate
197  stdoutdata, stderrdata = process.communicate()
198  return -1
199 
200  ## kill the HTCondor job with a this id
201  def killCondorId(self, cid):
202  log.debug("CondorJobs: killCondorId"+str(cid))
203  cmd = "condor_rm "+str(cid)
204  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
205  line = process.stdout.readline()
206  while line != "":
207  line = line.strip()
208  print line
209  line = process.stdout.readline()
210  # read the rest (if any) and terminate
211  stdoutdata, stderrdata = process.communicate()
212 
213  ## check to see if the job with id "cid" is still alive
214  def isJobAlive(self,cid):
215  jobNum = "%s.0" % cid
216  queueExp = re.compile("\S+")
217  process = subprocess.Popen("condor_q", shell=False, stdout=subprocess.PIPE)
218  while 1:
219  line = process.stdout.readline()
220  if not line:
221  break
222  values = queueExp.findall(line)
223  if len(values) == 0:
224  continue
225  if (values[0] == jobNum):
226  # read the rest (if any) and terminate
227  stdoutdata, stderrdata = process.communicate()
228  return True
229  # read the rest (if any) and terminate
230  stdoutdata, stderrdata = process.communicate()
231  return False
def killCondorId
kill the HTCondor job with a this id
Definition: CondorJobs.py:201
def submitJob
submit a condor file, and return the job number associated with it.
Definition: CondorJobs.py:51
def waitForAllJobsToRun
waits for all jobs to enter the run state
Definition: CondorJobs.py:135
Definition: Log.h:716
def condorSubmitDag
submit a condor dag and return its cluster number
Definition: CondorJobs.py:167
CondorJobs - handles interaction with HTCondor This class is highly dependent on the output of the co...
Definition: CondorJobs.py:38
def waitForJobToRun
wait for a condor job to reach it&#39;s run state.
Definition: CondorJobs.py:77
def isJobAlive
check to see if the job with id &quot;cid&quot; is still alive
Definition: CondorJobs.py:214