LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
PegasusJobs.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 #
4 # LSST Data Management System
5 # Copyright 2017 LSST Corporation.
6 #
7 # This product includes software developed by the
8 # LSST Project (http://www.lsst.org/).
9 #
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the LSST License Statement and
21 # the GNU General Public License along with this program. If not,
22 # see <http://www.lsstcorp.org/LegalNotices/>.
23 #
24 
25 
26 from __future__ import print_function
27 from builtins import str
28 from builtins import object
29 import os
30 import subprocess
31 import re
32 import time
33 import lsst.log as log
34 from lsst.ctrl.orca.CondorJobs import CondorJobs
35 
37  """Handles interaction with Pegasus
38  This class is highly dependent on the output of the pegasus commands
39  """
40 
41  def __init__(self):
42  log.debug("PegasusJobs:__init__")
43  return
44 
45  def pegasusSubmitDax(self, sitesFile, transformationFile, daxFile):
46  """Submit a pegagus dax and return its cluster number
47 
48  Parameters
49  ----------
50  daxFile : `str`
51  name of pegasus DAX file
52  """
53  log.debug("PegasusJobs: pegasusSubmitDax %s",daxFile)
54  """
55  Notes - This is the type of output we're dealing with....
56  -----
57  ---expected output begin---
58  2017.02.07 14:06:53.482 CST:
59  2017.02.07 14:06:53.488 CST: -----------------------------------------------------------------------
60  2017.02.07 14:06:53.493 CST: File for submitting this DAG to HTCondor : CiHscDax-0.dag.condor.sub
61  2017.02.07 14:06:53.498 CST: Log of DAGMan debugging messages : CiHscDax-0.dag.dagman.out
62  2017.02.07 14:06:53.504 CST: Log of HTCondor library output : CiHscDax-0.dag.lib.out
63  2017.02.07 14:06:53.509 CST: Log of HTCondor library error messages : CiHscDax-0.dag.lib.err
64  2017.02.07 14:06:53.514 CST: Log of the life of condor_dagman itself : CiHscDax-0.dag.dagman.log
65  2017.02.07 14:06:53.519 CST:
66 
67  2017.02.07 14:06:53.525 CST: -no_submit given, not submitting DAG to HTCondor. You can do this with:
68  2017.02.07 14:06:53.535 CST: -----------------------------------------------------------------------
69  2017.02.07 14:06:55.207 CST: Your database is compatible with Pegasus version: 4.7.0
70  2017.02.07 14:06:55.326 CST: Submitting to condor CiHscDax-0.dag.condor.sub
71  2017.02.07 14:06:55.382 CST: Submitting job(s).
72  2017.02.07 14:06:55.387 CST: 1 job(s) submitted to cluster 164870.
73  2017.02.07 14:06:55.392 CST:
74 
75  2017.02.07 14:06:55.398 CST: Your workflow has been started and is running in the base directory:
76  2017.02.07 14:06:55.403 CST:
77  2017.02.07 14:06:55.408 CST: /scratch/srp/condor_scratch/srp_2017_0207_110530/scripts/submit/srp/pegasus/CiHscDax/run0008
78  2017.02.07 14:06:55.414 CST:
79  2017.02.07 14:06:55.419 CST: *** To monitor the workflow you can run ***
80  2017.02.07 14:06:55.424 CST:
81  2017.02.07 14:06:55.429 CST: pegasus-status -l /scratch/srp/condor_scratch/srp_2017_0207_110530/scripts/submit/srp/pegasus/CiHscDax/run0008
82  2017.02.07 14:06:55.435 CST:
83  2017.02.07 14:06:55.440 CST: *** To remove your workflow run ***
84  2017.02.07 14:06:55.445 CST:
85  2017.02.07 14:06:55.451 CST: pegasus-remove /scratch/srp/condor_scratch/srp_2017_0207_110530/scripts/submit/srp/pegasus/CiHscDax/run0008
86  2017.02.07 14:06:55.456 CST:
87  2017.02.07 14:06:55.637 CST: Time taken to execute is 3.555 seconds
88  ---expected output end---
89  """
90 
91  # expressions to match
92  clusterexp = re.compile(".*1 job\(s\) submitted to cluster (\d+).")
93  statusexp = re.compile(".*pegasus-status -l ((\W|\w)+)")
94  removeexp = re.compile(".*pegasus-remove ((\W|\w)+)")
95 
96  cmd = "pegasus-plan -Dpegasus.transfer.links=true -Dpegasus.catalog.site.file=%s -Dpegasus.catalog.transformation.file=%s -Dpegasus.data.configuration=sharedfs --sites lsstvc --output-dir output --dir submit --dax %s --submit" % (sitesFile, transformationFile, daxFile)
97  print(cmd)
98  log.debug(cmd)
99  process = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE)
100  output = []
101  line = process.stdout.readline()
102  line = line.decode()
103  i = 0
104  while line != "":
105  line = line.strip()
106  output.append(line)
107  line = process.stdout.readline()
108  line = line.decode()
109  i += 1
110 
111  condorClusterId = -1
112  statusInfo = None
113  removeInfo = None
114  for line in output:
115  cluster = clusterexp.findall(line)
116  if len(cluster) != 0:
117  condorClusterId = cluster[0]
118  continue
119  status = statusexp.findall(line)
120  if len(status) != 0:
121  statusInfo = status[0]
122  continue
123  remove = removeexp.findall(line)
124  if len(remove) != 0:
125  removeInfo = remove[0]
126 
127  # read the rest (if any) and terminate
128  stdoutdata, stderrdata = process.communicate()
129  return condorClusterId, statusInfo, removeInfo