LSST Applications  21.0.0-1-g8760c09+64c1bc5aa5,21.0.0-1-ga51b5d4+5491f2a448,21.0.0-12-gbc0a1a5+a13491cd7a,21.0.0-16-gec61338+c1a0df68cc,21.0.0-17-g535a0ce+2ce1b35d7c,21.0.0-17-gd94932e+3011406107,21.0.0-18-g7027a3f+ff861a2432,21.0.0-18-ga7882e3+3bf3df37a8,21.0.0-2-g103fe59+61779578b6,21.0.0-2-g45278ab+64c1bc5aa5,21.0.0-2-g5242d73+d85392d81f,21.0.0-2-g7f82c8f+b7a913d2e8,21.0.0-2-ga326454+b7a913d2e8,21.0.0-2-gde069b7+4f46bdaea8,21.0.0-2-gecfae73+34aa867395,21.0.0-2-gfc62afb+d85392d81f,21.0.0-21-gbccb7aa+25657be8c9,21.0.0-24-g07df93d+5d47c285b5,21.0.0-25-g98a1dda9+87b78843fe,21.0.0-3-g357aad2+26c44b7683,21.0.0-3-g4a4ce7f+d85392d81f,21.0.0-3-g4be5c26+d85392d81f,21.0.0-3-gd7ab7aa+fdc5edd43f,21.0.0-3-ge02ed75+2ce1b35d7c,21.0.0-36-g68b03cce9+fb0e51fe34,21.0.0-4-g65b4814+25657be8c9,21.0.0-4-g7dab645+8dd487cf55,21.0.0-4-ge8a399c+718fb879c1,21.0.0-5-g19a7531+2ce1b35d7c,21.0.0-5-g8c1d971+7b9a448d34,21.0.0-5-gcc89fd6+fdc5edd43f,21.0.0-5-gd00fb1e+0be1da2719,21.0.0-6-g0bf7090+18535a8d22,21.0.0-6-gc675373+d85392d81f,21.0.0-7-gdf92d54+64c1bc5aa5,21.0.0-8-g5674e7b+8087abed30,21.0.0-86-gac3e7760+06ac5cc073,master-gac4afde19b+2ce1b35d7c,w.2021.21
LSST Data Management Base Package
executionButlerBuilder.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import annotations
22 
23 __all__ = ("buildExecutionButler", )
24 
25 import io
26 
27 from collections import defaultdict
28 from typing import Callable, DefaultDict, Mapping, Optional, Set, Tuple, Iterable, List, Union
29 
30 from lsst.daf.butler import (DatasetRef, DatasetType, Butler, DataCoordinate, ButlerURI, Config)
31 from lsst.daf.butler.core.utils import getClassOf
32 from lsst.daf.butler.transfers import RepoExportContext
33 from lsst.daf.butler.core.repoRelocation import BUTLER_ROOT_TAG
34 
35 
36 from . import QuantumGraph, QuantumNode
37 
38 DataSetTypeMap = Mapping[DatasetType, Set[DataCoordinate]]
39 
40 
41 def _accumulate(graph: QuantumGraph) -> Tuple[Set[DatasetRef], DataSetTypeMap]:
42  # accumulate the dataIds that will be transferred to the execution
43  # registry
44 
45  # exports holds all the existing data that will be migrated to the
46  # execution butler
47  exports: Set[DatasetRef] = set()
48 
49  # inserts is the mapping of DatasetType to dataIds for what is to be
50  # inserted into the registry. These are the products that are expected
51  # to be produced during processing of the QuantumGraph
52  inserts: DefaultDict[DatasetType, Set[DataCoordinate]] = defaultdict(set)
53 
54  n: QuantumNode
55  for quantum in (n.quantum for n in graph):
56  for attrName in ("initInputs", "inputs", "outputs"):
57  attr: Mapping[DatasetType, Union[DatasetRef, List[DatasetRef]]] = getattr(quantum, attrName)
58 
59  for type, refs in attr.items():
60  # This if block is because init inputs has a different
61  # signature for its items
62  if not isinstance(refs, list):
63  refs = [refs]
64  # iterate over all the references, if it has an id, it
65  # means it exists and should be exported, if not it should
66  # be inserted into the new registry
67  for ref in refs:
68  if ref.isComponent():
69  # We can't insert a component, and a component will
70  # be part of some other upstream dataset, so it
71  # should be safe to skip them here
72  continue
73 
74  if ref.id is not None:
75  exports.add(ref)
76  else:
77  inserts[type].add(ref.dataId)
78  return exports, inserts
79 
80 
81 def _discoverCollections(butler: Butler, collections: Iterable[str]) -> set[str]:
82  # Recurse through any discovered collections to make sure all collections
83  # are exported. This exists because I ran into a situation where some
84  # collections were not properly being discovered and exported. This
85  # method may be able to be removed in the future if collection export
86  # logic changes
87  collections = set(collections)
88  while True:
89  discoveredCollections = set(butler.registry.queryCollections(collections, flattenChains=True,
90  includeChains=True))
91  if len(discoveredCollections) > len(collections):
92  collections = discoveredCollections
93  else:
94  break
95  return collections
96 
97 
98 def _export(butler: Butler, collections: Optional[Iterable[str]], exports: Set[DatasetRef]) -> io.StringIO:
99  # This exports the datasets that exist in the input butler using
100  # daf butler objects, however it reaches in deep and does not use the
101  # public methods so that it can export it to a string buffer and skip
102  # disk access.
103  yamlBuffer = io.StringIO()
104  # Yaml is hard coded, since the class controls both ends of the
105  # export/import
106  BackendClass = getClassOf(butler._config["repo_transfer_formats", "yaml", "export"])
107  backend = BackendClass(yamlBuffer)
108  exporter = RepoExportContext(butler.registry, butler.datastore, backend, directory=None, transfer=None)
109  exporter.saveDatasets(exports)
110 
111  # Look for any defined collection, if not get the defaults
112  if collections is None:
113  collections = butler.registry.defaults.collections
114 
115  # look up all collections associated with those inputs, this follows
116  # all chains to make sure everything is properly exported
117  for c in _discoverCollections(butler, collections):
118  exporter.saveCollection(c)
119  exporter._finish()
120 
121  # reset the string buffer to the beginning so the read operation will
122  # actually *see* the data that was exported
123  yamlBuffer.seek(0)
124  return yamlBuffer
125 
126 
127 def _setupNewButler(butler: Butler, outputLocation: ButlerURI, dirExists: bool) -> Butler:
128  # Set up the new butler object at the specified location
129  if dirExists:
130  # Remove the existing table, if the code got this far and this exists
131  # clobber must be true
132  executionRegistry = outputLocation.join("gen3.sqlite3")
133  if executionRegistry.exists():
134  executionRegistry.remove()
135  else:
136  outputLocation.mkdir()
137 
138  # Copy the existing butler config, modifying the location of the
139  # registry to the specified location.
140  # Preserve the root path from the existing butler so things like
141  # file data stores continue to look at the old location.
142  config = Config(butler._config)
143  config["root"] = outputLocation.geturl()
144  config["registry", "db"] = "sqlite:///<butlerRoot>/gen3.sqlite3"
145  # record the current root of the datastore if it is specified relative
146  # to the butler root
147  if config.get(("datastore", "root")) == BUTLER_ROOT_TAG:
148  config["datastore", "root"] = butler._config.configDir.geturl()
149  config["datastore", "trust_get_request"] = True
150 
151  config = Butler.makeRepo(root=outputLocation, config=config, overwrite=True, forceConfigRoot=False)
152 
153  # Return a newly created butler
154  return Butler(config, writeable=True)
155 
156 
157 def _import(yamlBuffer: io.StringIO,
158  newButler: Butler,
159  inserts: DataSetTypeMap,
160  run: str,
161  butlerModifier: Optional[Callable[[Butler], Butler]]
162  ) -> Butler:
163  # This method takes the exports from the existing butler, imports
164  # them into the newly created butler, and then inserts the datasets
165  # that are expected to be produced.
166 
167  # import the existing datasets
168  newButler.import_(filename=yamlBuffer, format="yaml", reuseIds=True)
169 
170  # If there is modifier callable, run it to make necessary updates
171  # to the new butler.
172  if butlerModifier is not None:
173  newButler = butlerModifier(newButler)
174 
175  # Register datasets to be produced and insert them into the registry
176  for dsType, dataIds in inserts.items():
177  newButler.registry.registerDatasetType(dsType)
178  newButler.registry.insertDatasets(dsType, dataIds, run)
179 
180  return newButler
181 
182 
183 def buildExecutionButler(butler: Butler,
184  graph: QuantumGraph,
185  outputLocation: Union[str, ButlerURI],
186  run: str,
187  *,
188  clobber: bool = False,
189  butlerModifier: Optional[Callable[[Butler], Butler]] = None,
190  collections: Optional[Iterable[str]] = None
191  ) -> Butler:
192  r"""buildExecutionButler is a function that is responsible for exporting
193  input `QuantumGraphs` into a new minimal `~lsst.daf.butler.Butler` which
194  only contains datasets specified by the `QuantumGraph`. These datasets are
195  both those that already exist in the input `~lsst.daf.butler.Butler`, and
196  those that are expected to be produced during the execution of the
197  `QuantumGraph`.
198 
199  Parameters
200  ----------
201  butler : `lsst.daf.butler.Bulter`
202  This is the existing `~lsst.daf.butler.Butler` instance from which
203  existing datasets will be exported. This should be the
204  `~lsst.daf.butler.Butler` which was used to create any `QuantumGraphs`
205  that will be converted with this object.
206  graph : `QuantumGraph`
207  Graph containing nodes that are to be exported into an execution
208  butler
209  outputLocation : `str` or `~lsst.daf.butler.ButlerURI`
210  URI Location at which the execution butler is to be exported. May be
211  specified as a string or a ButlerURI instance.
212  run : `str` optional
213  The run collection that the exported datasets are to be placed in. If
214  None, the default value in registry.defaults will be used.
215  clobber : `bool`, Optional
216  By default a butler will not be created if a file or directory
217  already exists at the output location. If this is set to `True`
218  what is at the location will be deleted prior to running the
219  export. Defaults to `False`
220  butlerModifier : `~typing.Callable`, Optional
221  If supplied this should be a callable that accepts a
222  `~lsst.daf.butler.Butler`, and returns an instantiated
223  `~lsst.daf.butler.Butler`. This callable may be used to make any
224  modifications to the `~lsst.daf.butler.Butler` desired. This
225  will be called after importing all datasets that exist in the input
226  `~lsst.daf.butler.Butler` but prior to inserting Datasets expected
227  to be produced. Examples of what this method could do include
228  things such as creating collections/runs/ etc.
229  collections : `~typing.Iterable` of `str`, Optional
230  An iterable of collection names that will be exported from the input
231  `~lsst.daf.butler.Butler` when creating the execution butler. If not
232  supplied the `~lsst.daf.butler.Butler`\ 's `~lsst.daf.butler.Registry`
233  default collections will be used.
234 
235  Returns
236  -------
237  executionButler : `lsst.daf.butler.Butler`
238  An instance of the newly created execution butler
239 
240  Raises
241  ------
242  FileExistsError
243  Raised if something exists in the filesystem at the specified output
244  location and clobber is `False`
245  NotADirectoryError
246  Raised if specified output URI does not correspond to a directory
247  """
248  outputLocation = ButlerURI(outputLocation)
249 
250  # Do this first to Fail Fast if the output exists
251  if (dirExists := outputLocation.exists()) and not clobber:
252  raise FileExistsError("Cannot create a butler at specified location, location exists")
253  if not outputLocation.isdir():
254  raise NotADirectoryError("The specified output URI does not appear to correspond to a directory")
255 
256  exports, inserts = _accumulate(graph)
257  yamlBuffer = _export(butler, collections, exports)
258 
259  newButler = _setupNewButler(butler, outputLocation, dirExists)
260 
261  return _import(yamlBuffer, newButler, inserts, run, butlerModifier)
daf::base::PropertySet * set
Definition: fits.cc:912
Butler buildExecutionButler(Butler butler, QuantumGraph graph, Union[str, ButlerURI] outputLocation, str run, *bool clobber=False, Optional[Callable[[Butler], Butler]] butlerModifier=None, Optional[Iterable[str]] collections=None)