LSSTApplications  18.1.0
LSSTDataManagementBasePackage
pipelineBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining PipelineBuilder class and related methods.
23 """
24 
25 __all__ = ["PipelineBuilder"]
26 
27 # -------------------------------
28 # Imports of standard modules --
29 # -------------------------------
30 import logging
31 
32 # -----------------------------
33 # Imports for other modules --
34 # -----------------------------
35 from .configOverrides import ConfigOverrides
36 from .pipeline import Pipeline, TaskDef
37 from . import pipeTools
38 
39 # ----------------------------------
40 # Local non-exported definitions --
41 # ----------------------------------
42 
43 _LOG = logging.getLogger(__name__.partition(".")[2])
44 
45 # ------------------------
46 # Exported definitions --
47 # ------------------------
48 
49 
51  """PipelineBuilder class is responsible for building task pipeline.
52 
53  The class provides a set of methods to manipulate pipeline by adding,
54  deleting, re-ordering tasks in pipeline and changing their labels or
55  configuration.
56 
57  Parameters
58  ----------
59  taskFactory : `TaskFactory`
60  Factory object used to load/instantiate PipelineTasks
61  pipeline : `Pipeline`, optional
62  Initial pipeline to be modified, if `None` then new empty pipeline
63  will be created.
64  """
65  def __init__(self, taskFactory, pipeline=None):
66  if pipeline is None:
67  pipeline = Pipeline()
68  self._taskFactory = taskFactory
69  self._pipeline = pipeline
70 
71  def pipeline(self, ordered=False):
72  """Return updated pipeline instance.
73 
74  Pipeline will be checked for possible inconsistencies before
75  returning.
76 
77  Parameters
78  ----------
79  ordered : `bool`, optional
80  If `True` then order resulting pipeline according to Task data
81  dependencies.
82 
83  Returns
84  -------
85  pipeline : `Pipeline`
86 
87  Raises
88  ------
89  Exception
90  Raised if any inconsistencies are detected in pipeline definition,
91  see `pipeTools.orderPipeline` for list of exception types.
92  """
93  # conditionally re-order pipeline if requested, but unconditionally
94  # check for possible errors
95  orderedPipeline = pipeTools.orderPipeline(self._pipeline, self._taskFactory)
96  if ordered:
97  return orderedPipeline
98  else:
99  return self._pipeline
100 
101  def addTask(self, taskName, label=None):
102  """Append new task to a pipeline.
103 
104  Parameters
105  ----------
106  taskName : `str`
107  Name of the new task, can be either full class name including
108  package and module, or just a class name to be searched in
109  known packages and modules.
110  label : `str`, optional
111  Label for new task, if `None` then task class name is used as
112  label.
113  """
114  # load task class, will throw on errors
115  taskClass, taskName = self._taskFactory.loadTaskClass(taskName)
116 
117  # get label and check that it is unique
118  if not label:
119  label = taskName.rpartition('.')[2]
120  if self._pipeline.labelIndex(label) >= 0:
121  raise LookupError("Task label (or name) is not unique: " + label)
122 
123  # make config instance with defaults
124  config = taskClass.ConfigClass()
125 
126  self._pipeline.append(TaskDef(taskName=taskName, config=config,
127  taskClass=taskClass, label=label))
128 
129  def deleteTask(self, label):
130  """Remove task from a pipeline.
131 
132  Parameters
133  ----------
134  label : `str`
135  Label of the task to remove.
136  """
137  idx = self._pipeline.labelIndex(label)
138  if idx < 0:
139  raise LookupError("Task label is not found: " + label)
140  del self._pipeline[idx]
141 
142  def moveTask(self, label, newIndex):
143  """Move task to a new position in a pipeline.
144 
145  Parameters
146  ----------
147  label : `str`
148  Label of the task to move.
149  newIndex : `int`
150  New position.
151  """
152  idx = self._pipeline.labelIndex(label)
153  if idx < 0:
154  raise LookupError("Task label is not found: " + label)
155  self._pipeline.insert(newIndex, self._pipeline.pop(idx))
156 
157  def labelTask(self, label, newLabel):
158  """Change task label.
159 
160  Parameters
161  ----------
162  label : `str`
163  Existing label of the task.
164  newLabel : `str`
165  New label of the task.
166  """
167  idx = self._pipeline.labelIndex(label)
168  if idx < 0:
169  raise LookupError("Task label is not found: " + label)
170  # check that new one is unique
171  if newLabel != label and self._pipeline.labelIndex(newLabel) >= 0:
172  raise LookupError("New task label is not unique: " + label)
173  self._pipeline[idx].label = newLabel
174 
175  def configOverride(self, label, value):
176  """Apply single config override.
177 
178  Parameters
179  ----------
180  label : `str`
181  Label of the task.
182  value : `str`
183  String in the form ``"param=value"`` or ``"parm.subpar=value"``,
184  ``value`` can be a Python constant or a list of constants.
185  """
186  idx = self._pipeline.labelIndex(label)
187  if idx < 0:
188  raise LookupError("Task label is not found: " + label)
189  key, sep, val = value.partition('=')
190  overrides = ConfigOverrides()
191  overrides.addValueOverride(key, val)
192  overrides.applyTo(self._pipeline[idx].config)
193 
194  def configOverrideFile(self, label, path):
195  """Apply overrides from file.
196 
197  Parameters
198  ----------
199  label : `str`
200  Label of the task.
201  path : `str`
202  Path to file with overrides.
203  """
204  idx = self._pipeline.labelIndex(label)
205  if idx < 0:
206  raise LookupError("Task label is not found: " + label)
207  overrides = ConfigOverrides()
208  overrides.addFileOverride(path)
209  overrides.applyTo(self._pipeline[idx].config)
210 
211  def substituteDatatypeNames(self, label, value):
212  """Apply name string formatting to config file.
213 
214  Parameters
215  ----------
216  label : `str`
217  Label of the task.
218  value : `dict`
219  A python dict used in formatting nameTemplates.
220  """
221  idx = self._pipeline.labelIndex(label)
222  if idx < 0:
223  raise LookupError("Task label is not found: " + label)
224 
225  overrides = ConfigOverrides()
226  overrides.addDatasetNameSubstitution(value)
227  overrides.applyTo(self._pipeline[idx].config)
def addTask(self, taskName, label=None)
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def __init__(self, taskFactory, pipeline=None)