LSSTApplications  20.0.0
LSSTDataManagementBasePackage
pipelineIR.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 from dataclasses import dataclass, field
23 from typing import List, Union, Generator
24 
25 import os
26 import yaml
27 import warnings
28 
29 
30 class ContractError(Exception):
31  """An exception that is raised when a pipeline contract is not satisfied
32  """
33  pass
34 
35 
36 @dataclass
37 class ContractIR:
38  """Intermediate representation of contracts read from a pipeline yaml file.
39  """
40  contract: str
41  """A string of python code representing one or more conditions on configs
42  in a pipeline. This code-as-string should, once evaluated, should be True
43  if the configs are fine, and False otherwise.
44  """
45  msg: Union[str, None] = None
46  """An optional message to be shown to the user if a contract fails
47  """
48 
49  def to_primitives(self) -> dict:
50  """Convert to a representation used in yaml serialization
51  """
52  accumulate = {"contract": self.contract}
53  if self.msg is not None:
54  accumulate['msg'] = self.msg
55  return accumulate
56 
57  def __eq__(self, other: "ContractIR"):
58  if not isinstance(other, ContractIR):
59  return False
60  elif self.contract == other.contract and self.msg == other.msg:
61  return True
62  else:
63  return False
64 
65 
66 @dataclass
67 class ConfigIR:
68  """Intermediate representation of configurations read from a pipeline yaml
69  file.
70  """
71  python: Union[str, None] = None
72  """A string of python code that is used to modify a configuration. This can
73  also be None if there are no modifications to do.
74  """
75  dataId: Union[dict, None] = None
76  """A dataId that is used to constrain these config overrides to only quanta
77  with matching dataIds. This field can be None if there is no constraint.
78  This is currently an unimplemented feature, and is placed here for future
79  use.
80  """
81  file: List[str] = field(default_factory=list)
82  """A list of paths which points to a file containing config overrides to be
83  applied. This value may be an empty list if there are no overrides to apply.
84  """
85  rest: dict = field(default_factory=dict)
86  """This is a dictionary of key value pairs, where the keys are strings
87  corresponding to qualified fields on a config to override, and the values
88  are strings representing the values to apply.
89  """
90 
91  def to_primitives(self) -> dict:
92  """Convert to a representation used in yaml serialization
93  """
94  accumulate = {}
95  for name in ("python", "dataId", "file"):
96  # if this attribute is thruthy add it to the accumulation dictionary
97  if getattr(self, name):
98  accumulate[name] = getattr(self, name)
99  # Add the dictionary containing the rest of the config keys to the
100  # # accumulated dictionary
101  accumulate.update(self.rest)
102  return accumulate
103 
104  def maybe_merge(self, other_config: "ConfigIR") -> Generator["ConfigIR", None, None]:
105  """Merges another instance of a `ConfigIR` into this instance if
106  possible. This function returns a generator that is either self
107  if the configs were merged, or self, and other_config if that could
108  not be merged.
109 
110  Parameters
111  ----------
112  other_config : `ConfigIR`
113  An instance of `ConfigIR` to merge into this instance.
114 
115  Returns
116  -------
117  Generator : `ConfigIR`
118  A generator containing either self, or self and other_config if
119  the configs could be merged or not respectively.
120  """
121  # Verify that the config blocks can be merged
122  if self.dataId != other_config.dataId or self.python or other_config.python or\
123  self.file or other_config.file:
124  yield from (self, other_config)
125  return
126 
127  # create a set of all keys, and verify two keys do not have different
128  # values
129  key_union = self.rest.keys() & other_config.rest.keys()
130  for key in key_union:
131  if self.rest[key] != other_config.rest[key]:
132  yield from (self, other_config)
133  return
134  self.rest.update(other_config.rest)
135 
136  # Combine the lists of override files to load
137  self_file_set = set(self.file)
138  other_file_set = set(other_config.file)
139  self.file = list(self_file_set.union(other_file_set))
140 
141  yield self
142 
143  def __eq__(self, other: "ConfigIR"):
144  if not isinstance(other, ConfigIR):
145  return False
146  elif all(getattr(self, attr) == getattr(other, attr) for attr in
147  ("python", "dataId", "file", "rest")):
148  return True
149  else:
150  return False
151 
152 
153 @dataclass
154 class TaskIR:
155  """Intermediate representation of tasks read from a pipeline yaml file.
156  """
157  label: str
158  """An identifier used to refer to a task.
159  """
160  klass: str
161  """A string containing a fully qualified python class to be run in a
162  pipeline.
163  """
164  config: Union[List[ConfigIR], None] = None
165  """List of all configs overrides associated with this task, and may be
166  `None` if there are no config overrides.
167  """
168 
169  def to_primitives(self) -> dict:
170  """Convert to a representation used in yaml serialization
171  """
172  accumulate = {'class': self.klass}
173  if self.config:
174  accumulate['config'] = [c.to_primitives() for c in self.config]
175  return accumulate
176 
177  def add_or_update_config(self, other_config: ConfigIR):
178  """Adds a `ConfigIR` to this task if one is not present. Merges configs
179  if there is a `ConfigIR` present and the dataId keys of both configs
180  match, otherwise adds a new entry to the config list. The exception to
181  the above is that if either the last config or other_config has a python
182  block, then other_config is always added, as python blocks can modify
183  configs in ways that cannot be predicted.
184 
185  Parameters
186  ----------
187  other_config : `ConfigIR`
188  A `ConfigIR` instance to add or merge into the config attribute of
189  this task.
190  """
191  if not self.config:
192  self.config = [other_config]
193  return
194  self.config.extend(self.config.pop().maybe_merge(other_config))
195 
196  def __eq__(self, other: "TaskIR"):
197  if not isinstance(other, TaskIR):
198  return False
199  elif all(getattr(self, attr) == getattr(other, attr) for attr in
200  ("label", "klass", "config")):
201  return True
202  else:
203  return False
204 
205 
206 @dataclass
207 class InheritIR:
208  """An intermediate representation of inherited pipelines
209  """
210  location: str
211  """This is the location of the pipeline to inherit. The path should be
212  specified as an absolute path. Environment variables may be used in the path
213  and should be specified as a python string template, with the name of the
214  environment variable inside braces.
215  """
216  include: Union[List[str], None] = None
217  """List of tasks that should be included when inheriting this pipeline.
218  Either the include or exclude attributes may be specified, but not both.
219  """
220  exclude: Union[List[str], None] = None
221  """List of tasks that should be excluded when inheriting this pipeline.
222  Either the include or exclude attributes may be specified, but not both.
223  """
224  importContracts: bool = True
225  """Boolean attribute to dictate if contracts should be inherited with the
226  pipeline or not.
227  """
228 
229  def toPipelineIR(self) -> "PipelineIR":
230  """Convert to a representation used in yaml serialization
231  """
232  if self.include and self.exclude:
233  raise ValueError("Both an include and an exclude list cant be specified"
234  " when declaring a pipeline import")
235  tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
236  if tmp_pipeline.instrument is not None:
237  warnings.warn("Any instrument definitions in imported pipelines are ignored. "
238  "if an instrument is desired please define it in the top most pipeline")
239 
240  new_tasks = {}
241  for label, task in tmp_pipeline.tasks.items():
242  if (self.include and label in self.include) or (self.exclude and label not in self.exclude)\
243  or (self.include is None and self.exclude is None):
244  new_tasks[label] = task
245  tmp_pipeline.tasks = new_tasks
246 
247  if not self.importContracts:
248  tmp_pipeline.contracts = []
249 
250  return tmp_pipeline
251 
252  def __eq__(self, other: "InheritIR"):
253  if not isinstance(other, InheritIR):
254  return False
255  elif all(getattr(self, attr) == getattr(other, attr) for attr in
256  ("location", "include", "exclude", "importContracts")):
257  return True
258  else:
259  return False
260 
261 
263  """Intermediate representation of a pipeline definition
264 
265  Parameters
266  ----------
267  loaded_yaml : `dict`
268  A dictionary which matches the structure that would be produced by a
269  yaml reader which parses a pipeline definition document
270 
271  Raises
272  ------
273  ValueError :
274  - If a pipeline is declared without a description
275  - If no tasks are declared in a pipeline, and no pipelines are to be
276  inherited
277  - If more than one instrument is specified
278  - If more than one inherited pipeline share a label
279  """
280  def __init__(self, loaded_yaml):
281  # Check required fields are present
282  if "description" not in loaded_yaml:
283  raise ValueError("A pipeline must be declared with a description")
284  if "tasks" not in loaded_yaml and "inherits" not in loaded_yaml:
285  raise ValueError("A pipeline must be declared with one or more tasks")
286 
287  # Process pipeline description
288  self.description = loaded_yaml.pop("description")
289 
290  # Process tasks
291  self._read_tasks(loaded_yaml)
292 
293  # Process instrument keys
294  inst = loaded_yaml.pop("instrument", None)
295  if isinstance(inst, list):
296  raise ValueError("Only one top level instrument can be defined in a pipeline")
297  self.instrument = inst
298 
299  # Process any contracts
300  self._read_contracts(loaded_yaml)
301 
302  # Process any inherited pipelines
303  self._read_inherits(loaded_yaml)
304 
305  def _read_contracts(self, loaded_yaml):
306  """Process the contracts portion of the loaded yaml document
307 
308  Parameters
309  ---------
310  loaded_yaml : `dict`
311  A dictionary which matches the structure that would be produced by a
312  yaml reader which parses a pipeline definition document
313  """
314  loaded_contracts = loaded_yaml.pop("contracts", [])
315  if isinstance(loaded_contracts, str):
316  loaded_contracts = [loaded_contracts]
317  self.contracts = []
318  for contract in loaded_contracts:
319  if isinstance(contract, dict):
320  self.contracts.append(ContractIR(**contract))
321  if isinstance(contract, str):
322  self.contracts.append(ContractIR(contract=contract))
323 
324  def _read_inherits(self, loaded_yaml):
325  """Process the inherits portion of the loaded yaml document
326 
327  Parameters
328  ---------
329  loaded_yaml : `dict`
330  A dictionary which matches the structure that would be produced by a
331  yaml reader which parses a pipeline definition document
332  """
333  def process_args(argument: Union[str, dict]) -> dict:
334  if isinstance(argument, str):
335  return {"location": argument}
336  elif isinstance(argument, dict):
337  if "exclude" in argument and isinstance(argument["exclude"], str):
338  argument["exclude"] = [argument["exclude"]]
339  if "include" in argument and isinstance(argument["include"], str):
340  argument["include"] = [argument["include"]]
341  return argument
342  tmp_inherit = loaded_yaml.pop("inherits", None)
343  if tmp_inherit is None:
344  self.inherits = []
345  elif isinstance(tmp_inherit, list):
346  self.inherits = [InheritIR(**process_args(args)) for args in tmp_inherit]
347  else:
348  self.inherits = [InheritIR(**process_args(tmp_inherit))]
349 
350  # integrate any imported pipelines
351  accumulate_tasks = {}
352  for other_pipeline in self.inherits:
353  tmp_IR = other_pipeline.toPipelineIR()
354  if accumulate_tasks.keys() & tmp_IR.tasks.keys():
355  raise ValueError("Task labels in the imported pipelines must "
356  "be unique")
357  accumulate_tasks.update(tmp_IR.tasks)
358  self.contracts.extend(tmp_IR.contracts)
359  accumulate_tasks.update(self.tasks)
360  self.tasks = accumulate_tasks
361 
362  def _read_tasks(self, loaded_yaml):
363  """Process the tasks portion of the loaded yaml document
364 
365  Parameters
366  ---------
367  loaded_yaml : `dict`
368  A dictionary which matches the structure that would be produced by a
369  yaml reader which parses a pipeline definition document
370  """
371  self.tasks = {}
372  tmp_tasks = loaded_yaml.pop("tasks", None)
373  if tmp_tasks is None:
374  tmp_tasks = {}
375 
376  for label, definition in tmp_tasks.items():
377  if isinstance(definition, str):
378  definition = {"class": definition}
379  config = definition.get('config', None)
380  if config is None:
381  task_config_ir = None
382  else:
383  if isinstance(config, dict):
384  config = [config]
385  task_config_ir = []
386  for c in config:
387  file = c.pop("file", None)
388  if file is None:
389  file = []
390  elif not isinstance(file, list):
391  file = [file]
392  task_config_ir.append(ConfigIR(python=c.pop("python", None),
393  dataId=c.pop("dataId", None),
394  file=file,
395  rest=c))
396  self.tasks[label] = TaskIR(label, definition["class"], task_config_ir)
397 
398  @classmethod
399  def from_string(cls, pipeline_string: str):
400  """Create a `PipelineIR` object from a string formatted like a pipeline
401  document
402 
403  Parameters
404  ----------
405  pipeline_string : `str`
406  A string that is formatted according like a pipeline document
407  """
408  loaded_yaml = yaml.safe_load(pipeline_string)
409  return cls(loaded_yaml)
410 
411  @classmethod
412  def from_file(cls, filename: str):
413  """Create a `PipelineIR` object from the document specified by the
414  input path.
415 
416  Parameters
417  ----------
418  filename : `str`
419  Location of document to use in creating a `PipelineIR` object.
420  """
421  with open(filename, 'r') as f:
422  loaded_yaml = yaml.safe_load(f)
423  return cls(loaded_yaml)
424 
425  def to_file(self, filename: str):
426  """Serialize this `PipelineIR` object into a yaml formatted string and
427  write the output to a file at the specified path.
428 
429  Parameters
430  ----------
431  filename : `str`
432  Location of document to write a `PipelineIR` object.
433  """
434  with open(filename, 'w') as f:
435  yaml.dump(self.to_primitives(), f, sort_keys=False)
436 
437  def to_primitives(self):
438  """Convert to a representation used in yaml serialization
439  """
440  accumulate = {"description": self.description}
441  if self.instrument is not None:
442  accumulate['instrument'] = self.instrument
443  accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()}
444  if len(self.contracts) > 0:
445  accumulate['contracts'] = [c.to_primitives() for c in self.contracts]
446  return accumulate
447 
448  def __str__(self) -> str:
449  """Instance formatting as how it would look in yaml representation
450  """
451  return yaml.dump(self.to_primitives(), sort_keys=False)
452 
453  def __repr__(self) -> str:
454  """Instance formatting as how it would look in yaml representation
455  """
456  return str(self)
457 
458  def __eq__(self, other: "PipelineIR"):
459  if not isinstance(other, PipelineIR):
460  return False
461  elif all(getattr(self, attr) == getattr(other, attr) for attr in
462  ("contracts", "tasks", "instrument")):
463  return True
464  else:
465  return False
lsst.pipe.base.pipelineIR.PipelineIR._read_contracts
def _read_contracts(self, loaded_yaml)
Definition: pipelineIR.py:305
lsst.pipe.base.pipelineIR.PipelineIR.__eq__
def __eq__(self, "PipelineIR" other)
Definition: pipelineIR.py:458
lsst.pipe.base.pipelineIR.TaskIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:169
lsst.pipe.base.pipelineIR.PipelineIR.tasks
tasks
Definition: pipelineIR.py:360
field
lsst.pipe.base.pipelineIR.PipelineIR.inherits
inherits
Definition: pipelineIR.py:344
lsst.pipe.base.pipelineIR.ContractError
Definition: pipelineIR.py:30
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.pipe.base.pipelineIR.ConfigIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:91
astshim.keyMap.keyMapContinued.keys
def keys(self)
Definition: keyMapContinued.py:6
lsst.pipe.base.pipelineIR.PipelineIR.__str__
str __str__(self)
Definition: pipelineIR.py:448
lsst.pipe.base.pipelineIR.PipelineIR
Definition: pipelineIR.py:262
lsst.pipe.base.pipelineIR.PipelineIR.__repr__
str __repr__(self)
Definition: pipelineIR.py:453
lsst::afw::geom.transform.transformContinued.cls
cls
Definition: transformContinued.py:33
lsst::geom::all
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
Definition: CoordinateExpr.h:81
lsst.pipe.base.pipelineIR.ContractIR.msg
msg
Definition: pipelineIR.py:60
lsst.pipe.base.pipelineIR.PipelineIR.from_file
def from_file(cls, str filename)
Definition: pipelineIR.py:412
lsst.pipe.base.pipelineIR.TaskIR
Definition: pipelineIR.py:154
lsst.pipe.base.pipelineIR.ContractIR.__eq__
def __eq__(self, "ContractIR" other)
Definition: pipelineIR.py:57
lsst.pipe.base.pipelineIR.PipelineIR._read_tasks
def _read_tasks(self, loaded_yaml)
Definition: pipelineIR.py:362
lsst.pipe.base.pipelineIR.TaskIR.add_or_update_config
def add_or_update_config(self, ConfigIR other_config)
Definition: pipelineIR.py:177
lsst.pipe.base.pipelineIR.ContractIR.contract
contract
Definition: pipelineIR.py:60
lsst.pipe.base.pipelineIR.ConfigIR.maybe_merge
Generator["ConfigIR", None, None] maybe_merge(self, "ConfigIR" other_config)
Definition: pipelineIR.py:104
lsst.pipe.base.pipelineIR.ConfigIR.file
file
Definition: pipelineIR.py:139
lsst.pipe.base.pipelineIR.ContractIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:49
lsst.pipe.base.pipelineIR.PipelineIR.contracts
contracts
Definition: pipelineIR.py:317
lsst.pipe.base.pipelineIR.PipelineIR.description
description
Definition: pipelineIR.py:288
lsst.pipe.base.pipelineIR.InheritIR.__eq__
def __eq__(self, "InheritIR" other)
Definition: pipelineIR.py:252
lsst.pipe.base.pipelineIR.PipelineIR.instrument
instrument
Definition: pipelineIR.py:297
lsst.pipe.base.pipelineIR.ConfigIR
Definition: pipelineIR.py:67
lsst.pipe.base.pipelineIR.PipelineIR.from_string
def from_string(cls, str pipeline_string)
Definition: pipelineIR.py:399
lsst.pipe.base.pipelineIR.PipelineIR.__init__
def __init__(self, loaded_yaml)
Definition: pipelineIR.py:280
items
std::vector< SchemaItem< Flag > > * items
Definition: BaseColumnView.cc:142
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.base.pipelineIR.PipelineIR.to_file
def to_file(self, str filename)
Definition: pipelineIR.py:425
lsst.pipe.base.pipelineIR.PipelineIR.to_primitives
def to_primitives(self)
Definition: pipelineIR.py:437
lsst.pipe.base.pipelineIR.InheritIR.toPipelineIR
"PipelineIR" toPipelineIR(self)
Definition: pipelineIR.py:229
lsst.pipe.base.pipelineIR.ContractIR
Definition: pipelineIR.py:37
lsst.pipe.base.pipelineIR.TaskIR.config
config
Definition: pipelineIR.py:192
lsst.pipe.base.pipelineIR.PipelineIR._read_inherits
def _read_inherits(self, loaded_yaml)
Definition: pipelineIR.py:324
lsst.pipe.base.pipelineIR.InheritIR
Definition: pipelineIR.py:207
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.pipe.base.pipelineIR.TaskIR.__eq__
def __eq__(self, "TaskIR" other)
Definition: pipelineIR.py:196
lsst.pipe.base.pipelineIR.ConfigIR.__eq__
def __eq__(self, "ConfigIR" other)
Definition: pipelineIR.py:143