LSST Applications  21.0.0-1-g8760c09+64c1bc5aa5,21.0.0-1-ga51b5d4+5491f2a448,21.0.0-12-gbc0a1a5+358ec59a0b,21.0.0-16-gec61338+5610ae7019,21.0.0-17-g535a0ce+68fccf98ea,21.0.0-17-g712f02f+45471446cb,21.0.0-17-gd94932e+2079490dd7,21.0.0-18-g7027a3f+4cbab02c09,21.0.0-2-g103fe59+06c026c8ca,21.0.0-2-g45278ab+64c1bc5aa5,21.0.0-2-g5242d73+6960a99322,21.0.0-2-g7f82c8f+67f3a3c28a,21.0.0-2-ga326454+67f3a3c28a,21.0.0-2-gde069b7+4f46bdaea8,21.0.0-2-gecfae73+2147fbf537,21.0.0-2-gfc62afb+6960a99322,21.0.0-20-gddd4cb5+65cad37c35,21.0.0-23-gd55139e+1c650b9db3,21.0.0-23-gdb5a2aa5+0812f74ba3,21.0.0-3-g357aad2+4d30138d3a,21.0.0-3-g4a4ce7f+6960a99322,21.0.0-3-g4be5c26+6960a99322,21.0.0-3-gd7ab7aa+fdc5edd43f,21.0.0-3-ge02ed75+68fccf98ea,21.0.0-35-g4939cbb3d+fb0e51fe34,21.0.0-4-g591bb35+68fccf98ea,21.0.0-4-g65b4814+65cad37c35,21.0.0-4-g7dab645+661e67791c,21.0.0-4-ge8a399c+f93516e5e6,21.0.0-5-g8c1d971+7b9a448d34,21.0.0-5-gcc89fd6+fdc5edd43f,21.0.0-5-gd00fb1e+d528b983c0,21.0.0-6-g0bf7090+18535a8d22,21.0.0-6-gc675373+6960a99322,21.0.0-7-gdf92d54+64c1bc5aa5,21.0.0-8-g5674e7b+8087abed30,21.0.0-85-g296c1d01+374697a2d0,master-gac4afde19b+68fccf98ea,w.2021.20
LSST Data Management Base Package
makeLightWeightButler.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ("buildLightweightButler", )
24 
25 import io
26 
27 from collections import defaultdict
28 from typing import Callable, DefaultDict, Mapping, Optional, Set, Tuple, Iterable, List, Union
29 import os
30 import shutil
31 
32 from lsst.daf.butler import (DatasetRef, DatasetType, Butler, ButlerConfig, Registry, DataCoordinate,
33  RegistryConfig)
34 from lsst.daf.butler.core.utils import getClassOf
35 from lsst.daf.butler.transfers import RepoExportContext
36 
37 
38 from . import QuantumGraph, QuantumNode
39 
40 DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
41 
42 
43 def _accumulate(graph: QuantumGraph) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
44  # accumulate the dataIds that will be transferred to the lightweight
45  # registry
46 
47  # exports holds all the existing data that will be migrated to the
48  # lightweight butler
49  exports: Set[DatasetRef] = set()
50 
51  # inserts is the mapping of DatasetType to dataIds for what is to be
52  # inserted into the registry. These are the products that are expected
53  # to be produced during processing of the QuantumGraph
54  inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
55 
56  n: QuantumNode
57  for quantum in (n.quantum for n in graph):
58  for attrName in ("initInputs", "inputs", "outputs"):
59  attr: Mapping[DatasetType, Union[DatasetRef, List[DatasetRef]]] = getattr(quantum, attrName)
60 
61  for type, refs in attr.items():
62  # This if block is because init inputs has a different
63  # signature for its items
64  if not isinstance(refs, list):
65  refs = [refs]
66  # iterate over all the references, if it has an id, it
67  # means it exists and should be exported, if not it should
68  # be inserted into the new registry
69  for ref in refs:
70  if ref.isComponent():
71  # We can't insert a component, and a component will
72  # be part of some other upstream dataset, so it
73  # should be safe to skip them here
74  continue
75 
76  if ref.id is not None:
77  exports.add(ref)
78  else:
79  inserts[type].add(ref.dataId)
80  return exports, inserts
81 
82 
83 def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]:
84  # Recurse through any discovered collections to make sure all collections
85  # are exported. This exists because I ran into a situation where some
86  # collections were not properly being discovered and exported. This
87  # method may be able to be removed in the future if collection export
88  # logic changes
89  collections = set(collections)
90  while True:
91  discoveredCollections = set(butler.registry.queryCollections(collections, flattenChains=True,
92  includeChains=True))
93  if len(discoveredCollections) > len(collections):
94  collections = discoveredCollections
95  else:
96  break
97  return collections
98 
99 
100 def _export(butler: Butler, collections: Optional[Iterable[str]], exports: Set[DatasetRef]) -> io.StringIO:
101  # This exports the datasets that exist in the input butler using
102  # daf butler objects, however it reaches in deep and does not use the
103  # public methods so that it can export it to a string buffer and skip
104  # disk access.
105  yamlBuffer = io.StringIO()
106  # Yaml is hard coded, since the class controls both ends of the
107  # export/import
108  BackendClass = getClassOf(butler._config["repo_transfer_formats", "yaml", "export"])
109  backend = BackendClass(yamlBuffer)
110  exporter = RepoExportContext(butler.registry, butler.datastore, backend, directory=None, transfer=None)
111  exporter.saveDatasets(exports)
112 
113  # Look for any defined collection, if not get the defaults
114  if collections is None:
115  collections = butler.registry.defaults.collections
116 
117  # look up all collections associated with those inputs, this follows
118  # all chains to make sure everything is properly exported
119  for c in _discoverCollections(butler, collections):
120  exporter.saveCollection(c)
121  exporter._finish()
122 
123  # reset the string buffer to the beginning so the read operation will
124  # actually *see* the data that was exported
125  yamlBuffer.seek(0)
126  return yamlBuffer
127 
128 
129 def _setupNewButler(butler: Butler, outputLocation: str, dirExists: bool) -> Butler:
130  # Set up the new butler object at the specified location
131  if dirExists:
132  if os.path.isfile(outputLocation):
133  os.remove(outputLocation)
134  else:
135  shutil.rmtree(outputLocation)
136  os.mkdir(outputLocation)
137 
138  # Copy the existing butler config, modifying the location of the
139  # registry to the specified location.
140  # Preserve the root path from the existing butler so things like
141  # file data stores continue to look at the old location.
142  config = ButlerConfig(butler._config)
143  config["registry", "db"] = f"sqlite:///{outputLocation}/gen3.sqlite3"
144  config["root"] = butler._config.configDir.ospath
145 
146  # Create the new registry which will create and populate the sqlite
147  # file.
148  Registry.createFromConfig(RegistryConfig(config))
149 
150  # Return a newly created butler
151  return Butler(config, writeable=True)
152 
153 
154 def _import(yamlBuffer: io.StringIO,
155  newButler: Butler,
156  inserts: DataSetTypeMap,
157  run: str,
158  butlerModifier: Optional[Callable[[Butler], Butler]]
159  ) -> Butler:
160  # This method takes the exports from the existing butler, imports
161  # them into the newly created butler, and then inserts the datasets
162  # that are expected to be produced.
163 
164  # import the existing datasets
165  newButler.import_(filename=yamlBuffer, format="yaml", reuseIds=True)
166 
167  # If there is modifier callable, run it to make necessary updates
168  # to the new butler.
169  if butlerModifier is not None:
170  newButler = butlerModifier(newButler)
171 
172  # Register datasets to be produced and insert them into the registry
173  for dsType, dataIds in inserts.items():
174  newButler.registry.registerDatasetType(dsType)
175  newButler.registry.insertDatasets(dsType, dataIds, run)
176 
177  return newButler
178 
179 
180 def buildLightweightButler(butler: Butler,
181  graph: QuantumGraph,
182  outputLocation: str,
183  run: str,
184  *,
185  clobber: bool = False,
186  butlerModifier: Optional[Callable[[Butler], Butler]] = None,
187  collections: Optional[Iterable[str]] = None
188  ) -> None:
189  r"""buildLightweightButler is a function that is responsible for exporting
190  input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
191  only contains datasets specified by the `QuantumGraph`. These datasets are
192  both those that already exist in the input `~lsst.daf.butler.Butler`, and
193  those that are expected to be produced during the execution of the
194  `QuantumGraph`.
195 
196  Parameters
197  ----------
198  butler : `lsst.daf.butler.Bulter`
199  This is the existing `~lsst.daf.butler.Butler` instance from which
200  existing datasets will be exported. This should be the
201  `~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
202  that will be converted with this object.
203  graph : `QuantumGraph`
204  Graph containing nodes that are to be exported into a lightweight
205  butler
206  outputLocation : `str`
207  Location at which the lightweight butler is to be exported
208  run : `str`
209  The run collection that the exported datasets are to be placed in.
210  clobber : `bool`, Optional
211  By default a butler will not be created if a file or directory
212  already exists at the output location. If this is set to `True`
213  what is at the location will be deleted prior to running the
214  export. Defaults to `False`
215  butlerModifier : `~typing.Callable`, Optional
216  If supplied this should be a callable that accepts a
217  `~lsst.daf.butler.Butler`, and returns an instantiated
218  `~lsst.daf.butler.Butler`. This callable may be used to make any
219  modifications to the `~lsst.daf.butler.Butler` desired. This
220  will be called after importing all datasets that exist in the input
221  `~lsst.daf.butler.Butler` but prior to inserting Datasets expected
222  to be produced. Examples of what this method could do include
223  things such as creating collections/runs/ etc.
224  collections : `~typing.Iterable` of `str`, Optional
225  An iterable of collection names that will be exported from the input
226  `~lsst.daf.butler.Butler` when creating the lightweight butler. If not
227  supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
228  default collections will be used.
229 
230  Raises
231  ------
232  FileExistsError
233  Raise if something exists in the filesystem at the specified output
234  location and clobber is `False`
235  """
236  # Do this first to Fail Fast if the output exists
237  if (dirExists := os.path.exists(outputLocation)) and not clobber:
238  raise FileExistsError("Cannot create a butler at specified location, location exists")
239 
240  exports, inserts = _accumulate(graph)
241  yamlBuffer = _export(butler, collections, exports)
242 
243  newButler = _setupNewButler(butler, outputLocation, dirExists)
244 
245  newButler = _import(yamlBuffer, newButler, inserts, run, butlerModifier)
246  newButler._config.dumpToUri(f"{outputLocation}/butler.yaml")
daf::base::PropertySet * set
Definition: fits.cc:912
None buildLightweightButler(Butler butler, QuantumGraph graph, str outputLocation, str run, *bool clobber=False, Optional[Callable[[Butler], Butler]] butlerModifier=None, Optional[Iterable[str]] collections=None)