LSSTApplications  19.0.0-14-gb0260a2+72efe9b372,20.0.0+7927753e06,20.0.0+8829bf0056,20.0.0+995114c5d2,20.0.0+b6f4b2abd1,20.0.0+bddc4f4cbe,20.0.0-1-g253301a+8829bf0056,20.0.0-1-g2b7511a+0d71a2d77f,20.0.0-1-g5b95a8c+7461dd0434,20.0.0-12-g321c96ea+23efe4bbff,20.0.0-16-gfab17e72e+fdf35455f6,20.0.0-2-g0070d88+ba3ffc8f0b,20.0.0-2-g4dae9ad+ee58a624b3,20.0.0-2-g61b8584+5d3db074ba,20.0.0-2-gb780d76+d529cf1a41,20.0.0-2-ged6426c+226a441f5f,20.0.0-2-gf072044+8829bf0056,20.0.0-2-gf1f7952+ee58a624b3,20.0.0-20-geae50cf+e37fec0aee,20.0.0-25-g3dcad98+544a109665,20.0.0-25-g5eafb0f+ee58a624b3,20.0.0-27-g64178ef+f1f297b00a,20.0.0-3-g4cc78c6+e0676b0dc8,20.0.0-3-g8f21e14+4fd2c12c9a,20.0.0-3-gbd60e8c+187b78b4b8,20.0.0-3-gbecbe05+48431fa087,20.0.0-38-ge4adf513+a12e1f8e37,20.0.0-4-g97dc21a+544a109665,20.0.0-4-gb4befbc+087873070b,20.0.0-4-gf910f65+5d3db074ba,20.0.0-5-gdfe0fee+199202a608,20.0.0-5-gfbfe500+d529cf1a41,20.0.0-6-g64f541c+d529cf1a41,20.0.0-6-g9a5b7a1+a1cd37312e,20.0.0-68-ga3f3dda+5fca18c6a4,20.0.0-9-g4aef684+e18322736b,w.2020.45
LSSTDataManagementBasePackage
pipelineIR.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ("ConfigIR", "ContractError", "ContractIR", "InheritIR", "PipelineIR", "TaskIR", "LabeledSubset")
24 
25 from collections import Counter
26 from collections.abc import Iterable as abcIterable
27 from dataclasses import dataclass, field
28 from typing import Any, List, Set, Union, Generator, MutableMapping, Optional, Dict
29 
30 import copy
31 import re
32 import os
33 import yaml
34 import warnings
35 
36 
37 class PipelineYamlLoader(yaml.SafeLoader):
38  """This is a specialized version of yaml's SafeLoader. It checks and raises
39  an exception if it finds that there are multiple instances of the same key
40  found inside a pipeline file at a given scope.
41  """
42  def construct_mapping(self, node, deep=False):
43  # do the call to super first so that it can do all the other forms of
44  # checking on this node. If you check the uniqueness of keys first
45  # it would save the work that super does in the case of a failure, but
46  # it might fail in the case that the node was the incorrect node due
47  # to a parsing error, and the resulting exception would be difficult to
48  # understand.
49  mapping = super().construct_mapping(node, deep)
50  # Check if there are any duplicate keys
51  all_keys = Counter(key_node.value for key_node, _ in node.value)
52  duplicates = {k for k, i in all_keys.items() if i != 1}
53  if duplicates:
54  raise KeyError("Pipeline files must not have duplicated keys, "
55  f"{duplicates} appeared multiple times")
56  return mapping
57 
58 
59 class ContractError(Exception):
60  """An exception that is raised when a pipeline contract is not satisfied
61  """
62  pass
63 
64 
65 @dataclass
66 class ContractIR:
67  """Intermediate representation of contracts read from a pipeline yaml file.
68  """
69  contract: str
70  """A string of python code representing one or more conditions on configs
71  in a pipeline. This code-as-string should, once evaluated, should be True
72  if the configs are fine, and False otherwise.
73  """
74  msg: Union[str, None] = None
75  """An optional message to be shown to the user if a contract fails
76  """
77 
78  def to_primitives(self) -> dict:
79  """Convert to a representation used in yaml serialization
80  """
81  accumulate = {"contract": self.contract}
82  if self.msg is not None:
83  accumulate['msg'] = self.msg
84  return accumulate
85 
86  def __eq__(self, other: "ContractIR"):
87  if not isinstance(other, ContractIR):
88  return False
89  elif self.contract == other.contract and self.msg == other.msg:
90  return True
91  else:
92  return False
93 
94 
95 @dataclass
97  """Intermediate representation of named subset of task labels read from
98  a pipeline yaml file.
99  """
100  label: str
101  """The label used to identify the subset of task labels.
102  """
103  subset: Set[str]
104  """A set of task labels contained in this subset.
105  """
106  description: Optional[str]
107  """A description of what this subset of tasks is intended to do
108  """
109 
110  @staticmethod
111  def from_primatives(label: str, value: Union[List[str], dict]) -> LabeledSubset:
112  """Generate `LabeledSubset` objects given a properly formatted object
113  that as been created by a yaml loader.
114 
115  Parameters
116  ----------
117  label : `str`
118  The label that will be used to identify this labeled subset.
119  value : `list` of `str` or `dict`
120  Object returned from loading a labeled subset section from a yaml
121  document.
122 
123  Returns
124  -------
125  labeledSubset : `LabeledSubset`
126  A `LabeledSubset` object build from the inputs.
127 
128  Raises
129  ------
130  ValueError
131  Raised if the value input is not properly formatted for parsing
132  """
133  if isinstance(value, MutableMapping):
134  subset = value.pop("subset", None)
135  if subset is None:
136  raise ValueError("If a labeled subset is specified as a mapping, it must contain the key "
137  "'subset'")
138  description = value.pop("description", None)
139  elif isinstance(value, abcIterable):
140  subset = value
141  description = None
142  else:
143  raise ValueError(f"There was a problem parsing the labeled subset {label}, make sure the "
144  "definition is either a valid yaml list, or a mapping with keys "
145  "(subset, description) where subset points to a yaml list, and description is "
146  "associated with a string")
147  return LabeledSubset(label, set(subset), description)
148 
149  def to_primitives(self) -> dict:
150  """Convert to a representation used in yaml serialization
151  """
152  accumulate: Dict[str, Any] = {"subset": list(self.subset)}
153  if self.description is not None:
154  accumulate["description"] = self.description
155  return accumulate
156 
157 
158 @dataclass
159 class ConfigIR:
160  """Intermediate representation of configurations read from a pipeline yaml
161  file.
162  """
163  python: Union[str, None] = None
164  """A string of python code that is used to modify a configuration. This can
165  also be None if there are no modifications to do.
166  """
167  dataId: Union[dict, None] = None
168  """A dataId that is used to constrain these config overrides to only quanta
169  with matching dataIds. This field can be None if there is no constraint.
170  This is currently an unimplemented feature, and is placed here for future
171  use.
172  """
173  file: List[str] = field(default_factory=list)
174  """A list of paths which points to a file containing config overrides to be
175  applied. This value may be an empty list if there are no overrides to
176  apply.
177  """
178  rest: dict = field(default_factory=dict)
179  """This is a dictionary of key value pairs, where the keys are strings
180  corresponding to qualified fields on a config to override, and the values
181  are strings representing the values to apply.
182  """
183 
184  def to_primitives(self) -> dict:
185  """Convert to a representation used in yaml serialization
186  """
187  accumulate = {}
188  for name in ("python", "dataId", "file"):
189  # if this attribute is thruthy add it to the accumulation
190  # dictionary
191  if getattr(self, name):
192  accumulate[name] = getattr(self, name)
193  # Add the dictionary containing the rest of the config keys to the
194  # # accumulated dictionary
195  accumulate.update(self.rest)
196  return accumulate
197 
198  def maybe_merge(self, other_config: "ConfigIR") -> Generator["ConfigIR", None, None]:
199  """Merges another instance of a `ConfigIR` into this instance if
200  possible. This function returns a generator that is either self
201  if the configs were merged, or self, and other_config if that could
202  not be merged.
203 
204  Parameters
205  ----------
206  other_config : `ConfigIR`
207  An instance of `ConfigIR` to merge into this instance.
208 
209  Returns
210  -------
211  Generator : `ConfigIR`
212  A generator containing either self, or self and other_config if
213  the configs could be merged or not respectively.
214  """
215  # Verify that the config blocks can be merged
216  if self.dataId != other_config.dataId or self.python or other_config.python or\
217  self.file or other_config.file:
218  yield from (self, other_config)
219  return
220 
221  # create a set of all keys, and verify two keys do not have different
222  # values
223  key_union = self.rest.keys() & other_config.rest.keys()
224  for key in key_union:
225  if self.rest[key] != other_config.rest[key]:
226  yield from (self, other_config)
227  return
228  self.rest.update(other_config.rest)
229 
230  # Combine the lists of override files to load
231  self_file_set = set(self.file)
232  other_file_set = set(other_config.file)
233  self.file = list(self_file_set.union(other_file_set))
234 
235  yield self
236 
237  def __eq__(self, other: "ConfigIR"):
238  if not isinstance(other, ConfigIR):
239  return False
240  elif all(getattr(self, attr) == getattr(other, attr) for attr in
241  ("python", "dataId", "file", "rest")):
242  return True
243  else:
244  return False
245 
246 
247 @dataclass
248 class TaskIR:
249  """Intermediate representation of tasks read from a pipeline yaml file.
250  """
251  label: str
252  """An identifier used to refer to a task.
253  """
254  klass: str
255  """A string containing a fully qualified python class to be run in a
256  pipeline.
257  """
258  config: Union[List[ConfigIR], None] = None
259  """List of all configs overrides associated with this task, and may be
260  `None` if there are no config overrides.
261  """
262 
263  def to_primitives(self) -> dict:
264  """Convert to a representation used in yaml serialization
265  """
266  accumulate = {'class': self.klass}
267  if self.config:
268  accumulate['config'] = [c.to_primitives() for c in self.config]
269  return accumulate
270 
271  def add_or_update_config(self, other_config: ConfigIR):
272  """Adds a `ConfigIR` to this task if one is not present. Merges configs
273  if there is a `ConfigIR` present and the dataId keys of both configs
274  match, otherwise adds a new entry to the config list. The exception to
275  the above is that if either the last config or other_config has a
276  python block, then other_config is always added, as python blocks can
277  modify configs in ways that cannot be predicted.
278 
279  Parameters
280  ----------
281  other_config : `ConfigIR`
282  A `ConfigIR` instance to add or merge into the config attribute of
283  this task.
284  """
285  if not self.config:
286  self.config = [other_config]
287  return
288  self.config.extend(self.config.pop().maybe_merge(other_config))
289 
290  def __eq__(self, other: "TaskIR"):
291  if not isinstance(other, TaskIR):
292  return False
293  elif all(getattr(self, attr) == getattr(other, attr) for attr in
294  ("label", "klass", "config")):
295  return True
296  else:
297  return False
298 
299 
300 @dataclass
301 class InheritIR:
302  """An intermediate representation of inherited pipelines
303  """
304  location: str
305  """This is the location of the pipeline to inherit. The path should be
306  specified as an absolute path. Environment variables may be used in the
307  path and should be specified as a python string template, with the name of
308  the environment variable inside braces.
309  """
310  include: Union[List[str], None] = None
311  """List of tasks that should be included when inheriting this pipeline.
312  Either the include or exclude attributes may be specified, but not both.
313  """
314  exclude: Union[List[str], None] = None
315  """List of tasks that should be excluded when inheriting this pipeline.
316  Either the include or exclude attributes may be specified, but not both.
317  """
318  importContracts: bool = True
319  """Boolean attribute to dictate if contracts should be inherited with the
320  pipeline or not.
321  """
322 
323  def toPipelineIR(self) -> "PipelineIR":
324  """Convert to a representation used in yaml serialization
325  """
326  if self.include and self.exclude:
327  raise ValueError("Both an include and an exclude list cant be specified"
328  " when declaring a pipeline import")
329  tmp_pipeline = PipelineIR.from_file(os.path.expandvars(self.location))
330  if tmp_pipeline.instrument is not None:
331  warnings.warn("Any instrument definitions in imported pipelines are ignored. "
332  "if an instrument is desired please define it in the top most pipeline")
333 
334  included_labels = set()
335  for label in tmp_pipeline.tasks:
336  if (self.include and label in self.include) or (self.exclude and label not in self.exclude)\
337  or (self.include is None and self.exclude is None):
338  included_labels.add(label)
339 
340  # Handle labeled subsets being specified in the include or exclude
341  # list, adding or removing labels.
342  if self.include is not None:
343  subsets_in_include = tmp_pipeline.labeled_subsets.keys() & self.include
344  for label in subsets_in_include:
345  included_labels.update(tmp_pipeline.labeled_subsets[label].subset)
346 
347  elif self.exclude is not None:
348  subsets_in_exclude = tmp_pipeline.labeled_subsets.keys() & self.exclude
349  for label in subsets_in_exclude:
350  included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)
351 
352  tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels)
353 
354  if not self.importContracts:
355  tmp_pipeline.contracts = []
356 
357  return tmp_pipeline
358 
359  def __eq__(self, other: "InheritIR"):
360  if not isinstance(other, InheritIR):
361  return False
362  elif all(getattr(self, attr) == getattr(other, attr) for attr in
363  ("location", "include", "exclude", "importContracts")):
364  return True
365  else:
366  return False
367 
368 
370  """Intermediate representation of a pipeline definition
371 
372  Parameters
373  ----------
374  loaded_yaml : `dict`
375  A dictionary which matches the structure that would be produced by a
376  yaml reader which parses a pipeline definition document
377 
378  Raises
379  ------
380  ValueError :
381  - If a pipeline is declared without a description
382  - If no tasks are declared in a pipeline, and no pipelines are to be
383  inherited
384  - If more than one instrument is specified
385  - If more than one inherited pipeline share a label
386  """
387  def __init__(self, loaded_yaml):
388  # Check required fields are present
389  if "description" not in loaded_yaml:
390  raise ValueError("A pipeline must be declared with a description")
391  if "tasks" not in loaded_yaml and "inherits" not in loaded_yaml:
392  raise ValueError("A pipeline must be declared with one or more tasks")
393 
394  # These steps below must happen in this call order
395 
396  # Process pipeline description
397  self.description = loaded_yaml.pop("description")
398 
399  # Process tasks
400  self._read_tasks(loaded_yaml)
401 
402  # Process instrument keys
403  inst = loaded_yaml.pop("instrument", None)
404  if isinstance(inst, list):
405  raise ValueError("Only one top level instrument can be defined in a pipeline")
406  self.instrument = inst
407 
408  # Process any contracts
409  self._read_contracts(loaded_yaml)
410 
411  # Process any named label subsets
412  self._read_labeled_subsets(loaded_yaml)
413 
414  # Process any inherited pipelines
415  self._read_inherits(loaded_yaml)
416 
417  # verify named subsets, must be done after inheriting
419 
420  def _read_contracts(self, loaded_yaml):
421  """Process the contracts portion of the loaded yaml document
422 
423  Parameters
424  ---------
425  loaded_yaml : `dict`
426  A dictionary which matches the structure that would be produced by
427  a yaml reader which parses a pipeline definition document
428  """
429  loaded_contracts = loaded_yaml.pop("contracts", [])
430  if isinstance(loaded_contracts, str):
431  loaded_contracts = [loaded_contracts]
432  self.contracts = []
433  for contract in loaded_contracts:
434  if isinstance(contract, dict):
435  self.contracts.append(ContractIR(**contract))
436  if isinstance(contract, str):
437  self.contracts.append(ContractIR(contract=contract))
438 
439  def _read_labeled_subsets(self, loaded_yaml: dict):
440  """Process the subsets portion of the loaded yaml document
441 
442  Parameters
443  ----------
444  loaded_yaml: `MutableMapping`
445  A dictionary which matches the structure that would be produced
446  by a yaml reader which parses a pipeline definition document
447  """
448  loaded_subsets = loaded_yaml.pop("subsets", {})
449  self.labeled_subsets = {}
450  if not loaded_subsets and "subset" in loaded_yaml:
451  raise ValueError("Top level key should be subsets and not subset, add an s")
452  for key, value in loaded_subsets.items():
453  self.labeled_subsets[key] = LabeledSubset.from_primatives(key, value)
454 
455  def _verify_labeled_subsets(self):
456  """Verifies that all the labels in each named subset exist within the
457  pipeline.
458  """
459  # Verify that all labels defined in a labeled subset are in the
460  # Pipeline
461  for labeled_subset in self.labeled_subsets.values():
462  if not labeled_subset.subset.issubset(self.tasks.keys()):
463  raise ValueError(f"Labels {labeled_subset.subset - self.tasks.keys()} were not found in the "
464  "declared pipeline")
465  # Verify subset labels are not already task labels
466  label_intersection = self.labeled_subsets.keys() & self.tasks.keys()
467  if label_intersection:
468  raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}")
469 
470  def _read_inherits(self, loaded_yaml):
471  """Process the inherits portion of the loaded yaml document
472 
473  Parameters
474  ---------
475  loaded_yaml : `dict`
476  A dictionary which matches the structure that would be produced by
477  a yaml reader which parses a pipeline definition document
478  """
479  def process_args(argument: Union[str, dict]) -> dict:
480  if isinstance(argument, str):
481  return {"location": argument}
482  elif isinstance(argument, dict):
483  if "exclude" in argument and isinstance(argument["exclude"], str):
484  argument["exclude"] = [argument["exclude"]]
485  if "include" in argument and isinstance(argument["include"], str):
486  argument["include"] = [argument["include"]]
487  return argument
488  tmp_inherit = loaded_yaml.pop("inherits", None)
489  if tmp_inherit is None:
490  self.inherits = []
491  elif isinstance(tmp_inherit, list):
492  self.inherits = [InheritIR(**process_args(args)) for args in tmp_inherit]
493  else:
494  self.inherits = [InheritIR(**process_args(tmp_inherit))]
495 
496  # integrate any imported pipelines
497  accumulate_tasks = {}
498  accumulate_labeled_subsets = {}
499  for other_pipeline in self.inherits:
500  tmp_IR = other_pipeline.toPipelineIR()
501  if accumulate_tasks.keys() & tmp_IR.tasks.keys():
502  raise ValueError("Task labels in the imported pipelines must "
503  "be unique")
504  accumulate_tasks.update(tmp_IR.tasks)
505  self.contracts.extend(tmp_IR.contracts)
506  # verify that tmp_IR has unique labels for named subset among
507  # existing labeled subsets, and with existing task labels.
508  overlapping_subsets = accumulate_labeled_subsets.keys() & tmp_IR.labeled_subsets.keys()
509  task_subset_overlap = ((accumulate_labeled_subsets.keys() | tmp_IR.labeled_subsets.keys())
510  & accumulate_tasks.keys())
511  if overlapping_subsets or task_subset_overlap:
512  raise ValueError("Labeled subset names must be unique amongst imports in both labels and "
513  f" named Subsets. Duplicate: {overlapping_subsets | task_subset_overlap}")
514  accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
515 
516  # verify that any accumulated labeled subsets dont clash with a label
517  # from this pipeline
518  if accumulate_labeled_subsets.keys() & self.tasks.keys():
519  raise ValueError("Labeled subset names must be unique amongst imports in both labels and "
520  " named Subsets")
521  # merge in the named subsets for self so this document can override any
522  # that have been delcared
523  accumulate_labeled_subsets.update(self.labeled_subsets)
524  self.labeled_subsets = accumulate_labeled_subsets
525 
526  # merge the dict of label:TaskIR objects, preserving any configs in the
527  # imported pipeline if the labels point to the same class
528  for label, task in self.tasks.items():
529  if label not in accumulate_tasks:
530  accumulate_tasks[label] = task
531  elif accumulate_tasks[label].klass == task.klass:
532  if task.config is not None:
533  for config in task.config:
534  accumulate_tasks[label].add_or_update_config(config)
535  else:
536  accumulate_tasks[label] = task
537  self.tasks = accumulate_tasks
538 
539  def _read_tasks(self, loaded_yaml):
540  """Process the tasks portion of the loaded yaml document
541 
542  Parameters
543  ---------
544  loaded_yaml : `dict`
545  A dictionary which matches the structure that would be produced by
546  a yaml reader which parses a pipeline definition document
547  """
548  self.tasks = {}
549  tmp_tasks = loaded_yaml.pop("tasks", None)
550  if tmp_tasks is None:
551  tmp_tasks = {}
552 
553  for label, definition in tmp_tasks.items():
554  if isinstance(definition, str):
555  definition = {"class": definition}
556  config = definition.get('config', None)
557  if config is None:
558  task_config_ir = None
559  else:
560  if isinstance(config, dict):
561  config = [config]
562  task_config_ir = []
563  for c in config:
564  file = c.pop("file", None)
565  if file is None:
566  file = []
567  elif not isinstance(file, list):
568  file = [file]
569  task_config_ir.append(ConfigIR(python=c.pop("python", None),
570  dataId=c.pop("dataId", None),
571  file=file,
572  rest=c))
573  self.tasks[label] = TaskIR(label, definition["class"], task_config_ir)
574 
575  def _remove_contracts(self, label: str):
576  """Remove any contracts that contain the given label
577 
578  String comparison used in this way is not the most elegant and may
579  have issues, but it is the only feasible way when users can specify
580  contracts with generic strings.
581  """
582  new_contracts = []
583  for contract in self.contracts:
584  # match a label that is not preceded by an ASCII identifier, or
585  # is the start of a line and is followed by a dot
586  if re.match(f".*([^A-Za-z0-9_]|^){label}[.]", contract.contract):
587  continue
588  new_contracts.append(contract)
589  self.contracts = new_contracts
590 
591  def subset_from_labels(self, labelSpecifier: Set[str]) -> PipelineIR:
592  """Subset a pipelineIR to contain only labels specified in
593  labelSpecifier.
594 
595  Parameters
596  ----------
597  labelSpecifier : `set` of `str`
598  Set containing labels that describes how to subset a pipeline.
599 
600  Returns
601  -------
602  pipeline : `PipelineIR`
603  A new pipelineIR object that is a subset of the old pipelineIR
604 
605  Raises
606  ------
607  ValueError
608  Raised if there is an issue with specified labels
609 
610  Notes
611  -----
612  This method attempts to prune any contracts that contain labels which
613  are not in the declared subset of labels. This pruning is done using a
614  string based matching due to the nature of contracts and may prune more
615  than it should. Any labeled subsets defined that no longer have all
616  members of the subset present in the pipeline will be removed from the
617  resulting pipeline.
618  """
619 
620  pipeline = copy.deepcopy(self)
621 
622  # update the label specifier to expand any named subsets
623  toRemove = set()
624  toAdd = set()
625  for label in labelSpecifier:
626  if label in pipeline.labeled_subsets:
627  toRemove.add(label)
628  toAdd.update(pipeline.labeled_subsets[label].subset)
629  labelSpecifier.difference_update(toRemove)
630  labelSpecifier.update(toAdd)
631  # verify all the labels are in the pipeline
632  if not labelSpecifier.issubset(pipeline.tasks.keys()
633  | pipeline.labeled_subsets):
634  difference = labelSpecifier.difference(pipeline.tasks.keys())
635  raise ValueError("Not all supplied labels (specified or named subsets) are in the pipeline "
636  f"definition, extra labels: {difference}")
637  # copy needed so as to not modify while iterating
638  pipeline_labels = set(pipeline.tasks.keys())
639  # Remove the labels from the pipelineIR, and any contracts that contain
640  # those labels (see docstring on _remove_contracts for why this may
641  # cause issues)
642  for label in pipeline_labels:
643  if label not in labelSpecifier:
644  pipeline.tasks.pop(label)
645  pipeline._remove_contracts(label)
646 
647  # create a copy of the object to iterate over
648  labeled_subsets = copy.copy(pipeline.labeled_subsets)
649  # remove any labeled subsets that no longer have a complete set
650  for label, labeled_subset in labeled_subsets.items():
651  if labeled_subset.subset - pipeline.tasks.keys():
652  pipeline.labeled_subsets.pop(label)
653 
654  return pipeline
655 
656  @classmethod
657  def from_string(cls, pipeline_string: str):
658  """Create a `PipelineIR` object from a string formatted like a pipeline
659  document
660 
661  Parameters
662  ----------
663  pipeline_string : `str`
664  A string that is formatted according like a pipeline document
665  """
666  loaded_yaml = yaml.load(pipeline_string, Loader=PipelineYamlLoader)
667  return cls(loaded_yaml)
668 
669  @classmethod
670  def from_file(cls, filename: str):
671  """Create a `PipelineIR` object from the document specified by the
672  input path.
673 
674  Parameters
675  ----------
676  filename : `str`
677  Location of document to use in creating a `PipelineIR` object.
678  """
679  with open(filename, 'r') as f:
680  loaded_yaml = yaml.load(f, Loader=PipelineYamlLoader)
681  return cls(loaded_yaml)
682 
683  def to_file(self, filename: str):
684  """Serialize this `PipelineIR` object into a yaml formatted string and
685  write the output to a file at the specified path.
686 
687  Parameters
688  ----------
689  filename : `str`
690  Location of document to write a `PipelineIR` object.
691  """
692  with open(filename, 'w') as f:
693  yaml.dump(self.to_primitives(), f, sort_keys=False)
694 
695  def to_primitives(self):
696  """Convert to a representation used in yaml serialization
697  """
698  accumulate = {"description": self.description}
699  if self.instrument is not None:
700  accumulate['instrument'] = self.instrument
701  accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()}
702  if len(self.contracts) > 0:
703  accumulate['contracts'] = [c.to_primitives() for c in self.contracts]
704  if self.labeled_subsets:
705  accumulate['subsets'] = {k: v.to_primitives() for k, v in self.labeled_subsets.items()}
706  return accumulate
707 
708  def __str__(self) -> str:
709  """Instance formatting as how it would look in yaml representation
710  """
711  return yaml.dump(self.to_primitives(), sort_keys=False)
712 
713  def __repr__(self) -> str:
714  """Instance formatting as how it would look in yaml representation
715  """
716  return str(self)
717 
718  def __eq__(self, other: "PipelineIR"):
719  if not isinstance(other, PipelineIR):
720  return False
721  elif all(getattr(self, attr) == getattr(other, attr) for attr in
722  ("contracts", "tasks", "instrument")):
723  return True
724  else:
725  return False
lsst.pipe.base.pipelineIR.PipelineIR._read_contracts
def _read_contracts(self, loaded_yaml)
Definition: pipelineIR.py:420
lsst.pipe.base.pipelineIR.LabeledSubset
Definition: pipelineIR.py:96
lsst.pipe.base.pipelineIR.PipelineIR.__eq__
def __eq__(self, "PipelineIR" other)
Definition: pipelineIR.py:718
lsst.pipe.base.pipelineIR.TaskIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:263
lsst.pipe.base.pipelineIR.PipelineIR._read_labeled_subsets
def _read_labeled_subsets(self, dict loaded_yaml)
Definition: pipelineIR.py:439
lsst.pipe.base.pipelineIR.PipelineIR.subset_from_labels
PipelineIR subset_from_labels(self, Set[str] labelSpecifier)
Definition: pipelineIR.py:591
lsst.pipe.base.pipelineIR.LabeledSubset.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:149
lsst.pipe.base.pipelineIR.PipelineIR.tasks
tasks
Definition: pipelineIR.py:537
field
lsst.pipe.base.pipelineIR.PipelineIR.inherits
inherits
Definition: pipelineIR.py:490
lsst.pipe.base.pipelineIR.ContractError
Definition: pipelineIR.py:59
lsst.pipe.base.pipelineIR.PipelineIR._verify_labeled_subsets
def _verify_labeled_subsets(self)
Definition: pipelineIR.py:455
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.pipe.base.pipelineIR.ConfigIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:184
astshim.keyMap.keyMapContinued.keys
def keys(self)
Definition: keyMapContinued.py:6
lsst.pipe.base.pipelineIR.PipelineIR.__str__
str __str__(self)
Definition: pipelineIR.py:708
lsst.pipe.base.pipelineIR.PipelineIR
Definition: pipelineIR.py:369
lsst.pipe.base.pipelineIR.PipelineIR.__repr__
str __repr__(self)
Definition: pipelineIR.py:713
lsst::afw::geom.transform.transformContinued.cls
cls
Definition: transformContinued.py:33
lsst::geom::all
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
Definition: CoordinateExpr.h:81
lsst.pipe.base.pipelineIR.PipelineYamlLoader
Definition: pipelineIR.py:37
lsst.pipe.base.pipelineIR.ContractIR.msg
msg
Definition: pipelineIR.py:89
lsst.pipe.base.pipelineIR.PipelineIR.from_file
def from_file(cls, str filename)
Definition: pipelineIR.py:670
lsst.pipe.base.pipelineIR.TaskIR
Definition: pipelineIR.py:248
lsst.pipe.base.pipelineIR.ContractIR.__eq__
def __eq__(self, "ContractIR" other)
Definition: pipelineIR.py:86
lsst.pipe.base.pipelineIR.PipelineIR._read_tasks
def _read_tasks(self, loaded_yaml)
Definition: pipelineIR.py:539
lsst.pipe.base.pipelineIR.LabeledSubset.from_primatives
LabeledSubset from_primatives(str label, Union[List[str], dict] value)
Definition: pipelineIR.py:111
lsst.pipe.base.pipelineIR.TaskIR.add_or_update_config
def add_or_update_config(self, ConfigIR other_config)
Definition: pipelineIR.py:271
lsst.pipe.base.pipelineIR.ContractIR.contract
contract
Definition: pipelineIR.py:89
lsst.pipe.base.pipelineIR.ConfigIR.maybe_merge
Generator["ConfigIR", None, None] maybe_merge(self, "ConfigIR" other_config)
Definition: pipelineIR.py:198
lsst.pipe.base.pipelineIR.ConfigIR.file
file
Definition: pipelineIR.py:233
lsst.pipe.base.pipelineIR.ContractIR.to_primitives
dict to_primitives(self)
Definition: pipelineIR.py:78
lsst.pipe.base.pipelineIR.PipelineIR.contracts
contracts
Definition: pipelineIR.py:432
lsst.pipe.base.pipelineIR.PipelineIR.description
description
Definition: pipelineIR.py:397
lsst.pipe.base.pipelineIR.InheritIR.__eq__
def __eq__(self, "InheritIR" other)
Definition: pipelineIR.py:359
lsst.pipe.base.pipelineIR.PipelineIR.instrument
instrument
Definition: pipelineIR.py:406
lsst.pipe.base.pipelineIR.ConfigIR
Definition: pipelineIR.py:159
lsst.pipe.base.pipelineIR.PipelineIR.from_string
def from_string(cls, str pipeline_string)
Definition: pipelineIR.py:657
lsst.pipe.base.pipelineIR.PipelineIR.__init__
def __init__(self, loaded_yaml)
Definition: pipelineIR.py:387
items
std::vector< SchemaItem< Flag > > * items
Definition: BaseColumnView.cc:142
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.base.pipelineIR.PipelineIR.to_file
def to_file(self, str filename)
Definition: pipelineIR.py:683
lsst.pipe.base.pipelineIR.PipelineIR.to_primitives
def to_primitives(self)
Definition: pipelineIR.py:695
lsst.pipe.base.pipelineIR.InheritIR.toPipelineIR
"PipelineIR" toPipelineIR(self)
Definition: pipelineIR.py:323
lsst.pipe.base.pipelineIR.ContractIR
Definition: pipelineIR.py:66
lsst.pipe.base.pipelineIR.TaskIR.config
config
Definition: pipelineIR.py:286
lsst.pipe.base.pipelineIR.PipelineYamlLoader.construct_mapping
def construct_mapping(self, node, deep=False)
Definition: pipelineIR.py:42
lsst.pipe.base.pipelineIR.PipelineIR._read_inherits
def _read_inherits(self, loaded_yaml)
Definition: pipelineIR.py:470
lsst.pipe.base.pipelineIR.InheritIR
Definition: pipelineIR.py:301
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst.pipe.base.pipelineIR.TaskIR.__eq__
def __eq__(self, "TaskIR" other)
Definition: pipelineIR.py:290
lsst.pipe.base.pipelineIR.PipelineIR.labeled_subsets
labeled_subsets
Definition: pipelineIR.py:449
lsst.pipe.base.pipelineIR.ConfigIR.__eq__
def __eq__(self, "ConfigIR" other)
Definition: pipelineIR.py:237