21 from __future__
import annotations
23 __all__ = (
"buildExecutionButler", )
27 from collections
import defaultdict
28 from typing
import Callable, DefaultDict, Mapping, Optional, Set, Tuple, Iterable, List, Union
30 from lsst.daf.butler
import (DatasetRef, DatasetType, Butler, DataCoordinate, ButlerURI, Config)
31 from lsst.daf.butler.core.utils
import getClassOf
32 from lsst.daf.butler.transfers
import RepoExportContext
33 from lsst.daf.butler.core.repoRelocation
import BUTLER_ROOT_TAG
36 from .
import QuantumGraph, QuantumNode
38 DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
41 def _accumulate(graph: QuantumGraph) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
47 exports: Set[DatasetRef] =
set()
52 inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
55 for quantum
in (n.quantum
for n
in graph):
56 for attrName
in (
"initInputs",
"inputs",
"outputs"):
57 attr: Mapping[DatasetType, Union[DatasetRef, List[DatasetRef]]] = getattr(quantum, attrName)
59 for type, refs
in attr.items():
62 if not isinstance(refs, list):
74 if ref.id
is not None:
77 inserts[type].add(ref.dataId)
78 return exports, inserts
81 def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]:
87 collections =
set(collections)
89 discoveredCollections =
set(butler.registry.queryCollections(collections, flattenChains=
True,
91 if len(discoveredCollections) > len(collections):
92 collections = discoveredCollections
98 def _export(butler: Butler, collections: Optional[Iterable[str]], exports: Set[DatasetRef]) -> io.StringIO:
103 yamlBuffer = io.StringIO()
106 BackendClass = getClassOf(butler._config[
"repo_transfer_formats",
"yaml",
"export"])
107 backend = BackendClass(yamlBuffer)
108 exporter = RepoExportContext(butler.registry, butler.datastore, backend, directory=
None, transfer=
None)
109 exporter.saveDatasets(exports)
112 if collections
is None:
113 collections = butler.registry.defaults.collections
117 for c
in _discoverCollections(butler, collections):
118 exporter.saveCollection(c)
127 def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler:
132 executionRegistry = outputLocation.join(
"gen3.sqlite3")
133 if executionRegistry.exists():
134 executionRegistry.remove()
136 outputLocation.mkdir()
142 config = Config(butler._config)
143 config[
"root"] = outputLocation.geturl()
144 config[
"registry",
"db"] =
"sqlite:///<butlerRoot>/gen3.sqlite3"
147 if config.get((
"datastore",
"root")) == BUTLER_ROOT_TAG:
148 config[
"datastore",
"root"] = butler._config.configDir.geturl()
149 config[
"datastore",
"trust_get_request"] =
True
151 config = Butler.makeRepo(root=outputLocation, config=config, overwrite=
True, forceConfigRoot=
False)
154 return Butler(config, writeable=
True)
157 def _import(yamlBuffer: io.StringIO,
159 inserts: DataSetTypeMap,
161 butlerModifier: Optional[Callable[[Butler], Butler]]
168 newButler.import_(filename=yamlBuffer, format=
"yaml", reuseIds=
True)
172 if butlerModifier
is not None:
173 newButler = butlerModifier(newButler)
176 for dsType, dataIds
in inserts.items():
177 newButler.registry.registerDatasetType(dsType)
178 newButler.registry.insertDatasets(dsType, dataIds, run)
185 outputLocation: Union[str, ButlerURI],
188 clobber: bool =
False,
189 butlerModifier: Optional[Callable[[Butler], Butler]] =
None,
190 collections: Optional[Iterable[str]] =
None
192 r"""buildExecutionButler is a function that is responsible for exporting
193 input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
194 only contains datasets specified by the `QuantumGraph`. These datasets are
195 both those that already exist in the input `~lsst.daf.butler.Butler`, and
196 those that are expected to be produced during the execution of the
201 butler : `lsst.daf.butler.Bulter`
202 This is the existing `~lsst.daf.butler.Butler` instance from which
203 existing datasets will be exported. This should be the
204 `~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
205 that will be converted with this object.
206 graph : `QuantumGraph`
207 Graph containing nodes that are to be exported into an execution
209 outputLocation : `str` or `~lsst.daf.butler.ButlerURI`
210 URI Location at which the execution butler is to be exported. May be
211 specified as a string or a ButlerURI instance.
213 The run collection that the exported datasets are to be placed in. If
214 None, the default value in registry.defaults will be used.
215 clobber : `bool`, Optional
216 By default a butler will not be created if a file or directory
217 already exists at the output location. If this is set to `True`
218 what is at the location will be deleted prior to running the
219 export. Defaults to `False`
220 butlerModifier : `~typing.Callable`, Optional
221 If supplied this should be a callable that accepts a
222 `~lsst.daf.butler.Butler`, and returns an instantiated
223 `~lsst.daf.butler.Butler`. This callable may be used to make any
224 modifications to the `~lsst.daf.butler.Butler` desired. This
225 will be called after importing all datasets that exist in the input
226 `~lsst.daf.butler.Butler` but prior to inserting Datasets expected
227 to be produced. Examples of what this method could do include
228 things such as creating collections/runs/ etc.
229 collections : `~typing.Iterable` of `str`, Optional
230 An iterable of collection names that will be exported from the input
231 `~lsst.daf.butler.Butler` when creating the execution butler. If not
232 supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
233 default collections will be used.
237 executionButler : `lsst.daf.butler.Butler`
238 An instance of the newly created execution butler
243 Raised if something exists in the filesystem at the specified output
244 location and clobber is `False`
246 Raised if specified output URI does not correspond to a directory
248 outputLocation = ButlerURI(outputLocation)
251 if (dirExists := outputLocation.exists())
and not clobber:
252 raise FileExistsError(
"Cannot create a butler at specified location, location exists")
253 if not outputLocation.isdir():
254 raise NotADirectoryError(
"The specified output URI does not appear to correspond to a directory")
256 exports, inserts = _accumulate(graph)
257 yamlBuffer = _export(butler, collections, exports)
259 newButler = _setupNewButler(butler, outputLocation, dirExists)
261 return _import(yamlBuffer, newButler, inserts, run, butlerModifier)
daf::base::PropertySet * set
Butler buildExecutionButler(Butler butler, QuantumGraph graph, Union[str, ButlerURI] outputLocation, str run, *bool clobber=False, Optional[Callable[[Butler], Butler]] butlerModifier=None, Optional[Iterable[str]] collections=None)