LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
pipeline.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 """Module defining Pipeline class and related methods.
24 """
25 
26 __all__ = ["Pipeline", "TaskDef", "TaskDatasetTypes", "PipelineDatasetTypes"]
27 
28 # -------------------------------
29 # Imports of standard modules --
30 # -------------------------------
31 from dataclasses import dataclass
32 from types import MappingProxyType
33 from typing import FrozenSet, Mapping, Union, Generator, TYPE_CHECKING
34 
35 import copy
36 
37 # -----------------------------
38 # Imports for other modules --
39 from lsst.daf.butler import DatasetType, Registry, SkyPixDimension
40 from lsst.utils import doImport
41 from .configOverrides import ConfigOverrides
42 from .connections import PipelineTaskConnections, iterConnections
43 from .pipelineTask import PipelineTask
44 
45 from . import pipelineIR
46 from . import pipeTools
47 
48 if TYPE_CHECKING: # Imports needed only for type annotations; may be circular.
49  from lsst.obs.base.instrument import Instrument
50 
51 # ----------------------------------
52 # Local non-exported definitions --
53 # ----------------------------------
54 
55 # ------------------------
56 # Exported definitions --
57 # ------------------------
58 
59 
60 class TaskDef:
61  """TaskDef is a collection of information about task needed by Pipeline.
62 
63  The information includes task name, configuration object and optional
64  task class. This class is just a collection of attributes and it exposes
65  all of them so that attributes could potentially be modified in place
66  (e.g. if configuration needs extra overrides).
67 
68  Attributes
69  ----------
70  taskName : `str`
71  `PipelineTask` class name, currently it is not specified whether this
72  is a fully-qualified name or partial name (e.g. ``module.TaskClass``).
73  Framework should be prepared to handle all cases.
74  config : `lsst.pex.config.Config`
75  Instance of the configuration class corresponding to this task class,
76  usually with all overrides applied.
77  taskClass : `type` or ``None``
78  `PipelineTask` class object, can be ``None``. If ``None`` then
79  framework will have to locate and load class.
80  label : `str`, optional
81  Task label, usually a short string unique in a pipeline.
82  """
83  def __init__(self, taskName, config, taskClass=None, label=""):
84  self.taskName = taskName
85  self.config = config
86  self.taskClass = taskClass
87  self.label = label
88  self.connections = config.connections.ConnectionsClass(config=config)
89 
90  def __str__(self):
91  rep = "TaskDef(" + self.taskName
92  if self.label:
93  rep += ", label=" + self.label
94  rep += ")"
95  return rep
96 
97 
98 class Pipeline:
99  """A `Pipeline` is a representation of a series of tasks to run, and the
100  configuration for those tasks.
101 
102  Parameters
103  ----------
104  description : `str`
105  A description of that this pipeline does.
106  """
107  def __init__(self, description: str) -> Pipeline:
108  pipeline_dict = {"description": description, "tasks": {}}
109  self._pipelineIR = pipelineIR.PipelineIR(pipeline_dict)
110 
111  @classmethod
112  def fromFile(cls, filename: str) -> Pipeline:
113  """Load a pipeline defined in a pipeline yaml file.
114 
115  Parameters
116  ----------
117  filename: `str`
118  A path that points to a pipeline defined in yaml format
119 
120  Returns
121  -------
122  pipeline: `Pipeline`
123  """
124  pipeline = cls.fromIR(pipelineIR.PipelineIR.from_file(filename))
125  return pipeline
126 
127  @classmethod
128  def fromString(cls, pipeline_string: str) -> Pipeline:
129  """Create a pipeline from string formatted as a pipeline document.
130 
131  Parameters
132  ----------
133  pipeline_string : `str`
134  A string that is formatted according like a pipeline document
135 
136  Returns
137  -------
138  pipeline: `Pipeline`
139  """
140  pipeline = cls.fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
141  return pipeline
142 
143  @classmethod
144  def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
145  """Create a pipeline from an already created `PipelineIR` object.
146 
147  Parameters
148  ----------
149  deserialized_pipeline: `PipelineIR`
150  An already created pipeline intermediate representation object
151 
152  Returns
153  -------
154  pipeline: `Pipeline`
155  """
156  pipeline = cls.__new__(cls)
157  pipeline._pipelineIR = deserialized_pipeline
158  return pipeline
159 
160  @classmethod
161  def fromPipeline(cls, pipeline: pipelineIR.PipelineIR) -> Pipeline:
162  """Create a new pipeline by copying an already existing `Pipeline`.
163 
164  Parameters
165  ----------
166  pipeline: `Pipeline`
167  An already created pipeline intermediate representation object
168 
169  Returns
170  -------
171  pipeline: `Pipeline`
172  """
173  return cls.fromIR(copy.deep_copy(pipeline._pipelineIR))
174 
175  def __str__(self) -> str:
176  return str(self._pipelineIR)
177 
178  def addInstrument(self, instrument: Union[Instrument, str]):
179  """Add an instrument to the pipeline, or replace an instrument that is
180  already defined.
181 
182  Parameters
183  ----------
184  instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
185  Either a derived class object of a `lsst.daf.butler.instrument` or a
186  string corresponding to a fully qualified
187  `lsst.daf.butler.instrument` name.
188  """
189  if isinstance(instrument, str):
190  pass
191  else:
192  # TODO: assume that this is a subclass of Instrument, no type checking
193  instrument = f"{instrument.__module__}.{instrument.__qualname__}"
194  self._pipelineIR.instrument = instrument
195 
196  def addTask(self, task: Union[PipelineTask, str], label: str):
197  """Add a new task to the pipeline, or replace a task that is already
198  associated with the supplied label.
199 
200  Parameters
201  ----------
202  task: `PipelineTask` or `str`
203  Either a derived class object of a `PipelineTask` or a string
204  corresponding to a fully qualified `PipelineTask` name.
205  label: `str`
206  A label that is used to identify the `PipelineTask` being added
207  """
208  if isinstance(task, str):
209  taskName = task
210  elif issubclass(task, PipelineTask):
211  taskName = f"{task.__module__}.{task.__qualname__}"
212  else:
213  raise ValueError("task must be either a child class of PipelineTask or a string containing"
214  " a fully qualified name to one")
215  self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)
216 
217  def removeTask(self, label: str):
218  """Remove a task from the pipeline.
219 
220  Parameters
221  ----------
222  label : `str`
223  The label used to identify the task that is to be removed
224 
225  Raises
226  ------
227  KeyError
228  If no task with that label exists in the pipeline
229 
230  """
231  self._pipelineIR.tasks.pop(label)
232 
233  def addConfigOverride(self, label: str, key: str, value: object):
234  """Apply single config override.
235 
236  Parameters
237  ----------
238  label : `str`
239  Label of the task.
240  key: `str`
241  Fully-qualified field name.
242  value : object
243  Value to be given to a field.
244  """
245  self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value}))
246 
247  def addConfigFile(self, label: str, filename: str):
248  """Add overrides from a specified file.
249 
250  Parameters
251  ----------
252  label : `str`
253  The label used to identify the task associated with config to
254  modify
255  filename : `str`
256  Path to the override file.
257  """
258  self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename]))
259 
260  def addConfigPython(self, label: str, pythonString: str):
261  """Add Overrides by running a snippet of python code against a config.
262 
263  Parameters
264  ----------
265  label : `str`
266  The label used to identity the task associated with config to
267  modify.
268  pythonString: `str`
269  A string which is valid python code to be executed. This is done
270  with config as the only local accessible value.
271  """
272  self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString))
273 
274  def _addConfigImpl(self, label: str, newConfig: pipelineIR.ConfigIR):
275  if label not in self._pipelineIR.tasks:
276  raise LookupError(f"There are no tasks labeled '{label}' in the pipeline")
277  self._pipelineIR.tasks[label].add_or_update_config(newConfig)
278 
279  def toFile(self, filename: str):
280  self._pipelineIR.to_file(filename)
281 
282  def toExpandedPipeline(self) -> Generator[TaskDef]:
283  """Returns a generator of TaskDefs which can be used to create quantum
284  graphs.
285 
286  Returns
287  -------
288  generator : generator of `TaskDef`
289  The generator returned will be the sorted iterator of tasks which
290  are to be used in constructing a quantum graph.
291 
292  Raises
293  ------
294  NotImplementedError
295  If a dataId is supplied in a config block. This is in place for
296  future use
297  """
298  taskDefs = []
299  for label, taskIR in self._pipelineIR.tasks.items():
300  taskClass = doImport(taskIR.klass)
301  taskName = taskClass.__qualname__
302  config = taskClass.ConfigClass()
303  overrides = ConfigOverrides()
304  if self._pipelineIR.instrument is not None:
305  overrides.addInstrumentOverride(self._pipelineIR.instrument, taskClass._DefaultName)
306  if taskIR.config is not None:
307  for configIR in taskIR.config:
308  if configIR.dataId is not None:
309  raise NotImplementedError("Specializing a config on a partial data id is not yet "
310  "supported in Pipeline definition")
311  # only apply override if it applies to everything
312  if configIR.dataId is None:
313  if configIR.file:
314  for configFile in configIR.file:
315  overrides.addFileOverride(configFile)
316  if configIR.python is not None:
317  overrides.addPythonOverride(configIR.python)
318  for key, value in configIR.rest.items():
319  overrides.addValueOverride(key, value)
320  overrides.applyTo(config)
321  # This may need to be revisited
322  config.validate()
323  taskDefs.append(TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
324 
325  # lets evaluate the contracts
326  if self._pipelineIR.contracts is not None:
327  label_to_config = {x.label: x.config for x in taskDefs}
328  for contract in self._pipelineIR.contracts:
329  # execute this in its own line so it can raise a good error message if there was problems
330  # with the eval
331  success = eval(contract.contract, None, label_to_config)
332  if not success:
333  extra_info = f": {contract.msg}" if contract.msg is not None else ""
334  raise pipelineIR.ContractError(f"Contract(s) '{contract.contract}' were not "
335  f"satisfied{extra_info}")
336 
337  yield from pipeTools.orderPipeline(taskDefs)
338 
339  def __len__(self):
340  return len(self._pipelineIR.tasks)
341 
342  def __eq__(self, other: "Pipeline"):
343  if not isinstance(other, Pipeline):
344  return False
345  return self._pipelineIR == other._pipelineIR
346 
347 
348 @dataclass(frozen=True)
350  """An immutable struct that extracts and classifies the dataset types used
351  by a `PipelineTask`
352  """
353 
354  initInputs: FrozenSet[DatasetType]
355  """Dataset types that are needed as inputs in order to construct this Task.
356 
357  Task-level `initInputs` may be classified as either
358  `~PipelineDatasetTypes.initInputs` or
359  `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
360  """
361 
362  initOutputs: FrozenSet[DatasetType]
363  """Dataset types that may be written after constructing this Task.
364 
365  Task-level `initOutputs` may be classified as either
366  `~PipelineDatasetTypes.initOutputs` or
367  `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
368  """
369 
370  inputs: FrozenSet[DatasetType]
371  """Dataset types that are regular inputs to this Task.
372 
373  If an input dataset needed for a Quantum cannot be found in the input
374  collection(s) or produced by another Task in the Pipeline, that Quantum
375  (and all dependent Quanta) will not be produced.
376 
377  Task-level `inputs` may be classified as either
378  `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates`
379  at the Pipeline level.
380  """
381 
382  prerequisites: FrozenSet[DatasetType]
383  """Dataset types that are prerequisite inputs to this Task.
384 
385  Prerequisite inputs must exist in the input collection(s) before the
386  pipeline is run, but do not constrain the graph - if a prerequisite is
387  missing for a Quantum, `PrerequisiteMissingError` is raised.
388 
389  Prerequisite inputs are not resolved until the second stage of
390  QuantumGraph generation.
391  """
392 
393  outputs: FrozenSet[DatasetType]
394  """Dataset types that are produced by this Task.
395 
396  Task-level `outputs` may be classified as either
397  `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates`
398  at the Pipeline level.
399  """
400 
401  @classmethod
402  def fromConnections(cls, connectionsInstance: PipelineTaskConnections, *,
403  registry: Registry) -> TaskDatasetTypes:
404  """Extract and classify the dataset types from a single `PipelineTask`.
405 
406  Parameters
407  ----------
408  connectionsInstance: `PipelineTaskConnections`
409  An instance of a `PipelineTaskConnections` class for a particular
410  `PipelineTask`.
411  registry: `Registry`
412  Registry used to construct normalized `DatasetType` objects and
413  retrieve those that are incomplete.
414 
415  Returns
416  -------
417  types: `TaskDatasetTypes`
418  The dataset types used by this task.
419  """
420  def makeDatasetTypesSet(connectionType):
421  """Constructs a set of true `DatasetType` objects
422 
423  Parameters
424  ----------
425  connectionType : `str`
426  Name of the connection type to produce a set for, corresponds
427  to an attribute of type `list` on the connection class instance
428 
429  Returns
430  -------
431  datasetTypes : `frozenset`
432  A set of all datasetTypes which correspond to the input
433  connection type specified in the connection class of this
434  `PipelineTask`
435 
436  Notes
437  -----
438  This function is a closure over the variables ``registry`` and
439  ``connectionsInstance``.
440  """
441  datasetTypes = []
442  for c in iterConnections(connectionsInstance, connectionType):
443  dimensions = set(getattr(c, 'dimensions', set()))
444  if "skypix" in dimensions:
445  try:
446  datasetType = registry.getDatasetType(c.name)
447  except LookupError as err:
448  raise LookupError(
449  f"DatasetType '{c.name}' referenced by "
450  f"{type(connectionsInstance).__name__} uses 'skypix' as a dimension "
451  f"placeholder, but does not already exist in the registry. "
452  f"Note that reference catalog names are now used as the dataset "
453  f"type name instead of 'ref_cat'."
454  ) from err
455  rest1 = set(registry.dimensions.extract(dimensions - set(["skypix"])).names)
456  rest2 = set(dim.name for dim in datasetType.dimensions
457  if not isinstance(dim, SkyPixDimension))
458  if rest1 != rest2:
459  raise ValueError(f"Non-skypix dimensions for dataset type {c.name} declared in "
460  f"connections ({rest1}) are inconsistent with those in "
461  f"registry's version of this dataset ({rest2}).")
462  else:
463  datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
464  c.storageClass)
465  datasetTypes.append(datasetType)
466  return frozenset(datasetTypes)
467 
468  return cls(
469  initInputs=makeDatasetTypesSet("initInputs"),
470  initOutputs=makeDatasetTypesSet("initOutputs"),
471  inputs=makeDatasetTypesSet("inputs"),
472  prerequisites=makeDatasetTypesSet("prerequisiteInputs"),
473  outputs=makeDatasetTypesSet("outputs"),
474  )
475 
476 
477 @dataclass(frozen=True)
479  """An immutable struct that classifies the dataset types used in a
480  `Pipeline`.
481  """
482 
483  initInputs: FrozenSet[DatasetType]
484  """Dataset types that are needed as inputs in order to construct the Tasks
485  in this Pipeline.
486 
487  This does not include dataset types that are produced when constructing
488  other Tasks in the Pipeline (these are classified as `initIntermediates`).
489  """
490 
491  initOutputs: FrozenSet[DatasetType]
492  """Dataset types that may be written after constructing the Tasks in this
493  Pipeline.
494 
495  This does not include dataset types that are also used as inputs when
496  constructing other Tasks in the Pipeline (these are classified as
497  `initIntermediates`).
498  """
499 
500  initIntermediates: FrozenSet[DatasetType]
501  """Dataset types that are both used when constructing one or more Tasks
502  in the Pipeline and produced as a side-effect of constructing another
503  Task in the Pipeline.
504  """
505 
506  inputs: FrozenSet[DatasetType]
507  """Dataset types that are regular inputs for the full pipeline.
508 
509  If an input dataset needed for a Quantum cannot be found in the input
510  collection(s), that Quantum (and all dependent Quanta) will not be
511  produced.
512  """
513 
514  prerequisites: FrozenSet[DatasetType]
515  """Dataset types that are prerequisite inputs for the full Pipeline.
516 
517  Prerequisite inputs must exist in the input collection(s) before the
518  pipeline is run, but do not constrain the graph - if a prerequisite is
519  missing for a Quantum, `PrerequisiteMissingError` is raised.
520 
521  Prerequisite inputs are not resolved until the second stage of
522  QuantumGraph generation.
523  """
524 
525  intermediates: FrozenSet[DatasetType]
526  """Dataset types that are output by one Task in the Pipeline and consumed
527  as inputs by one or more other Tasks in the Pipeline.
528  """
529 
530  outputs: FrozenSet[DatasetType]
531  """Dataset types that are output by a Task in the Pipeline and not consumed
532  by any other Task in the Pipeline.
533  """
534 
535  byTask: Mapping[str, TaskDatasetTypes]
536  """Per-Task dataset types, keyed by label in the `Pipeline`.
537 
538  This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming
539  neither has been modified since the dataset types were extracted, of
540  course).
541  """
542 
543  @classmethod
544  def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
545  """Extract and classify the dataset types from all tasks in a
546  `Pipeline`.
547 
548  Parameters
549  ----------
550  pipeline: `Pipeline`
551  An ordered collection of tasks that can be run together.
552  registry: `Registry`
553  Registry used to construct normalized `DatasetType` objects and
554  retrieve those that are incomplete.
555 
556  Returns
557  -------
558  types: `PipelineDatasetTypes`
559  The dataset types used by this `Pipeline`.
560 
561  Raises
562  ------
563  ValueError
564  Raised if Tasks are inconsistent about which datasets are marked
565  prerequisite. This indicates that the Tasks cannot be run as part
566  of the same `Pipeline`.
567  """
568  allInputs = set()
569  allOutputs = set()
570  allInitInputs = set()
571  allInitOutputs = set()
572  prerequisites = set()
573  byTask = dict()
574  if isinstance(pipeline, Pipeline):
575  pipeline = pipeline.toExpandedPipeline()
576  for taskDef in pipeline:
577  thisTask = TaskDatasetTypes.fromConnections(taskDef.connections, registry=registry)
578  allInitInputs.update(thisTask.initInputs)
579  allInitOutputs.update(thisTask.initOutputs)
580  allInputs.update(thisTask.inputs)
581  prerequisites.update(thisTask.prerequisites)
582  allOutputs.update(thisTask.outputs)
583  byTask[taskDef.label] = thisTask
584  if not prerequisites.isdisjoint(allInputs):
585  raise ValueError("{} marked as both prerequisites and regular inputs".format(
586  {dt.name for dt in allInputs & prerequisites}
587  ))
588  if not prerequisites.isdisjoint(allOutputs):
589  raise ValueError("{} marked as both prerequisites and outputs".format(
590  {dt.name for dt in allOutputs & prerequisites}
591  ))
592  # Make sure that components which are marked as inputs get treated as
593  # intermediates if there is an output which produces the composite
594  # containing the component
595  intermediateComponents = set()
596  intermediateComposites = set()
597  outputNameMapping = {dsType.name: dsType for dsType in allOutputs}
598  for dsType in allInputs:
599  # get the name of a possible component
600  name, component = dsType.nameAndComponent()
601  # if there is a component name, that means this is a component
602  # DatasetType, if there is an output which produces the parent of
603  # this component, treat this input as an intermediate
604  if component is not None:
605  if name in outputNameMapping and outputNameMapping[name].dimensions == dsType.dimensions:
606  composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
607  universe=registry.dimensions)
608  intermediateComponents.add(dsType)
609  intermediateComposites.add(composite)
610  return cls(
611  initInputs=frozenset(allInitInputs - allInitOutputs),
612  initIntermediates=frozenset(allInitInputs & allInitOutputs),
613  initOutputs=frozenset(allInitOutputs - allInitInputs),
614  inputs=frozenset(allInputs - allOutputs - intermediateComponents),
615  intermediates=frozenset(allInputs & allOutputs | intermediateComponents),
616  outputs=frozenset(allOutputs - allInputs - intermediateComposites),
617  prerequisites=frozenset(prerequisites),
618  byTask=MappingProxyType(byTask), # MappingProxyType -> frozen view of dict for immutability
619  )
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
daf::base::PropertySet * set
Definition: fits.cc:902
def __init__(self, taskName, config, taskClass=None, label="")
Definition: pipeline.py:83
def doImport(pythonType)
Definition: utils.py:106