835 def fromPipeline(cls, pipeline, *, registry: Registry) -> PipelineDatasetTypes:
836 """Extract and classify the dataset types from all tasks in a
842 An ordered collection of tasks that can be run together.
844 Registry used to construct normalized `DatasetType` objects and
845 retrieve those that are incomplete.
849 types: `PipelineDatasetTypes`
850 The dataset types used by this `Pipeline`.
855 Raised if Tasks are inconsistent about which datasets are marked
856 prerequisite. This indicates that the Tasks cannot be run as part
857 of the same `Pipeline`.
859 allInputs = NamedValueSet()
860 allOutputs = NamedValueSet()
861 allInitInputs = NamedValueSet()
862 allInitOutputs = NamedValueSet()
863 prerequisites = NamedValueSet()
865 if isinstance(pipeline, Pipeline):
866 pipeline = pipeline.toExpandedPipeline()
867 for taskDef
in pipeline:
868 thisTask = TaskDatasetTypes.fromTaskDef(taskDef, registry=registry)
869 allInitInputs |= thisTask.initInputs
870 allInitOutputs |= thisTask.initOutputs
871 allInputs |= thisTask.inputs
872 prerequisites |= thisTask.prerequisites
873 allOutputs |= thisTask.outputs
874 byTask[taskDef.label] = thisTask
875 if not prerequisites.isdisjoint(allInputs):
876 raise ValueError(
"{} marked as both prerequisites and regular inputs".
format(
877 {dt.name
for dt
in allInputs & prerequisites}
879 if not prerequisites.isdisjoint(allOutputs):
880 raise ValueError(
"{} marked as both prerequisites and outputs".
format(
881 {dt.name
for dt
in allOutputs & prerequisites}
886 intermediateComponents = NamedValueSet()
887 intermediateComposites = NamedValueSet()
888 outputNameMapping = {dsType.name: dsType
for dsType
in allOutputs}
889 for dsType
in allInputs:
891 name, component = dsType.nameAndComponent()
895 if component
is not None:
896 if name
in outputNameMapping:
897 if outputNameMapping[name].dimensions != dsType.dimensions:
898 raise ValueError(f
"Component dataset type {dsType.name} has different "
899 f
"dimensions ({dsType.dimensions}) than its parent "
900 f
"({outputNameMapping[name].dimensions}).")
901 composite = DatasetType(name, dsType.dimensions, outputNameMapping[name].storageClass,
902 universe=registry.dimensions)
903 intermediateComponents.add(dsType)
904 intermediateComposites.add(composite)
906 def checkConsistency(a: NamedValueSet, b: NamedValueSet):
907 common = a.names & b.names
909 if a[name] != b[name]:
910 raise ValueError(f
"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
912 checkConsistency(allInitInputs, allInitOutputs)
913 checkConsistency(allInputs, allOutputs)
914 checkConsistency(allInputs, intermediateComposites)
915 checkConsistency(allOutputs, intermediateComposites)
917 def frozen(s: NamedValueSet) -> NamedValueSet:
922 initInputs=frozen(allInitInputs - allInitOutputs),
923 initIntermediates=frozen(allInitInputs & allInitOutputs),
924 initOutputs=frozen(allInitOutputs - allInitInputs),
925 inputs=frozen(allInputs - allOutputs - intermediateComponents),
926 intermediates=frozen(allInputs & allOutputs | intermediateComponents),
927 outputs=frozen(allOutputs - allInputs - intermediateComposites),
928 prerequisites=frozen(prerequisites),
929 byTask=MappingProxyType(byTask),
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)