22 from dataclasses
import dataclass, field
23 from typing
import List, Union, Generator
31 """An exception that is raised when a pipeline contract is not satisfied 38 """Intermediate representation of contracts read from a pipeline yaml file. 41 """A string of python code representing one or more conditions on configs 42 in a pipeline. This code-as-string should, once evaluated, should be True 43 if the configs are fine, and False otherwise. 45 msg: Union[str,
None] =
None 46 """An optional message to be shown to the user if a contract fails 50 """Convert to a representation used in yaml serialization 52 accumulate = {
"contract": self.
contract}
53 if self.
msg is not None:
54 accumulate[
'msg'] = self.
msg 57 def __eq__(self, other:
"ContractIR"):
58 if not isinstance(other, ContractIR):
60 elif self.
contract == other.contract
and self.
msg == other.msg:
68 """Intermediate representation of configurations read from a pipeline yaml 71 python: Union[str,
None] =
None 72 """A string of python code that is used to modify a configuration. This can 73 also be None if there are no modifications to do. 75 dataId: Union[dict,
None] =
None 76 """A dataId that is used to constrain these config overrides to only quanta 77 with matching dataIds. This field can be None if there is no constraint. 78 This is currently an unimplemented feature, and is placed here for future 81 file: List[str] =
field(default_factory=list)
82 """A list of paths which points to a file containing config overrides to be 83 applied. This value may be an empty list if there are no overrides to apply. 85 rest: dict =
field(default_factory=dict)
86 """This is a dictionary of key value pairs, where the keys are strings 87 corresponding to qualified fields on a config to override, and the values 88 are strings representing the values to apply. 92 """Convert to a representation used in yaml serialization 95 for name
in (
"python",
"dataId",
"file"):
97 if getattr(self, name):
98 accumulate[name] = getattr(self, name)
101 accumulate.update(self.rest)
104 def maybe_merge(self, other_config:
"ConfigIR") -> Generator[
"ConfigIR",
None,
None]:
105 """Merges another instance of a `ConfigIR` into this instance if 106 possible. This function returns a generator that is either self 107 if the configs were merged, or self, and other_config if that could 112 other_config : `ConfigIR` 113 An instance of `ConfigIR` to merge into this instance. 117 Generator : `ConfigIR` 118 A generator containing either self, or self and other_config if 119 the configs could be merged or not respectively. 122 if self.dataId != other_config.dataId
or self.python
or other_config.python
or\
123 self.
file or other_config.file:
124 yield from (self, other_config)
129 key_union = self.rest.
keys() & other_config.rest.keys()
130 for key
in key_union:
131 if self.rest[key] != other_config.rest[key]:
132 yield from (self, other_config)
134 self.rest.update(other_config.rest)
138 other_file_set =
set(other_config.file)
139 self.
file =
list(self_file_set.union(other_file_set))
144 if not isinstance(other, ConfigIR):
146 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in 147 (
"python",
"dataId",
"file",
"rest")):
155 """Intermediate representation of tasks read from a pipeline yaml file. 158 """An identifier used to refer to a task. 161 """A string containing a fully qualified python class to be run in a 164 config: Union[List[ConfigIR],
None] =
None 165 """List of all configs overrides associated with this task, and may be 166 `None` if there are no config overrides. 170 """Convert to a representation used in yaml serialization 172 accumulate = {
'class': self.klass}
174 accumulate[
'config'] = [c.to_primitives()
for c
in self.
config]
178 """Adds a `ConfigIR` to this task if one is not present. Merges configs 179 if there is a `ConfigIR` present and the dataId keys of both configs 180 match, otherwise adds a new entry to the config list. The exception to 181 the above is that if either the last config or other_config has a python 182 block, then other_config is always added, as python blocks can modify 183 configs in ways that cannot be predicted. 187 other_config : `ConfigIR` 188 A `ConfigIR` instance to add or merge into the config attribute of 194 self.
config.extend(self.
config.pop().maybe_merge(other_config))
197 if not isinstance(other, TaskIR):
199 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in 200 (
"label",
"klass",
"config")):
208 """An intermediate representation of inherited pipelines 211 """This is the location of the pipeline to inherit. The path should be 212 specified as an absolute path. Environment variables may be used in the path 213 and should be specified as a python string template, with the name of the 214 environment variable inside braces. 216 include: Union[List[str],
None] =
None 217 """List of tasks that should be included when inheriting this pipeline. 218 Either the include or exclude attributes may be specified, but not both. 220 exclude: Union[List[str],
None] =
None 221 """List of tasks that should be excluded when inheriting this pipeline. 222 Either the include or exclude attributes may be specified, but not both. 224 importContracts: bool =
True 225 """Boolean attribute to dictate if contracts should be inherited with the 230 """Convert to a representation used in yaml serialization 232 if self.include
and self.exclude:
233 raise ValueError(
"Both an include and an exclude list cant be specified" 234 " when declaring a pipeline import")
235 tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
236 if tmp_pipeline.instrument
is not None:
237 warnings.warn(
"Any instrument definitions in imported pipelines are ignored. " 238 "if an instrument is desired please define it in the top most pipeline")
241 for label, task
in tmp_pipeline.tasks.items():
242 if (self.include
and label
in self.include)
or (self.exclude
and label
not in self.exclude)\
243 or (self.include
is None and self.exclude
is None):
244 new_tasks[label] = task
245 tmp_pipeline.tasks = new_tasks
247 if not self.importContracts:
248 tmp_pipeline.contracts = []
253 if not isinstance(other, InheritIR):
255 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in 256 (
"location",
"include",
"exclude",
"importContracts")):
263 """Intermediate representation of a pipeline definition 268 A dictionary which matches the structure that would be produced by a 269 yaml reader which parses a pipeline definition document 274 - If a pipeline is declared without a description 275 - If no tasks are declared in a pipeline, and no pipelines are to be 277 - If more than one instrument is specified 278 - If more than one inherited pipeline share a label 282 if "description" not in loaded_yaml:
283 raise ValueError(
"A pipeline must be declared with a description")
284 if "tasks" not in loaded_yaml
and "inherits" not in loaded_yaml:
285 raise ValueError(
"A pipeline must be declared with one or more tasks")
294 inst = loaded_yaml.pop(
"instrument",
None)
295 if isinstance(inst, list):
296 raise ValueError(
"Only one top level instrument can be defined in a pipeline")
305 def _read_contracts(self, loaded_yaml):
306 """Process the contracts portion of the loaded yaml document 311 A dictionary which matches the structure that would be produced by a 312 yaml reader which parses a pipeline definition document 314 loaded_contracts = loaded_yaml.pop(
"contracts", [])
315 if isinstance(loaded_contracts, str):
316 loaded_contracts = [loaded_contracts]
318 for contract
in loaded_contracts:
319 if isinstance(contract, dict):
321 if isinstance(contract, str):
324 def _read_inherits(self, loaded_yaml):
325 """Process the inherits portion of the loaded yaml document 330 A dictionary which matches the structure that would be produced by a 331 yaml reader which parses a pipeline definition document 333 def process_args(argument: Union[str, dict]) -> dict:
334 if isinstance(argument, str):
335 return {
"location": argument}
336 elif isinstance(argument, dict):
337 if "exclude" in argument
and isinstance(argument[
"exclude"], str):
338 argument[
"exclude"] = [argument[
"exclude"]]
339 if "include" in argument
and isinstance(argument[
"include"], str):
340 argument[
"include"] = [argument[
"include"]]
342 tmp_inherit = loaded_yaml.pop(
"inherits",
None)
343 if tmp_inherit
is None:
345 elif isinstance(tmp_inherit, list):
351 accumulate_tasks = {}
352 for other_pipeline
in self.
inherits:
353 tmp_IR = other_pipeline.toPipelineIR()
354 if accumulate_tasks.keys() & tmp_IR.tasks.keys():
355 raise ValueError(
"Task labels in the imported pipelines must " 357 accumulate_tasks.update(tmp_IR.tasks)
359 accumulate_tasks.update(self.
tasks)
362 def _read_tasks(self, loaded_yaml):
363 """Process the tasks portion of the loaded yaml document 368 A dictionary which matches the structure that would be produced by a 369 yaml reader which parses a pipeline definition document 372 tmp_tasks = loaded_yaml.pop(
"tasks",
None)
373 if tmp_tasks
is None:
376 for label, definition
in tmp_tasks.items():
377 if isinstance(definition, str):
378 definition = {
"class": definition}
379 config = definition.get(
'config',
None)
381 task_config_ir =
None 383 if isinstance(config, dict):
387 file = c.pop(
"file",
None)
390 elif not isinstance(file, list):
392 task_config_ir.append(
ConfigIR(python=c.pop(
"python",
None),
393 dataId=c.pop(
"dataId",
None),
396 self.
tasks[label] =
TaskIR(label, definition[
"class"], task_config_ir)
400 """Create a `PipelineIR` object from a string formatted like a pipeline 405 pipeline_string : `str` 406 A string that is formatted according like a pipeline document 408 loaded_yaml = yaml.safe_load(pipeline_string)
409 return cls(loaded_yaml)
413 """Create a `PipelineIR` object from the document specified by the 419 Location of document to use in creating a `PipelineIR` object. 421 with open(filename,
'r') as f: 422 loaded_yaml = yaml.safe_load(f) 423 return cls(loaded_yaml)
426 """Serialize this `PipelineIR` object into a yaml formatted string and 427 write the output to a file at the specified path. 432 Location of document to write a `PipelineIR` object. 434 with open(filename,
'w')
as f:
438 """Convert to a representation used in yaml serialization 443 accumulate[
'tasks'] = {l: t.to_primitives()
for l, t
in self.
tasks.
items()}
445 accumulate[
'contracts'] = [c.to_primitives()
for c
in self.
contracts]
449 """Instance formatting as how it would look in yaml representation 454 """Instance formatting as how it would look in yaml representation 459 if not isinstance(other, PipelineIR):
461 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in 462 (
"contracts",
"tasks",
"instrument")):
std::vector< SchemaItem< Flag > > * items
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
def _read_tasks(self, loaded_yaml)
daf::base::PropertySet * set
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def __init__(self, loaded_yaml)
daf::base::PropertyList * list
def _read_contracts(self, loaded_yaml)
def _read_inherits(self, loaded_yaml)