LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
startPipeline.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 from __future__ import with_statement
24 import sys
25 import os
26 import subprocess
27 import time
28 
29 import eups
30 import lsst.pex.logging
31 
32 RootIoDir = "/share/DC2root"
33 Dc2PipeDir = "/share/stack/dc2pipe"
34 DbHost = "lsst10.ncsa.uiuc.edu "
35 DbUser = "test"
36 DbPassword = "globular.test"
37 DbCmdFiles = ["lsstSchema4mysql.sql", "lsstPipelineSetup4mysql.sql"]
38 
39 def startPipeline(nodeList, pipelinePolicy, runId, createIoDirs=False, createDbTables=False):
40  """Start pipeline execution
41 
42  Inputs:
43  - nodeList: path to mpi machine file; environment variables and relative paths are expanded
44  relative policy paths are relative to the directory containing nodeList
45  - pipelinePolicy: path to pipeline policy file
46  - createIoDirs: create I/O directories for pipeline run
47  - createDbTables: create database tables
48 
49  Set createIoDirs and createDbTables True for DC2 operation, False for running local tests.
50 
51  The node list file contains information about the nodes on which to run the pipeline.
52  In its simplest form there is one line per node in the format:
53  ipaddress[:ncores]
54  where ncores is the number of cores that you wish to use on that node; the defaults is 1.
55  Blank lines and lines beginning with # are ignored.
56  Additional options may be specified; see documentation for MPICH2 used with
57  the MPD process management environment.
58 
59  The pipeline uses one slice just to run the preprocess and postprocess phase;
60  all other slices are used to run slices of the process phase.
61  For example:
62  - To run one slice of the process phase on one host (that has at least 2 CPUs):
63  specify one host with 2 slices (1 for pre/postprocess and 1 for process)
64  - To run three slices of the process phase on two hosts (each with at least 2 CPUs):
65  specify two hosts with 2 slices each (4 slices: 1 for pre/postprocess and 3 for process)
66  """
67  nodeList = os.path.abspath(os.path.expandvars(nodeList))
68  pipelineDir = os.path.dirname(nodeList)
69 
70  # create I/O directories for this run
71  if createIoDirs:
72  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Creating I/O directories for run %s" % (runId,))
73  runRoot = os.path.join(RootIoDir, runId)
74  if os.path.exists(runRoot):
75  raise RuntimeError("runId %s already exists" % (runId,))
76  os.mkdir(runRoot)
77  os.mkdir(os.path.join(runRoot, "ipd"))
78  os.mkdir(os.path.join(runRoot, "ipd", "input"))
79  os.mkdir(os.path.join(runRoot, "ipd", "output"))
80  os.mkdir(os.path.join(runRoot, "mops"))
81  os.mkdir(os.path.join(runRoot, "mops", "input"))
82  os.mkdir(os.path.join(runRoot, "mops", "output"))
83  os.mkdir(os.path.join(runRoot, "assoc"))
84  os.mkdir(os.path.join(runRoot, "assoc", "input"))
85  os.mkdir(os.path.join(runRoot, "assoc", "update"))
86  os.mkdir(os.path.join(runRoot, "assoc", "output"))
87  else:
88  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Not creating I/O directories for run %s" % (runId,))
89 
90  # create database tables for this run
91  if createDbTables:
92  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Creating database tables for run %s" % (runId,))
93  dbBaseArgs = ["mysql", "-h", "lsst10.ncsa.uiuc.edu", "-u"+DbUser, "-p"+DbPassword]
94  subprocess.call(dbBaseArgs + ["-e", 'create database "%s"' % (runId,)])
95  for sqlCmdFile in DbCmdFiles:
96  with file(os.path.join(Dc2PipeDir, sqlCmdFile)) as sqlFile:
97  subprocess.call(dbBaseArgs + [runId], stdin=sqlFile)
98  else:
99  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Not creating database tables for run %s" % (runId,))
100 
101  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "pipelineDir=%s" % (pipelineDir,))
102 
103  nnodes, ncores = parseNodeList(nodeList)
104  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "nnodes=%s; ncores=%s; nslices=%s" % \
105  (nnodes, ncores, ncores-1))
106 
107  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdboot")
108  subprocess.call(["mpdboot", "--totalnum=%s" % (nnodes,), "--file=%s" % (nodeList,), "--verbose"])
109 
110  time.sleep(3)
111  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdtrace")
112  subprocess.call(["mpdtrace", "-l"])
113  time.sleep(2)
114 
115  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpiexec")
116  subprocess.call(
117  ["mpiexec", "-usize", str(ncores), "-machinefile", nodeList, "-n", "1",
118  "runPipeline.py", pipelinePolicy, runId],
119  cwd = pipelineDir,
120  )
121 
122  time.sleep(1)
123  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdallexit")
124  subprocess.call(["mpdallexit"])
125 
126 def parseNodeList(nodeList):
127  """Return (nnodes, ncores)
128  where:
129  - nnodes = number of nodes (host IP addresses) in file
130  - ncores = total number of CPU cores specified
131  """
132  nnodes = 0
133  ncores = 0
134  with file(nodeList, "r") as nodeFile:
135  for line in nodeFile:
136  line = line.strip()
137  if not line:
138  continue
139  if line.startswith("#"):
140  continue
141 
142  try:
143  # strip optional extra arguments
144  hostInfo = line.split()[0]
145  # parse optional number of slices
146  hostSlice = hostInfo.split(":")
147  if len(hostSlice) == 1:
148  ncores += 1
149  elif len(hostSlice) == 2:
150  ncores += int(hostSlice[1])
151  else:
152  raise RuntimeError("Could not parse host info %r" % hostInfo)
153  except Exception, e:
154  raise RuntimeError("Cannot parse nodeList line %r; error = %s" % (line, e))
155  nnodes += 1
156  return (nnodes, ncores)
157 
158 
limited backward compatibility to the DC2 run-time trace facilities
Definition: Trace.h:93