LSSTApplications  17.0+120,17.0+13,17.0+70,18.0.0+34,18.0.0+75,18.0.0-4-g68ffd23+3,18.1.0-1-g0001055+11,18.1.0-1-g03d53ef+4,18.1.0-1-g1349e88+50,18.1.0-1-g2505f39+40,18.1.0-1-g5315e5e+3,18.1.0-1-g5e4b7ea+13,18.1.0-1-g7e8fceb+3,18.1.0-1-g85f8cd4+43,18.1.0-1-g8ff0b9f+2,18.1.0-1-ga2c679d,18.1.0-1-gd55f500+31,18.1.0-13-gfe4edf0b+6,18.1.0-14-g259bd21+15,18.1.0-17-gf19619b+1,18.1.0-2-g5f9922c+19,18.1.0-2-gd3b74e5+8,18.1.0-2-gfbf3545+27,18.1.0-24-ged780bc+4,18.1.0-25-g75534f69+1,18.1.0-28-ge996dbe42+1,18.1.0-3-g52aa583+22,18.1.0-3-g8ea57af+4,18.1.0-3-gb69f684+36,18.1.0-3-gfcaddf3+1,18.1.0-4-gf3f9b77+1,18.1.0-5-g1dd662b+1,18.1.0-5-g6dbcb01+36,18.1.0-6-gae77429+2,18.1.0-7-g9d75d83+4,18.1.0-7-gae09a6d+24,18.1.0-8-gc69d46e+22,18.1.0-9-g1af92ce+4,18.1.0-9-gee19f03+8,w.2019.44
LSSTDataManagementBasePackage
pipelineBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining PipelineBuilder class and related methods.
23 """
24 
25 __all__ = ["PipelineBuilder"]
26 
27 # -------------------------------
28 # Imports of standard modules --
29 # -------------------------------
30 import logging
31 
32 # -----------------------------
33 # Imports for other modules --
34 # -----------------------------
35 from .configOverrides import ConfigOverrides
36 from .pipeline import Pipeline, TaskDef
37 from . import pipeTools
38 
39 # ----------------------------------
40 # Local non-exported definitions --
41 # ----------------------------------
42 
43 _LOG = logging.getLogger(__name__.partition(".")[2])
44 
45 # ------------------------
46 # Exported definitions --
47 # ------------------------
48 
49 
51  """PipelineBuilder class is responsible for building task pipeline.
52 
53  The class provides a set of methods to manipulate pipeline by adding,
54  deleting, re-ordering tasks in pipeline and changing their labels or
55  configuration.
56 
57  Parameters
58  ----------
59  taskFactory : `TaskFactory`
60  Factory object used to load/instantiate PipelineTasks
61  pipeline : `Pipeline`, optional
62  Initial pipeline to be modified, if `None` then new empty pipeline
63  will be created.
64  """
65  def __init__(self, taskFactory, pipeline=None):
66  if pipeline is None:
67  pipeline = Pipeline()
68  self._taskFactory = taskFactory
69  self._pipeline = pipeline
70 
71  def pipeline(self, ordered=False):
72  """Return updated pipeline instance.
73 
74  Pipeline will be checked for possible inconsistencies before
75  returning.
76 
77  Parameters
78  ----------
79  ordered : `bool`, optional
80  If `True` then order resulting pipeline according to Task data
81  dependencies.
82 
83  Returns
84  -------
85  pipeline : `Pipeline`
86 
87  Raises
88  ------
89  Exception
90  Raised if any inconsistencies are detected in pipeline definition,
91  see `pipeTools.orderPipeline` for list of exception types.
92  """
93  for taskDef in self._pipeline:
94  taskDef.connections = taskDef.config.connections.ConnectionsClass(config=taskDef.config)
95 
96  # conditionally re-order pipeline if requested, but unconditionally
97  # check for possible errors
98  orderedPipeline = pipeTools.orderPipeline(self._pipeline, self._taskFactory)
99  if ordered:
100  return orderedPipeline
101  else:
102  return self._pipeline
103 
104  def addTask(self, taskName, label=None):
105  """Append new task to a pipeline.
106 
107  Parameters
108  ----------
109  taskName : `str`
110  Name of the new task, can be either full class name including
111  package and module, or just a class name to be searched in
112  known packages and modules.
113  label : `str`, optional
114  Label for new task, if `None` then task class name is used as
115  label.
116  """
117  # load task class, will throw on errors
118  taskClass, taskName = self._taskFactory.loadTaskClass(taskName)
119 
120  # get label and check that it is unique
121  if not label:
122  label = taskName.rpartition('.')[2]
123  if self._pipeline.labelIndex(label) >= 0:
124  raise LookupError("Task label (or name) is not unique: " + label)
125 
126  # make config instance with defaults
127  config = taskClass.ConfigClass()
128 
129  self._pipeline.append(TaskDef(taskName=taskName, config=config,
130  taskClass=taskClass, label=label))
131 
132  def deleteTask(self, label):
133  """Remove task from a pipeline.
134 
135  Parameters
136  ----------
137  label : `str`
138  Label of the task to remove.
139  """
140  idx = self._pipeline.labelIndex(label)
141  if idx < 0:
142  raise LookupError("Task label is not found: " + label)
143  del self._pipeline[idx]
144 
145  def moveTask(self, label, newIndex):
146  """Move task to a new position in a pipeline.
147 
148  Parameters
149  ----------
150  label : `str`
151  Label of the task to move.
152  newIndex : `int`
153  New position.
154  """
155  idx = self._pipeline.labelIndex(label)
156  if idx < 0:
157  raise LookupError("Task label is not found: " + label)
158  self._pipeline.insert(newIndex, self._pipeline.pop(idx))
159 
160  def labelTask(self, label, newLabel):
161  """Change task label.
162 
163  Parameters
164  ----------
165  label : `str`
166  Existing label of the task.
167  newLabel : `str`
168  New label of the task.
169  """
170  idx = self._pipeline.labelIndex(label)
171  if idx < 0:
172  raise LookupError("Task label is not found: " + label)
173  # check that new one is unique
174  if newLabel != label and self._pipeline.labelIndex(newLabel) >= 0:
175  raise LookupError("New task label is not unique: " + label)
176  self._pipeline[idx].label = newLabel
177 
178  def configOverride(self, label, value):
179  """Apply single config override.
180 
181  Parameters
182  ----------
183  label : `str`
184  Label of the task.
185  value : `str`
186  String in the form ``"param=value"`` or ``"parm.subpar=value"``,
187  ``value`` can be a Python constant or a list of constants.
188  """
189  idx = self._pipeline.labelIndex(label)
190  if idx < 0:
191  raise LookupError("Task label is not found: " + label)
192  key, sep, val = value.partition('=')
193  overrides = ConfigOverrides()
194  overrides.addValueOverride(key, val)
195  overrides.applyTo(self._pipeline[idx].config)
196 
197  def configOverrideFile(self, label, path):
198  """Apply overrides from file.
199 
200  Parameters
201  ----------
202  label : `str`
203  Label of the task.
204  path : `str`
205  Path to file with overrides.
206  """
207  idx = self._pipeline.labelIndex(label)
208  if idx < 0:
209  raise LookupError("Task label is not found: " + label)
210  overrides = ConfigOverrides()
211  overrides.addFileOverride(path)
212  overrides.applyTo(self._pipeline[idx].config)
def addTask(self, taskName, label=None)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def __init__(self, taskFactory, pipeline=None)