LSST Applications  22.0.1,22.0.1+01bcf6a671,22.0.1+046ee49490,22.0.1+05c7de27da,22.0.1+0c6914dbf6,22.0.1+1220d50b50,22.0.1+12fd109e95,22.0.1+1a1dd69893,22.0.1+1c910dc348,22.0.1+1ef34551f5,22.0.1+30170c3d08,22.0.1+39153823fd,22.0.1+611137eacc,22.0.1+771eb1e3e8,22.0.1+94e66cc9ed,22.0.1+9a075d06e2,22.0.1+a5ff6e246e,22.0.1+a7db719c1a,22.0.1+ba0d97e778,22.0.1+bfe1ee9056,22.0.1+c4e1e0358a,22.0.1+cc34b8281e,22.0.1+d640e2c0fa,22.0.1+d72a2e677a,22.0.1+d9a6b571bd,22.0.1+e485e9761b,22.0.1+ebe8d3385e
LSST Data Management Base Package
Public Member Functions | List of all members
lsst.pipe.base.pipeline.Pipeline Class Reference

Public Member Functions

def __init__ (self, str description)
 
Pipeline fromFile (cls, str filename)
 
Pipeline from_uri (cls, Union[str, ButlerURI] uri)
 
Pipeline subsetFromLabels (self, LabelSpecifier labelSpecifier)
 
Pipeline fromString (cls, str pipeline_string)
 
Pipeline fromIR (cls, pipelineIR.PipelineIR deserialized_pipeline)
 
Pipeline fromPipeline (cls, pipelineIR.PipelineIR pipeline)
 
str __str__ (self)
 
None addInstrument (self, Union[Instrument, str] instrument)
 
Instrument getInstrument (self)
 
None addTask (self, Union[PipelineTask, str] task, str label)
 
None removeTask (self, str label)
 
None addConfigOverride (self, str label, str key, object value)
 
None addConfigFile (self, str label, str filename)
 
None addConfigPython (self, str label, str pythonString)
 
None toFile (self, str filename)
 
None write_to_uri (self, Union[str, ButlerURI] uri)
 
Generator[TaskDef, None, None] toExpandedPipeline (self)
 
def __len__ (self)
 
def __eq__ (self, object other)
 

Detailed Description

A `Pipeline` is a representation of a series of tasks to run, and the
configuration for those tasks.

Parameters
----------
description : `str`
    A description of that this pipeline does.

Definition at line 151 of file pipeline.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.base.pipeline.Pipeline.__init__ (   self,
str  description 
)

Definition at line 160 of file pipeline.py.

160  def __init__(self, description: str):
161  pipeline_dict = {"description": description, "tasks": {}}
162  self._pipelineIR = pipelineIR.PipelineIR(pipeline_dict)
163 

Member Function Documentation

◆ __eq__()

def lsst.pipe.base.pipeline.Pipeline.__eq__ (   self,
object  other 
)

Definition at line 598 of file pipeline.py.

598  def __eq__(self, other: object):
599  if not isinstance(other, Pipeline):
600  return False
601  return self._pipelineIR == other._pipelineIR
602 
603 
604 @dataclass(frozen=True)

◆ __len__()

def lsst.pipe.base.pipeline.Pipeline.__len__ (   self)

Definition at line 595 of file pipeline.py.

595  def __len__(self):
596  return len(self._pipelineIR.tasks)
597 

◆ __str__()

str lsst.pipe.base.pipeline.Pipeline.__str__ (   self)

Definition at line 398 of file pipeline.py.

398  def __str__(self) -> str:
399  return str(self._pipelineIR)
400 

◆ addConfigFile()

None lsst.pipe.base.pipeline.Pipeline.addConfigFile (   self,
str  label,
str  filename 
)
Add overrides from a specified file.

Parameters
----------
label : `str`
    The label used to identify the task associated with config to
    modify
filename : `str`
    Path to the override file.

Definition at line 490 of file pipeline.py.

490  def addConfigFile(self, label: str, filename: str) -> None:
491  """Add overrides from a specified file.
492 
493  Parameters
494  ----------
495  label : `str`
496  The label used to identify the task associated with config to
497  modify
498  filename : `str`
499  Path to the override file.
500  """
501  self._addConfigImpl(label, pipelineIR.ConfigIR(file=[filename]))
502 

◆ addConfigOverride()

None lsst.pipe.base.pipeline.Pipeline.addConfigOverride (   self,
str  label,
str  key,
object  value 
)
Apply single config override.

Parameters
----------
label : `str`
    Label of the task.
key: `str`
    Fully-qualified field name.
value : object
    Value to be given to a field.

Definition at line 476 of file pipeline.py.

476  def addConfigOverride(self, label: str, key: str, value: object) -> None:
477  """Apply single config override.
478 
479  Parameters
480  ----------
481  label : `str`
482  Label of the task.
483  key: `str`
484  Fully-qualified field name.
485  value : object
486  Value to be given to a field.
487  """
488  self._addConfigImpl(label, pipelineIR.ConfigIR(rest={key: value}))
489 

◆ addConfigPython()

None lsst.pipe.base.pipeline.Pipeline.addConfigPython (   self,
str  label,
str  pythonString 
)
Add Overrides by running a snippet of python code against a config.

Parameters
----------
label : `str`
    The label used to identity the task associated with config to
    modify.
pythonString: `str`
    A string which is valid python code to be executed. This is done
    with config as the only local accessible value.

Definition at line 503 of file pipeline.py.

503  def addConfigPython(self, label: str, pythonString: str) -> None:
504  """Add Overrides by running a snippet of python code against a config.
505 
506  Parameters
507  ----------
508  label : `str`
509  The label used to identity the task associated with config to
510  modify.
511  pythonString: `str`
512  A string which is valid python code to be executed. This is done
513  with config as the only local accessible value.
514  """
515  self._addConfigImpl(label, pipelineIR.ConfigIR(python=pythonString))
516 

◆ addInstrument()

None lsst.pipe.base.pipeline.Pipeline.addInstrument (   self,
Union[Instrument, str]  instrument 
)
Add an instrument to the pipeline, or replace an instrument that is
already defined.

Parameters
----------
instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
    Either a derived class object of a `lsst.daf.butler.instrument` or
    a string corresponding to a fully qualified
    `lsst.daf.butler.instrument` name.

Definition at line 401 of file pipeline.py.

401  def addInstrument(self, instrument: Union[Instrument, str]) -> None:
402  """Add an instrument to the pipeline, or replace an instrument that is
403  already defined.
404 
405  Parameters
406  ----------
407  instrument : `~lsst.daf.butler.instrument.Instrument` or `str`
408  Either a derived class object of a `lsst.daf.butler.instrument` or
409  a string corresponding to a fully qualified
410  `lsst.daf.butler.instrument` name.
411  """
412  if isinstance(instrument, str):
413  pass
414  else:
415  # TODO: assume that this is a subclass of Instrument, no type
416  # checking
417  instrument = f"{instrument.__module__}.{instrument.__qualname__}"
418  self._pipelineIR.instrument = instrument
419 

◆ addTask()

None lsst.pipe.base.pipeline.Pipeline.addTask (   self,
Union[PipelineTask, str]  task,
str  label 
)
Add a new task to the pipeline, or replace a task that is already
associated with the supplied label.

Parameters
----------
task: `PipelineTask` or `str`
    Either a derived class object of a `PipelineTask` or a string
    corresponding to a fully qualified `PipelineTask` name.
label: `str`
    A label that is used to identify the `PipelineTask` being added

Definition at line 432 of file pipeline.py.

432  def addTask(self, task: Union[PipelineTask, str], label: str) -> None:
433  """Add a new task to the pipeline, or replace a task that is already
434  associated with the supplied label.
435 
436  Parameters
437  ----------
438  task: `PipelineTask` or `str`
439  Either a derived class object of a `PipelineTask` or a string
440  corresponding to a fully qualified `PipelineTask` name.
441  label: `str`
442  A label that is used to identify the `PipelineTask` being added
443  """
444  if isinstance(task, str):
445  taskName = task
446  elif issubclass(task, PipelineTask):
447  taskName = f"{task.__module__}.{task.__qualname__}"
448  else:
449  raise ValueError("task must be either a child class of PipelineTask or a string containing"
450  " a fully qualified name to one")
451  if not label:
452  # in some cases (with command line-generated pipeline) tasks can
453  # be defined without label which is not acceptable, use task
454  # _DefaultName in that case
455  if isinstance(task, str):
456  task = doImport(task)
457  label = task._DefaultName
458  self._pipelineIR.tasks[label] = pipelineIR.TaskIR(label, taskName)
459 

◆ from_uri()

Pipeline lsst.pipe.base.pipeline.Pipeline.from_uri (   cls,
Union[str, ButlerURI]  uri 
)
Load a pipeline defined in a pipeline yaml file at a location
specified by a URI.

Parameters
----------
uri: `str` or `ButlerURI`
   If a string is supplied this should be a URI path that points to a
   pipeline defined in yaml format. This uri may also supply
   additional labels to be used in subsetting the loaded Pipeline.
   These labels are separated from the path by a \\#, and may be
   specified as a comma separated list, or a range denoted as
   beginning..end. Beginning or end may be empty, in which case the
   range will be a half open interval. Unlike python iteration
   bounds, end bounds are *INCLUDED*. Note that range based selection
   is not well defined for pipelines that are not linear in nature,
   and correct behavior is not guaranteed, or may vary from run to
   run. The same specifiers can be used with a ButlerURI object, by
   being the sole contents in the fragments attribute.

Returns
-------
pipeline: `Pipeline`
    The pipeline loaded from specified location with appropriate (if
    any) subsetting

Notes
-----
This method attempts to prune any contracts that contain labels which
are not in the declared subset of labels. This pruning is done using a
string based matching due to the nature of contracts and may prune more
than it should.

Definition at line 198 of file pipeline.py.

198  def from_uri(cls, uri: Union[str, ButlerURI]) -> Pipeline:
199  """Load a pipeline defined in a pipeline yaml file at a location
200  specified by a URI.
201 
202  Parameters
203  ----------
204  uri: `str` or `ButlerURI`
205  If a string is supplied this should be a URI path that points to a
206  pipeline defined in yaml format. This uri may also supply
207  additional labels to be used in subsetting the loaded Pipeline.
208  These labels are separated from the path by a \\#, and may be
209  specified as a comma separated list, or a range denoted as
210  beginning..end. Beginning or end may be empty, in which case the
211  range will be a half open interval. Unlike python iteration
212  bounds, end bounds are *INCLUDED*. Note that range based selection
213  is not well defined for pipelines that are not linear in nature,
214  and correct behavior is not guaranteed, or may vary from run to
215  run. The same specifiers can be used with a ButlerURI object, by
216  being the sole contents in the fragments attribute.
217 
218  Returns
219  -------
220  pipeline: `Pipeline`
221  The pipeline loaded from specified location with appropriate (if
222  any) subsetting
223 
224  Notes
225  -----
226  This method attempts to prune any contracts that contain labels which
227  are not in the declared subset of labels. This pruning is done using a
228  string based matching due to the nature of contracts and may prune more
229  than it should.
230  """
231  # Split up the uri and any labels that were supplied
232  uri, label_specifier = cls._parse_file_specifier(uri)
233  pipeline: Pipeline = cls.fromIR(pipelineIR.PipelineIR.from_uri(uri))
234 
235  # If there are labels supplied, only keep those
236  if label_specifier is not None:
237  pipeline = pipeline.subsetFromLabels(label_specifier)
238  return pipeline
239 

◆ fromFile()

Pipeline lsst.pipe.base.pipeline.Pipeline.fromFile (   cls,
str  filename 
)
Load a pipeline defined in a pipeline yaml file.

Parameters
----------
filename: `str`
   A path that points to a pipeline defined in yaml format. This
   filename may also supply additional labels to be used in
   subsetting the loaded Pipeline. These labels are separated from
   the path by a \\#, and may be specified as a comma separated
   list, or a range denoted as beginning..end. Beginning or end may
   be empty, in which case the range will be a half open interval.
   Unlike python iteration bounds, end bounds are *INCLUDED*. Note
   that range based selection is not well defined for pipelines that
   are not linear in nature, and correct behavior is not guaranteed,
   or may vary from run to run.

Returns
-------
pipeline: `Pipeline`
    The pipeline loaded from specified location with appropriate (if
    any) subsetting

Notes
-----
This method attempts to prune any contracts that contain labels which
are not in the declared subset of labels. This pruning is done using a
string based matching due to the nature of contracts and may prune more
than it should.

Definition at line 165 of file pipeline.py.

165  def fromFile(cls, filename: str) -> Pipeline:
166  """Load a pipeline defined in a pipeline yaml file.
167 
168  Parameters
169  ----------
170  filename: `str`
171  A path that points to a pipeline defined in yaml format. This
172  filename may also supply additional labels to be used in
173  subsetting the loaded Pipeline. These labels are separated from
174  the path by a \\#, and may be specified as a comma separated
175  list, or a range denoted as beginning..end. Beginning or end may
176  be empty, in which case the range will be a half open interval.
177  Unlike python iteration bounds, end bounds are *INCLUDED*. Note
178  that range based selection is not well defined for pipelines that
179  are not linear in nature, and correct behavior is not guaranteed,
180  or may vary from run to run.
181 
182  Returns
183  -------
184  pipeline: `Pipeline`
185  The pipeline loaded from specified location with appropriate (if
186  any) subsetting
187 
188  Notes
189  -----
190  This method attempts to prune any contracts that contain labels which
191  are not in the declared subset of labels. This pruning is done using a
192  string based matching due to the nature of contracts and may prune more
193  than it should.
194  """
195  return cls.from_uri(filename)
196 

◆ fromIR()

Pipeline lsst.pipe.base.pipeline.Pipeline.fromIR (   cls,
pipelineIR.PipelineIR  deserialized_pipeline 
)
Create a pipeline from an already created `PipelineIR` object.

Parameters
----------
deserialized_pipeline: `PipelineIR`
    An already created pipeline intermediate representation object

Returns
-------
pipeline: `Pipeline`

Definition at line 367 of file pipeline.py.

367  def fromIR(cls, deserialized_pipeline: pipelineIR.PipelineIR) -> Pipeline:
368  """Create a pipeline from an already created `PipelineIR` object.
369 
370  Parameters
371  ----------
372  deserialized_pipeline: `PipelineIR`
373  An already created pipeline intermediate representation object
374 
375  Returns
376  -------
377  pipeline: `Pipeline`
378  """
379  pipeline = cls.__new__(cls)
380  pipeline._pipelineIR = deserialized_pipeline
381  return pipeline
382 

◆ fromPipeline()

Pipeline lsst.pipe.base.pipeline.Pipeline.fromPipeline (   cls,
pipelineIR.PipelineIR  pipeline 
)
Create a new pipeline by copying an already existing `Pipeline`.

Parameters
----------
pipeline: `Pipeline`
    An already created pipeline intermediate representation object

Returns
-------
pipeline: `Pipeline`

Definition at line 384 of file pipeline.py.

384  def fromPipeline(cls, pipeline: pipelineIR.PipelineIR) -> Pipeline:
385  """Create a new pipeline by copying an already existing `Pipeline`.
386 
387  Parameters
388  ----------
389  pipeline: `Pipeline`
390  An already created pipeline intermediate representation object
391 
392  Returns
393  -------
394  pipeline: `Pipeline`
395  """
396  return cls.fromIR(copy.deepcopy(pipeline._pipelineIR))
397 

◆ fromString()

Pipeline lsst.pipe.base.pipeline.Pipeline.fromString (   cls,
str  pipeline_string 
)
Create a pipeline from string formatted as a pipeline document.

Parameters
----------
pipeline_string : `str`
    A string that is formatted according like a pipeline document

Returns
-------
pipeline: `Pipeline`

Definition at line 351 of file pipeline.py.

351  def fromString(cls, pipeline_string: str) -> Pipeline:
352  """Create a pipeline from string formatted as a pipeline document.
353 
354  Parameters
355  ----------
356  pipeline_string : `str`
357  A string that is formatted according like a pipeline document
358 
359  Returns
360  -------
361  pipeline: `Pipeline`
362  """
363  pipeline = cls.fromIR(pipelineIR.PipelineIR.from_string(pipeline_string))
364  return pipeline
365 

◆ getInstrument()

Instrument lsst.pipe.base.pipeline.Pipeline.getInstrument (   self)
Get the instrument from the pipeline.

Returns
-------
instrument : `~lsst.daf.butler.instrument.Instrument`, `str`, or None
    A derived class object of a `lsst.daf.butler.instrument`, a string
    corresponding to a fully qualified `lsst.daf.butler.instrument`
    name, or None if the pipeline does not have an instrument.

Definition at line 420 of file pipeline.py.

420  def getInstrument(self) -> Instrument:
421  """Get the instrument from the pipeline.
422 
423  Returns
424  -------
425  instrument : `~lsst.daf.butler.instrument.Instrument`, `str`, or None
426  A derived class object of a `lsst.daf.butler.instrument`, a string
427  corresponding to a fully qualified `lsst.daf.butler.instrument`
428  name, or None if the pipeline does not have an instrument.
429  """
430  return self._pipelineIR.instrument
431 

◆ removeTask()

None lsst.pipe.base.pipeline.Pipeline.removeTask (   self,
str  label 
)
Remove a task from the pipeline.

Parameters
----------
label : `str`
    The label used to identify the task that is to be removed

Raises
------
KeyError
    If no task with that label exists in the pipeline

Definition at line 460 of file pipeline.py.

460  def removeTask(self, label: str) -> None:
461  """Remove a task from the pipeline.
462 
463  Parameters
464  ----------
465  label : `str`
466  The label used to identify the task that is to be removed
467 
468  Raises
469  ------
470  KeyError
471  If no task with that label exists in the pipeline
472 
473  """
474  self._pipelineIR.tasks.pop(label)
475 

◆ subsetFromLabels()

Pipeline lsst.pipe.base.pipeline.Pipeline.subsetFromLabels (   self,
LabelSpecifier  labelSpecifier 
)
Subset a pipeline to contain only labels specified in labelSpecifier

Parameters
----------
labelSpecifier : `labelSpecifier`
    Object containing labels that describes how to subset a pipeline.

Returns
-------
pipeline : `Pipeline`
    A new pipeline object that is a subset of the old pipeline

Raises
------
ValueError
    Raised if there is an issue with specified labels

Notes
-----
This method attempts to prune any contracts that contain labels which
are not in the declared subset of labels. This pruning is done using a
string based matching due to the nature of contracts and may prune more
than it should.

Definition at line 240 of file pipeline.py.

240  def subsetFromLabels(self, labelSpecifier: LabelSpecifier) -> Pipeline:
241  """Subset a pipeline to contain only labels specified in labelSpecifier
242 
243  Parameters
244  ----------
245  labelSpecifier : `labelSpecifier`
246  Object containing labels that describes how to subset a pipeline.
247 
248  Returns
249  -------
250  pipeline : `Pipeline`
251  A new pipeline object that is a subset of the old pipeline
252 
253  Raises
254  ------
255  ValueError
256  Raised if there is an issue with specified labels
257 
258  Notes
259  -----
260  This method attempts to prune any contracts that contain labels which
261  are not in the declared subset of labels. This pruning is done using a
262  string based matching due to the nature of contracts and may prune more
263  than it should.
264  """
265  # Labels supplied as a set
266  if labelSpecifier.labels:
267  labelSet = labelSpecifier.labels
268  # Labels supplied as a range, first create a list of all the labels
269  # in the pipeline sorted according to task dependency. Then only
270  # keep labels that lie between the supplied bounds
271  else:
272  # Create a copy of the pipeline to use when assessing the label
273  # ordering. Use a dict for fast searching while preserving order.
274  # Remove contracts so they do not fail in the expansion step. This
275  # is needed because a user may only configure the tasks they intend
276  # to run, which may cause some contracts to fail if they will later
277  # be dropped
278  pipeline = copy.deepcopy(self)
279  pipeline._pipelineIR.contracts = []
280  labels = {taskdef.label: True for taskdef in pipeline.toExpandedPipeline()}
281 
282  # Verify the bounds are in the labels
283  if labelSpecifier.begin is not None:
284  if labelSpecifier.begin not in labels:
285  raise ValueError(f"Beginning of range subset, {labelSpecifier.begin}, not found in "
286  "pipeline definition")
287  if labelSpecifier.end is not None:
288  if labelSpecifier.end not in labels:
289  raise ValueError(f"End of range subset, {labelSpecifier.end}, not found in pipeline "
290  "definition")
291 
292  labelSet = set()
293  for label in labels:
294  if labelSpecifier.begin is not None:
295  if label != labelSpecifier.begin:
296  continue
297  else:
298  labelSpecifier.begin = None
299  labelSet.add(label)
300  if labelSpecifier.end is not None and label == labelSpecifier.end:
301  break
302  return Pipeline.fromIR(self._pipelineIR.subset_from_labels(labelSet))
303 
daf::base::PropertySet * set
Definition: fits.cc:912

◆ toExpandedPipeline()

Generator[TaskDef, None, None] lsst.pipe.base.pipeline.Pipeline.toExpandedPipeline (   self)
Returns a generator of TaskDefs which can be used to create quantum
graphs.

Returns
-------
generator : generator of `TaskDef`
    The generator returned will be the sorted iterator of tasks which
    are to be used in constructing a quantum graph.

Raises
------
NotImplementedError
    If a dataId is supplied in a config block. This is in place for
    future use

Definition at line 537 of file pipeline.py.

537  def toExpandedPipeline(self) -> Generator[TaskDef, None, None]:
538  """Returns a generator of TaskDefs which can be used to create quantum
539  graphs.
540 
541  Returns
542  -------
543  generator : generator of `TaskDef`
544  The generator returned will be the sorted iterator of tasks which
545  are to be used in constructing a quantum graph.
546 
547  Raises
548  ------
549  NotImplementedError
550  If a dataId is supplied in a config block. This is in place for
551  future use
552  """
553  taskDefs = []
554  for label, taskIR in self._pipelineIR.tasks.items():
555  taskClass = doImport(taskIR.klass)
556  taskName = taskClass.__qualname__
557  config = taskClass.ConfigClass()
558  overrides = ConfigOverrides()
559  if self._pipelineIR.instrument is not None:
560  overrides.addInstrumentOverride(self._pipelineIR.instrument, taskClass._DefaultName)
561  if taskIR.config is not None:
562  for configIR in (configIr.formatted(self._pipelineIR.parameters)
563  for configIr in taskIR.config):
564  if configIR.dataId is not None:
565  raise NotImplementedError("Specializing a config on a partial data id is not yet "
566  "supported in Pipeline definition")
567  # only apply override if it applies to everything
568  if configIR.dataId is None:
569  if configIR.file:
570  for configFile in configIR.file:
571  overrides.addFileOverride(os.path.expandvars(configFile))
572  if configIR.python is not None:
573  overrides.addPythonOverride(configIR.python)
574  for key, value in configIR.rest.items():
575  overrides.addValueOverride(key, value)
576  overrides.applyTo(config)
577  # This may need to be revisited
578  config.validate()
579  taskDefs.append(TaskDef(taskName=taskName, config=config, taskClass=taskClass, label=label))
580 
581  # lets evaluate the contracts
582  if self._pipelineIR.contracts is not None:
583  label_to_config = {x.label: x.config for x in taskDefs}
584  for contract in self._pipelineIR.contracts:
585  # execute this in its own line so it can raise a good error
586  # message if there was problems with the eval
587  success = eval(contract.contract, None, label_to_config)
588  if not success:
589  extra_info = f": {contract.msg}" if contract.msg is not None else ""
590  raise pipelineIR.ContractError(f"Contract(s) '{contract.contract}' were not "
591  f"satisfied{extra_info}")
592 
593  yield from pipeTools.orderPipeline(taskDefs)
594 

◆ toFile()

None lsst.pipe.base.pipeline.Pipeline.toFile (   self,
str  filename 
)

Definition at line 531 of file pipeline.py.

531  def toFile(self, filename: str) -> None:
532  self._pipelineIR.to_file(filename)
533 

◆ write_to_uri()

None lsst.pipe.base.pipeline.Pipeline.write_to_uri (   self,
Union[str, ButlerURI]  uri 
)

Definition at line 534 of file pipeline.py.

534  def write_to_uri(self, uri: Union[str, ButlerURI]) -> None:
535  self._pipelineIR.write_to_uri(uri)
536 

The documentation for this class was generated from the following file: