LSST Applications g013ef56533+b8d55c8942,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+7a3b874d60,g210f2d0738+7416ca6900,g262e1987ae+1d557ba9a3,g29ae962dfc+519d34895e,g2cef7863aa+aef1011c0b,g30d7c61c20+36d16ea71a,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+cb326ad149,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+6240c63dbc,g64539dfbff+7416ca6900,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g74acd417e5+0bae3c876a,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+dee7680868,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+14b8164b5b,g98a1a72a9c+8389601a76,g98df359435+fff771c62d,gb8cb2b794d+6728931916,gbf99507273+8c5ae1fdc5,gc2a301910b+7416ca6900,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+0bae3c876a,ge410e46f29+f459a6810c,ge41e95a9f2+7416ca6900,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbReplica", "ApdbTableData", "ReplicaChunk"]
25
26import uuid
27from abc import ABC, abstractmethod
28from collections.abc import Collection, Iterable, Sequence
29from dataclasses import dataclass
30from typing import TYPE_CHECKING
31
32import astropy.time
33import felis.datamodel
34
35from lsst.resources import ResourcePathExpression
36
37from .apdb import ApdbConfig, ApdbTables
38from .factory import make_apdb_replica
39
40if TYPE_CHECKING:
41 from .apdbUpdateRecord import ApdbUpdateRecord
42 from .versionTuple import VersionTuple
43
44
45class ApdbTableData(ABC):
46 """Abstract class for representing table data."""
47
48 @abstractmethod
49 def column_names(self) -> Sequence[str]:
50 """Return ordered sequence of column names in the table.
51
52 Returns
53 -------
54 names : `~collections.abc.Sequence` [`str`]
55 Column names.
56 """
57 raise NotImplementedError()
58
59 @abstractmethod
60 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
61 """Return ordered sequence of column names and their types.
62
63 Returns
64 -------
65 columns : `~collections.abc.Sequence` \
66 [`tuple`[`str`, `felis.datamodel.DataType`]]
67 Sequence of 2-tuples, each tuple consists of column name and its
68 type.
69 """
70 raise NotImplementedError()
71
72 @abstractmethod
73 def rows(self) -> Collection[tuple]:
74 """Return table rows, each row is a tuple of values.
75
76 Returns
77 -------
78 rows : `~collections.abc.Collection` [`tuple`]
79 Collection of tuples.
80 """
81 raise NotImplementedError()
82
83
84@dataclass(frozen=True)
86 """Class used for identification of replication chunks.
87
88 Instances of this class are used to identify the units of transfer from
89 APDB to PPDB. Usually single `ReplicaChunk` corresponds to multiple
90 consecutive calls to `Apdb.store` method.
91
92 Every ``store`` with the same ``id`` value will update ``unique_id`` with
93 some unique value so that it can be verified on PPDB side.
94 """
95
96 id: int
97 """A number identifying replication chunk (`int`)."""
98
99 last_update_time: astropy.time.Time
100 """Time of last insert for this chunk, usually corresponds to visit time
101 (`astropy.time.Time`).
102 """
103
104 unique_id: uuid.UUID
105 """Unique value updated on each new store (`uuid.UUID`)."""
106
107 @classmethod
109 cls, last_update_time: astropy.time.Time, chunk_window_seconds: int
110 ) -> ReplicaChunk:
111 """Generate new unique insert identifier."""
112 seconds = int(last_update_time.unix_tai)
113 seconds = (seconds // chunk_window_seconds) * chunk_window_seconds
114 unique_id = uuid.uuid4()
115 return ReplicaChunk(id=seconds, last_update_time=last_update_time, unique_id=unique_id)
116
117 def __str__(self) -> str:
118 class_name = self.__class__.__name__
119 time_str = str(self.last_update_time.tai.isot)
120 return f"{class_name}(id={self.id:10d}, last_update_time={time_str}/tai, unique_id={self.unique_id})"
121
122
123class ApdbReplica(ABC):
124 """Abstract interface for APDB replication methods."""
125
126 @classmethod
127 def from_config(cls, config: ApdbConfig) -> ApdbReplica:
128 """Create ApdbReplica instance from configuration object.
129
130 Parameters
131 ----------
132 config : `ApdbConfig`
133 Configuration object, type of this object determines type of the
134 ApdbReplica implementation.
135
136 Returns
137 -------
138 replica : `ApdbReplica`
139 Instance of `ApdbReplica` class.
140 """
141 return make_apdb_replica(config)
142
143 @classmethod
144 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica:
145 """Make ApdbReplica instance from a serialized configuration.
146
147 Parameters
148 ----------
149 uri : `~lsst.resources.ResourcePathExpression`
150 URI or local file path pointing to a file with serialized
151 configuration, or a string with a "label:" prefix. In the latter
152 case, the configuration will be looked up from an APDB index file
153 using the label name that follows the prefix. The APDB index file's
154 location is determined by the ``DAX_APDB_INDEX_URI`` environment
155 variable.
156
157 Returns
158 -------
159 replica : `ApdbReplica`
160 Instance of `ApdbReplica` class, the type of the returned instance
161 is determined by configuration.
162 """
163 config = ApdbConfig.from_uri(uri)
164 return make_apdb_replica(config)
165
166 @classmethod
167 @abstractmethod
168 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
169 """Return version number for current ApdbReplica implementation.
170
171 Returns
172 -------
173 version : `VersionTuple`
174 Version of the code defined in implementation class.
175 """
176 raise NotImplementedError()
177
178 @abstractmethod
179 def schemaVersion(self) -> VersionTuple:
180 """Return version number of the database schema.
181
182 Returns
183 -------
184 version : `VersionTuple`
185 Version of the database schema.
186 """
187 raise NotImplementedError()
188
189 @abstractmethod
190 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
191 """Return collection of replication chunks known to the database.
192
193 Returns
194 -------
195 chunks : `list` [`ReplicaChunk`] or `None`
196 List of chunks, they may be time-ordered if database supports
197 ordering. `None` is returned if database is not configured for
198 replication.
199 """
200 raise NotImplementedError()
201
202 @abstractmethod
203 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
204 """Remove replication chunks from the database.
205
206 Parameters
207 ----------
208 chunks : `~collections.abc.Iterable` [`int`]
209 Chunk identifiers to remove.
210
211 Notes
212 -----
213 This method causes Apdb to forget about specified chunks. If there
214 are any auxiliary data associated with the identifiers, it is also
215 removed from database (but data in regular tables is not removed).
216 This method should be called after successful transfer of data from
217 APDB to PPDB to free space used by replicas.
218 """
219 raise NotImplementedError()
220
221 @abstractmethod
222 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
223 """Return catalog of new records for a table from given replica chunks.
224
225 Parameters
226 ----------
227 table : `ApdbTables`
228 Table for which to return the data. Acceptable tables are
229 `ApdbTables.DiaObject`, `ApdbTables.DiaSource`, and
230 `ApdbTables.DiaForcedSource`.
231 chunks : `~collections.abc.Iterable` [`int`]
232 Chunk identifiers to return.
233
234 Returns
235 -------
236 data : `ApdbTableData`
237 Catalog containing table records. In addition to all regular
238 columns it will contain ``apdb_replica_chunk`` column.
239
240 Notes
241 -----
242 This method returns new records that have been added to the table by
243 `Apdb.store()` method. Updates to the records that happen at later time
244 are available from `getTableUpdateChunks` method.
245
246 This part of API may not be very stable and can change before the
247 implementation finalizes.
248 """
249 raise NotImplementedError()
250
251 @abstractmethod
252 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
253 """Return the list of record updates from given replica chunks.
254
255 Parameters
256 ----------
257 chunks : `~collections.abc.Iterable` [`int`]
258 Chunk identifiers to return.
259
260 Returns
261 -------
262 records : `~collections.abc.Sequence` [`ApdbUpdateRecord`]
263 Collection of update records. Records will be sorted according
264 their update time and update order.
265 """
266 raise NotImplementedError()
ApdbReplica from_config(cls, ApdbConfig config)
None deleteReplicaChunks(self, Iterable[int] chunks)
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
VersionTuple apdbReplicaImplementationVersion(cls)
ApdbReplica from_uri(cls, ResourcePathExpression uri)
Sequence[ApdbUpdateRecord] getUpdateRecordChunks(self, Iterable[int] chunks)
Sequence[tuple[str, felis.datamodel.DataType]] column_defs(self)
ReplicaChunk make_replica_chunk(cls, astropy.time.Time last_update_time, int chunk_window_seconds)