LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbReplica", "ReplicaChunk", "ApdbTableData"]
25
26import uuid
27from abc import ABC, abstractmethod
28from collections.abc import Collection, Iterable, Sequence
29from dataclasses import dataclass
30from typing import TYPE_CHECKING, cast
31
32import astropy.time
33from lsst.pex.config import Config
34from lsst.resources import ResourcePath, ResourcePathExpression
35
36from .apdb import ApdbConfig
37from .apdbIndex import ApdbIndex
38from .factory import make_apdb_replica
39
40if TYPE_CHECKING:
41 from .versionTuple import VersionTuple
42
43
44class ApdbTableData(ABC):
45 """Abstract class for representing table data."""
46
47 @abstractmethod
48 def column_names(self) -> Sequence[str]:
49 """Return ordered sequence of column names in the table.
50
51 Returns
52 -------
53 names : `~collections.abc.Sequence` [`str`]
54 Column names.
55 """
56 raise NotImplementedError()
57
58 @abstractmethod
59 def rows(self) -> Collection[tuple]:
60 """Return table rows, each row is a tuple of values.
61
62 Returns
63 -------
64 rows : `~collections.abc.Collection` [`tuple`]
65 Collection of tuples.
66 """
67 raise NotImplementedError()
68
69
70@dataclass(frozen=True)
72 """Class used for identification of replication chunks.
73
74 Instances of this class are used to identify the units of transfer from
75 APDB to PPDB. Usually single `ReplicaChunk` corresponds to multiple
76 consecutive calls to `Apdb.store` method.
77
78 Every ``store`` with the same ``id`` value will update ``unique_id`` with
79 some unique value so that it can be verified on PPDB side.
80 """
81
82 id: int
83 """A number identifying replication chunk (`int`)."""
84
85 last_update_time: astropy.time.Time
86 """Time of last insert for this chunk, usually corresponds to visit time
87 (`astropy.time.Time`).
88 """
89
90 unique_id: uuid.UUID
91 """Unique value updated on each new store (`uuid.UUID`)."""
92
93 @classmethod
95 cls, last_update_time: astropy.time.Time, chunk_window_seconds: int
96 ) -> ReplicaChunk:
97 """Generate new unique insert identifier."""
98 seconds = int(last_update_time.unix_tai)
99 seconds = (seconds // chunk_window_seconds) * chunk_window_seconds
100 unique_id = uuid.uuid4()
101 return ReplicaChunk(id=seconds, last_update_time=last_update_time, unique_id=unique_id)
102
103 def __str__(self) -> str:
104 class_name = self.__class__.__name__
105 time_str = str(self.last_update_time.tai.isot)
106 return f"{class_name}(id={self.id:10d}, last_update_time={time_str}/tai, unique_id={self.unique_id})"
107
108
109class ApdbReplica(ABC):
110 """Abstract interface for APDB replication methods."""
111
112 @classmethod
113 def from_config(cls, config: ApdbConfig) -> ApdbReplica:
114 """Create ApdbReplica instance from configuration object.
115
116 Parameters
117 ----------
118 config : `ApdbConfig`
119 Configuration object, type of this object determines type of the
120 ApdbReplica implementation.
121
122 Returns
123 -------
124 replica : `ApdbReplica`
125 Instance of `ApdbReplica` class.
126 """
127 return make_apdb_replica(config)
128
129 @classmethod
130 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica:
131 """Make ApdbReplica instance from a serialized configuration.
132
133 Parameters
134 ----------
135 uri : `~lsst.resources.ResourcePathExpression`
136 URI or local file path pointing to a file with serialized
137 configuration, or a string with a "label:" prefix. In the latter
138 case, the configuration will be looked up from an APDB index file
139 using the label name that follows the prefix. The APDB index file's
140 location is determined by the ``DAX_APDB_INDEX_URI`` environment
141 variable.
142
143 Returns
144 -------
145 replica : `ApdbReplica`
146 Instance of `ApdbReplica` class, the type of the returned instance
147 is determined by configuration.
148 """
149 if isinstance(uri, str) and uri.startswith("label:"):
150 tag, _, label = uri.partition(":")
151 index = ApdbIndex()
152 # Current format for config files is "pex_config"
153 format = "pex_config"
154 uri = index.get_apdb_uri(label, format)
155 path = ResourcePath(uri)
156 config_str = path.read().decode()
157 # Assume that this is ApdbConfig, make_apdb will raise if not.
158 config = cast(ApdbConfig, Config._fromPython(config_str))
159 return make_apdb_replica(config)
160
161 @classmethod
162 @abstractmethod
163 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
164 """Return version number for current ApdbReplica implementation.
165
166 Returns
167 -------
168 version : `VersionTuple`
169 Version of the code defined in implementation class.
170 """
171 raise NotImplementedError()
172
173 @abstractmethod
174 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
175 """Return collection of replication chunks known to the database.
176
177 Returns
178 -------
179 chunks : `list` [`ReplicaChunk`] or `None`
180 List of chunks, they may be time-ordered if database supports
181 ordering. `None` is returned if database is not configured for
182 replication.
183 """
184 raise NotImplementedError()
185
186 @abstractmethod
187 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
188 """Remove replication chunks from the database.
189
190 Parameters
191 ----------
192 chunks : `~collections.abc.Iterable` [`int`]
193 Chunk identifiers to remove.
194
195 Notes
196 -----
197 This method causes Apdb to forget about specified chunks. If there
198 are any auxiliary data associated with the identifiers, it is also
199 removed from database (but data in regular tables is not removed).
200 This method should be called after successful transfer of data from
201 APDB to PPDB to free space used by replicas.
202 """
203 raise NotImplementedError()
204
205 @abstractmethod
206 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
207 """Return catalog of DiaObject records from given replica chunks.
208
209 Parameters
210 ----------
211 chunks : `~collections.abc.Iterable` [`int`]
212 Chunk identifiers to return.
213
214 Returns
215 -------
216 data : `ApdbTableData`
217 Catalog containing DiaObject records. In addition to all regular
218 columns it will contain ``apdb_replica_chunk`` column.
219
220 Notes
221 -----
222 This part of API may not be very stable and can change before the
223 implementation finalizes.
224 """
225 raise NotImplementedError()
226
227 @abstractmethod
228 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
229 """Return catalog of DiaSource records from given replica chunks.
230
231 Parameters
232 ----------
233 chunks : `~collections.abc.Iterable` [`int`]
234 Chunk identifiers to return.
235
236 Returns
237 -------
238 data : `ApdbTableData`
239 Catalog containing DiaSource records. In addition to all regular
240 columns it will contain ``apdb_replica_chunk`` column.
241
242 Notes
243 -----
244 This part of API may not be very stable and can change before the
245 implementation finalizes.
246 """
247 raise NotImplementedError()
248
249 @abstractmethod
250 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
251 """Return catalog of DiaForcedSource records from given replica chunks.
252
253 Parameters
254 ----------
255 chunks : `~collections.abc.Iterable` [`int`]
256 Chunk identifiers to return.
257
258 Returns
259 -------
260 data : `ApdbTableData`
261 Catalog containing DiaForcedSource records. In addition to all
262 regular columns it will contain ``apdb_replica_chunk`` column.
263
264 Notes
265 -----
266 This part of API may not be very stable and can change before the
267 implementation finalizes.
268 """
269 raise NotImplementedError()
ApdbReplica from_config(cls, ApdbConfig config)
None deleteReplicaChunks(self, Iterable[int] chunks)
ApdbTableData getDiaObjectsChunks(self, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
VersionTuple apdbReplicaImplementationVersion(cls)
ApdbReplica from_uri(cls, ResourcePathExpression uri)
ApdbTableData getDiaSourcesChunks(self, Iterable[int] chunks)
ApdbTableData getDiaForcedSourcesChunks(self, Iterable[int] chunks)
ReplicaChunk make_replica_chunk(cls, astropy.time.Time last_update_time, int chunk_window_seconds)