LSST Applications  22.0.1,22.0.1+01bcf6a671,22.0.1+046ee49490,22.0.1+05c7de27da,22.0.1+0c6914dbf6,22.0.1+1220d50b50,22.0.1+12fd109e95,22.0.1+1a1dd69893,22.0.1+1c910dc348,22.0.1+1ef34551f5,22.0.1+30170c3d08,22.0.1+39153823fd,22.0.1+611137eacc,22.0.1+771eb1e3e8,22.0.1+94e66cc9ed,22.0.1+9a075d06e2,22.0.1+a5ff6e246e,22.0.1+a7db719c1a,22.0.1+ba0d97e778,22.0.1+bfe1ee9056,22.0.1+c4e1e0358a,22.0.1+cc34b8281e,22.0.1+d640e2c0fa,22.0.1+d72a2e677a,22.0.1+d9a6b571bd,22.0.1+e485e9761b,22.0.1+ebe8d3385e
LSST Data Management Base Package
Public Member Functions | Public Attributes | List of all members
lsst.pipe.base.graphBuilder._PipelineScaffolding Class Reference

Public Member Functions

def __init__ (self, pipeline, *registry)
 
def __repr__ (self)
 
def connectDataIds (self, registry, collections, userQuery, externalDataId)
 
def resolveDatasetRefs (self, registry, collections, run, commonDataIds, *skipExisting=True)
 
def makeQuantumGraph (self)
 

Public Attributes

 tasks
 
 dimensions
 

Detailed Description

A helper data structure that organizes the information involved in
constructing a `QuantumGraph` for a `Pipeline`.

Parameters
----------
pipeline : `Pipeline`
    Sequence of tasks from which a graph is to be constructed.  Must
    have nested task classes already imported.
universe : `DimensionUniverse`
    Universe of all possible dimensions.

Notes
-----
The scaffolding data structure contains nested data structures for both
tasks (`_TaskScaffolding`) and datasets (`_DatasetDict`).  The dataset
data structures are shared between the pipeline-level structure (which
aggregates all datasets and categorizes them from the perspective of the
complete pipeline) and the individual tasks that use them as inputs and
outputs.

`QuantumGraph` construction proceeds in four steps, with each corresponding
to a different `_PipelineScaffolding` method:

1. When `_PipelineScaffolding` is constructed, we extract and categorize
   the DatasetTypes used by the pipeline (delegating to
   `PipelineDatasetTypes.fromPipeline`), then use these to construct the
   nested `_TaskScaffolding` and `_DatasetDict` objects.

2. In `connectDataIds`, we construct and run the "Big Join Query", which
   returns related tuples of all dimensions used to identify any regular
   input, output, and intermediate datasets (not prerequisites).  We then
   iterate over these tuples of related dimensions, identifying the subsets
   that correspond to distinct data IDs for each task and dataset type,
   and then create `_QuantumScaffolding` objects.

3. In `resolveDatasetRefs`, we run follow-up queries against all of the
   dataset data IDs previously identified, transforming unresolved
   DatasetRefs into resolved DatasetRefs where appropriate.  We then look
   up prerequisite datasets for all quanta.

4. In `makeQuantumGraph`, we construct a `QuantumGraph` from the lists of
   per-task `_QuantumScaffolding` objects.

Definition at line 357 of file graphBuilder.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.base.graphBuilder._PipelineScaffolding.__init__ (   self,
  pipeline,
registry 
)

Definition at line 401 of file graphBuilder.py.

401  def __init__(self, pipeline, *, registry):
402  _LOG.debug("Initializing data structures for QuantumGraph generation.")
403  self.tasks = []
404  # Aggregate and categorize the DatasetTypes in the Pipeline.
405  datasetTypes = PipelineDatasetTypes.fromPipeline(pipeline, registry=registry)
406  # Construct dictionaries that map those DatasetTypes to structures
407  # that will (later) hold addiitonal information about them.
408  for attr in ("initInputs", "initIntermediates", "initOutputs",
409  "inputs", "intermediates", "outputs", "prerequisites"):
410  setattr(self, attr, _DatasetDict.fromDatasetTypes(getattr(datasetTypes, attr),
411  universe=registry.dimensions))
412  # Aggregate all dimensions for all non-init, non-prerequisite
413  # DatasetTypes. These are the ones we'll include in the big join
414  # query.
415  self.dimensions = self.inputs.dimensions.union(self.intermediates.dimensions,
416  self.outputs.dimensions)
417  # Construct scaffolding nodes for each Task, and add backreferences
418  # to the Task from each DatasetScaffolding node.
419  # Note that there's only one scaffolding node for each DatasetType,
420  # shared by _PipelineScaffolding and all _TaskScaffoldings that
421  # reference it.
422  if isinstance(pipeline, Pipeline):
423  pipeline = pipeline.toExpandedPipeline()
424  self.tasks = [_TaskScaffolding(taskDef=taskDef, parent=self, datasetTypes=taskDatasetTypes)
425  for taskDef, taskDatasetTypes in zip(pipeline,
426  datasetTypes.byTask.values())]
427 

Member Function Documentation

◆ __repr__()

def lsst.pipe.base.graphBuilder._PipelineScaffolding.__repr__ (   self)

Definition at line 428 of file graphBuilder.py.

428  def __repr__(self):
429  # Default dataclass-injected __repr__ gets caught in an infinite loop
430  # because of back-references.
431  return f"_PipelineScaffolding(tasks={self.tasks}, ...)"
432 

◆ connectDataIds()

def lsst.pipe.base.graphBuilder._PipelineScaffolding.connectDataIds (   self,
  registry,
  collections,
  userQuery,
  externalDataId 
)
Query for the data IDs that connect nodes in the `QuantumGraph`.

This method populates `_TaskScaffolding.dataIds` and
`_DatasetScaffolding.dataIds` (except for those in `prerequisites`).

Parameters
----------
registry : `lsst.daf.butler.Registry`
    Registry for the data repository; used for all data ID queries.
collections
    Expressions representing the collections to search for input
    datasets.  May be any of the types accepted by
    `lsst.daf.butler.CollectionSearch.fromExpression`.
userQuery : `str` or `None`
    User-provided expression to limit the data IDs processed.
externalDataId : `DataCoordinate`
    Externally-provided data ID that should be used to restrict the
    results, just as if these constraints had been included via ``AND``
    in ``userQuery``.  This includes (at least) any instrument named
    in the pipeline definition.

Returns
-------
commonDataIds : \
        `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
    An interface to a database temporary table containing all data IDs
    that will appear in this `QuantumGraph`.  Returned inside a
    context manager, which will drop the temporary table at the end of
    the `with` block in which this method is called.

Definition at line 482 of file graphBuilder.py.

482  def connectDataIds(self, registry, collections, userQuery, externalDataId):
483  """Query for the data IDs that connect nodes in the `QuantumGraph`.
484 
485  This method populates `_TaskScaffolding.dataIds` and
486  `_DatasetScaffolding.dataIds` (except for those in `prerequisites`).
487 
488  Parameters
489  ----------
490  registry : `lsst.daf.butler.Registry`
491  Registry for the data repository; used for all data ID queries.
492  collections
493  Expressions representing the collections to search for input
494  datasets. May be any of the types accepted by
495  `lsst.daf.butler.CollectionSearch.fromExpression`.
496  userQuery : `str` or `None`
497  User-provided expression to limit the data IDs processed.
498  externalDataId : `DataCoordinate`
499  Externally-provided data ID that should be used to restrict the
500  results, just as if these constraints had been included via ``AND``
501  in ``userQuery``. This includes (at least) any instrument named
502  in the pipeline definition.
503 
504  Returns
505  -------
506  commonDataIds : \
507  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
508  An interface to a database temporary table containing all data IDs
509  that will appear in this `QuantumGraph`. Returned inside a
510  context manager, which will drop the temporary table at the end of
511  the `with` block in which this method is called.
512  """
513  _LOG.debug("Building query for data IDs.")
514  # Initialization datasets always have empty data IDs.
515  emptyDataId = DataCoordinate.makeEmpty(registry.dimensions)
516  for datasetType, refs in itertools.chain(self.initInputs.items(),
517  self.initIntermediates.items(),
518  self.initOutputs.items()):
519  refs[emptyDataId] = DatasetRef(datasetType, emptyDataId)
520  # Run one big query for the data IDs for task dimensions and regular
521  # inputs and outputs. We limit the query to only dimensions that are
522  # associated with the input dataset types, but don't (yet) try to
523  # obtain the dataset_ids for those inputs.
524  _LOG.debug("Submitting data ID query and materializing results.")
525  with registry.queryDataIds(self.dimensions,
526  datasets=list(self.inputs),
527  collections=collections,
528  where=userQuery,
529  dataId=externalDataId,
530  ).materialize() as commonDataIds:
531  _LOG.debug("Expanding data IDs.")
532  commonDataIds = commonDataIds.expanded()
533  _LOG.debug("Iterating over query results to associate quanta with datasets.")
534  # Iterate over query results, populating data IDs for datasets and
535  # quanta and then connecting them to each other.
536  n = 0
537  for n, commonDataId in enumerate(commonDataIds):
538  # Create DatasetRefs for all DatasetTypes from this result row,
539  # noting that we might have created some already.
540  # We remember both those that already existed and those that we
541  # create now.
542  refsForRow = {}
543  for datasetType, refs in itertools.chain(self.inputs.items(), self.intermediates.items(),
544  self.outputs.items()):
545  datasetDataId = commonDataId.subset(datasetType.dimensions)
546  ref = refs.get(datasetDataId)
547  if ref is None:
548  ref = DatasetRef(datasetType, datasetDataId)
549  refs[datasetDataId] = ref
550  refsForRow[datasetType.name] = ref
551  # Create _QuantumScaffolding objects for all tasks from this
552  # result row, noting that we might have created some already.
553  for task in self.tasks:
554  quantumDataId = commonDataId.subset(task.dimensions)
555  quantum = task.quanta.get(quantumDataId)
556  if quantum is None:
557  quantum = _QuantumScaffolding(task=task, dataId=quantumDataId)
558  task.quanta[quantumDataId] = quantum
559  # Whether this is a new quantum or an existing one, we can
560  # now associate the DatasetRefs for this row with it. The
561  # fact that a Quantum data ID and a dataset data ID both
562  # came from the same result row is what tells us they
563  # should be associated.
564  # Many of these associates will be duplicates (because
565  # another query row that differed from this one only in
566  # irrelevant dimensions already added them), and we use
567  # sets to skip.
568  for datasetType in task.inputs:
569  ref = refsForRow[datasetType.name]
570  quantum.inputs[datasetType.name][ref.dataId] = ref
571  for datasetType in task.outputs:
572  ref = refsForRow[datasetType.name]
573  quantum.outputs[datasetType.name][ref.dataId] = ref
574  _LOG.debug("Finished processing %d rows from data ID query.", n)
575  yield commonDataIds
576 
std::vector< SchemaItem< Flag > > * items
daf::base::PropertyList * list
Definition: fits.cc:913

◆ makeQuantumGraph()

def lsst.pipe.base.graphBuilder._PipelineScaffolding.makeQuantumGraph (   self)
Create a `QuantumGraph` from the quanta already present in
the scaffolding data structure.

Returns
-------
graph : `QuantumGraph`
    The full `QuantumGraph`.

Definition at line 755 of file graphBuilder.py.

755  def makeQuantumGraph(self):
756  """Create a `QuantumGraph` from the quanta already present in
757  the scaffolding data structure.
758 
759  Returns
760  -------
761  graph : `QuantumGraph`
762  The full `QuantumGraph`.
763  """
764  graph = QuantumGraph({task.taskDef: task.makeQuantumSet() for task in self.tasks})
765  return graph
766 
767 
768 # ------------------------
769 # Exported definitions --
770 # ------------------------
771 
772 

◆ resolveDatasetRefs()

def lsst.pipe.base.graphBuilder._PipelineScaffolding.resolveDatasetRefs (   self,
  registry,
  collections,
  run,
  commonDataIds,
skipExisting = True 
)
Perform follow up queries for each dataset data ID produced in
`fillDataIds`.

This method populates `_DatasetScaffolding.refs` (except for those in
`prerequisites`).

Parameters
----------
registry : `lsst.daf.butler.Registry`
    Registry for the data repository; used for all data ID queries.
collections
    Expressions representing the collections to search for input
    datasets.  May be any of the types accepted by
    `lsst.daf.butler.CollectionSearch.fromExpression`.
run : `str`, optional
    Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
    output datasets, if it already exists.
commonDataIds : \
        `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
    Result of a previous call to `connectDataIds`.
skipExisting : `bool`, optional
    If `True` (default), a Quantum is not created if all its outputs
    already exist in ``run``.  Ignored if ``run`` is `None`.

Raises
------
OutputExistsError
    Raised if an output dataset already exists in the output run
    and ``skipExisting`` is `False`.  The case where some but not all
    of a quantum's outputs are present and ``skipExisting`` is `True`
    cannot be identified at this stage, and is handled by `fillQuanta`
    instead.

Definition at line 577 of file graphBuilder.py.

577  def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *, skipExisting=True):
578  """Perform follow up queries for each dataset data ID produced in
579  `fillDataIds`.
580 
581  This method populates `_DatasetScaffolding.refs` (except for those in
582  `prerequisites`).
583 
584  Parameters
585  ----------
586  registry : `lsst.daf.butler.Registry`
587  Registry for the data repository; used for all data ID queries.
588  collections
589  Expressions representing the collections to search for input
590  datasets. May be any of the types accepted by
591  `lsst.daf.butler.CollectionSearch.fromExpression`.
592  run : `str`, optional
593  Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
594  output datasets, if it already exists.
595  commonDataIds : \
596  `lsst.daf.butler.registry.queries.DataCoordinateQueryResults`
597  Result of a previous call to `connectDataIds`.
598  skipExisting : `bool`, optional
599  If `True` (default), a Quantum is not created if all its outputs
600  already exist in ``run``. Ignored if ``run`` is `None`.
601 
602  Raises
603  ------
604  OutputExistsError
605  Raised if an output dataset already exists in the output run
606  and ``skipExisting`` is `False`. The case where some but not all
607  of a quantum's outputs are present and ``skipExisting`` is `True`
608  cannot be identified at this stage, and is handled by `fillQuanta`
609  instead.
610  """
611  # Look up [init] intermediate and output datasets in the output
612  # collection, if there is an output collection.
613  if run is not None:
614  for datasetType, refs in itertools.chain(self.initIntermediates.items(),
615  self.initOutputs.items(),
616  self.intermediates.items(),
617  self.outputs.items()):
618  _LOG.debug("Resolving %d datasets for intermediate and/or output dataset %s.",
619  len(refs), datasetType.name)
620  isInit = datasetType in self.initIntermediates or datasetType in self.initOutputs
621  resolvedRefQueryResults = commonDataIds.subset(
622  datasetType.dimensions,
623  unique=True
624  ).findDatasets(
625  datasetType,
626  collections=run,
627  findFirst=True
628  )
629  for resolvedRef in resolvedRefQueryResults:
630  # TODO: we could easily support per-DatasetType
631  # skipExisting and I could imagine that being useful - it's
632  # probably required in order to support writing initOutputs
633  # before QuantumGraph generation.
634  assert resolvedRef.dataId in refs
635  if skipExisting or isInit:
636  refs[resolvedRef.dataId] = resolvedRef
637  else:
638  raise OutputExistsError(f"Output dataset {datasetType.name} already exists in "
639  f"output RUN collection '{run}' with data ID"
640  f" {resolvedRef.dataId}.")
641  # Look up input and initInput datasets in the input collection(s).
642  for datasetType, refs in itertools.chain(self.initInputs.items(), self.inputs.items()):
643  _LOG.debug("Resolving %d datasets for input dataset %s.", len(refs), datasetType.name)
644  resolvedRefQueryResults = commonDataIds.subset(
645  datasetType.dimensions,
646  unique=True
647  ).findDatasets(
648  datasetType,
649  collections=collections,
650  findFirst=True
651  )
652  dataIdsNotFoundYet = set(refs.keys())
653  for resolvedRef in resolvedRefQueryResults:
654  dataIdsNotFoundYet.discard(resolvedRef.dataId)
655  refs[resolvedRef.dataId] = resolvedRef
656  if dataIdsNotFoundYet:
657  raise RuntimeError(
658  f"{len(dataIdsNotFoundYet)} dataset(s) of type "
659  f"'{datasetType.name}' was/were present in a previous "
660  f"query, but could not be found now."
661  f"This is either a logic bug in QuantumGraph generation "
662  f"or the input collections have been modified since "
663  f"QuantumGraph generation began."
664  )
665  # Copy the resolved DatasetRefs to the _QuantumScaffolding objects,
666  # replacing the unresolved refs there, and then look up prerequisites.
667  for task in self.tasks:
668  _LOG.debug(
669  "Applying resolutions and finding prerequisites for %d quanta of task with label '%s'.",
670  len(task.quanta),
671  task.taskDef.label
672  )
673  lookupFunctions = {
674  c.name: c.lookupFunction
675  for c in iterConnections(task.taskDef.connections, "prerequisiteInputs")
676  if c.lookupFunction is not None
677  }
678  dataIdsToSkip = []
679  for quantum in task.quanta.values():
680  # Process outputs datasets only if there is a run to look for
681  # outputs in and skipExisting is True. Note that if
682  # skipExisting is False, any output datasets that already exist
683  # would have already caused an exception to be raised.
684  # We never update the DatasetRefs in the quantum because those
685  # should never be resolved.
686  if run is not None and skipExisting:
687  resolvedRefs = []
688  unresolvedRefs = []
689  for datasetType, originalRefs in quantum.outputs.items():
690  for ref in task.outputs.extract(datasetType, originalRefs.keys()):
691  if ref.id is not None:
692  resolvedRefs.append(ref)
693  else:
694  unresolvedRefs.append(ref)
695  if resolvedRefs:
696  if unresolvedRefs:
697  raise OutputExistsError(
698  f"Quantum {quantum.dataId} of task with label "
699  f"'{quantum.task.taskDef.label}' has some outputs that exist "
700  f"({resolvedRefs}) "
701  f"and others that don't ({unresolvedRefs})."
702  )
703  else:
704  # All outputs are already present; skip this
705  # quantum and continue to the next.
706  dataIdsToSkip.append(quantum.dataId)
707  continue
708  # Update the input DatasetRefs to the resolved ones we already
709  # searched for.
710  for datasetType, refs in quantum.inputs.items():
711  for ref in task.inputs.extract(datasetType, refs.keys()):
712  refs[ref.dataId] = ref
713  # Look up prerequisite datasets in the input collection(s).
714  # These may have dimensions that extend beyond those we queried
715  # for originally, because we want to permit those data ID
716  # values to differ across quanta and dataset types.
717  for datasetType in task.prerequisites:
718  lookupFunction = lookupFunctions.get(datasetType.name)
719  if lookupFunction is not None:
720  # PipelineTask has provided its own function to do the
721  # lookup. This always takes precedence.
722  refs = list(
723  lookupFunction(datasetType, registry, quantum.dataId, collections)
724  )
725  elif (datasetType.isCalibration()
726  and datasetType.dimensions <= quantum.dataId.graph
727  and quantum.dataId.graph.temporal):
728  # This is a master calibration lookup, which we have to
729  # handle specially because the query system can't do a
730  # temporal join on a non-dimension-based timespan yet.
731  timespan = quantum.dataId.timespan
732  try:
733  refs = [registry.findDataset(datasetType, quantum.dataId,
734  collections=collections,
735  timespan=timespan)]
736  except KeyError:
737  # This dataset type is not present in the registry,
738  # which just means there are no datasets here.
739  refs = []
740  else:
741  # Most general case.
742  refs = list(registry.queryDatasets(datasetType,
743  collections=collections,
744  dataId=quantum.dataId,
745  findFirst=True).expanded())
746  quantum.prerequisites[datasetType].update({ref.dataId: ref for ref in refs
747  if ref is not None})
748  # Actually remove any quanta that we decided to skip above.
749  if dataIdsToSkip:
750  _LOG.debug("Pruning %d quanta for task with label '%s' because all of their outputs exist.",
751  len(dataIdsToSkip), task.taskDef.label)
752  for dataId in dataIdsToSkip:
753  del task.quanta[dataId]
754 
daf::base::PropertySet * set
Definition: fits.cc:912
typing.Generator[BaseConnection, None, None] iterConnections(PipelineTaskConnections connections, Union[str, Iterable[str]] connectionType)
Definition: connections.py:503

Member Data Documentation

◆ dimensions

lsst.pipe.base.graphBuilder._PipelineScaffolding.dimensions

Definition at line 415 of file graphBuilder.py.

◆ tasks

lsst.pipe.base.graphBuilder._PipelineScaffolding.tasks

Definition at line 403 of file graphBuilder.py.


The documentation for this class was generated from the following file: