LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Functions | Variables
lsst.pex.harness.startPipeline Namespace Reference

Functions

def startPipeline
 
def parseNodeList
 

Variables

string RootIoDir = "/share/DC2root"
 
string Dc2PipeDir = "/share/stack/dc2pipe"
 
string DbHost = "lsst10.ncsa.uiuc.edu "
 
string DbUser = "test"
 
string DbPassword = "globular.test"
 
list DbCmdFiles = ["lsstSchema4mysql.sql", "lsstPipelineSetup4mysql.sql"]
 

Function Documentation

def lsst.pex.harness.startPipeline.parseNodeList (   nodeList)
Return (nnodes, ncores)
where:
- nnodes = number of nodes (host IP addresses) in file
- ncores = total number of CPU cores specified    

Definition at line 126 of file startPipeline.py.

127 def parseNodeList(nodeList):
128  """Return (nnodes, ncores)
129  where:
130  - nnodes = number of nodes (host IP addresses) in file
131  - ncores = total number of CPU cores specified
132  """
133  nnodes = 0
134  ncores = 0
135  with file(nodeList, "r") as nodeFile:
136  for line in nodeFile:
137  line = line.strip()
138  if not line:
139  continue
140  if line.startswith("#"):
141  continue
142 
143  try:
144  # strip optional extra arguments
145  hostInfo = line.split()[0]
146  # parse optional number of slices
147  hostSlice = hostInfo.split(":")
148  if len(hostSlice) == 1:
149  ncores += 1
150  elif len(hostSlice) == 2:
151  ncores += int(hostSlice[1])
152  else:
153  raise RuntimeError("Could not parse host info %r" % hostInfo)
154  except Exception, e:
155  raise RuntimeError("Cannot parse nodeList line %r; error = %s" % (line, e))
156  nnodes += 1
157  return (nnodes, ncores)
158 
159 
def lsst.pex.harness.startPipeline.startPipeline (   nodeList,
  pipelinePolicy,
  runId,
  createIoDirs = False,
  createDbTables = False 
)
Start pipeline execution

Inputs:
- nodeList: path to mpi machine file; environment variables and relative paths are expanded
  relative policy paths are relative to the directory containing nodeList
- pipelinePolicy: path to pipeline policy file
- createIoDirs: create I/O directories for pipeline run
- createDbTables: create database tables

Set createIoDirs and createDbTables True for DC2 operation, False for running local tests.

The node list file contains information about the nodes on which to run the pipeline.
In its simplest form there is one line per node in the format:
   ipaddress[:ncores]
where ncores is the number of cores that you wish to use on that node; the defaults is 1.
Blank lines and lines beginning with # are ignored.
Additional options may be specified; see documentation for MPICH2 used with
the MPD process management environment.

The pipeline uses one slice just to run the preprocess and postprocess phase;
all other slices are used to run slices of the process phase.
For example:
- To run one slice of the process phase on one host (that has at least 2 CPUs):
  specify one host with 2 slices (1 for pre/postprocess and 1 for process)
- To run three slices of the process phase on two hosts (each with at least 2 CPUs):
  specify two hosts with 2 slices each (4 slices: 1 for pre/postprocess and 3 for process)

Definition at line 39 of file startPipeline.py.

39 
40 def startPipeline(nodeList, pipelinePolicy, runId, createIoDirs=False, createDbTables=False):
41  """Start pipeline execution
42 
43  Inputs:
44  - nodeList: path to mpi machine file; environment variables and relative paths are expanded
45  relative policy paths are relative to the directory containing nodeList
46  - pipelinePolicy: path to pipeline policy file
47  - createIoDirs: create I/O directories for pipeline run
48  - createDbTables: create database tables
49 
50  Set createIoDirs and createDbTables True for DC2 operation, False for running local tests.
51 
52  The node list file contains information about the nodes on which to run the pipeline.
53  In its simplest form there is one line per node in the format:
54  ipaddress[:ncores]
55  where ncores is the number of cores that you wish to use on that node; the defaults is 1.
56  Blank lines and lines beginning with # are ignored.
57  Additional options may be specified; see documentation for MPICH2 used with
58  the MPD process management environment.
59 
60  The pipeline uses one slice just to run the preprocess and postprocess phase;
61  all other slices are used to run slices of the process phase.
62  For example:
63  - To run one slice of the process phase on one host (that has at least 2 CPUs):
64  specify one host with 2 slices (1 for pre/postprocess and 1 for process)
65  - To run three slices of the process phase on two hosts (each with at least 2 CPUs):
66  specify two hosts with 2 slices each (4 slices: 1 for pre/postprocess and 3 for process)
67  """
68  nodeList = os.path.abspath(os.path.expandvars(nodeList))
69  pipelineDir = os.path.dirname(nodeList)
70 
71  # create I/O directories for this run
72  if createIoDirs:
73  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Creating I/O directories for run %s" % (runId,))
74  runRoot = os.path.join(RootIoDir, runId)
75  if os.path.exists(runRoot):
76  raise RuntimeError("runId %s already exists" % (runId,))
77  os.mkdir(runRoot)
78  os.mkdir(os.path.join(runRoot, "ipd"))
79  os.mkdir(os.path.join(runRoot, "ipd", "input"))
80  os.mkdir(os.path.join(runRoot, "ipd", "output"))
81  os.mkdir(os.path.join(runRoot, "mops"))
82  os.mkdir(os.path.join(runRoot, "mops", "input"))
83  os.mkdir(os.path.join(runRoot, "mops", "output"))
84  os.mkdir(os.path.join(runRoot, "assoc"))
85  os.mkdir(os.path.join(runRoot, "assoc", "input"))
86  os.mkdir(os.path.join(runRoot, "assoc", "update"))
87  os.mkdir(os.path.join(runRoot, "assoc", "output"))
88  else:
89  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Not creating I/O directories for run %s" % (runId,))
90 
91  # create database tables for this run
92  if createDbTables:
93  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Creating database tables for run %s" % (runId,))
94  dbBaseArgs = ["mysql", "-h", "lsst10.ncsa.uiuc.edu", "-u"+DbUser, "-p"+DbPassword]
95  subprocess.call(dbBaseArgs + ["-e", 'create database "%s"' % (runId,)])
96  for sqlCmdFile in DbCmdFiles:
97  with file(os.path.join(Dc2PipeDir, sqlCmdFile)) as sqlFile:
98  subprocess.call(dbBaseArgs + [runId], stdin=sqlFile)
99  else:
100  lsst.pex.logging.Trace("pex.harness.startPipeline", 3, "Not creating database tables for run %s" % (runId,))
101 
102  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "pipelineDir=%s" % (pipelineDir,))
103 
104  nnodes, ncores = parseNodeList(nodeList)
105  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "nnodes=%s; ncores=%s; nslices=%s" % \
106  (nnodes, ncores, ncores-1))
107 
108  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdboot")
109  subprocess.call(["mpdboot", "--totalnum=%s" % (nnodes,), "--file=%s" % (nodeList,), "--verbose"])
110 
111  time.sleep(3)
112  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdtrace")
113  subprocess.call(["mpdtrace", "-l"])
114  time.sleep(2)
115 
116  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpiexec")
117  subprocess.call(
118  ["mpiexec", "-usize", str(ncores), "-machinefile", nodeList, "-n", "1",
119  "runPipeline.py", pipelinePolicy, runId],
120  cwd = pipelineDir,
121  )
122 
123  time.sleep(1)
124  lsst.pex.logging.Trace("lsst.pex.harness.startPipeline", 3, "Running mpdallexit")
125  subprocess.call(["mpdallexit"])
limited backward compatibility to the DC2 run-time trace facilities
Definition: Trace.h:93

Variable Documentation

list lsst.pex.harness.startPipeline.DbCmdFiles = ["lsstSchema4mysql.sql", "lsstPipelineSetup4mysql.sql"]

Definition at line 37 of file startPipeline.py.

string lsst.pex.harness.startPipeline.DbHost = "lsst10.ncsa.uiuc.edu "

Definition at line 34 of file startPipeline.py.

string lsst.pex.harness.startPipeline.DbPassword = "globular.test"

Definition at line 36 of file startPipeline.py.

string lsst.pex.harness.startPipeline.DbUser = "test"

Definition at line 35 of file startPipeline.py.

string lsst.pex.harness.startPipeline.Dc2PipeDir = "/share/stack/dc2pipe"

Definition at line 33 of file startPipeline.py.

string lsst.pex.harness.startPipeline.RootIoDir = "/share/DC2root"

Definition at line 32 of file startPipeline.py.