21 from __future__
import annotations
23 __all__ = (
"ConfigIR",
"ContractError",
"ContractIR",
"InheritIR",
"PipelineIR",
"TaskIR",
"LabeledSubset")
25 from collections
import Counter
26 from collections.abc
import Iterable
as abcIterable
27 from dataclasses
import dataclass, field
28 from typing
import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict
38 """This is a specialized version of yaml's SafeLoader. It checks and raises
39 an exception if it finds that there are multiple instances of the same key
40 found inside a pipeline file at a given scope.
51 all_keys = Counter(key_node.value
for key_node, _
in node.value)
52 duplicates = {k
for k, i
in all_keys.items()
if i != 1}
54 raise KeyError(
"Pipeline files must not have duplicated keys, "
55 f
"{duplicates} appeared multiple times")
60 """An exception that is raised when a pipeline contract is not satisfied
67 """Intermediate representation of contracts read from a pipeline yaml file.
70 """A string of python code representing one or more conditions on configs
71 in a pipeline. This code-as-string should, once evaluated, should be True
72 if the configs are fine, and False otherwise.
74 msg: Union[str,
None] =
None
75 """An optional message to be shown to the user if a contract fails
79 """Convert to a representation used in yaml serialization
81 accumulate = {
"contract": self.
contract}
82 if self.
msg is not None:
83 accumulate[
'msg'] = self.
msg
86 def __eq__(self, other:
"ContractIR"):
87 if not isinstance(other, ContractIR):
89 elif self.
contract == other.contract
and self.
msg == other.msg:
97 """Intermediate representation of named subset of task labels read from
101 """The label used to identify the subset of task labels.
104 """A set of task labels contained in this subset.
106 description: Optional[str]
107 """A description of what this subset of tasks is intended to do
112 """Generate `LabeledSubset` objects given a properly formatted object
113 that as been created by a yaml loader.
118 The label that will be used to identify this labeled subset.
119 value : `list` of `str` or `dict`
120 Object returned from loading a labeled subset section from a yaml
125 labeledSubset : `LabeledSubset`
126 A `LabeledSubset` object build from the inputs.
131 Raised if the value input is not properly formatted for parsing
133 if isinstance(value, MutableMapping):
134 subset = value.pop(
"subset",
None)
136 raise ValueError(
"If a labeled subset is specified as a mapping, it must contain the key "
138 description = value.pop(
"description",
None)
139 elif isinstance(value, abcIterable):
143 raise ValueError(f
"There was a problem parsing the labeled subset {label}, make sure the "
144 "definition is either a valid yaml list, or a mapping with keys "
145 "(subset, description) where subset points to a yaml list, and description is "
146 "associated with a string")
150 """Convert to a representation used in yaml serialization
152 accumulate: Dict[str, Any] = {
"subset":
list(self.subset)}
153 if self.description
is not None:
154 accumulate[
"description"] = self.description
160 """Intermediate representation of configurations read from a pipeline yaml
163 python: Union[str,
None] =
None
164 """A string of python code that is used to modify a configuration. This can
165 also be None if there are no modifications to do.
167 dataId: Union[dict,
None] =
None
168 """A dataId that is used to constrain these config overrides to only quanta
169 with matching dataIds. This field can be None if there is no constraint.
170 This is currently an unimplemented feature, and is placed here for future
173 file: List[str] =
field(default_factory=list)
174 """A list of paths which points to a file containing config overrides to be
175 applied. This value may be an empty list if there are no overrides to
178 rest: dict =
field(default_factory=dict)
179 """This is a dictionary of key value pairs, where the keys are strings
180 corresponding to qualified fields on a config to override, and the values
181 are strings representing the values to apply.
185 """Convert to a representation used in yaml serialization
188 for name
in (
"python",
"dataId",
"file"):
191 if getattr(self, name):
192 accumulate[name] = getattr(self, name)
195 accumulate.update(self.rest)
198 def maybe_merge(self, other_config:
"ConfigIR") -> Generator[
"ConfigIR",
None,
None]:
199 """Merges another instance of a `ConfigIR` into this instance if
200 possible. This function returns a generator that is either self
201 if the configs were merged, or self, and other_config if that could
206 other_config : `ConfigIR`
207 An instance of `ConfigIR` to merge into this instance.
211 Generator : `ConfigIR`
212 A generator containing either self, or self and other_config if
213 the configs could be merged or not respectively.
216 if self.dataId != other_config.dataId
or self.python
or other_config.python
or\
217 self.
file or other_config.file:
218 yield from (self, other_config)
223 key_union = self.rest.
keys() & other_config.rest.keys()
224 for key
in key_union:
225 if self.rest[key] != other_config.rest[key]:
226 yield from (self, other_config)
228 self.rest.update(other_config.rest)
232 other_file_set =
set(other_config.file)
233 self.
file =
list(self_file_set.union(other_file_set))
238 if not isinstance(other, ConfigIR):
240 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
241 (
"python",
"dataId",
"file",
"rest")):
249 """Intermediate representation of tasks read from a pipeline yaml file.
252 """An identifier used to refer to a task.
255 """A string containing a fully qualified python class to be run in a
258 config: Union[List[ConfigIR],
None] =
None
259 """List of all configs overrides associated with this task, and may be
260 `None` if there are no config overrides.
264 """Convert to a representation used in yaml serialization
266 accumulate = {
'class': self.klass}
268 accumulate[
'config'] = [c.to_primitives()
for c
in self.
config]
272 """Adds a `ConfigIR` to this task if one is not present. Merges configs
273 if there is a `ConfigIR` present and the dataId keys of both configs
274 match, otherwise adds a new entry to the config list. The exception to
275 the above is that if either the last config or other_config has a
276 python block, then other_config is always added, as python blocks can
277 modify configs in ways that cannot be predicted.
281 other_config : `ConfigIR`
282 A `ConfigIR` instance to add or merge into the config attribute of
288 self.
config.extend(self.
config.pop().maybe_merge(other_config))
291 if not isinstance(other, TaskIR):
293 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
294 (
"label",
"klass",
"config")):
302 """An intermediate representation of inherited pipelines
305 """This is the location of the pipeline to inherit. The path should be
306 specified as an absolute path. Environment variables may be used in the
307 path and should be specified as a python string template, with the name of
308 the environment variable inside braces.
310 include: Union[List[str],
None] =
None
311 """List of tasks that should be included when inheriting this pipeline.
312 Either the include or exclude attributes may be specified, but not both.
314 exclude: Union[List[str],
None] =
None
315 """List of tasks that should be excluded when inheriting this pipeline.
316 Either the include or exclude attributes may be specified, but not both.
318 importContracts: bool =
True
319 """Boolean attribute to dictate if contracts should be inherited with the
324 """Convert to a representation used in yaml serialization
326 if self.include
and self.exclude:
327 raise ValueError(
"Both an include and an exclude list cant be specified"
328 " when declaring a pipeline import")
329 tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
330 if tmp_pipeline.instrument
is not None:
331 warnings.warn(
"Any instrument definitions in imported pipelines are ignored. "
332 "if an instrument is desired please define it in the top most pipeline")
334 included_labels =
set()
335 for label
in tmp_pipeline.tasks:
336 if (self.include
and label
in self.include)
or (self.exclude
and label
not in self.exclude)\
337 or (self.include
is None and self.exclude
is None):
338 included_labels.add(label)
342 if self.include
is not None:
343 subsets_in_include = tmp_pipeline.labeled_subsets.keys() & self.include
344 for label
in subsets_in_include:
345 included_labels.update(tmp_pipeline.labeled_subsets[label].subset)
347 elif self.exclude
is not None:
348 subsets_in_exclude = tmp_pipeline.labeled_subsets.keys() & self.exclude
349 for label
in subsets_in_exclude:
350 included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)
352 tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels)
354 if not self.importContracts:
355 tmp_pipeline.contracts = []
360 if not isinstance(other, InheritIR):
362 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
363 (
"location",
"include",
"exclude",
"importContracts")):
370 """Intermediate representation of a pipeline definition
375 A dictionary which matches the structure that would be produced by a
376 yaml reader which parses a pipeline definition document
381 - If a pipeline is declared without a description
382 - If no tasks are declared in a pipeline, and no pipelines are to be
384 - If more than one instrument is specified
385 - If more than one inherited pipeline share a label
389 if "description" not in loaded_yaml:
390 raise ValueError(
"A pipeline must be declared with a description")
391 if "tasks" not in loaded_yaml
and "inherits" not in loaded_yaml:
392 raise ValueError(
"A pipeline must be declared with one or more tasks")
403 inst = loaded_yaml.pop(
"instrument",
None)
404 if isinstance(inst, list):
405 raise ValueError(
"Only one top level instrument can be defined in a pipeline")
420 def _read_contracts(self, loaded_yaml):
421 """Process the contracts portion of the loaded yaml document
426 A dictionary which matches the structure that would be produced by
427 a yaml reader which parses a pipeline definition document
429 loaded_contracts = loaded_yaml.pop(
"contracts", [])
430 if isinstance(loaded_contracts, str):
431 loaded_contracts = [loaded_contracts]
433 for contract
in loaded_contracts:
434 if isinstance(contract, dict):
436 if isinstance(contract, str):
439 def _read_labeled_subsets(self, loaded_yaml: dict):
440 """Process the subsets portion of the loaded yaml document
444 loaded_yaml: `MutableMapping`
445 A dictionary which matches the structure that would be produced
446 by a yaml reader which parses a pipeline definition document
448 loaded_subsets = loaded_yaml.pop(
"subsets", {})
450 if not loaded_subsets
and "subset" in loaded_yaml:
451 raise ValueError(
"Top level key should be subsets and not subset, add an s")
452 for key, value
in loaded_subsets.items():
455 def _verify_labeled_subsets(self):
456 """Verifies that all the labels in each named subset exist within the
462 if not labeled_subset.subset.issubset(self.
tasks.
keys()):
463 raise ValueError(f
"Labels {labeled_subset.subset - self.tasks.keys()} were not found in the "
467 if label_intersection:
468 raise ValueError(f
"Labeled subsets can not use the same label as a task: {label_intersection}")
470 def _read_inherits(self, loaded_yaml):
471 """Process the inherits portion of the loaded yaml document
476 A dictionary which matches the structure that would be produced by
477 a yaml reader which parses a pipeline definition document
479 def process_args(argument: Union[str, dict]) -> dict:
480 if isinstance(argument, str):
481 return {
"location": argument}
482 elif isinstance(argument, dict):
483 if "exclude" in argument
and isinstance(argument[
"exclude"], str):
484 argument[
"exclude"] = [argument[
"exclude"]]
485 if "include" in argument
and isinstance(argument[
"include"], str):
486 argument[
"include"] = [argument[
"include"]]
488 tmp_inherit = loaded_yaml.pop(
"inherits",
None)
489 if tmp_inherit
is None:
491 elif isinstance(tmp_inherit, list):
497 accumulate_tasks = {}
498 accumulate_labeled_subsets = {}
499 for other_pipeline
in self.
inherits:
500 tmp_IR = other_pipeline.toPipelineIR()
501 if accumulate_tasks.keys() & tmp_IR.tasks.keys():
502 raise ValueError(
"Task labels in the imported pipelines must "
504 accumulate_tasks.update(tmp_IR.tasks)
508 overlapping_subsets = accumulate_labeled_subsets.keys() & tmp_IR.labeled_subsets.keys()
509 task_subset_overlap = ((accumulate_labeled_subsets.keys() | tmp_IR.labeled_subsets.keys())
510 & accumulate_tasks.keys())
511 if overlapping_subsets
or task_subset_overlap:
512 raise ValueError(
"Labeled subset names must be unique amongst imports in both labels and "
513 f
" named Subsets. Duplicate: {overlapping_subsets | task_subset_overlap}")
514 accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
518 if accumulate_labeled_subsets.keys() & self.
tasks.
keys():
519 raise ValueError(
"Labeled subset names must be unique amongst imports in both labels and "
529 if label
not in accumulate_tasks:
530 accumulate_tasks[label] = task
531 elif accumulate_tasks[label].klass == task.klass:
532 if task.config
is not None:
533 for config
in task.config:
534 accumulate_tasks[label].add_or_update_config(config)
536 accumulate_tasks[label] = task
539 def _read_tasks(self, loaded_yaml):
540 """Process the tasks portion of the loaded yaml document
545 A dictionary which matches the structure that would be produced by
546 a yaml reader which parses a pipeline definition document
549 tmp_tasks = loaded_yaml.pop(
"tasks",
None)
550 if tmp_tasks
is None:
553 for label, definition
in tmp_tasks.items():
554 if isinstance(definition, str):
555 definition = {
"class": definition}
556 config = definition.get(
'config',
None)
558 task_config_ir =
None
560 if isinstance(config, dict):
564 file = c.pop(
"file",
None)
567 elif not isinstance(file, list):
569 task_config_ir.append(
ConfigIR(python=c.pop(
"python",
None),
570 dataId=c.pop(
"dataId",
None),
573 self.
tasks[label] =
TaskIR(label, definition[
"class"], task_config_ir)
575 def _remove_contracts(self, label: str):
576 """Remove any contracts that contain the given label
578 String comparison used in this way is not the most elegant and may
579 have issues, but it is the only feasible way when users can specify
580 contracts with generic strings.
586 if re.match(f
".*([^A-Za-z0-9_]|^){label}[.]", contract.contract):
588 new_contracts.append(contract)
592 """Subset a pipelineIR to contain only labels specified in
597 labelSpecifier : `set` of `str`
598 Set containing labels that describes how to subset a pipeline.
602 pipeline : `PipelineIR`
603 A new pipelineIR object that is a subset of the old pipelineIR
608 Raised if there is an issue with specified labels
612 This method attempts to prune any contracts that contain labels which
613 are not in the declared subset of labels. This pruning is done using a
614 string based matching due to the nature of contracts and may prune more
615 than it should. Any labeled subsets defined that no longer have all
616 members of the subset present in the pipeline will be removed from the
620 pipeline = copy.deepcopy(self)
625 for label
in labelSpecifier:
626 if label
in pipeline.labeled_subsets:
628 toAdd.update(pipeline.labeled_subsets[label].subset)
629 labelSpecifier.difference_update(toRemove)
630 labelSpecifier.update(toAdd)
632 if not labelSpecifier.issubset(pipeline.tasks.keys()
633 | pipeline.labeled_subsets):
634 difference = labelSpecifier.difference(pipeline.tasks.keys())
635 raise ValueError(
"Not all supplied labels (specified or named subsets) are in the pipeline "
636 f
"definition, extra labels: {difference}")
638 pipeline_labels =
set(pipeline.tasks.keys())
642 for label
in pipeline_labels:
643 if label
not in labelSpecifier:
644 pipeline.tasks.pop(label)
645 pipeline._remove_contracts(label)
648 labeled_subsets = copy.copy(pipeline.labeled_subsets)
650 for label, labeled_subset
in labeled_subsets.items():
651 if labeled_subset.subset - pipeline.tasks.keys():
652 pipeline.labeled_subsets.pop(label)
658 """Create a `PipelineIR` object from a string formatted like a pipeline
663 pipeline_string : `str`
664 A string that is formatted according like a pipeline document
666 loaded_yaml = yaml.load(pipeline_string, Loader=PipelineYamlLoader)
667 return cls(loaded_yaml)
671 """Create a `PipelineIR` object from the document specified by the
677 Location of document to use in creating a `PipelineIR` object.
679 with open(filename,
'r')
as f:
680 loaded_yaml = yaml.load(f, Loader=PipelineYamlLoader)
681 return cls(loaded_yaml)
684 """Serialize this `PipelineIR` object into a yaml formatted string and
685 write the output to a file at the specified path.
690 Location of document to write a `PipelineIR` object.
692 with open(filename,
'w')
as f:
696 """Convert to a representation used in yaml serialization
701 accumulate[
'tasks'] = {m: t.to_primitives()
for m, t
in self.
tasks.
items()}
703 accumulate[
'contracts'] = [c.to_primitives()
for c
in self.
contracts]
709 """Instance formatting as how it would look in yaml representation
714 """Instance formatting as how it would look in yaml representation
719 if not isinstance(other, PipelineIR):
721 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
722 (
"contracts",
"tasks",
"instrument")):