LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
CondorJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2008, 2009, 2010 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 
26 from __future__ import print_function
27 from builtins import str
28 from builtins import object
29 import os
30 import subprocess
31 import re
32 import time
33 import lsst.log as log
34 
35 
36 class CondorJobs(object):
37  """Handles interaction with HTCondor
38  This class is highly dependent on the output of the condor commands
39  condor_submit and condor_q
40  """
41 
42  def __init__(self):
43  log.debug("CondorJobs:__init__")
44  return
45 
46  def submitJob(self, condorFile):
47  """Submit a condor file, and return the job number associated with it.
48 
49  Parameters
50  ----------
51  condorFile: `str`
52  condor submit file.
53 
54  Notes
55  -----
56  expected output:
57  Submitting job(s).
58  Logging submit event(s).
59  1 job(s) submitted to cluster 1317.
60  """
61  log.debug("CondorJobs:submitJob")
62  clusterexp = re.compile("1 job\(s\) submitted to cluster (\d+).")
63 
64  submitRequest = "condor_submit %s" % condorFile
65 
66  pop = os.popen(submitRequest, "r")
67 
68  line = pop.readline()
69  line = pop.readline()
70  line = line.decode()
71  num = clusterexp.findall(line)
72  if len(num) == 0:
73  return None
74  print("submitted job # %s as file %s" % (num[0], condorFile))
75  return num[0]
76 
77  def waitForJobToRun(self, num, extramsg=None):
78  """Wait for a condor job to reach it's run state.
79 
80  Parameters
81  ----------
82  num : `str`
83  job number.
84  extramsg : `str`, optional
85  addition message to print to stdout
86 
87  Notes
88  -----
89  expected output:
90  -- Submitter: srp@lsst6.ncsa.uiuc.edu : <141.142.15.103:40900> : lsst6.ncsa.uiuc.edu
91  ID OWNER SUBMITTED RUN_TIME ST PRI SIZE CMD
92  1016.0 srp 5/24 09:17 0+00:00:00 I 0 0.0 launch_joboffices_
93  1017.0 srp 5/24 09:18 0+00:00:00 R 0 0.0 launch_joboffices_
94  """
95  log.debug("CondorJobs:waitForJobToRun")
96  jobNum = "%s.0" % num
97  queueExp = re.compile("\S+")
98  cJobSeen = 0
99  print("waiting for job %s to run." % num)
100  if extramsg is not None:
101  print(extramsg)
102  secondsWaited = 0
103  while 1:
104  pop = os.popen("condor_q", "r")
105  bJobSeenNow = False
106  if (secondsWaited > 0) and ((secondsWaited % 60) == 0):
107  minutes = secondsWaited/60
108  msg = "waited %d minute%s so far. still waiting for job %s to run."
109  print(msg % ((secondsWaited / 60), ("" if (minutes == 1) else "s"), num))
110  while 1:
111  line = pop.readline()
112  line = line.decode()
113  if not line:
114  break
115  values = queueExp.findall(line)
116  if len(values) == 0:
117  continue
118  runstate = values[5]
119  if (values[0] == jobNum):
120  cJobSeen = cJobSeen + 1
121  bJobSeenNow = True
122  if (values[0] == jobNum) and (runstate == 'R'):
123  pop.close()
124  print("Job %s is now being run." % num)
125  return runstate
126  if (values[0] == jobNum) and (runstate == 'H'):
127  pop.close()
128  # throw exception here
129  print("Job %s is being held. Please review the logs." % num)
130  return runstate
131  if (values[0] == jobNum) and (runstate == 'X'):
132  print(values)
133  pop.close()
134  # throw exception here
135  print("Saw job %s, but it was being aborted" % num)
136  return runstate
137  if (values[0] == jobNum) and (runstate == 'C'):
138  pop.close()
139  # throw exception here
140  print("Job %s is being cancelled." % num)
141  return runstate
142  # check to see if we've seen the job before, but that
143  # it disappeared
144  if (cJobSeen > 0) and not bJobSeenNow:
145  pop.close()
146  print("Was monitoring job %s, but it exitted." % num)
147  # throw exception
148  return None
149  pop.close()
150  time.sleep(1)
151  secondsWaited = secondsWaited + 1
152 
153  def waitForAllJobsToRun(self, numList):
154  """Waits for all jobs to enter the run state
155 
156  Parameters
157  ----------
158  numList : `list`
159  list of condor job ids
160  """
161  log.debug("CondorJobs:waitForAllJobsToRun")
162  queueExp = re.compile("\S+")
163  jobList = list(numList)
164  while 1:
165  pop = os.popen("condor_q", "r")
166  while 1:
167  line = pop.readline()
168  line = line.decode()
169  if not line:
170  break
171  values = queueExp.findall(line)
172  if len(values) == 0:
173  continue
174  jobNum = values[0]
175  runstate = values[5]
176  for jobEntry in jobList:
177  jobId = "%s.0" % jobEntry
178  if (jobNum == jobId) and (runstate == 'R'):
179  jobList = [job for job in jobList if job[:] != jobEntry]
180  if len(jobList) == 0:
181  return
182  break
183  else:
184  continue
185  if (jobNum == jobEntry) and (runstate == 'H'):
186  pop.close()
187  # throw exception here
188  return
189  pop.close()
190  time.sleep(1)
191 
192  def condorSubmitDag(self, filename):
193  """Submit a condor dag and return its cluster number
194 
195  Parameters
196  ----------
197  filename : `str`
198  name of condor DAG file
199  """
200  log.debug("CondorJobs: condorSubmitDag %s", filename)
201  # Just a note about why this was done this way...
202  # There's something wierd about how "condor_submit_dag" prints it's output.
203  # If you run it on the command line, it'll print the "1 job(s) submitted"
204  # message as one of the last lines of output.
205  # If you redirect output, even on the command line, to a file, it will
206  # be one of the first lines.
207  # In an effort to avoid having to fix any output behavior issues in the
208  # future, we just try and match every line of output with "1 jobs(s) submitted"
209  # and if we find, it, we grab the cluster id out of that line.
210  clusterexp = re.compile("1 job\(s\) submitted to cluster (\d+).")
211  cmd = "condor_submit_dag %s" % filename
212  log.debug(cmd)
213  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
214  output = []
215  line = process.stdout.readline()
216  line = line.decode()
217  i = 0
218  while line != "":
219  line = line.strip()
220  output.append(line)
221  line = process.stdout.readline()
222  line = line.decode()
223  i += 1
224  for line in output:
225  num = clusterexp.findall(line)
226  if len(num) != 0:
227  # read the rest (if any) and terminate
228  stdoutdata, stderrdata = process.communicate()
229  return num[0]
230  # read the rest (if any) and terminate
231  stdoutdata, stderrdata = process.communicate()
232  return -1
233 
234  def killCondorId(self, cid):
235  """Kill the HTCondor job with a this id
236 
237  Parameters
238  ----------
239  cid : `str`
240  condor job id
241  """
242  log.debug("CondorJobs: killCondorId %s", str(cid))
243  cmd = "condor_rm "+str(cid)
244  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
245  line = process.stdout.readline()
246  line = line.decode()
247  while line != "":
248  line = line.strip()
249  line = process.stdout.readline()
250  line = line.decode()
251  # read the rest (if any) and terminate
252  stdoutdata, stderrdata = process.communicate()
253 
254  def isJobAlive(self, cid):
255  """Check to see if the job with id "cid" is still alive
256 
257  Parameters
258  ----------
259  cid : `str`
260  condor job id
261  """
262  jobNum = int(cid)
263  cmd = "condor_q -af ClusterId"
264  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
265  while 1:
266  line = process.stdout.readline()
267  line = line.strip()
268  if not line:
269  break
270  value = int(line)
271  if value == jobNum:
272  # read the rest (if any) and terminate
273  stdoutdata, stderrdata = process.communicate()
274  return True
275  # read the rest (if any) and terminate
276  stdoutdata, stderrdata = process.communicate()
277  return False