21 from __future__
import annotations
23 __all__ = (
"LoadHelper")
25 from lsst.daf.butler
import ButlerURI, Quantum
26 from lsst.daf.butler.core._butlerUri.s3
import ButlerS3URI
27 from lsst.daf.butler.core._butlerUri.file
import ButlerFileURI
29 from ..pipeline
import TaskDef
30 from .quantumNode
import NodeId
32 from dataclasses
import dataclass
39 from collections
import defaultdict, UserDict
40 from typing
import (Optional, Iterable, DefaultDict, Set, Dict, TYPE_CHECKING, Type, Union)
43 from .
import QuantumGraph
49 return DefaultLoadHelper
57 """Used to register classes as Load helpers
59 When decorating a class the parameter is the class of "handle type", i.e.
60 a ButlerURI type or open file handle that will be used to do the loading.
61 This is then associated with the decorated class such that when the
62 parameter type is used to load data, the appropriate helper to work with
63 that data type can be returned.
65 A decorator is used so that in theory someone could define another handler
66 in a different module and register it for use.
70 URIClass : Type of `~lsst.daf.butler.ButlerURI` or `~io.IO` of bytes
71 type for which the decorated class should be mapped to
74 HELPER_REGISTRY[URICLass] = class_
80 """Default load helper for `QuantumGraph` save files
82 This class, and its subclasses, are used to unpack a quantum graph save
83 file. This file is a binary representation of the graph in a format that
84 allows individual nodes to be loaded without needing to load the entire
87 This default implementation has the interface to load select nodes
88 from disk, but actually always loads the entire save file and simply
89 returns what nodes (or all) are requested. This is intended to serve for
90 all cases where there is a read method on the input parameter, but it is
91 unknown how to read select bytes of the stream. It is the responsibility of
92 sub classes to implement the method responsible for loading individual
93 bytes from the stream.
97 uriObject : `~lsst.daf.butler.ButlerURI` or `io.IO` of bytes
98 This is the object that will be used to retrieve the raw bytes of the
104 Raised if the specified file contains the wrong file signature and is
105 not a `QuantumGraph` save
107 def __init__(self, uriObject: Union[ButlerURI, io.IO[bytes]]):
110 preambleSize, taskDefSize, nodeSize = self.
_readSizes_readSizes()
113 self.
headerSizeheaderSize = preambleSize + taskDefSize + nodeSize
117 def _readSizes(self):
119 from .graph
import STRUCT_FMT_STRING, MAGIC_BYTES
124 magicSize = len(MAGIC_BYTES)
125 fmt = STRUCT_FMT_STRING
126 fmtSize = struct.calcsize(fmt)
127 preambleSize = magicSize + fmtSize
129 headerBytes = self.
_readBytes_readBytes(0, preambleSize)
130 magic = headerBytes[:magicSize]
131 sizeBytes = headerBytes[magicSize:]
133 if magic != MAGIC_BYTES:
134 raise ValueError(
"This file does not appear to be a quantum graph save got magic bytes "
135 f
"{magic}, expected {MAGIC_BYTES}")
138 save_version, taskDefSize, nodeSize = struct.unpack(
'>HQQ', sizeBytes)
144 return preambleSize, taskDefSize, nodeSize
146 def _readByteMappings(self, preambleSize, headerSize, taskDefSize):
151 headerMaps = self.
_readBytes_readBytes(preambleSize, headerSize)
154 self.
taskDefMaptaskDefMap = pickle.loads(headerMaps[:taskDefSize])
161 self.
mapmap = pickle.loads(headerMaps[taskDefSize:])
163 def load(self, nodes: Optional[Iterable[int]] =
None, graphID: Optional[str] =
None) -> QuantumGraph:
164 """Loads in the specified nodes from the graph
166 Load in the `QuantumGraph` containing only the nodes specified in the
167 ``nodes`` parameter from the graph specified at object creation. If
168 ``nodes`` is None (the default) the whole graph is loaded.
172 nodes : `Iterable` of `int` or `None`
173 The nodes to load from the graph, loads all if value is None
175 graphID : `str` or `None`
176 If specified this ID is verified against the loaded graph prior to
177 loading any Nodes. This defaults to None in which case no
182 graph : `QuantumGraph`
183 The loaded `QuantumGraph` object
188 Raised if one or more of the nodes requested is not in the
189 `QuantumGraph` or if graphID parameter does not match the graph
193 from .
import QuantumGraph
194 if graphID
is not None and self.
_buildId_buildId != graphID:
195 raise ValueError(
'graphID does not match that of the graph being loaded')
201 _readBytes = functools.partial(DefaultLoadHelper._readBytes, self)
208 remainder = nodes - self.
mapmap.
keys()
210 raise ValueError(
"Nodes {remainder} were requested, but could not be found in the input "
214 quanta: DefaultDict[TaskDef, Set[Quantum]] = defaultdict(set)
215 quantumToNodeId: Dict[Quantum, NodeId] = {}
220 start, stop = self.
mapmap[node]
226 dump = lzma.decompress(_readBytes(start, stop))
229 qNode = pickle.loads(dump)
233 nodeTask = qNode.taskDef
234 if nodeTask
not in loadedTaskDef:
236 start, stop = self.
taskDefMaptaskDefMap[nodeTask]
243 taskDef = pickle.loads(lzma.decompress(_readBytes(start, stop)))
244 loadedTaskDef[nodeTask] = taskDef
247 object.__setattr__(qNode,
'taskDef', loadedTaskDef[nodeTask])
248 quanta[qNode.taskDef].add(qNode.quantum)
251 quantumToNodeId[qNode.quantum] = qNode.nodeId
255 qGraph = object.__new__(QuantumGraph)
256 qGraph._buildGraphs(quanta, _quantumToNodeId=quantumToNodeId, _buildId=self.
_buildId_buildId)
259 def _readBytes(self, start: int, stop: int) -> bytes:
260 """Loads the specified byte range from the ButlerURI object
262 In the base class, this actually will read all the bytes into a buffer
263 from the specified ButlerURI object. Then for each method call will
264 return the requested byte range. This is the most flexible
265 implementation, as no special read is required. This will not give a
266 speed up with any sub graph reads though.
268 if not hasattr(self,
'buffer'):
270 return self.
bufferbuffer[start:stop]
273 """Cleans up an instance if needed. Base class does nothing
281 def _readBytes(self, start: int, stop: int) -> bytes:
285 args[
"Range"] = f
"bytes={start}-{stop-1}"
287 response = self.
uriObjecturiObject.client.get_object(Bucket=self.
uriObjecturiObject.netloc,
288 Key=self.
uriObjecturiObject.relativeToPathRoot,
290 except (self.
uriObjecturiObject.client.exceptions.NoSuchKey,
291 self.
uriObjecturiObject.client.exceptions.NoSuchBucket)
as err:
292 raise FileNotFoundError(f
"No such resource: {self.uriObject}")
from err
293 body = response[
"Body"].read()
294 response[
"Body"].
close()
298 @register_helper(ButlerFileURI)
301 def _readBytes(self, start: int, stop: int) -> bytes:
302 if not hasattr(self,
'fileHandle'):
305 return self.
fileHandlefileHandle.read(stop-start)
308 if hasattr(self,
'fileHandle'):
312 @register_helper(io.IOBase)
333 def _readBytes(self, start: int, stop: int) -> bytes:
335 result = self.
uriObjecturiObject.read(stop-start)
341 """This is a helper class to assist with selecting the appropriate loader
342 and managing any contexts that may be needed.
346 This class may go away or be modified in the future if some of the
347 features of this module can be propagated to `~lsst.daf.butler.ButlerURI`.
349 This helper will raise a `ValueError` if the specified file does not appear
350 to be a valid `QuantumGraph` save file.
353 """ButlerURI object from which the `QuantumGraph` is to be loaded
359 if isinstance(self.uri, io.IOBase):
363 self.
_loaded_loaded = HELPER_REGISTRY[key](self.uri)
bytes _readBytes(self, int start, int stop)
def __init__(self, Union[ButlerURI, io.IO[bytes]] uriObject)
QuantumGraph load(self, Optional[Iterable[int]] nodes=None, Optional[str] graphID=None)
def _readByteMappings(self, preambleSize, headerSize, taskDefSize)
def __exit__(self, type, value, traceback)
def __init__(self, io.IO[bytes] uriObject)
def __missing__(self, key)
daf::base::PropertyList * list
daf::base::PropertySet * set
def register_helper(Union[Type[ButlerURI], Type[io.IO[bytes]]] URICLass)