LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Template for testing an application Stage in a Pipeline

The template example demonstrates how to set up a simple Pipeline with a single Slice executing a single Stage. This example is found under pex_harness/examples/template/ .

The template example has a single application stage, SampleStage (which may be found in pex_harness/examples/stages/lsst/pexhexamples/pipeline.py).

Example Stage Classes

The first component of the SampleStage is a class SampleStageSerial that inherits from lsst.pex.harness.stage.SerialProcessing. This class performs a few simple tasks:

  1. overwrites the preprocess() and postprocess() methods of SerialProcessing,
  2. implements a setup() method that performs initializations at the time of construction (one example of setup() usage is accessing policy information and setting corresponding values to class fields)
  3. accesses the default logger (self.log) within preprocess() and postprocess()
  4. writes fields provided to the class by the harness framework (e.g., self.stageId, self.rank, etc) to the Log.

If SampleStageSerial is designated as "serialClass" within the configuration of the stage within Pipeline policy (see below), then its preprocess() method will be executed in the Pipeline prior to the work of the parallel Slices, and the postprocess() method will be executed in the Pipeline after the Slices finish their processing.

import lsst.pex.harness.stage as harnessStage
class SampleStageSerial(harnessStage.SerialProcessing):
def setup(self):
self.runmode ="None"
if self.policy.exists('RunMode'):
self.runmode = self.policy.getString('RunMode')
def preprocess(self, clipboard):
"""
Processing code for this Stage to be executed by the main Pipeline
prior to invoking Slice process
"""
log = Log(self.log, "lsst.pexhexamples.pipeline.SampleStageSerial.preprocess")
log.log(Log.INFO, 'Executing SampleStageSerial preprocess')
def postprocess(self, clipboard):
"""
Processing code for this Stage to be executed by the main Pipeline
after the completion of Slice process
"""
log = Log(self.log, "lsst.pexhexamples.pipeline.SampleStageSerial.postprocess")
log.log(Log.INFO, 'Executing SampleStageSerial postprocess')
lr = LogRec(log, Log.INFO)
lr << " rank " + str(self.rank)
lr << " stageId " + str(self.stageId)
lr << " universeSize " + str(self.universeSize)
lr << " RunMode from Policy " + self.runmode
lr << LogRec.endr

The next component of the SampleStage is a class SampleStageParallel that inherits from lsst.pex.harness.stage.ParallelProcessing. This class does the following:

  1. overwrites the process() methods of ParallelProcessing,
  2. implements a setup() method that performs initializations at the time of Slice construction
  3. accesses the default logger (self.log) within process()
  4. writes fields provided to the class by the harness framework (e.g., self.stageId, self.rank, etc) to the Log.

If SampleStageParallel is designated as "parallelClass" within the configuration of the stage within Pipeline policy (see below), then its process() method will be executed by all parallel Slice workers.

class SampleStageParallel(harnessStage.ParallelProcessing):
def setup(self):
self.runmode ="None"
if self.policy.exists('RunMode'):
self.runmode = self.policy.getString('RunMode')
def process(self, clipboard):
"""
Processing code for this Stage to be executed within a Slice
"""
log = Log(self.log, "lsst.pexhexamples.pipeline.SampleStageParallel.process")
lr = LogRec(log, Log.INFO)
lr << " rank " + str(self.rank)
lr << " stageId " + str(self.stageId)
lr << " runId " + str(self.runId)
lr << " universeSize " + str(self.universeSize)
lr << " RunMode from Policy " + self.runmode
lr << LogRec.endr

Sample Pipeline Configuration

A simple Pipeline that runs one Slice and the single application stage SampleStage might be configured by the following policy.

In pex_harness at level >=3.5 explicit dependence on MPI has been removed; in the default mode a Pipeline will run on a single node with Python threads used to run parallel Slices. In this context the number of Slices is specified with the attribute "nSlices" in the Pipeline policy file (in contrast to the specification of an MPI machine file in previous versions of pex_harness).

A designation of one of serialClass or parallelClass is required to have a nontrivial stage execution, though specification of both serialClass and parallelClass is not required.

nSlices: 1
executionMode: "oneloop"
logThreshold: -3
localLogMode: true
eventBrokerHost: "lsst8.ncsa.uiuc.edu"
appStage: {
name: "SampleStage"
serialClass: "lsst.pexhexamples.pipeline.SampleStageSerial"
parallelClass: "lsst.pexhexamples.pipeline.SampleStageParallel"
eventTopic: "None"
stagePolicy: @policy/samplestage.paf
}

Sample Pipeline Execiution

In order to execute this sample Pipeline, the path to the directory pex_harness/examples/stages should be added to the PYTHONPATH:

% export PYTHONPATH ${PWD}/../stages:${PYTHONPATH} (bash)
% setenv PYTHONPATH ${PWD}/../stages:${PYTHONPATH} (tcsh)

The pipeline is then executed via

% launchPipeline.py template_policy.paf <some-run-id>

such as

% launchPipeline.py template_policy.paf test_1090

Although this example does not use events, the events system does make use of an ActiveMQ broker during initialization. If an ActiveMQ broker ("eventHostBroker") other than the LSST default (lsst8.ncsa.uiuc.edu) is used, this needs to be specified in the pipeline policy file under "eventBrokerHost".