21 from __future__
import annotations
23 __all__ = (
"ConfigIR",
"ContractError",
"ContractIR",
"ImportIR",
"PipelineIR",
"TaskIR",
"LabeledSubset")
25 from collections
import Counter
27 from dataclasses
import dataclass, field
28 from deprecated.sphinx
import deprecated
29 from typing
import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type
37 from lsst.daf.butler
import ButlerURI
45 """This is a specialized version of yaml's SafeLoader. It checks and raises
46 an exception if it finds that there are multiple instances of the same key
47 found inside a pipeline file at a given scope.
58 all_keys = Counter(key_node.value
for key_node, _
in node.value)
59 duplicates = {k
for k, i
in all_keys.items()
if i != 1}
61 raise KeyError(
"Pipeline files must not have duplicated keys, "
62 f
"{duplicates} appeared multiple times")
67 """An exception that is raised when a pipeline contract is not satisfied
74 """Intermediate representation of contracts read from a pipeline yaml file.
77 """A string of python code representing one or more conditions on configs
78 in a pipeline. This code-as-string should, once evaluated, should be True
79 if the configs are fine, and False otherwise.
81 msg: Union[str,
None] =
None
82 """An optional message to be shown to the user if a contract fails
86 """Convert to a representation used in yaml serialization
88 accumulate = {
"contract": self.
contractcontract}
89 if self.
msgmsg
is not None:
90 accumulate[
'msg'] = self.
msgmsg
94 if not isinstance(other, ContractIR):
96 elif self.
contractcontract == other.contract
and self.
msgmsg == other.msg:
104 """Intermediate representation of named subset of task labels read from
105 a pipeline yaml file.
108 """The label used to identify the subset of task labels.
111 """A set of task labels contained in this subset.
113 description: Optional[str]
114 """A description of what this subset of tasks is intended to do
119 """Generate `LabeledSubset` objects given a properly formatted object
120 that as been created by a yaml loader.
125 The label that will be used to identify this labeled subset.
126 value : `list` of `str` or `dict`
127 Object returned from loading a labeled subset section from a yaml
132 labeledSubset : `LabeledSubset`
133 A `LabeledSubset` object build from the inputs.
138 Raised if the value input is not properly formatted for parsing
140 if isinstance(value, MutableMapping):
141 subset = value.pop(
"subset",
None)
143 raise ValueError(
"If a labeled subset is specified as a mapping, it must contain the key "
145 description = value.pop(
"description",
None)
146 elif isinstance(value, abcIterable):
150 raise ValueError(f
"There was a problem parsing the labeled subset {label}, make sure the "
151 "definition is either a valid yaml list, or a mapping with keys "
152 "(subset, description) where subset points to a yaml list, and description is "
153 "associated with a string")
157 """Convert to a representation used in yaml serialization
159 accumulate: Dict[str, Union[List[str], str]] = {
"subset":
list(self.subset)}
160 if self.description
is not None:
161 accumulate[
"description"] = self.description
167 """Intermediate representation of parameters that are global to a pipeline
169 These parameters are specified under a top level key named `parameters`
170 and are declared as a yaml mapping. These entries can then be used inside
171 task configuration blocks to specify configuration values. They may not be
172 used in the special ``file`` or ``python`` blocks.
181 field1: parameters.shared_value
185 field2: parameters.shared_value
187 mapping: MutableMapping[str, str]
188 """A mutable mapping of identifiers as keys, and shared configuration
191 def update(self, other: Optional[ParametersIR]):
192 if other
is not None:
193 self.mapping.
update(other.mapping)
196 """Convert to a representation used in yaml serialization
201 return value
in self.mapping
204 return self.mapping[item]
207 return bool(self.mapping)
212 """Intermediate representation of configurations read from a pipeline yaml
215 python: Union[str,
None] =
None
216 """A string of python code that is used to modify a configuration. This can
217 also be None if there are no modifications to do.
219 dataId: Union[dict,
None] =
None
220 """A dataId that is used to constrain these config overrides to only quanta
221 with matching dataIds. This field can be None if there is no constraint.
222 This is currently an unimplemented feature, and is placed here for future
225 file: List[str] =
field(default_factory=list)
226 """A list of paths which points to a file containing config overrides to be
227 applied. This value may be an empty list if there are no overrides to
230 rest: dict =
field(default_factory=dict)
231 """This is a dictionary of key value pairs, where the keys are strings
232 corresponding to qualified fields on a config to override, and the values
233 are strings representing the values to apply.
237 """Convert to a representation used in yaml serialization
240 for name
in (
"python",
"dataId",
"file"):
243 if getattr(self, name):
244 accumulate[name] = getattr(self, name)
247 accumulate.update(self.rest)
250 def formatted(self, parameters: ParametersIR) -> ConfigIR:
251 """Returns a new ConfigIR object that is formatted according to the
256 parameters : ParametersIR
257 Object that contains variable mappings used in substitution.
262 A new ConfigIR object formatted with the input parameters
264 new_config = copy.deepcopy(self)
265 for key, value
in new_config.rest.items():
266 if not isinstance(value, str):
268 match = re.match(
"parameters[.](.*)", value)
269 if match
and match.group(1)
in parameters:
270 new_config.rest[key] = parameters[match.group(1)]
271 if match
and match.group(1)
not in parameters:
272 warnings.warn(f
"config {key} contains value {match.group(0)} which is formatted like a "
273 "Pipeline parameter but was not found within the Pipeline, if this was not "
274 "intentional, check for a typo")
277 def maybe_merge(self, other_config:
"ConfigIR") -> Generator[
"ConfigIR",
None,
None]:
278 """Merges another instance of a `ConfigIR` into this instance if
279 possible. This function returns a generator that is either self
280 if the configs were merged, or self, and other_config if that could
285 other_config : `ConfigIR`
286 An instance of `ConfigIR` to merge into this instance.
290 Generator : `ConfigIR`
291 A generator containing either self, or self and other_config if
292 the configs could be merged or not respectively.
295 if self.dataId != other_config.dataId
or self.python
or other_config.python
or\
296 self.
filefile
or other_config.file:
297 yield from (self, other_config)
302 key_union = self.rest.
keys() & other_config.rest.keys()
303 for key
in key_union:
304 if self.rest[key] != other_config.rest[key]:
305 yield from (self, other_config)
307 self.rest.update(other_config.rest)
310 self_file_set =
set(self.
filefile)
311 other_file_set =
set(other_config.file)
312 self.
filefile =
list(self_file_set.union(other_file_set))
317 if not isinstance(other, ConfigIR):
319 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
320 (
"python",
"dataId",
"file",
"rest")):
328 """Intermediate representation of tasks read from a pipeline yaml file.
331 """An identifier used to refer to a task.
334 """A string containing a fully qualified python class to be run in a
337 config: Union[List[ConfigIR],
None] =
None
338 """List of all configs overrides associated with this task, and may be
339 `None` if there are no config overrides.
343 """Convert to a representation used in yaml serialization
345 accumulate: Dict[str, Union[str, List[dict]]] = {
'class': self.klass}
347 accumulate[
'config'] = [c.to_primitives()
for c
in self.
configconfig]
351 """Adds a `ConfigIR` to this task if one is not present. Merges configs
352 if there is a `ConfigIR` present and the dataId keys of both configs
353 match, otherwise adds a new entry to the config list. The exception to
354 the above is that if either the last config or other_config has a
355 python block, then other_config is always added, as python blocks can
356 modify configs in ways that cannot be predicted.
360 other_config : `ConfigIR`
361 A `ConfigIR` instance to add or merge into the config attribute of
367 self.
configconfig.extend(self.
configconfig.pop().maybe_merge(other_config))
370 if not isinstance(other, TaskIR):
372 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
373 (
"label",
"klass",
"config")):
381 """An intermediate representation of imported pipelines
384 """This is the location of the pipeline to inherit. The path should be
385 specified as an absolute path. Environment variables may be used in the
386 path and should be specified as a python string template, with the name of
387 the environment variable inside braces.
389 include: Union[List[str],
None] =
None
390 """List of tasks that should be included when inheriting this pipeline.
391 Either the include or exclude attributes may be specified, but not both.
393 exclude: Union[List[str],
None] =
None
394 """List of tasks that should be excluded when inheriting this pipeline.
395 Either the include or exclude attributes may be specified, but not both.
397 importContracts: bool =
True
398 """Boolean attribute to dictate if contracts should be inherited with the
401 instrument: Union[Type[KeepInstrument], str,
None] = KeepInstrument
402 """Instrument to assign to the Pipeline at import. The default value of
403 KEEP_INSTRUMENT indicates that whatever instrument the pipeline is declared
404 with will not be modified. Setting this value to None will drop any
405 declared instrument prior to import.
409 """Load in the Pipeline specified by this object, and turn it into a
414 pipeline : `PipelineIR`
415 A pipeline generated from the imported pipeline file
417 if self.include
and self.exclude:
418 raise ValueError(
"Both an include and an exclude list cant be specified"
419 " when declaring a pipeline import")
420 tmp_pipeline = PipelineIR.from_uri(os.path.expandvars(self.location))
421 if self.instrument
is not KeepInstrument:
422 tmp_pipeline.instrument = self.instrument
424 included_labels =
set()
425 for label
in tmp_pipeline.tasks:
426 if (self.include
and label
in self.include)
or (self.exclude
and label
not in self.exclude)\
427 or (self.include
is None and self.exclude
is None):
428 included_labels.add(label)
432 if self.include
is not None:
433 subsets_in_include = tmp_pipeline.labeled_subsets.keys() & self.include
434 for label
in subsets_in_include:
435 included_labels.update(tmp_pipeline.labeled_subsets[label].subset)
437 elif self.exclude
is not None:
438 subsets_in_exclude = tmp_pipeline.labeled_subsets.keys() & self.exclude
439 for label
in subsets_in_exclude:
440 included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)
442 tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels)
444 if not self.importContracts:
445 tmp_pipeline.contracts = []
450 if not isinstance(other, ImportIR):
452 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
453 (
"location",
"include",
"exclude",
"importContracts")):
460 """Intermediate representation of a pipeline definition
465 A dictionary which matches the structure that would be produced by a
466 yaml reader which parses a pipeline definition document
471 - If a pipeline is declared without a description
472 - If no tasks are declared in a pipeline, and no pipelines are to be
474 - If more than one instrument is specified
475 - If more than one inherited pipeline share a label
479 if "description" not in loaded_yaml:
480 raise ValueError(
"A pipeline must be declared with a description")
481 if "tasks" not in loaded_yaml
and len({
"imports",
"inherits"} - loaded_yaml.keys()) == 2:
482 raise ValueError(
"A pipeline must be declared with one or more tasks")
493 inst = loaded_yaml.pop(
"instrument",
None)
494 if isinstance(inst, list):
495 raise ValueError(
"Only one top level instrument can be defined in a pipeline")
513 def _read_contracts(self, loaded_yaml):
514 """Process the contracts portion of the loaded yaml document
519 A dictionary which matches the structure that would be produced by
520 a yaml reader which parses a pipeline definition document
522 loaded_contracts = loaded_yaml.pop(
"contracts", [])
523 if isinstance(loaded_contracts, str):
524 loaded_contracts = [loaded_contracts]
526 for contract
in loaded_contracts:
527 if isinstance(contract, dict):
529 if isinstance(contract, str):
532 def _read_parameters(self, loaded_yaml):
533 """Process the parameters portion of the loaded yaml document
538 A dictionary which matches the structure that would be produced by
539 a yaml reader which parses a pipeline definition document
541 loaded_parameters = loaded_yaml.pop(
"parameters", {})
542 if not isinstance(loaded_parameters, dict):
543 raise ValueError(
"The parameters section must be a yaml mapping")
546 def _read_labeled_subsets(self, loaded_yaml: dict):
547 """Process the subsets portion of the loaded yaml document
551 loaded_yaml: `MutableMapping`
552 A dictionary which matches the structure that would be produced
553 by a yaml reader which parses a pipeline definition document
555 loaded_subsets = loaded_yaml.pop(
"subsets", {})
557 if not loaded_subsets
and "subset" in loaded_yaml:
558 raise ValueError(
"Top level key should be subsets and not subset, add an s")
559 for key, value
in loaded_subsets.items():
560 self.
labeled_subsetslabeled_subsets[key] = LabeledSubset.from_primitives(key, value)
562 def _verify_labeled_subsets(self):
563 """Verifies that all the labels in each named subset exist within the
569 if not labeled_subset.subset.issubset(self.
taskstasks.
keys()):
570 raise ValueError(f
"Labels {labeled_subset.subset - self.tasks.keys()} were not found in the "
574 if label_intersection:
575 raise ValueError(f
"Labeled subsets can not use the same label as a task: {label_intersection}")
577 def _read_imports(self, loaded_yaml):
578 """Process the inherits portion of the loaded yaml document
583 A dictionary which matches the structure that would be produced by
584 a yaml reader which parses a pipeline definition document
586 def process_args(argument: Union[str, dict]) -> dict:
587 if isinstance(argument, str):
588 return {
"location": argument}
589 elif isinstance(argument, dict):
590 if "exclude" in argument
and isinstance(argument[
"exclude"], str):
591 argument[
"exclude"] = [argument[
"exclude"]]
592 if "include" in argument
and isinstance(argument[
"include"], str):
593 argument[
"include"] = [argument[
"include"]]
594 if "instrument" in argument
and argument[
"instrument"] ==
"None":
595 argument[
"instrument"] =
None
597 if not {
"inherits",
"imports"} - loaded_yaml.keys():
598 raise ValueError(
"Cannot define both inherits and imports sections, use imports")
599 tmp_import = loaded_yaml.pop(
"inherits",
None)
600 if tmp_import
is None:
601 tmp_import = loaded_yaml.pop(
"imports",
None)
603 warnings.warn(
"The 'inherits' key is deprecated, and will be "
604 "removed around June 2021. Please use the key "
606 if tmp_import
is None:
608 elif isinstance(tmp_import, list):
609 self.
importsimports = [
ImportIR(**process_args(args))
for args
in tmp_import]
614 accumulate_tasks = {}
615 accumulate_labeled_subsets = {}
617 for other_pipeline
in self.
importsimports:
618 tmp_IR = other_pipeline.toPipelineIR()
621 elif self.
instrumentinstrument != tmp_IR.instrument
and tmp_IR.instrument
is not None:
622 raise ValueError(
"Only one instrument can be declared in a pipeline or it's imports")
623 if accumulate_tasks.keys() & tmp_IR.tasks.keys():
624 raise ValueError(
"Task labels in the imported pipelines must "
626 accumulate_tasks.update(tmp_IR.tasks)
627 self.
contractscontracts.extend(tmp_IR.contracts)
630 overlapping_subsets = accumulate_labeled_subsets.keys() & tmp_IR.labeled_subsets.keys()
631 task_subset_overlap = ((accumulate_labeled_subsets.keys() | tmp_IR.labeled_subsets.keys())
632 & accumulate_tasks.keys())
633 if overlapping_subsets
or task_subset_overlap:
634 raise ValueError(
"Labeled subset names must be unique amongst imports in both labels and "
635 f
" named Subsets. Duplicate: {overlapping_subsets | task_subset_overlap}")
636 accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
637 accumulated_parameters.update(tmp_IR.parameters)
641 if accumulate_labeled_subsets.keys() & self.
taskstasks.
keys():
642 raise ValueError(
"Labeled subset names must be unique amongst imports in both labels and "
652 if label
not in accumulate_tasks:
653 accumulate_tasks[label] = task
654 elif accumulate_tasks[label].klass == task.klass:
655 if task.config
is not None:
656 for config
in task.config:
657 accumulate_tasks[label].add_or_update_config(config)
659 accumulate_tasks[label] = task
661 self.
parametersparameters.update(accumulated_parameters)
663 def _read_tasks(self, loaded_yaml):
664 """Process the tasks portion of the loaded yaml document
669 A dictionary which matches the structure that would be produced by
670 a yaml reader which parses a pipeline definition document
673 tmp_tasks = loaded_yaml.pop(
"tasks",
None)
674 if tmp_tasks
is None:
677 if "parameters" in tmp_tasks:
678 raise ValueError(
"parameters is a reserved word and cannot be used as a task label")
680 for label, definition
in tmp_tasks.items():
681 if isinstance(definition, str):
682 definition = {
"class": definition}
683 config = definition.get(
'config',
None)
685 task_config_ir =
None
687 if isinstance(config, dict):
691 file = c.pop(
"file",
None)
694 elif not isinstance(file, list):
696 task_config_ir.append(
ConfigIR(python=c.pop(
"python",
None),
697 dataId=c.pop(
"dataId",
None),
700 self.
taskstasks[label] =
TaskIR(label, definition[
"class"], task_config_ir)
702 def _remove_contracts(self, label: str):
703 """Remove any contracts that contain the given label
705 String comparison used in this way is not the most elegant and may
706 have issues, but it is the only feasible way when users can specify
707 contracts with generic strings.
713 if re.match(f
".*([^A-Za-z0-9_]|^){label}[.]", contract.contract):
715 new_contracts.append(contract)
719 """Subset a pipelineIR to contain only labels specified in
724 labelSpecifier : `set` of `str`
725 Set containing labels that describes how to subset a pipeline.
729 pipeline : `PipelineIR`
730 A new pipelineIR object that is a subset of the old pipelineIR
735 Raised if there is an issue with specified labels
739 This method attempts to prune any contracts that contain labels which
740 are not in the declared subset of labels. This pruning is done using a
741 string based matching due to the nature of contracts and may prune more
742 than it should. Any labeled subsets defined that no longer have all
743 members of the subset present in the pipeline will be removed from the
747 pipeline = copy.deepcopy(self)
752 for label
in labelSpecifier:
753 if label
in pipeline.labeled_subsets:
755 toAdd.update(pipeline.labeled_subsets[label].subset)
756 labelSpecifier.difference_update(toRemove)
757 labelSpecifier.update(toAdd)
759 if not labelSpecifier.issubset(pipeline.tasks.keys()
760 | pipeline.labeled_subsets):
761 difference = labelSpecifier.difference(pipeline.tasks.keys())
762 raise ValueError(
"Not all supplied labels (specified or named subsets) are in the pipeline "
763 f
"definition, extra labels: {difference}")
765 pipeline_labels =
set(pipeline.tasks.keys())
769 for label
in pipeline_labels:
770 if label
not in labelSpecifier:
771 pipeline.tasks.pop(label)
772 pipeline._remove_contracts(label)
775 labeled_subsets = copy.copy(pipeline.labeled_subsets)
777 for label, labeled_subset
in labeled_subsets.items():
778 if labeled_subset.subset - pipeline.tasks.keys():
779 pipeline.labeled_subsets.pop(label)
785 """Create a `PipelineIR` object from a string formatted like a pipeline
790 pipeline_string : `str`
791 A string that is formatted according like a pipeline document
793 loaded_yaml = yaml.load(pipeline_string, Loader=PipelineYamlLoader)
794 return cls(loaded_yaml)
797 @deprecated(reason="This has been replaced with `from_uri`. will be removed after v23",
version="v21.0,", category=FutureWarning)
799 """Create a `PipelineIR` object from the document specified by the
805 Location of document to use in creating a `PipelineIR` object.
809 pipelineIR : `PipelineIR`
814 This method is deprecated, please use from_uri
816 return cls.
from_urifrom_uri(filename)
819 def from_uri(cls, uri: Union[str, ButlerURI]) -> PipelineIR:
820 """Create a `PipelineIR` object from the document specified by the
825 uri: `str` or `ButlerURI`
826 Location of document to use in creating a `PipelineIR` object.
830 pipelineIR : `PipelineIR`
833 loaded_uri = ButlerURI(uri)
839 with loaded_uri.as_local()
as local:
843 loaded_yaml = yaml.load(local.read(), Loader=PipelineYamlLoader)
844 return cls(loaded_yaml)
846 @deprecated(reason="This has been replaced with `write_to_uri`. will be removed after v23",
version="v21.0,", category=FutureWarning)
848 """Serialize this `PipelineIR` object into a yaml formatted string and
849 write the output to a file at the specified path.
854 Location of document to write a `PipelineIR` object.
859 """Serialize this `PipelineIR` object into a yaml formatted string and
860 write the output to a file at the specified uri.
864 uri: `str` or `ButlerURI`
865 Location of document to write a `PipelineIR` object.
867 butlerUri = ButlerURI(uri)
868 butlerUri.write(yaml.dump(self.
to_primitivesto_primitives(), sort_keys=
False).encode())
871 """Convert to a representation used in yaml serialization
873 accumulate = {
"description": self.
descriptiondescription}
875 accumulate[
'instrument'] = self.
instrumentinstrument
878 accumulate[
'tasks'] = {m: t.to_primitives()
for m, t
in self.
taskstasks.
items()}
880 accumulate[
'contracts'] = [c.to_primitives()
for c
in self.
contractscontracts]
882 accumulate[
'subsets'] = {k: v.to_primitives()
for k, v
in self.
labeled_subsetslabeled_subsets.
items()}
886 """Instance formatting as how it would look in yaml representation
888 return yaml.dump(self.
to_primitivesto_primitives(), sort_keys=
False)
891 """Instance formatting as how it would look in yaml representation
896 if not isinstance(other, PipelineIR):
898 elif all(getattr(self, attr) == getattr(other, attr)
for attr
in
899 (
"contracts",
"tasks",
"instrument")):
903 std::vector< SchemaItem< Flag > > * items
Generator["ConfigIR", None, None] maybe_merge(self, "ConfigIR" other_config)
Dict[str, Union[str, dict, List[str]]] to_primitives(self)
ConfigIR formatted(self, ParametersIR parameters)
def __eq__(self, object other)
def __eq__(self, object other)
Dict[str, str] to_primitives(self)
def __eq__(self, object other)
"PipelineIR" toPipelineIR(self)
Dict[str, Union[List[str], str]] to_primitives(self)
LabeledSubset from_primitives(str label, Union[List[str], dict] value)
MutableMapping[str, str] to_primitives(self)
Any __getitem__(self, str item)
def update(self, Optional[ParametersIR] other)
bool __contains__(self, str value)
def _read_tasks(self, loaded_yaml)
def _read_labeled_subsets(self, dict loaded_yaml)
def _read_parameters(self, loaded_yaml)
def write_to_uri(self, Union[ButlerURI, str] uri)
PipelineIR subset_from_labels(self, Set[str] labelSpecifier)
def __init__(self, loaded_yaml)
def _verify_labeled_subsets(self)
def _read_contracts(self, loaded_yaml)
def from_string(cls, str pipeline_string)
PipelineIR from_file(cls, str filename)
PipelineIR from_uri(cls, Union[str, ButlerURI] uri)
Dict[str, Any] to_primitives(self)
def _read_imports(self, loaded_yaml)
def __eq__(self, object other)
def to_file(self, str filename)
def construct_mapping(self, node, deep=False)
Dict[str, Union[str, List[dict]]] to_primitives(self)
def __eq__(self, object other)
def add_or_update_config(self, ConfigIR other_config)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.