LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
stage.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 
23 import os, sys, re
24 from Queue import Queue
25 from lsst.pex.logging import Log
26 
27 class StageProcessing(object):
28  """
29  A container for processing that handles one step in a pipeline. This
30  container will be realized either via the subclass, SerialProcessing
31  (when held by a Pipeline) or via the subclass, ParallelProcessing (when
32  held by a Slice)
33 
34  This abstract class is implemented (via one of the subclasses,
35  SerialProcessing or ParallelProcessing) to apply a specific
36  science-oriented algorithm to data drawn from the queue.
37  """
38 
39  def __init__(self, policy=None, log=None, eventBroker=None, sysdata=None,
40  callSetup=True):
41  """
42  initialize this stage. This will call setup() after all internal
43  attributes are initialized unless callSetup is False.
44 
45  The sysdata is a dictionary intended to include data describing
46  the execution context to assume for this instance. This constructor
47  will recognize the following named items:
48  name a string name for display purposes that identifies
49  this stage does for the pipeline. If not provided,
50  a default name will be set.
51  stageId the integer identifier for this stage within the
52  sequence of stages that makes up a pipeline.
53  A value of -1 means that the stage is not part of
54  a pipeline sequence
55  rank the integer identifier for the parallel thread that
56  hosts this instance. A value of -1 indicates the
57  master thread for the pipeline.
58  runId the string identifier for the production run that
59  this stage is a part of. If None, a default will
60  be set.
61  universeSize the total number of parallel threads
62 
63  The dictionary may contain other arbitrary data. In general,
64  constructors for specific stage subclasses that provide application
65  algorithms will not be able to make use of this information without
66  special knowledge of the execution context.
67 
68  @param policy the policy for configuring this stage. This may
69  be None if this stage does not require a policy
70  for configuration.
71  @param log the log object the stage instance should stage
72  should use. If not provided, a default will be
73  used.
74  @param eventBroker the name of the host where the event broker is
75  running. If not provided, an eventBroker will
76  not be available to the stage.
77  @param sysdata a dictionary of data describing the execution
78  context. The stage uses this information to
79  set some of its internal data.
80  @param callSetup if true, the setup() function will be called as the
81  the last step in this constructor, after all data
82  is initialized. Default: True.
83  """
84 
85  # the policy that configures this stage
86  self.policy = policy
87 
88  # arbitrary data describing the execution context
89  if sysdata is None:
90  sysdata = {}
91  self.sysdata = sysdata.copy()
92 
93  # the integer identifier for this stage within the sequence of stages
94  # that makes up a pipeline.
95  self.stageId = -1
96  if sysdata.has_key("stageId"):
97  self.stageId = sysdata["stageId"]
98 
99  # the integer identifier for the parallel thread that hosts this
100  # instance
101  self.rank = -1
102  if sysdata.has_key("rank"):
103  self.rank = sysdata["rank"]
104 
105  # the string identifier for the production run that this stage is a
106  # part of
107  self.runId = "user-driven"
108  if sysdata.has_key("runId"):
109  self.runId = sysdata["runId"]
110 
111  # a string name for display purposes that identifies this stage does
112  # for the pipeline.
113  self.name = "processing stage"
114  if sysdata.has_key("name"):
115  self.name = sysdata["name"]
116 
117  # the total number of parallel threads within the pipeline hosting
118  # this stage
119  self.universeSize = 1
120  if sysdata.has_key("universeSize"):
121  self.universeSize = sysdata["universeSize"]
122 
123  # the host where the event broker is running. If None, then an
124  # event broker is not available.
125  self.eventBroker = eventBroker
126 
127  # the root log stages should derive their log instances from
128  self.log = log
129 
130  # the input queue (used in scripting mode)
131  self.inputQueue = None
132 
133  # the output queue (used in scripting mode)
134  self.outputQueue = None
135 
136  if callSetup:
137  self.setup()
138 
139  def setup(self):
140  """
141  setup the initial, internal state for this stage. This default
142  implementation does nothing; however, subclasses may override it.
143  It will be called during construction after this parent class's
144  attributes are initialized (unless the constructor is called with
145  callSetup=False).
146  """
147  pass
148 
149  def initialize(self, outQueue, inQueue):
150  """
151  called once when the stage is plugged into a pipeline by the manager
152  of the pipeine (either a Pipeline or Slice instance), this method
153  initializes the stage so that it is ready to process datasets. It
154  passes in the data queues it will use. Subclasses may override this
155  but should call the parent's implementation.
156  """
157  self.outputQueue = outQueue
158  self.inputQueue = inQueue
159 
161  """
162  get the hostname where the event broker currently in use is located
163  """
164  return self.eventBroker
165 
166  def setRun(self, runId):
167  """
168  set the runid
169  """
170  self.runId = runId
171 
172  def getRun(self):
173  """
174  get the runid
175  """
176  return self.runId
177 
178  def setRank(self, rank):
179  """
180  set the MPI rank of the process running this stage
181  """
182  self.rank = rank
183 
184  def getRank(self):
185  """
186  get the MPI rank of the process running this stage
187  """
188  return self.rank
189 
190  def setUniverseSize(self, universeSize):
191  """
192  set the MPI universe size
193  """
194  self.universeSize = universeSize
195 
196  def getUniverseSize(self):
197  """
198  get the MPI universe size, an integer
199  """
200  return self.universeSize
201 
202  def getName(self):
203  """
204  return the name assigned to this stage.
205  """
206  return self.name
207 
208  def shutdown():
209  """
210  discontinue processing and clean up and release resources.
211  """
212  pass
213 
214 
215 class SerialProcessing(StageProcessing):
216  """
217  The container for the serial part of the processing that happens before
218  and after the parallel part. This processing will happen in the context
219  of a Pipeline.
220  """
221 
222  def __init__(self, policy=None, log=None, eventBroker=None,
223  sysdata=None, callSetup=True):
224  """
225  initialize this stage. This will call setup() after all internal
226  attributes are initialized unless callSetup is False.
227 
228  @param policy the policy for configuring this stage. This may
229  be None if this stage does not require a policy
230  for configuration.
231  @param log the log object the stage instance should
232  should use. If not provided, a default will be
233  used.
234  @param eventBroker the name of the host where the event broker is
235  running. If not provided, an eventBroker will
236  not be available to the stage.
237  @param sysdata a dictionary of data describing the execution
238  context. The stage uses this information to
239  set some of its internal data. See
240  StageProcessing documentation for datails.
241  @param callSetup if true, the setup() function will be called as the
242  the last step in this constructor, after all data
243  is initialized. Default: True.
244  """
245  StageProcessing.__init__(self, policy, log,eventBroker, sysdata, False)
246  if callSetup:
247  self.setup()
248 
249 
250  def applyPreprocess(self):
251  """
252  Apply the preprocess() function to data on the input queue.
253  Returned is an InputQueue that contains the clipboards processed
254  by this function and which should be passed onto applyPostprocess().
255 
256  This implementation retrieves a single clipboard from the input
257  queue and preprocess it. While most subclasses will inherit this
258  implementation, some may override it to take more control over the
259  processing data from the input queue.
260  """
261  # Don't pop it off because failureStage will then not be able to access it
262  # element() gives a reference
263  clipboard = self.inputQueue.element()
264  self.preprocess(clipboard)
265  # Pop it off at this point; a new reference is not needed, so it is a dummy
266  dummyClipboard = self.inputQueue.getNextDataset()
267  out = Queue()
268  out.addDataset(clipboard)
269  return out
270 
271  def preprocess(self, clipboard):
272  """
273  execute the serial processing that should occur before the parallel
274  processing part of the stage on the data on the given clipboard.
275 
276  @param clipboard the data to process, packaged as a Clipboard
277  """
278  raise RuntimeError("Not Implemented: preprocess()")
279 
280  def applyPostprocess(self, queue):
281  """
282  apply the postprocess() function to an ordered set of clipboards.
283  This implementation loops over the clipboards on the given InputQueue,
284  calls postprocess() on each one, and posts it to the stage outputQueue.
285 
286  @param queue the InputQueue instance returned by applyPreprocess()
287  """
288  while queue.size() > 0:
289  # Don't pop it off because failureStage will then not be able to access it
290  # clipboard = queue.getNextDataset()
291  clipboard = queue.element()
292  self.postprocess(clipboard)
293  # Pop it off at this point; a new reference is not needed, so it is a dummy
294  dummyClipboard = queue.getNextDataset()
295  self.outputQueue.addDataset(clipboard)
296 
297 
298  def postprocess(self, clipboard):
299  """
300  execute the serial processing that should happen after the parallel
301  processing part of the stage.
302 
303  @param clipboard the data to process, packaged as a Clipboard
304  """
305  raise RuntimeError("Not Implemented: postprocess()")
306 
307 
309  """
310  a container class for the parallel processing part of a pipeline stage.
311  This processing will happen in the context of a Slice.
312  """
313 
314  def __init__(self, policy=None, log=None, eventBroker=None,
315  sysdata=None, callSetup=True):
316  """
317  initialize this stage. This will call setup() after all internal
318  attributes are initialized unless callSetup is False.
319 
320  @param policy the policy for configuring this stage. This may
321  be None if this stage does not require a policy
322  for configuration.
323  @param log the log object the stage instance should stage
324  should use. If not provided, a default will be
325  used.
326  @param eventBroker the name of the host where the event broker is
327  running. If not provided, an eventBroker will
328  not be available to the stage.
329  @param sysdata a dictionary of data describing the execution
330  context. The stage uses this information to
331  set some of its internal data. See
332  StageProcessing documentation for datails.
333  @param callSetup if true, the setup() function will be called as the
334  the last step in this constructor, after all data
335  is initialized. Default: True.
336  """
337  StageProcessing.__init__(self, policy, log, eventBroker, sysdata, False)
338  if callSetup:
339  self.setup()
340 
341 
342  def applyProcess(self):
343  """
344  apply the process() function to data from the input queue. This
345  implementation will pull one clipboard from the input queue, call
346  process() on it, and post it to the output queue. While most
347  subclasses will inherit this default implementation, some may
348  override it to take more control over how much data to process.
349  """
350  # Don't pop it off because failureStage will then not be able to access it
351  # clipboard = self.inputQueue.getNextDataset()
352  clipboard = self.inputQueue.element()
353  self.process(clipboard)
354  # Pop it off at this point; a new reference is not needed, so it is a dummy
355  dummyClipboard = self.inputQueue.getNextDataset()
356  self.outputQueue.addDataset(clipboard)
357 
358  def process(self, clipboard):
359  """
360  execute the parallel processing part of the stage within one thread
361  (Slice) of the pipeline.
362  """
363  raise RuntimeError("Not Implemented: process()")
364 
365 
367  """
368  A SerialProcessing subclass that provides no-op implementations
369  of preprocess() and postprocess().
370 
371  The default SerialProcessing implementations normally through Runtime
372  exceptions.
373  """
374 
375  def preprocess(self, clipboard):
376  """
377  execute the serial processing that should occur before the parallel
378  processing part of the stage on the data on the given clipboard.
379 
380  @param clipboard the data to process, packaged as a Clipboard
381  """
382  pass
383 
384  def postprocess(self, clipboard):
385  """
386  execute the serial processing that should happen after the parallel
387  processing part of the stage.
388 
389  @param clipboard the data to process, packaged as a Clipboard
390  """
391  pass
392 
393 
394 
395 
396 class NoOpParallelProcessing(ParallelProcessing):
397  """
398  A ParallelProcessing subclass that provides a no-op implementation
399  of process().
400 
401  The default ParallelProcessing implementation normally throws a Runtime
402  exception.
403  """
404 
405  def process(self, clipboard):
406  """
407  execute the parallel processing part of the stage within one thread
408  (Slice) of the pipeline.
409 
410  @param clipboard the data to process, packaged as a Clipboard
411  """
412  pass
413 
414 class Stage(object):
415  """
416  a class that will create and initialize the StageProcessing classes that
417  constitute a stage.
418 
419  There three intended ways to create a Stage instance. First is by
420  the makeStageFromPolicy() module function. Intended for use inside the
421  Pipeline-constructing classes (like Pipeline and Slice), this method
422  takes as input a policy that provides the names of the SerialProcessing
423  and ParallelProcessing classes that define the stage. Alternatively,
424  one can pass these names explicitly via the makeStage() module function.
425  The third way is by directly constructing a subclass of Stage. The
426  simplest way to create this subclass is as follows:
427 
428  class MyStage(Stage):
429  serialClass = MySerialProcessing
430  parallelClass = MyParallelProcessing
431 
432  If the stage does not have a parallel component, then the 'parallelClass'
433  variable does not need to be updated; likewise, for serial component.
434  """
435 
436  serialClass = NoOpSerialProcessing
437  parallelClass = NoOpParallelProcessing
438 
439  def __init__(self, policy, log=None, stageId=-1, eventBroker=None,
440  sysdata=None):
441  """
442  initialize this stage with the policy that defines the stage and
443  some contextual system data. Applications normally do not directly
444  call this constructor. Instead they either construct a Stage subclass
445  or create a Stage instance using makeStage() or makeStageFromPolicy().
446 
447  @param policy the policy that will configure the SerialProcessing
448  and ParallelProcessing
449  @param log the log object the stage instance should
450  should use. If not provided, a default will be
451  used.
452  @param eventBroker the name of the host where the event broker is
453  running. If not provided, an eventBroker will
454  not be available to the stage.
455  @param sysdata a dictionary of data describing the execution
456  context. The stage uses this information to
457  set some of its internal data. See
458  StageProcessing documentation for datails.
459  The name provided in the policy will override
460  the name in this dictionary
461  """
462  if sysdata is None:
463  sysdata = {}
464  self.sysdata = sysdata
465  self.stagePolicy = policy
466  self.eventBroker = eventBroker
467  if log is None:
468  log = Log(Log.getDefaultLog(), "stage")
469  self.log = log
470 
471  def setLog(self, log):
472  """
473  set the logger that should be assigned to the stage components when
474  they are created.
475  """
476  self.log = log
477 
478  def setEventBroker(self, broker):
479  """
480  set the instance of the event broker that the stage components should
481  use.
482  """
483  self.eventBroker = broker
484 
485  def updateSysProperty(self, sysdata):
486  """
487  override the current execution context data that will be passed to
488  the stage components when they are created with the given data.
489  Currently set data with names that are not included in the given
490  dictionary will not be affected.
491  """
492  if not sysdata:
493  return
494  for name in sysdata.keys():
495  self.sysdata[name] = sysdata[name]
496 
497  def getSysProperty(self, name):
498  """
499  return the currently set value for the execution context property or
500  None if it is not set.
501  @param name the name of the property
502  """
503  return self.sysdata.get(name, None)
504 
505  def createSerialProcessing(self, log=None, extraSysdata=None):
506  """
507  create a new SerialProcessing instance.
508 
509  All of the parameters are optional. If not provided, they will be
510  set to the values provided by the constructor.
511  @param stageId the integer identifier for this stage within the
512  sequence of stages that makes up a pipeline.
513  Default=-1
514  @param log the log object the stage instance should stage
515  should use. If not provided, a default will be
516  used.
517  @param extraSysdata a dictionary of execution context data whose items
518  will override those set via the Stage constructor
519  """
520  if log is None:
521  log = self.log
522  return self._create(self.serialClass, -1, log, extraSysdata)
523 
524  def createParallelProcessing(self, rank=1, log=None, extraSysdata=None):
525  """
526  create a new ParallelProcessing instance.
527 
528  All of the parameters are optional. If not provided, they will be
529  set to the values provided by the constructor (except for rank).
530  @param rank the integer identifier for the parallel thread
531  that hosts this instance. Default=-1.
532  indicating the master thread for the pipeline.
533  @param log the log object the stage instance should stage
534  should use. If not provided, a default will be
535  used.
536  @param sysdata a dictionary of arbitrary contextual system that
537  the stage might need.
538  @param runId the string identifier for the production run that
539  this stage is a part of. If None, a default will
540  be set.
541  """
542  if not log:
543  log = self.log
544  return self._create(self.parallelClass, rank, log, extraSysdata)
545 
546  def _create(self, cls, rank, log, extraSysdata):
547  # set the rank into the system data
548  sysdata = self.sysdata.copy()
549  for k in extraSysdata.keys():
550  sysdata[k] = extraSysdata[k]
551  sysdata["rank"] = rank
552 
553  if log is None:
554  log = self.log
555 
556  return cls(self.stagePolicy, log, self.eventBroker, sysdata)
557 
558 def _createClass(name):
559  (modn, cln) = name.rsplit('.', 1)
560  mod = __import__(modn, globals(), locals(), [cln], -1)
561  stageClass = getattr(mod, cln)
562  return stageClass
563 
564 def makeStageFromPolicy(stageDefPolicy, log=None, eventBroker=None,
565  sysdata=None):
566  """
567  create a Stage instance from a "stage definition policy". This policy
568  corresponds to the "appStage" parameter in a pipeline policy file that
569  defines a pipeline (described below). The resulting instance is used
570  as the factory for creating instances of SerialProcessing and
571  ParallelProcessing.
572 
573  The stage definition policy (the "appStage" parameter) has the following
574  contents:
575 
576  name a short name identifying the name (e.g. with logs)
577  serialClass a string containing the fully qualified
578  SerialProcessing class name. If the stage does not
579  have a serial component, this parameter is not
580  provided.
581  parallelClass a string containing the fully qualified
582  ParallelProcessing class name. If the stage does not
583  have a parallel component, this parameter is not
584  provided.
585  eventTopic the name of an event topic that the stage expects to
586  be received and placed on the clipboard. If no
587  event is expected, this parameter is not provided.
588  stagePolicy the policy that configures the stage and thus should
589  be passed to the SerialProcessing and
590  ParallelProcessing constructors.
591 
592  @param stageDefPolicy the policy for defining this stage (see above).
593  @param log the log object the stage instance should stage
594  should use. If not provided, a default will be
595  used unless one is provided via setLog() on the
596  returned Stage instance.
597  @param eventBroker the name of the host where the event broker is
598  running. If not provided, an eventBroker will
599  not be available to the stage unless setEventBroker()
600  is called on the returned Stage instance.
601  @param sysdata a dictionary of data describing the execution
602  context. The stage uses this information to
603  set some of its internal data.
604  """
605  if sysdata is None:
606  sysdata = {}
607  mysysdata = sysdata
608 
609  if not mysysdata.has_key("name"):
610  mysysdata = sysdata.copy()
611  if stageDefPolicy.exists("name"):
612  mysysdata["name"] = stageDefPolicy.get("name")
613  else:
614  mysysdata["name"] = "unknown stage"
615 
616  stageConfigPolicy = stageDefPolicy.get("stagePolicy")
617 
618  serialClass = None
619  parallelClass = None
620  if stageDefPolicy.exists("serialClass"):
621  serialClass = stageDefPolicy.get("serialClass")
622  if stageDefPolicy.exists("parallelClass"):
623  parallelClass = stageDefPolicy.get("parallelClass")
624 
625  return makeStage(stageConfigPolicy, serialClass, parallelClass,
626  log, eventBroker, mysysdata)
627 
628 
629 def makeStage(policy=None, serClsName=None, paraClsName=None, log=None,
630  eventBroker=None, sysdata=None):
631  """
632  create a Stage instance that is defined by a SerialProcessing class
633  and/or a ParallelProcessing class.
634  @param policy the stage configuration policy. This policy will be
635  passed to the SerialProcessing and ParallelProcessing
636  class constructors.
637  @param log the log object the stage instance should stage
638  should use. If not provided, a default will be
639  used unless one is provided via setLog() on the
640  returned Stage instance.
641  @param eventBroker the name of the host where the event broker is
642  running. If not provided, an eventBroker will
643  not be available to the stage unless setEventBroker()
644  is called on the returned Stage instance.
645  @param sysdata a dictionary of data describing the execution
646  context. The stage uses this information to
647  set some of its internal data.
648  """
649  out = Stage(policy, log, eventBroker, sysdata)
650 
651  if serClsName:
652  out.serialClass = _createClass(serClsName)
653  if not issubclass(out.serialClass, SerialProcessing):
654  raise ValueError("Not a SerialProcessing subclass: " + serClsName)
655  if paraClsName:
656  out.parallelClass = _createClass(paraClsName)
657  if not issubclass(out.parallelClass, ParallelProcessing):
658  raise ValueError("Not a ParallelProcessing subclass: "+paraClsName)
659 
660  return out
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154