22 from dataclasses 
import dataclass, field
 
   23 from typing 
import List, Union, Generator
 
   31     """An exception that is raised when a pipeline contract is not satisfied 
   38     """Intermediate representation of contracts read from a pipeline yaml file. 
   41     """A string of python code representing one or more conditions on configs 
   42     in a pipeline. This code-as-string should, once evaluated, should be True 
   43     if the configs are fine, and False otherwise. 
   45     msg: Union[str, 
None] = 
None 
   46     """An optional message to be shown to the user if a contract fails 
   50         """Convert to a representation used in yaml serialization 
   52         accumulate = {
"contract": self.
contract}
 
   53         if self.
msg is not None:
 
   54             accumulate[
'msg'] = self.
msg 
   57     def __eq__(self, other: 
"ContractIR"):
 
   58         if not isinstance(other, ContractIR):
 
   60         elif self.
contract == other.contract 
and self.
msg == other.msg:
 
   68     """Intermediate representation of configurations read from a pipeline yaml 
   71     python: Union[str, 
None] = 
None 
   72     """A string of python code that is used to modify a configuration. This can 
   73     also be None if there are no modifications to do. 
   75     dataId: Union[dict, 
None] = 
None 
   76     """A dataId that is used to constrain these config overrides to only quanta 
   77     with matching dataIds. This field can be None if there is no constraint. 
   78     This is currently an unimplemented feature, and is placed here for future 
   81     file: List[str] = 
field(default_factory=list)
 
   82     """A list of paths which points to a file containing config overrides to be 
   83     applied. This value may be an empty list if there are no overrides to apply. 
   85     rest: dict = 
field(default_factory=dict)
 
   86     """This is a dictionary of key value pairs, where the keys are strings 
   87     corresponding to qualified fields on a config to override, and the values 
   88     are strings representing the values to apply. 
   92         """Convert to a representation used in yaml serialization 
   95         for name 
in (
"python", 
"dataId", 
"file"):
 
   97             if getattr(self, name):
 
   98                 accumulate[name] = getattr(self, name)
 
  101         accumulate.update(self.rest)
 
  104     def maybe_merge(self, other_config: 
"ConfigIR") -> Generator[
"ConfigIR", 
None, 
None]:
 
  105         """Merges another instance of a `ConfigIR` into this instance if 
  106         possible. This function returns a generator that is either self 
  107         if the configs were merged, or self, and other_config if that could 
  112         other_config : `ConfigIR` 
  113             An instance of `ConfigIR` to merge into this instance. 
  117         Generator : `ConfigIR` 
  118             A generator containing either self, or self and other_config if 
  119             the configs could be merged or not respectively. 
  122         if self.dataId != other_config.dataId 
or self.python 
or other_config.python 
or\
 
  123                 self.
file or other_config.file:
 
  124             yield from (self, other_config)
 
  129         key_union = self.rest.
keys() & other_config.rest.keys()
 
  130         for key 
in key_union:
 
  131             if self.rest[key] != other_config.rest[key]:
 
  132                 yield from (self, other_config)
 
  134         self.rest.update(other_config.rest)
 
  138         other_file_set = 
set(other_config.file)
 
  139         self.
file = 
list(self_file_set.union(other_file_set))
 
  144         if not isinstance(other, ConfigIR):
 
  146         elif all(getattr(self, attr) == getattr(other, attr) 
for attr 
in 
  147                  (
"python", 
"dataId", 
"file", 
"rest")):
 
  155     """Intermediate representation of tasks read from a pipeline yaml file. 
  158     """An identifier used to refer to a task. 
  161     """A string containing a fully qualified python class to be run in a 
  164     config: Union[List[ConfigIR], 
None] = 
None 
  165     """List of all configs overrides associated with this task, and may be 
  166     `None` if there are no config overrides. 
  170         """Convert to a representation used in yaml serialization 
  172         accumulate = {
'class': self.klass}
 
  174             accumulate[
'config'] = [c.to_primitives() 
for c 
in self.
config]
 
  178         """Adds a `ConfigIR` to this task if one is not present. Merges configs 
  179         if there is a `ConfigIR` present and the dataId keys of both configs 
  180         match, otherwise adds a new entry to the config list. The exception to 
  181         the above is that if either the last config or other_config has a python 
  182         block, then other_config is always added, as python blocks can modify 
  183         configs in ways that cannot be predicted. 
  187         other_config : `ConfigIR` 
  188             A `ConfigIR` instance to add or merge into the config attribute of 
  194         self.
config.extend(self.
config.pop().maybe_merge(other_config))
 
  197         if not isinstance(other, TaskIR):
 
  199         elif all(getattr(self, attr) == getattr(other, attr) 
for attr 
in 
  200                  (
"label", 
"klass", 
"config")):
 
  208     """An intermediate representation of inherited pipelines 
  211     """This is the location of the pipeline to inherit. The path should be 
  212     specified as an absolute path. Environment variables may be used in the path 
  213     and should be specified as a python string template, with the name of the 
  214     environment variable inside braces. 
  216     include: Union[List[str], 
None] = 
None 
  217     """List of tasks that should be included when inheriting this pipeline. 
  218     Either the include or exclude attributes may be specified, but not both. 
  220     exclude: Union[List[str], 
None] = 
None 
  221     """List of tasks that should be excluded when inheriting this pipeline. 
  222     Either the include or exclude attributes may be specified, but not both. 
  224     importContracts: bool = 
True 
  225     """Boolean attribute to dictate if contracts should be inherited with the 
  230         """Convert to a representation used in yaml serialization 
  232         if self.include 
and self.exclude:
 
  233             raise ValueError(
"Both an include and an exclude list cant be specified" 
  234                              " when declaring a pipeline import")
 
  235         tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
 
  236         if tmp_pipeline.instrument 
is not None:
 
  237             warnings.warn(
"Any instrument definitions in imported pipelines are ignored. " 
  238                           "if an instrument is desired please define it in the top most pipeline")
 
  241         for label, task 
in tmp_pipeline.tasks.items():
 
  242             if (self.include 
and label 
in self.include) 
or (self.exclude 
and label 
not in self.exclude)\
 
  243                     or (self.include 
is None and self.exclude 
is None):
 
  244                 new_tasks[label] = task
 
  245         tmp_pipeline.tasks = new_tasks
 
  247         if not self.importContracts:
 
  248             tmp_pipeline.contracts = []
 
  253         if not isinstance(other, InheritIR):
 
  255         elif all(getattr(self, attr) == getattr(other, attr) 
for attr 
in 
  256                  (
"location", 
"include", 
"exclude", 
"importContracts")):
 
  263     """Intermediate representation of a pipeline definition 
  268         A dictionary which matches the structure that would be produced by a 
  269         yaml reader which parses a pipeline definition document 
  274         - If a pipeline is declared without a description 
  275         - If no tasks are declared in a pipeline, and no pipelines are to be 
  277         - If more than one instrument is specified 
  278         - If more than one inherited pipeline share a label 
  282         if "description" not in loaded_yaml:
 
  283             raise ValueError(
"A pipeline must be declared with a description")
 
  284         if "tasks" not in loaded_yaml 
and "inherits" not in loaded_yaml:
 
  285             raise ValueError(
"A pipeline must be declared with one or more tasks")
 
  294         inst = loaded_yaml.pop(
"instrument", 
None)
 
  295         if isinstance(inst, list):
 
  296             raise ValueError(
"Only one top level instrument can be defined in a pipeline")
 
  305     def _read_contracts(self, loaded_yaml):
 
  306         """Process the contracts portion of the loaded yaml document 
  311             A dictionary which matches the structure that would be produced by a 
  312             yaml reader which parses a pipeline definition document 
  314         loaded_contracts = loaded_yaml.pop(
"contracts", [])
 
  315         if isinstance(loaded_contracts, str):
 
  316             loaded_contracts = [loaded_contracts]
 
  318         for contract 
in loaded_contracts:
 
  319             if isinstance(contract, dict):
 
  321             if isinstance(contract, str):
 
  324     def _read_inherits(self, loaded_yaml):
 
  325         """Process the inherits portion of the loaded yaml document 
  330             A dictionary which matches the structure that would be produced by a 
  331             yaml reader which parses a pipeline definition document 
  333         def process_args(argument: Union[str, dict]) -> dict:
 
  334             if isinstance(argument, str):
 
  335                 return {
"location": argument}
 
  336             elif isinstance(argument, dict):
 
  337                 if "exclude" in argument 
and isinstance(argument[
"exclude"], str):
 
  338                     argument[
"exclude"] = [argument[
"exclude"]]
 
  339                 if "include" in argument 
and isinstance(argument[
"include"], str):
 
  340                     argument[
"include"] = [argument[
"include"]]
 
  342         tmp_inherit = loaded_yaml.pop(
"inherits", 
None)
 
  343         if tmp_inherit 
is None:
 
  345         elif isinstance(tmp_inherit, list):
 
  351         accumulate_tasks = {}
 
  352         for other_pipeline 
in self.
inherits:
 
  353             tmp_IR = other_pipeline.toPipelineIR()
 
  354             if accumulate_tasks.keys() & tmp_IR.tasks.keys():
 
  355                 raise ValueError(
"Task labels in the imported pipelines must " 
  357             accumulate_tasks.update(tmp_IR.tasks)
 
  359         accumulate_tasks.update(self.
tasks)
 
  362     def _read_tasks(self, loaded_yaml):
 
  363         """Process the tasks portion of the loaded yaml document 
  368             A dictionary which matches the structure that would be produced by a 
  369             yaml reader which parses a pipeline definition document 
  372         tmp_tasks = loaded_yaml.pop(
"tasks", 
None)
 
  373         if tmp_tasks 
is None:
 
  376         for label, definition 
in tmp_tasks.items():
 
  377             if isinstance(definition, str):
 
  378                 definition = {
"class": definition}
 
  379             config = definition.get(
'config', 
None)
 
  381                 task_config_ir = 
None 
  383                 if isinstance(config, dict):
 
  387                     file = c.pop(
"file", 
None)
 
  390                     elif not isinstance(file, list):
 
  392                     task_config_ir.append(
ConfigIR(python=c.pop(
"python", 
None),
 
  393                                                    dataId=c.pop(
"dataId", 
None),
 
  396             self.
tasks[label] = 
TaskIR(label, definition[
"class"], task_config_ir)
 
  400         """Create a `PipelineIR` object from a string formatted like a pipeline 
  405         pipeline_string : `str` 
  406             A string that is formatted according like a pipeline document 
  408         loaded_yaml = yaml.safe_load(pipeline_string)
 
  409         return cls(loaded_yaml)
 
  413         """Create a `PipelineIR` object from the document specified by the 
  419             Location of document to use in creating a `PipelineIR` object. 
  421         with open(filename, 
'r') 
as f:
 
  422             loaded_yaml = yaml.safe_load(f)
 
  423         return cls(loaded_yaml)
 
  426         """Serialize this `PipelineIR` object into a yaml formatted string and 
  427         write the output to a file at the specified path. 
  432             Location of document to write a `PipelineIR` object. 
  434         with open(filename, 
'w') 
as f:
 
  438         """Convert to a representation used in yaml serialization 
  443         accumulate[
'tasks'] = {m: t.to_primitives() 
for m, t 
in self.
tasks.
items()}
 
  445             accumulate[
'contracts'] = [c.to_primitives() 
for c 
in self.
contracts]
 
  449         """Instance formatting as how it would look in yaml representation 
  454         """Instance formatting as how it would look in yaml representation 
  459         if not isinstance(other, PipelineIR):
 
  461         elif all(getattr(self, attr) == getattr(other, attr) 
for attr 
in 
  462                  (
"contracts", 
"tasks", 
"instrument")):