22 from dataclasses
import dataclass, field
23 from typing
import List, Union, Generator
31 """An exception that is raised when a pipeline contract is not satisfied
38 """Intermediate representation of contracts read from a pipeline yaml file.
41 """A string of python code representing one or more conditions on configs
42 in a pipeline. This code-as-string should, once evaluated, should be True
43 if the configs are fine, and False otherwise.
45 msg: Union[str,
None] =
None
46 """An optional message to be shown to the user if a contract fails
50 """Convert to a representation used in yaml serialization
52 accumulate = {
"contract": self.
contract}
53 if self.
msg is not None:
54 accumulate[
'msg'] = self.
msg
57 def __eq__(self, other:
"ContractIR"):
58 if not isinstance(other, ContractIR):
60 elif self.
contract == other.contract
and self.
msg == other.msg:
68 """Intermediate representation of configurations read from a pipeline yaml
71 python: Union[str,
None] =
None
72 """A string of python code that is used to modify a configuration. This can
73 also be None if there are no modifications to do.
75 dataId: Union[dict,
None] =
None
76 """A dataId that is used to constrain these config overrides to only quanta
77 with matching dataIds. This field can be None if there is no constraint.
78 This is currently an unimplemented feature, and is placed here for future
81 file: List[str] =
field(default_factory=list)
82 """A list of paths which points to a file containing config overrides to be
83 applied. This value may be an empty list if there are no overrides to apply.
85 rest: dict =
field(default_factory=dict)
86 """This is a dictionary of key value pairs, where the keys are strings
87 corresponding to qualified fields on a config to override, and the values
88 are strings representing the values to apply.
92 """Convert to a representation used in yaml serialization
95 for name
in (
"python",
"dataId",
"file"):
97 if getattr(self, name):
98 accumulate[name] = getattr(self, name)
101 accumulate.update(self.rest)
104 def maybe_merge(self, other_config:
"ConfigIR") -> Generator[
"ConfigIR",
None,
None]:
105 """Merges another instance of a `ConfigIR` into this instance if
106 possible. This function returns a generator that is either self
107 if the configs were merged, or self, and other_config if that could
112 other_config : `ConfigIR`
113 An instance of `ConfigIR` to merge into this instance.
117 Generator : `ConfigIR`
118 A generator containing either self, or self and other_config if
119 the configs could be merged or not respectively.
122 if self.dataId != other_config.dataId
or self.python
or other_config.python
or\
123 self.
file or other_config.file:
124 yield from (self, other_config)
129 key_union = self.rest.
keys() & other_config.rest.keys()
130 for key
in key_union:
131 if self.rest[key] != other_config.rest[key]:
132 yield from (self, other_config)
134 self.rest.update(other_config.rest)
138 other_file_set =
set(other_config.file)
139 self.
file =
list(self_file_set.union(other_file_set))
144 if not isinstance(other, ConfigIR):
146 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
147 (
"python",
"dataId",
"file",
"rest")):
155 """Intermediate representation of tasks read from a pipeline yaml file.
158 """An identifier used to refer to a task.
161 """A string containing a fully qualified python class to be run in a
164 config: Union[List[ConfigIR],
None] =
None
165 """List of all configs overrides associated with this task, and may be
166 `None` if there are no config overrides.
170 """Convert to a representation used in yaml serialization
172 accumulate = {
'class': self.klass}
174 accumulate[
'config'] = [c.to_primitives()
for c
in self.
config]
178 """Adds a `ConfigIR` to this task if one is not present. Merges configs
179 if there is a `ConfigIR` present and the dataId keys of both configs
180 match, otherwise adds a new entry to the config list. The exception to
181 the above is that if either the last config or other_config has a python
182 block, then other_config is always added, as python blocks can modify
183 configs in ways that cannot be predicted.
187 other_config : `ConfigIR`
188 A `ConfigIR` instance to add or merge into the config attribute of
194 self.
config.extend(self.
config.pop().maybe_merge(other_config))
197 if not isinstance(other, TaskIR):
199 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
200 (
"label",
"klass",
"config")):
208 """An intermediate representation of inherited pipelines
211 """This is the location of the pipeline to inherit. The path should be
212 specified as an absolute path. Environment variables may be used in the path
213 and should be specified as a python string template, with the name of the
214 environment variable inside braces.
216 include: Union[List[str],
None] =
None
217 """List of tasks that should be included when inheriting this pipeline.
218 Either the include or exclude attributes may be specified, but not both.
220 exclude: Union[List[str],
None] =
None
221 """List of tasks that should be excluded when inheriting this pipeline.
222 Either the include or exclude attributes may be specified, but not both.
224 importContracts: bool =
True
225 """Boolean attribute to dictate if contracts should be inherited with the
230 """Convert to a representation used in yaml serialization
232 if self.include
and self.exclude:
233 raise ValueError(
"Both an include and an exclude list cant be specified"
234 " when declaring a pipeline import")
235 tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
236 if tmp_pipeline.instrument
is not None:
237 warnings.warn(
"Any instrument definitions in imported pipelines are ignored. "
238 "if an instrument is desired please define it in the top most pipeline")
241 for label, task
in tmp_pipeline.tasks.items():
242 if (self.include
and label
in self.include)
or (self.exclude
and label
not in self.exclude)\
243 or (self.include
is None and self.exclude
is None):
244 new_tasks[label] = task
245 tmp_pipeline.tasks = new_tasks
247 if not self.importContracts:
248 tmp_pipeline.contracts = []
253 if not isinstance(other, InheritIR):
255 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
256 (
"location",
"include",
"exclude",
"importContracts")):
263 """Intermediate representation of a pipeline definition
268 A dictionary which matches the structure that would be produced by a
269 yaml reader which parses a pipeline definition document
274 - If a pipeline is declared without a description
275 - If no tasks are declared in a pipeline, and no pipelines are to be
277 - If more than one instrument is specified
278 - If more than one inherited pipeline share a label
282 if "description" not in loaded_yaml:
283 raise ValueError(
"A pipeline must be declared with a description")
284 if "tasks" not in loaded_yaml
and "inherits" not in loaded_yaml:
285 raise ValueError(
"A pipeline must be declared with one or more tasks")
294 inst = loaded_yaml.pop(
"instrument",
None)
295 if isinstance(inst, list):
296 raise ValueError(
"Only one top level instrument can be defined in a pipeline")
305 def _read_contracts(self, loaded_yaml):
306 """Process the contracts portion of the loaded yaml document
311 A dictionary which matches the structure that would be produced by a
312 yaml reader which parses a pipeline definition document
314 loaded_contracts = loaded_yaml.pop(
"contracts", [])
315 if isinstance(loaded_contracts, str):
316 loaded_contracts = [loaded_contracts]
318 for contract
in loaded_contracts:
319 if isinstance(contract, dict):
321 if isinstance(contract, str):
324 def _read_inherits(self, loaded_yaml):
325 """Process the inherits portion of the loaded yaml document
330 A dictionary which matches the structure that would be produced by a
331 yaml reader which parses a pipeline definition document
333 def process_args(argument: Union[str, dict]) -> dict:
334 if isinstance(argument, str):
335 return {
"location": argument}
336 elif isinstance(argument, dict):
337 if "exclude" in argument
and isinstance(argument[
"exclude"], str):
338 argument[
"exclude"] = [argument[
"exclude"]]
339 if "include" in argument
and isinstance(argument[
"include"], str):
340 argument[
"include"] = [argument[
"include"]]
342 tmp_inherit = loaded_yaml.pop(
"inherits",
None)
343 if tmp_inherit
is None:
345 elif isinstance(tmp_inherit, list):
351 accumulate_tasks = {}
352 for other_pipeline
in self.
inherits:
353 tmp_IR = other_pipeline.toPipelineIR()
354 if accumulate_tasks.keys() & tmp_IR.tasks.keys():
355 raise ValueError(
"Task labels in the imported pipelines must "
357 accumulate_tasks.update(tmp_IR.tasks)
359 accumulate_tasks.update(self.
tasks)
362 def _read_tasks(self, loaded_yaml):
363 """Process the tasks portion of the loaded yaml document
368 A dictionary which matches the structure that would be produced by a
369 yaml reader which parses a pipeline definition document
372 tmp_tasks = loaded_yaml.pop(
"tasks",
None)
373 if tmp_tasks
is None:
376 for label, definition
in tmp_tasks.items():
377 if isinstance(definition, str):
378 definition = {
"class": definition}
379 config = definition.get(
'config',
None)
381 task_config_ir =
None
383 if isinstance(config, dict):
387 file = c.pop(
"file",
None)
390 elif not isinstance(file, list):
392 task_config_ir.append(
ConfigIR(python=c.pop(
"python",
None),
393 dataId=c.pop(
"dataId",
None),
396 self.
tasks[label] =
TaskIR(label, definition[
"class"], task_config_ir)
400 """Create a `PipelineIR` object from a string formatted like a pipeline
405 pipeline_string : `str`
406 A string that is formatted according like a pipeline document
408 loaded_yaml = yaml.safe_load(pipeline_string)
409 return cls(loaded_yaml)
413 """Create a `PipelineIR` object from the document specified by the
419 Location of document to use in creating a `PipelineIR` object.
421 with open(filename,
'r')
as f:
422 loaded_yaml = yaml.safe_load(f)
423 return cls(loaded_yaml)
426 """Serialize this `PipelineIR` object into a yaml formatted string and
427 write the output to a file at the specified path.
432 Location of document to write a `PipelineIR` object.
434 with open(filename,
'w')
as f:
438 """Convert to a representation used in yaml serialization
443 accumulate[
'tasks'] = {m: t.to_primitives()
for m, t
in self.
tasks.
items()}
445 accumulate[
'contracts'] = [c.to_primitives()
for c
in self.
contracts]
449 """Instance formatting as how it would look in yaml representation
454 """Instance formatting as how it would look in yaml representation
459 if not isinstance(other, PipelineIR):
461 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
462 (
"contracts",
"tasks",
"instrument")):