21 from __future__
import annotations
23 __all__ = (
"buildLightweightButler", )
27 from collections
import defaultdict
28 from typing
import Callable, DefaultDict, Mapping, Optional, Set, Tuple, Iterable, List, Union
32 from lsst.daf.butler
import (DatasetRef, DatasetType, Butler, ButlerConfig, Registry, DataCoordinate,
34 from lsst.daf.butler.core.utils
import getClassOf
35 from lsst.daf.butler.transfers
import RepoExportContext
38 from .
import QuantumGraph, QuantumNode
40 DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
43 def _accumulate(graph: QuantumGraph) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
49 exports: Set[DatasetRef] =
set()
54 inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
57 for quantum
in (n.quantum
for n
in graph):
58 for attrName
in (
"initInputs",
"inputs",
"outputs"):
59 attr: Mapping[DatasetType, Union[DatasetRef, List[DatasetRef]]] = getattr(quantum, attrName)
61 for type, refs
in attr.items():
64 if not isinstance(refs, list):
76 if ref.id
is not None:
79 inserts[type].add(ref.dataId)
80 return exports, inserts
83 def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]:
89 collections =
set(collections)
91 discoveredCollections =
set(butler.registry.queryCollections(collections, flattenChains=
True,
93 if len(discoveredCollections) > len(collections):
94 collections = discoveredCollections
100 def _export(butler: Butler, collections: Optional[Iterable[str]], exports: Set[DatasetRef]) -> io.StringIO:
105 yamlBuffer = io.StringIO()
108 BackendClass = getClassOf(butler._config[
"repo_transfer_formats",
"yaml",
"export"])
109 backend = BackendClass(yamlBuffer)
110 exporter = RepoExportContext(butler.registry, butler.datastore, backend, directory=
None, transfer=
None)
111 exporter.saveDatasets(exports)
114 if collections
is None:
115 collections = butler.registry.defaults.collections
119 for c
in _discoverCollections(butler, collections):
120 exporter.saveCollection(c)
129 def _setupNewButler(butler: Butler, outputLocation: str, dirExists: bool) -> Butler:
132 if os.path.isfile(outputLocation):
133 os.remove(outputLocation)
135 shutil.rmtree(outputLocation)
136 os.mkdir(outputLocation)
142 config = ButlerConfig(butler._config)
143 config[
"registry",
"db"] = f
"sqlite:///{outputLocation}/gen3.sqlite3"
144 config[
"root"] = butler._config.configDir.ospath
148 Registry.createFromConfig(RegistryConfig(config))
151 return Butler(config, writeable=
True)
154 def _import(yamlBuffer: io.StringIO,
156 inserts: DataSetTypeMap,
158 butlerModifier: Optional[Callable[[Butler], Butler]]
165 newButler.import_(filename=yamlBuffer, format=
"yaml", reuseIds=
True)
169 if butlerModifier
is not None:
170 newButler = butlerModifier(newButler)
173 for dsType, dataIds
in inserts.items():
174 newButler.registry.registerDatasetType(dsType)
175 newButler.registry.insertDatasets(dsType, dataIds, run)
185 clobber: bool =
False,
186 butlerModifier: Optional[Callable[[Butler], Butler]] =
None,
187 collections: Optional[Iterable[str]] =
None
189 r"""buildLightweightButler is a function that is responsible for exporting
190 input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
191 only contains datasets specified by the `QuantumGraph`. These datasets are
192 both those that already exist in the input `~lsst.daf.butler.Butler`, and
193 those that are expected to be produced during the execution of the
198 butler : `lsst.daf.butler.Bulter`
199 This is the existing `~lsst.daf.butler.Butler` instance from which
200 existing datasets will be exported. This should be the
201 `~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
202 that will be converted with this object.
203 graph : `QuantumGraph`
204 Graph containing nodes that are to be exported into a lightweight
206 outputLocation : `str`
207 Location at which the lightweight butler is to be exported
209 The run collection that the exported datasets are to be placed in.
210 clobber : `bool`, Optional
211 By default a butler will not be created if a file or directory
212 already exists at the output location. If this is set to `True`
213 what is at the location will be deleted prior to running the
214 export. Defaults to `False`
215 butlerModifier : `~typing.Callable`, Optional
216 If supplied this should be a callable that accepts a
217 `~lsst.daf.butler.Butler`, and returns an instantiated
218 `~lsst.daf.butler.Butler`. This callable may be used to make any
219 modifications to the `~lsst.daf.butler.Butler` desired. This
220 will be called after importing all datasets that exist in the input
221 `~lsst.daf.butler.Butler` but prior to inserting Datasets expected
222 to be produced. Examples of what this method could do include
223 things such as creating collections/runs/ etc.
224 collections : `~typing.Iterable` of `str`, Optional
225 An iterable of collection names that will be exported from the input
226 `~lsst.daf.butler.Butler` when creating the lightweight butler. If not
227 supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
228 default collections will be used.
233 Raise if something exists in the filesystem at the specified output
234 location and clobber is `False`
237 if (dirExists := os.path.exists(outputLocation))
and not clobber:
238 raise FileExistsError(
"Cannot create a butler at specified location, location exists")
240 exports, inserts = _accumulate(graph)
241 yamlBuffer = _export(butler, collections, exports)
243 newButler = _setupNewButler(butler, outputLocation, dirExists)
245 newButler = _import(yamlBuffer, newButler, inserts, run, butlerModifier)
246 newButler._config.dumpToUri(f
"{outputLocation}/butler.yaml")
daf::base::PropertySet * set
None buildLightweightButler(Butler butler, QuantumGraph graph, str outputLocation, str run, *bool clobber=False, Optional[Callable[[Butler], Butler]] butlerModifier=None, Optional[Iterable[str]] collections=None)