LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbReplica", "ApdbTableData", "ReplicaChunk"]
25
26import uuid
27from abc import ABC, abstractmethod
28from collections.abc import Collection, Iterable, Sequence
29from dataclasses import dataclass
30from typing import TYPE_CHECKING
31
32import astropy.time
33import felis.datamodel
34
35from lsst.resources import ResourcePathExpression
36
37from .apdb import ApdbConfig, ApdbTables
38from .factory import make_apdb_replica
39
40if TYPE_CHECKING:
41 from .versionTuple import VersionTuple
42
43
44class ApdbTableData(ABC):
45 """Abstract class for representing table data."""
46
47 @abstractmethod
48 def column_names(self) -> Sequence[str]:
49 """Return ordered sequence of column names in the table.
50
51 Returns
52 -------
53 names : `~collections.abc.Sequence` [`str`]
54 Column names.
55 """
56 raise NotImplementedError()
57
58 @abstractmethod
59 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
60 """Return ordered sequence of column names and their types.
61
62 Returns
63 -------
64 columns : `~collections.abc.Sequence` \
65 [`tuple`[`str`, `felis.datamodel.DataType`]]
66 Sequence of 2-tuples, each tuple consists of column name and its
67 type.
68 """
69 raise NotImplementedError()
70
71 @abstractmethod
72 def rows(self) -> Collection[tuple]:
73 """Return table rows, each row is a tuple of values.
74
75 Returns
76 -------
77 rows : `~collections.abc.Collection` [`tuple`]
78 Collection of tuples.
79 """
80 raise NotImplementedError()
81
82
83@dataclass(frozen=True)
85 """Class used for identification of replication chunks.
86
87 Instances of this class are used to identify the units of transfer from
88 APDB to PPDB. Usually single `ReplicaChunk` corresponds to multiple
89 consecutive calls to `Apdb.store` method.
90
91 Every ``store`` with the same ``id`` value will update ``unique_id`` with
92 some unique value so that it can be verified on PPDB side.
93 """
94
95 id: int
96 """A number identifying replication chunk (`int`)."""
97
98 last_update_time: astropy.time.Time
99 """Time of last insert for this chunk, usually corresponds to visit time
100 (`astropy.time.Time`).
101 """
102
103 unique_id: uuid.UUID
104 """Unique value updated on each new store (`uuid.UUID`)."""
105
106 @classmethod
108 cls, last_update_time: astropy.time.Time, chunk_window_seconds: int
109 ) -> ReplicaChunk:
110 """Generate new unique insert identifier."""
111 seconds = int(last_update_time.unix_tai)
112 seconds = (seconds // chunk_window_seconds) * chunk_window_seconds
113 unique_id = uuid.uuid4()
114 return ReplicaChunk(id=seconds, last_update_time=last_update_time, unique_id=unique_id)
115
116 def __str__(self) -> str:
117 class_name = self.__class__.__name__
118 time_str = str(self.last_update_time.tai.isot)
119 return f"{class_name}(id={self.id:10d}, last_update_time={time_str}/tai, unique_id={self.unique_id})"
120
121
122class ApdbReplica(ABC):
123 """Abstract interface for APDB replication methods."""
124
125 @classmethod
126 def from_config(cls, config: ApdbConfig) -> ApdbReplica:
127 """Create ApdbReplica instance from configuration object.
128
129 Parameters
130 ----------
131 config : `ApdbConfig`
132 Configuration object, type of this object determines type of the
133 ApdbReplica implementation.
134
135 Returns
136 -------
137 replica : `ApdbReplica`
138 Instance of `ApdbReplica` class.
139 """
140 return make_apdb_replica(config)
141
142 @classmethod
143 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica:
144 """Make ApdbReplica instance from a serialized configuration.
145
146 Parameters
147 ----------
148 uri : `~lsst.resources.ResourcePathExpression`
149 URI or local file path pointing to a file with serialized
150 configuration, or a string with a "label:" prefix. In the latter
151 case, the configuration will be looked up from an APDB index file
152 using the label name that follows the prefix. The APDB index file's
153 location is determined by the ``DAX_APDB_INDEX_URI`` environment
154 variable.
155
156 Returns
157 -------
158 replica : `ApdbReplica`
159 Instance of `ApdbReplica` class, the type of the returned instance
160 is determined by configuration.
161 """
162 config = ApdbConfig.from_uri(uri)
163 return make_apdb_replica(config)
164
165 @classmethod
166 @abstractmethod
167 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
168 """Return version number for current ApdbReplica implementation.
169
170 Returns
171 -------
172 version : `VersionTuple`
173 Version of the code defined in implementation class.
174 """
175 raise NotImplementedError()
176
177 @abstractmethod
178 def schemaVersion(self) -> VersionTuple:
179 """Return version number of the database schema.
180
181 Returns
182 -------
183 version : `VersionTuple`
184 Version of the database schema.
185 """
186 raise NotImplementedError()
187
188 @abstractmethod
189 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
190 """Return collection of replication chunks known to the database.
191
192 Returns
193 -------
194 chunks : `list` [`ReplicaChunk`] or `None`
195 List of chunks, they may be time-ordered if database supports
196 ordering. `None` is returned if database is not configured for
197 replication.
198 """
199 raise NotImplementedError()
200
201 @abstractmethod
202 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
203 """Remove replication chunks from the database.
204
205 Parameters
206 ----------
207 chunks : `~collections.abc.Iterable` [`int`]
208 Chunk identifiers to remove.
209
210 Notes
211 -----
212 This method causes Apdb to forget about specified chunks. If there
213 are any auxiliary data associated with the identifiers, it is also
214 removed from database (but data in regular tables is not removed).
215 This method should be called after successful transfer of data from
216 APDB to PPDB to free space used by replicas.
217 """
218 raise NotImplementedError()
219
220 @abstractmethod
221 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
222 """Return catalog of new records for a table from given replica chunks.
223
224 Parameters
225 ----------
226 table : `ApdbTables`
227 Table for which to return the data. Acceptable tables are
228 `ApdbTables.DiaObject`, `ApdbTables.DiaSource`, and
229 `ApdbTables.DiaForcedSource`.
230 chunks : `~collections.abc.Iterable` [`int`]
231 Chunk identifiers to return.
232
233 Returns
234 -------
235 data : `ApdbTableData`
236 Catalog containing table records. In addition to all regular
237 columns it will contain ``apdb_replica_chunk`` column.
238
239 Notes
240 -----
241 This method returns new records that have been added to the table by
242 `Apdb.store()` method. Updates to the records that happen at later time
243 are available from `getTableUpdateChunks` method.
244
245 This part of API may not be very stable and can change before the
246 implementation finalizes.
247 """
248 raise NotImplementedError()
249
250 @abstractmethod
251 def getTableUpdateChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
252 """Return the list of record updates for a table from given replica
253 chunks.
254
255 Parameters
256 ----------
257 table : `ApdbTables`
258 Table for which to return the updates. Acceptable tables are
259 `ApdbTables.DiaObject`, `ApdbTables.DiaSource`, and
260 `ApdbTables.DiaForcedSource`.
261 chunks : `~collections.abc.Iterable` [`int`]
262 Chunk identifiers to return.
263
264 Returns
265 -------
266 data : `ApdbTableData`
267 Catalog containing table updates.
268
269 Notes
270 -----
271 The returned catalog will include all primary key columns,
272 ``timestamp`` column, ``apdb_replica_chunk`` column, and
273 ``update_action`` column containing a string with JSON-serialized
274 description of the update.
275
276 This part of API may not be very stable and can change before the
277 implementation finalizes.
278 """
279 raise NotImplementedError()
ApdbReplica from_config(cls, ApdbConfig config)
None deleteReplicaChunks(self, Iterable[int] chunks)
ApdbTableData getTableUpdateChunks(self, ApdbTables table, Iterable[int] chunks)
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
VersionTuple apdbReplicaImplementationVersion(cls)
ApdbReplica from_uri(cls, ResourcePathExpression uri)
Sequence[tuple[str, felis.datamodel.DataType]] column_defs(self)
ReplicaChunk make_replica_chunk(cls, astropy.time.Time last_update_time, int chunk_window_seconds)