LSST Applications  22.0.1,22.0.1+01bcf6a671,22.0.1+046ee49490,22.0.1+05c7de27da,22.0.1+0c6914dbf6,22.0.1+1220d50b50,22.0.1+12fd109e95,22.0.1+1a1dd69893,22.0.1+1c910dc348,22.0.1+1ef34551f5,22.0.1+30170c3d08,22.0.1+39153823fd,22.0.1+611137eacc,22.0.1+771eb1e3e8,22.0.1+94e66cc9ed,22.0.1+9a075d06e2,22.0.1+a5ff6e246e,22.0.1+a7db719c1a,22.0.1+ba0d97e778,22.0.1+bfe1ee9056,22.0.1+c4e1e0358a,22.0.1+cc34b8281e,22.0.1+d640e2c0fa,22.0.1+d72a2e677a,22.0.1+d9a6b571bd,22.0.1+e485e9761b,22.0.1+ebe8d3385e
LSST Data Management Base Package
pipelineIR.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ("ConfigIR", "ContractError", "ContractIR", "ImportIR", "PipelineIR", "TaskIR", "LabeledSubset")
24 
25 from collections import Counter
26 from collections.abc import Iterable as abcIterable
27 from dataclasses import dataclass, field
28 from deprecated.sphinx import deprecated
29 from typing import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict, Type
30 
31 import copy
32 import re
33 import os
34 import yaml
35 import warnings
36 
37 from lsst.daf.butler import ButlerURI
38 
39 
41  pass
42 
43 
44 class PipelineYamlLoader(yaml.SafeLoader):
45  """This is a specialized version of yaml's SafeLoader. It checks and raises
46  an exception if it finds that there are multiple instances of the same key
47  found inside a pipeline file at a given scope.
48  """
49  def construct_mapping(self, node, deep=False):
50  # do the call to super first so that it can do all the other forms of
51  # checking on this node. If you check the uniqueness of keys first
52  # it would save the work that super does in the case of a failure, but
53  # it might fail in the case that the node was the incorrect node due
54  # to a parsing error, and the resulting exception would be difficult to
55  # understand.
56  mapping = super().construct_mapping(node, deep)
57  # Check if there are any duplicate keys
58  all_keys = Counter(key_node.value for key_node, _ in node.value)
59  duplicates = {k for k, i in all_keys.items() if i != 1}
60  if duplicates:
61  raise KeyError("Pipeline files must not have duplicated keys, "
62  f"{duplicates} appeared multiple times")
63  return mapping
64 
65 
66 class ContractError(Exception):
67  """An exception that is raised when a pipeline contract is not satisfied
68  """
69  pass
70 
71 
72 @dataclass
73 class ContractIR:
74  """Intermediate representation of contracts read from a pipeline yaml file.
75  """
76  contract: str
77  """A string of python code representing one or more conditions on configs
78  in a pipeline. This code-as-string should, once evaluated, should be True
79  if the configs are fine, and False otherwise.
80  """
81  msg: Union[str, None] = None
82  """An optional message to be shown to the user if a contract fails
83  """
84 
85  def to_primitives(self) -> Dict[str, str]:
86  """Convert to a representation used in yaml serialization
87  """
88  accumulate = {"contract": self.contractcontract}
89  if self.msgmsg is not None:
90  accumulate['msg'] = self.msgmsg
91  return accumulate
92 
93  def __eq__(self, other: object):
94  if not isinstance(other, ContractIR):
95  return False
96  elif self.contractcontract == other.contract and self.msgmsg == other.msg:
97  return True
98  else:
99  return False
100 
101 
102 @dataclass
104  """Intermediate representation of named subset of task labels read from
105  a pipeline yaml file.
106  """
107  label: str
108  """The label used to identify the subset of task labels.
109  """
110  subset: Set[str]
111  """A set of task labels contained in this subset.
112  """
113  description: Optional[str]
114  """A description of what this subset of tasks is intended to do
115  """
116 
117  @staticmethod
118  def from_primitives(label: str, value: Union[List[str], dict]) -> LabeledSubset:
119  """Generate `LabeledSubset` objects given a properly formatted object
120  that as been created by a yaml loader.
121 
122  Parameters
123  ----------
124  label : `str`
125  The label that will be used to identify this labeled subset.
126  value : `list` of `str` or `dict`
127  Object returned from loading a labeled subset section from a yaml
128  document.
129 
130  Returns
131  -------
132  labeledSubset : `LabeledSubset`
133  A `LabeledSubset` object build from the inputs.
134 
135  Raises
136  ------
137  ValueError
138  Raised if the value input is not properly formatted for parsing
139  """
140  if isinstance(value, MutableMapping):
141  subset = value.pop("subset", None)
142  if subset is None:
143  raise ValueError("If a labeled subset is specified as a mapping, it must contain the key "
144  "'subset'")
145  description = value.pop("description", None)
146  elif isinstance(value, abcIterable):
147  subset = value
148  description = None
149  else:
150  raise ValueError(f"There was a problem parsing the labeled subset {label}, make sure the "
151  "definition is either a valid yaml list, or a mapping with keys "
152  "(subset, description) where subset points to a yaml list, and description is "
153  "associated with a string")
154  return LabeledSubset(label, set(subset), description)
155 
156  def to_primitives(self) -> Dict[str, Union[List[str], str]]:
157  """Convert to a representation used in yaml serialization
158  """
159  accumulate: Dict[str, Union[List[str], str]] = {"subset": list(self.subset)}
160  if self.description is not None:
161  accumulate["description"] = self.description
162  return accumulate
163 
164 
165 @dataclass
167  """Intermediate representation of parameters that are global to a pipeline
168 
169  These parameters are specified under a top level key named `parameters`
170  and are declared as a yaml mapping. These entries can then be used inside
171  task configuration blocks to specify configuration values. They may not be
172  used in the special ``file`` or ``python`` blocks.
173 
174  Example:
175  paramters:
176  shared_value: 14
177  tasks:
178  taskA:
179  class: modA
180  config:
181  field1: parameters.shared_value
182  taskB:
183  class: modB
184  config:
185  field2: parameters.shared_value
186  """
187  mapping: MutableMapping[str, str]
188  """A mutable mapping of identifiers as keys, and shared configuration
189  as values.
190  """
191  def update(self, other: Optional[ParametersIR]):
192  if other is not None:
193  self.mapping.update(other.mapping)
194 
195  def to_primitives(self) -> MutableMapping[str, str]:
196  """Convert to a representation used in yaml serialization
197  """
198  return self.mapping
199 
200  def __contains__(self, value: str) -> bool:
201  return value in self.mapping
202 
203  def __getitem__(self, item: str) -> Any:
204  return self.mapping[item]
205 
206  def __bool__(self) -> bool:
207  return bool(self.mapping)
208 
209 
210 @dataclass
211 class ConfigIR:
212  """Intermediate representation of configurations read from a pipeline yaml
213  file.
214  """
215  python: Union[str, None] = None
216  """A string of python code that is used to modify a configuration. This can
217  also be None if there are no modifications to do.
218  """
219  dataId: Union[dict, None] = None
220  """A dataId that is used to constrain these config overrides to only quanta
221  with matching dataIds. This field can be None if there is no constraint.
222  This is currently an unimplemented feature, and is placed here for future
223  use.
224  """
225  file: List[str] = field(default_factory=list)
226  """A list of paths which points to a file containing config overrides to be
227  applied. This value may be an empty list if there are no overrides to
228  apply.
229  """
230  rest: dict = field(default_factory=dict)
231  """This is a dictionary of key value pairs, where the keys are strings
232  corresponding to qualified fields on a config to override, and the values
233  are strings representing the values to apply.
234  """
235 
236  def to_primitives(self) -> Dict[str, Union[str, dict, List[str]]]:
237  """Convert to a representation used in yaml serialization
238  """
239  accumulate = {}
240  for name in ("python", "dataId", "file"):
241  # if this attribute is thruthy add it to the accumulation
242  # dictionary
243  if getattr(self, name):
244  accumulate[name] = getattr(self, name)
245  # Add the dictionary containing the rest of the config keys to the
246  # # accumulated dictionary
247  accumulate.update(self.rest)
248  return accumulate
249 
250  def formatted(self, parameters: ParametersIR) -> ConfigIR:
251  """Returns a new ConfigIR object that is formatted according to the
252  specified parameters
253 
254  Parameters
255  ----------
256  parameters : ParametersIR
257  Object that contains variable mappings used in substitution.
258 
259  Returns
260  -------
261  config : ConfigIR
262  A new ConfigIR object formatted with the input parameters
263  """
264  new_config = copy.deepcopy(self)
265  for key, value in new_config.rest.items():
266  if not isinstance(value, str):
267  continue
268  match = re.match("parameters[.](.*)", value)
269  if match and match.group(1) in parameters:
270  new_config.rest[key] = parameters[match.group(1)]
271  if match and match.group(1) not in parameters:
272  warnings.warn(f"config {key} contains value {match.group(0)} which is formatted like a "
273  "Pipeline parameter but was not found within the Pipeline, if this was not "
274  "intentional, check for a typo")
275  return new_config
276 
277  def maybe_merge(self, other_config: "ConfigIR") -> Generator["ConfigIR", None, None]:
278  """Merges another instance of a `ConfigIR` into this instance if
279  possible. This function returns a generator that is either self
280  if the configs were merged, or self, and other_config if that could
281  not be merged.
282 
283  Parameters
284  ----------
285  other_config : `ConfigIR`
286  An instance of `ConfigIR` to merge into this instance.
287 
288  Returns
289  -------
290  Generator : `ConfigIR`
291  A generator containing either self, or self and other_config if
292  the configs could be merged or not respectively.
293  """
294  # Verify that the config blocks can be merged
295  if self.dataId != other_config.dataId or self.python or other_config.python or\
296  self.filefile or other_config.file:
297  yield from (self, other_config)
298  return
299 
300  # create a set of all keys, and verify two keys do not have different
301  # values
302  key_union = self.rest.keys() & other_config.rest.keys()
303  for key in key_union:
304  if self.rest[key] != other_config.rest[key]:
305  yield from (self, other_config)
306  return
307  self.rest.update(other_config.rest)
308 
309  # Combine the lists of override files to load
310  self_file_set = set(self.filefile)
311  other_file_set = set(other_config.file)
312  self.filefile = list(self_file_set.union(other_file_set))
313 
314  yield self
315 
316  def __eq__(self, other: object):
317  if not isinstance(other, ConfigIR):
318  return False
319  elif all(getattr(self, attr) == getattr(other, attr) for attr in
320  ("python", "dataId", "file", "rest")):
321  return True
322  else:
323  return False
324 
325 
326 @dataclass
327 class TaskIR:
328  """Intermediate representation of tasks read from a pipeline yaml file.
329  """
330  label: str
331  """An identifier used to refer to a task.
332  """
333  klass: str
334  """A string containing a fully qualified python class to be run in a
335  pipeline.
336  """
337  config: Union[List[ConfigIR], None] = None
338  """List of all configs overrides associated with this task, and may be
339  `None` if there are no config overrides.
340  """
341 
342  def to_primitives(self) -> Dict[str, Union[str, List[dict]]]:
343  """Convert to a representation used in yaml serialization
344  """
345  accumulate: Dict[str, Union[str, List[dict]]] = {'class': self.klass}
346  if self.configconfig:
347  accumulate['config'] = [c.to_primitives() for c in self.configconfig]
348  return accumulate
349 
350  def add_or_update_config(self, other_config: ConfigIR):
351  """Adds a `ConfigIR` to this task if one is not present. Merges configs
352  if there is a `ConfigIR` present and the dataId keys of both configs
353  match, otherwise adds a new entry to the config list. The exception to
354  the above is that if either the last config or other_config has a
355  python block, then other_config is always added, as python blocks can
356  modify configs in ways that cannot be predicted.
357 
358  Parameters
359  ----------
360  other_config : `ConfigIR`
361  A `ConfigIR` instance to add or merge into the config attribute of
362  this task.
363  """
364  if not self.configconfig:
365  self.configconfig = [other_config]
366  return
367  self.configconfig.extend(self.configconfig.pop().maybe_merge(other_config))
368 
369  def __eq__(self, other: object):
370  if not isinstance(other, TaskIR):
371  return False
372  elif all(getattr(self, attr) == getattr(other, attr) for attr in
373  ("label", "klass", "config")):
374  return True
375  else:
376  return False
377 
378 
379 @dataclass
380 class ImportIR:
381  """An intermediate representation of imported pipelines
382  """
383  location: str
384  """This is the location of the pipeline to inherit. The path should be
385  specified as an absolute path. Environment variables may be used in the
386  path and should be specified as a python string template, with the name of
387  the environment variable inside braces.
388  """
389  include: Union[List[str], None] = None
390  """List of tasks that should be included when inheriting this pipeline.
391  Either the include or exclude attributes may be specified, but not both.
392  """
393  exclude: Union[List[str], None] = None
394  """List of tasks that should be excluded when inheriting this pipeline.
395  Either the include or exclude attributes may be specified, but not both.
396  """
397  importContracts: bool = True
398  """Boolean attribute to dictate if contracts should be inherited with the
399  pipeline or not.
400  """
401  instrument: Union[Type[KeepInstrument], str, None] = KeepInstrument
402  """Instrument to assign to the Pipeline at import. The default value of
403  KEEP_INSTRUMENT indicates that whatever instrument the pipeline is declared
404  with will not be modified. Setting this value to None will drop any
405  declared instrument prior to import.
406  """
407 
408  def toPipelineIR(self) -> "PipelineIR":
409  """Load in the Pipeline specified by this object, and turn it into a
410  PipelineIR instance.
411 
412  Returns
413  -------
414  pipeline : `PipelineIR`
415  A pipeline generated from the imported pipeline file
416  """
417  if self.include and self.exclude:
418  raise ValueError("Both an include and an exclude list cant be specified"
419  " when declaring a pipeline import")
420  tmp_pipeline = PipelineIR.from_uri(os.path.expandvars(self.location))
421  if self.instrument is not KeepInstrument:
422  tmp_pipeline.instrument = self.instrument
423 
424  included_labels = set()
425  for label in tmp_pipeline.tasks:
426  if (self.include and label in self.include) or (self.exclude and label not in self.exclude)\
427  or (self.include is None and self.exclude is None):
428  included_labels.add(label)
429 
430  # Handle labeled subsets being specified in the include or exclude
431  # list, adding or removing labels.
432  if self.include is not None:
433  subsets_in_include = tmp_pipeline.labeled_subsets.keys() & self.include
434  for label in subsets_in_include:
435  included_labels.update(tmp_pipeline.labeled_subsets[label].subset)
436 
437  elif self.exclude is not None:
438  subsets_in_exclude = tmp_pipeline.labeled_subsets.keys() & self.exclude
439  for label in subsets_in_exclude:
440  included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)
441 
442  tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels)
443 
444  if not self.importContracts:
445  tmp_pipeline.contracts = []
446 
447  return tmp_pipeline
448 
449  def __eq__(self, other: object):
450  if not isinstance(other, ImportIR):
451  return False
452  elif all(getattr(self, attr) == getattr(other, attr) for attr in
453  ("location", "include", "exclude", "importContracts")):
454  return True
455  else:
456  return False
457 
458 
460  """Intermediate representation of a pipeline definition
461 
462  Parameters
463  ----------
464  loaded_yaml : `dict`
465  A dictionary which matches the structure that would be produced by a
466  yaml reader which parses a pipeline definition document
467 
468  Raises
469  ------
470  ValueError :
471  - If a pipeline is declared without a description
472  - If no tasks are declared in a pipeline, and no pipelines are to be
473  inherited
474  - If more than one instrument is specified
475  - If more than one inherited pipeline share a label
476  """
477  def __init__(self, loaded_yaml):
478  # Check required fields are present
479  if "description" not in loaded_yaml:
480  raise ValueError("A pipeline must be declared with a description")
481  if "tasks" not in loaded_yaml and len({"imports", "inherits"} - loaded_yaml.keys()) == 2:
482  raise ValueError("A pipeline must be declared with one or more tasks")
483 
484  # These steps below must happen in this call order
485 
486  # Process pipeline description
487  self.descriptiondescription = loaded_yaml.pop("description")
488 
489  # Process tasks
490  self._read_tasks_read_tasks(loaded_yaml)
491 
492  # Process instrument keys
493  inst = loaded_yaml.pop("instrument", None)
494  if isinstance(inst, list):
495  raise ValueError("Only one top level instrument can be defined in a pipeline")
496  self.instrumentinstrument = inst
497 
498  # Process any contracts
499  self._read_contracts_read_contracts(loaded_yaml)
500 
501  # Process any defined parameters
502  self._read_parameters_read_parameters(loaded_yaml)
503 
504  # Process any named label subsets
505  self._read_labeled_subsets_read_labeled_subsets(loaded_yaml)
506 
507  # Process any inherited pipelines
508  self._read_imports_read_imports(loaded_yaml)
509 
510  # verify named subsets, must be done after inheriting
511  self._verify_labeled_subsets_verify_labeled_subsets()
512 
513  def _read_contracts(self, loaded_yaml):
514  """Process the contracts portion of the loaded yaml document
515 
516  Parameters
517  ---------
518  loaded_yaml : `dict`
519  A dictionary which matches the structure that would be produced by
520  a yaml reader which parses a pipeline definition document
521  """
522  loaded_contracts = loaded_yaml.pop("contracts", [])
523  if isinstance(loaded_contracts, str):
524  loaded_contracts = [loaded_contracts]
525  self.contractscontracts = []
526  for contract in loaded_contracts:
527  if isinstance(contract, dict):
528  self.contractscontracts.append(ContractIR(**contract))
529  if isinstance(contract, str):
530  self.contractscontracts.append(ContractIR(contract=contract))
531 
532  def _read_parameters(self, loaded_yaml):
533  """Process the parameters portion of the loaded yaml document
534 
535  Parameters
536  ---------
537  loaded_yaml : `dict`
538  A dictionary which matches the structure that would be produced by
539  a yaml reader which parses a pipeline definition document
540  """
541  loaded_parameters = loaded_yaml.pop("parameters", {})
542  if not isinstance(loaded_parameters, dict):
543  raise ValueError("The parameters section must be a yaml mapping")
544  self.parametersparameters = ParametersIR(loaded_parameters)
545 
546  def _read_labeled_subsets(self, loaded_yaml: dict):
547  """Process the subsets portion of the loaded yaml document
548 
549  Parameters
550  ----------
551  loaded_yaml: `MutableMapping`
552  A dictionary which matches the structure that would be produced
553  by a yaml reader which parses a pipeline definition document
554  """
555  loaded_subsets = loaded_yaml.pop("subsets", {})
556  self.labeled_subsetslabeled_subsets = {}
557  if not loaded_subsets and "subset" in loaded_yaml:
558  raise ValueError("Top level key should be subsets and not subset, add an s")
559  for key, value in loaded_subsets.items():
560  self.labeled_subsetslabeled_subsets[key] = LabeledSubset.from_primitives(key, value)
561 
562  def _verify_labeled_subsets(self):
563  """Verifies that all the labels in each named subset exist within the
564  pipeline.
565  """
566  # Verify that all labels defined in a labeled subset are in the
567  # Pipeline
568  for labeled_subset in self.labeled_subsetslabeled_subsets.values():
569  if not labeled_subset.subset.issubset(self.taskstasks.keys()):
570  raise ValueError(f"Labels {labeled_subset.subset - self.tasks.keys()} were not found in the "
571  "declared pipeline")
572  # Verify subset labels are not already task labels
573  label_intersection = self.labeled_subsetslabeled_subsets.keys() & self.taskstasks.keys()
574  if label_intersection:
575  raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}")
576 
577  def _read_imports(self, loaded_yaml):
578  """Process the inherits portion of the loaded yaml document
579 
580  Parameters
581  ---------
582  loaded_yaml : `dict`
583  A dictionary which matches the structure that would be produced by
584  a yaml reader which parses a pipeline definition document
585  """
586  def process_args(argument: Union[str, dict]) -> dict:
587  if isinstance(argument, str):
588  return {"location": argument}
589  elif isinstance(argument, dict):
590  if "exclude" in argument and isinstance(argument["exclude"], str):
591  argument["exclude"] = [argument["exclude"]]
592  if "include" in argument and isinstance(argument["include"], str):
593  argument["include"] = [argument["include"]]
594  if "instrument" in argument and argument["instrument"] == "None":
595  argument["instrument"] = None
596  return argument
597  if not {"inherits", "imports"} - loaded_yaml.keys():
598  raise ValueError("Cannot define both inherits and imports sections, use imports")
599  tmp_import = loaded_yaml.pop("inherits", None)
600  if tmp_import is None:
601  tmp_import = loaded_yaml.pop("imports", None)
602  else:
603  warnings.warn("The 'inherits' key is deprecated, and will be "
604  "removed around June 2021. Please use the key "
605  "'imports' instead")
606  if tmp_import is None:
607  self.importsimports = []
608  elif isinstance(tmp_import, list):
609  self.importsimports = [ImportIR(**process_args(args)) for args in tmp_import]
610  else:
611  self.importsimports = [ImportIR(**process_args(tmp_import))]
612 
613  # integrate any imported pipelines
614  accumulate_tasks = {}
615  accumulate_labeled_subsets = {}
616  accumulated_parameters = ParametersIR({})
617  for other_pipeline in self.importsimports:
618  tmp_IR = other_pipeline.toPipelineIR()
619  if self.instrumentinstrument is None:
620  self.instrumentinstrument = tmp_IR.instrument
621  elif self.instrumentinstrument != tmp_IR.instrument and tmp_IR.instrument is not None:
622  raise ValueError("Only one instrument can be declared in a pipeline or it's imports")
623  if accumulate_tasks.keys() & tmp_IR.tasks.keys():
624  raise ValueError("Task labels in the imported pipelines must "
625  "be unique")
626  accumulate_tasks.update(tmp_IR.tasks)
627  self.contractscontracts.extend(tmp_IR.contracts)
628  # verify that tmp_IR has unique labels for named subset among
629  # existing labeled subsets, and with existing task labels.
630  overlapping_subsets = accumulate_labeled_subsets.keys() & tmp_IR.labeled_subsets.keys()
631  task_subset_overlap = ((accumulate_labeled_subsets.keys() | tmp_IR.labeled_subsets.keys())
632  & accumulate_tasks.keys())
633  if overlapping_subsets or task_subset_overlap:
634  raise ValueError("Labeled subset names must be unique amongst imports in both labels and "
635  f" named Subsets. Duplicate: {overlapping_subsets | task_subset_overlap}")
636  accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
637  accumulated_parameters.update(tmp_IR.parameters)
638 
639  # verify that any accumulated labeled subsets dont clash with a label
640  # from this pipeline
641  if accumulate_labeled_subsets.keys() & self.taskstasks.keys():
642  raise ValueError("Labeled subset names must be unique amongst imports in both labels and "
643  " named Subsets")
644  # merge in the named subsets for self so this document can override any
645  # that have been delcared
646  accumulate_labeled_subsets.update(self.labeled_subsetslabeled_subsets)
647  self.labeled_subsetslabeled_subsets = accumulate_labeled_subsets
648 
649  # merge the dict of label:TaskIR objects, preserving any configs in the
650  # imported pipeline if the labels point to the same class
651  for label, task in self.taskstasks.items():
652  if label not in accumulate_tasks:
653  accumulate_tasks[label] = task
654  elif accumulate_tasks[label].klass == task.klass:
655  if task.config is not None:
656  for config in task.config:
657  accumulate_tasks[label].add_or_update_config(config)
658  else:
659  accumulate_tasks[label] = task
660  self.taskstasks = accumulate_tasks
661  self.parametersparameters.update(accumulated_parameters)
662 
663  def _read_tasks(self, loaded_yaml):
664  """Process the tasks portion of the loaded yaml document
665 
666  Parameters
667  ---------
668  loaded_yaml : `dict`
669  A dictionary which matches the structure that would be produced by
670  a yaml reader which parses a pipeline definition document
671  """
672  self.taskstasks = {}
673  tmp_tasks = loaded_yaml.pop("tasks", None)
674  if tmp_tasks is None:
675  tmp_tasks = {}
676 
677  if "parameters" in tmp_tasks:
678  raise ValueError("parameters is a reserved word and cannot be used as a task label")
679 
680  for label, definition in tmp_tasks.items():
681  if isinstance(definition, str):
682  definition = {"class": definition}
683  config = definition.get('config', None)
684  if config is None:
685  task_config_ir = None
686  else:
687  if isinstance(config, dict):
688  config = [config]
689  task_config_ir = []
690  for c in config:
691  file = c.pop("file", None)
692  if file is None:
693  file = []
694  elif not isinstance(file, list):
695  file = [file]
696  task_config_ir.append(ConfigIR(python=c.pop("python", None),
697  dataId=c.pop("dataId", None),
698  file=file,
699  rest=c))
700  self.taskstasks[label] = TaskIR(label, definition["class"], task_config_ir)
701 
702  def _remove_contracts(self, label: str):
703  """Remove any contracts that contain the given label
704 
705  String comparison used in this way is not the most elegant and may
706  have issues, but it is the only feasible way when users can specify
707  contracts with generic strings.
708  """
709  new_contracts = []
710  for contract in self.contractscontracts:
711  # match a label that is not preceded by an ASCII identifier, or
712  # is the start of a line and is followed by a dot
713  if re.match(f".*([^A-Za-z0-9_]|^){label}[.]", contract.contract):
714  continue
715  new_contracts.append(contract)
716  self.contractscontracts = new_contracts
717 
718  def subset_from_labels(self, labelSpecifier: Set[str]) -> PipelineIR:
719  """Subset a pipelineIR to contain only labels specified in
720  labelSpecifier.
721 
722  Parameters
723  ----------
724  labelSpecifier : `set` of `str`
725  Set containing labels that describes how to subset a pipeline.
726 
727  Returns
728  -------
729  pipeline : `PipelineIR`
730  A new pipelineIR object that is a subset of the old pipelineIR
731 
732  Raises
733  ------
734  ValueError
735  Raised if there is an issue with specified labels
736 
737  Notes
738  -----
739  This method attempts to prune any contracts that contain labels which
740  are not in the declared subset of labels. This pruning is done using a
741  string based matching due to the nature of contracts and may prune more
742  than it should. Any labeled subsets defined that no longer have all
743  members of the subset present in the pipeline will be removed from the
744  resulting pipeline.
745  """
746 
747  pipeline = copy.deepcopy(self)
748 
749  # update the label specifier to expand any named subsets
750  toRemove = set()
751  toAdd = set()
752  for label in labelSpecifier:
753  if label in pipeline.labeled_subsets:
754  toRemove.add(label)
755  toAdd.update(pipeline.labeled_subsets[label].subset)
756  labelSpecifier.difference_update(toRemove)
757  labelSpecifier.update(toAdd)
758  # verify all the labels are in the pipeline
759  if not labelSpecifier.issubset(pipeline.tasks.keys()
760  | pipeline.labeled_subsets):
761  difference = labelSpecifier.difference(pipeline.tasks.keys())
762  raise ValueError("Not all supplied labels (specified or named subsets) are in the pipeline "
763  f"definition, extra labels: {difference}")
764  # copy needed so as to not modify while iterating
765  pipeline_labels = set(pipeline.tasks.keys())
766  # Remove the labels from the pipelineIR, and any contracts that contain
767  # those labels (see docstring on _remove_contracts for why this may
768  # cause issues)
769  for label in pipeline_labels:
770  if label not in labelSpecifier:
771  pipeline.tasks.pop(label)
772  pipeline._remove_contracts(label)
773 
774  # create a copy of the object to iterate over
775  labeled_subsets = copy.copy(pipeline.labeled_subsets)
776  # remove any labeled subsets that no longer have a complete set
777  for label, labeled_subset in labeled_subsets.items():
778  if labeled_subset.subset - pipeline.tasks.keys():
779  pipeline.labeled_subsets.pop(label)
780 
781  return pipeline
782 
783  @classmethod
784  def from_string(cls, pipeline_string: str):
785  """Create a `PipelineIR` object from a string formatted like a pipeline
786  document
787 
788  Parameters
789  ----------
790  pipeline_string : `str`
791  A string that is formatted according like a pipeline document
792  """
793  loaded_yaml = yaml.load(pipeline_string, Loader=PipelineYamlLoader)
794  return cls(loaded_yaml)
795 
796  @classmethod
797  @deprecated(reason="This has been replaced with `from_uri`. will be removed after v23", version="v21.0,", category=FutureWarning) # type: ignore
798  def from_file(cls, filename: str) -> PipelineIR:
799  """Create a `PipelineIR` object from the document specified by the
800  input path.
801 
802  Parameters
803  ----------
804  filename : `str`
805  Location of document to use in creating a `PipelineIR` object.
806 
807  Returns
808  -------
809  pipelineIR : `PipelineIR`
810  The loaded pipeline
811 
812  Note
813  ----
814  This method is deprecated, please use from_uri
815  """
816  return cls.from_urifrom_uri(filename)
817 
818  @classmethod
819  def from_uri(cls, uri: Union[str, ButlerURI]) -> PipelineIR:
820  """Create a `PipelineIR` object from the document specified by the
821  input uri.
822 
823  Parameters
824  ----------
825  uri: `str` or `ButlerURI`
826  Location of document to use in creating a `PipelineIR` object.
827 
828  Returns
829  -------
830  pipelineIR : `PipelineIR`
831  The loaded pipeline
832  """
833  loaded_uri = ButlerURI(uri)
834  # With ButlerURI we have the choice of always using a local file or
835  # reading in the bytes directly. Reading in bytes can be more
836  # efficient for reasonably-sized files when the resource is remote.
837  # For now use the local file variant. For a local file as_local() does
838  # nothing.
839  with loaded_uri.as_local() as local:
840  # explicitly read here, there was some issue with yaml trying
841  # to read the ButlerURI itself (I think because it only
842  # pretends to be conformant to the io api)
843  loaded_yaml = yaml.load(local.read(), Loader=PipelineYamlLoader)
844  return cls(loaded_yaml)
845 
846  @deprecated(reason="This has been replaced with `write_to_uri`. will be removed after v23", version="v21.0,", category=FutureWarning) # type: ignore
847  def to_file(self, filename: str):
848  """Serialize this `PipelineIR` object into a yaml formatted string and
849  write the output to a file at the specified path.
850 
851  Parameters
852  ----------
853  filename : `str`
854  Location of document to write a `PipelineIR` object.
855  """
856  self.write_to_uriwrite_to_uri(filename)
857 
858  def write_to_uri(self, uri: Union[ButlerURI, str]):
859  """Serialize this `PipelineIR` object into a yaml formatted string and
860  write the output to a file at the specified uri.
861 
862  Parameters
863  ----------
864  uri: `str` or `ButlerURI`
865  Location of document to write a `PipelineIR` object.
866  """
867  butlerUri = ButlerURI(uri)
868  butlerUri.write(yaml.dump(self.to_primitivesto_primitives(), sort_keys=False).encode())
869 
870  def to_primitives(self) -> Dict[str, Any]:
871  """Convert to a representation used in yaml serialization
872  """
873  accumulate = {"description": self.descriptiondescription}
874  if self.instrumentinstrument is not None:
875  accumulate['instrument'] = self.instrumentinstrument
876  if self.parametersparameters:
877  accumulate['parameters'] = self.parametersparameters.to_primitives()
878  accumulate['tasks'] = {m: t.to_primitives() for m, t in self.taskstasks.items()}
879  if len(self.contractscontracts) > 0:
880  accumulate['contracts'] = [c.to_primitives() for c in self.contractscontracts]
881  if self.labeled_subsetslabeled_subsets:
882  accumulate['subsets'] = {k: v.to_primitives() for k, v in self.labeled_subsetslabeled_subsets.items()}
883  return accumulate
884 
885  def __str__(self) -> str:
886  """Instance formatting as how it would look in yaml representation
887  """
888  return yaml.dump(self.to_primitivesto_primitives(), sort_keys=False)
889 
890  def __repr__(self) -> str:
891  """Instance formatting as how it would look in yaml representation
892  """
893  return str(self)
894 
895  def __eq__(self, other: object):
896  if not isinstance(other, PipelineIR):
897  return False
898  elif all(getattr(self, attr) == getattr(other, attr) for attr in
899  ("contracts", "tasks", "instrument")):
900  return True
901  else:
902  return False
903 
std::vector< SchemaItem< Flag > > * items
Generator["ConfigIR", None, None] maybe_merge(self, "ConfigIR" other_config)
Definition: pipelineIR.py:277
Dict[str, Union[str, dict, List[str]]] to_primitives(self)
Definition: pipelineIR.py:236
ConfigIR formatted(self, ParametersIR parameters)
Definition: pipelineIR.py:250
def __eq__(self, object other)
Definition: pipelineIR.py:316
def __eq__(self, object other)
Definition: pipelineIR.py:93
Dict[str, str] to_primitives(self)
Definition: pipelineIR.py:85
def __eq__(self, object other)
Definition: pipelineIR.py:449
"PipelineIR" toPipelineIR(self)
Definition: pipelineIR.py:408
Dict[str, Union[List[str], str]] to_primitives(self)
Definition: pipelineIR.py:156
LabeledSubset from_primitives(str label, Union[List[str], dict] value)
Definition: pipelineIR.py:118
MutableMapping[str, str] to_primitives(self)
Definition: pipelineIR.py:195
def update(self, Optional[ParametersIR] other)
Definition: pipelineIR.py:191
bool __contains__(self, str value)
Definition: pipelineIR.py:200
def _read_tasks(self, loaded_yaml)
Definition: pipelineIR.py:663
def _read_labeled_subsets(self, dict loaded_yaml)
Definition: pipelineIR.py:546
def _read_parameters(self, loaded_yaml)
Definition: pipelineIR.py:532
def write_to_uri(self, Union[ButlerURI, str] uri)
Definition: pipelineIR.py:858
PipelineIR subset_from_labels(self, Set[str] labelSpecifier)
Definition: pipelineIR.py:718
def __init__(self, loaded_yaml)
Definition: pipelineIR.py:477
def _read_contracts(self, loaded_yaml)
Definition: pipelineIR.py:513
def from_string(cls, str pipeline_string)
Definition: pipelineIR.py:784
PipelineIR from_file(cls, str filename)
Definition: pipelineIR.py:798
PipelineIR from_uri(cls, Union[str, ButlerURI] uri)
Definition: pipelineIR.py:819
Dict[str, Any] to_primitives(self)
Definition: pipelineIR.py:870
def _read_imports(self, loaded_yaml)
Definition: pipelineIR.py:577
def __eq__(self, object other)
Definition: pipelineIR.py:895
def to_file(self, str filename)
Definition: pipelineIR.py:847
def construct_mapping(self, node, deep=False)
Definition: pipelineIR.py:49
Dict[str, Union[str, List[dict]]] to_primitives(self)
Definition: pipelineIR.py:342
def __eq__(self, object other)
Definition: pipelineIR.py:369
def add_or_update_config(self, ConfigIR other_config)
Definition: pipelineIR.py:350
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.