Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+83433b07ee,g16d25e1f1b+23bc9e47ac,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+c74e83464e,g53246c7159+8c5ae1fdc5,g55585698de+1e04e59700,g56a49b3a55+92a7603e7a,g60b5630c4e+1e04e59700,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+1e04e59700,g989de1cb63+3fc8cb0b9e,g9d31334357+1e04e59700,g9f33ca652e+0a83e03614,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+8b4b9c8ed7,gb89ab40317+3fc8cb0b9e,gc0af124501+57ccba3ad1,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gd794735e4e+7be992507c,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+c7f52e58fe,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+747388abfa,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbReplica", "ReplicaChunk", "ApdbTableData"]
25
26import uuid
27from abc import ABC, abstractmethod
28from collections.abc import Collection, Iterable, Sequence
29from dataclasses import dataclass
30from typing import TYPE_CHECKING
31
32import astropy.time
33from lsst.resources import ResourcePathExpression
34
35from .apdb import ApdbConfig
36from .factory import make_apdb_replica
37
38if TYPE_CHECKING:
39 from .versionTuple import VersionTuple
40
41
42class ApdbTableData(ABC):
43 """Abstract class for representing table data."""
44
45 @abstractmethod
46 def column_names(self) -> Sequence[str]:
47 """Return ordered sequence of column names in the table.
48
49 Returns
50 -------
51 names : `~collections.abc.Sequence` [`str`]
52 Column names.
53 """
54 raise NotImplementedError()
55
56 @abstractmethod
57 def rows(self) -> Collection[tuple]:
58 """Return table rows, each row is a tuple of values.
59
60 Returns
61 -------
62 rows : `~collections.abc.Collection` [`tuple`]
63 Collection of tuples.
64 """
65 raise NotImplementedError()
66
67
68@dataclass(frozen=True)
70 """Class used for identification of replication chunks.
71
72 Instances of this class are used to identify the units of transfer from
73 APDB to PPDB. Usually single `ReplicaChunk` corresponds to multiple
74 consecutive calls to `Apdb.store` method.
75
76 Every ``store`` with the same ``id`` value will update ``unique_id`` with
77 some unique value so that it can be verified on PPDB side.
78 """
79
80 id: int
81 """A number identifying replication chunk (`int`)."""
82
83 last_update_time: astropy.time.Time
84 """Time of last insert for this chunk, usually corresponds to visit time
85 (`astropy.time.Time`).
86 """
87
88 unique_id: uuid.UUID
89 """Unique value updated on each new store (`uuid.UUID`)."""
90
91 @classmethod
93 cls, last_update_time: astropy.time.Time, chunk_window_seconds: int
94 ) -> ReplicaChunk:
95 """Generate new unique insert identifier."""
96 seconds = int(last_update_time.unix_tai)
97 seconds = (seconds // chunk_window_seconds) * chunk_window_seconds
98 unique_id = uuid.uuid4()
99 return ReplicaChunk(id=seconds, last_update_time=last_update_time, unique_id=unique_id)
100
101 def __str__(self) -> str:
102 class_name = self.__class__.__name__
103 time_str = str(self.last_update_time.tai.isot)
104 return f"{class_name}(id={self.id:10d}, last_update_time={time_str}/tai, unique_id={self.unique_id})"
105
106
107class ApdbReplica(ABC):
108 """Abstract interface for APDB replication methods."""
109
110 @classmethod
111 def from_config(cls, config: ApdbConfig) -> ApdbReplica:
112 """Create ApdbReplica instance from configuration object.
113
114 Parameters
115 ----------
116 config : `ApdbConfig`
117 Configuration object, type of this object determines type of the
118 ApdbReplica implementation.
119
120 Returns
121 -------
122 replica : `ApdbReplica`
123 Instance of `ApdbReplica` class.
124 """
125 return make_apdb_replica(config)
126
127 @classmethod
128 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica:
129 """Make ApdbReplica instance from a serialized configuration.
130
131 Parameters
132 ----------
133 uri : `~lsst.resources.ResourcePathExpression`
134 URI or local file path pointing to a file with serialized
135 configuration, or a string with a "label:" prefix. In the latter
136 case, the configuration will be looked up from an APDB index file
137 using the label name that follows the prefix. The APDB index file's
138 location is determined by the ``DAX_APDB_INDEX_URI`` environment
139 variable.
140
141 Returns
142 -------
143 replica : `ApdbReplica`
144 Instance of `ApdbReplica` class, the type of the returned instance
145 is determined by configuration.
146 """
147 config = ApdbConfig.from_uri(uri)
148 return make_apdb_replica(config)
149
150 @classmethod
151 @abstractmethod
152 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
153 """Return version number for current ApdbReplica implementation.
154
155 Returns
156 -------
157 version : `VersionTuple`
158 Version of the code defined in implementation class.
159 """
160 raise NotImplementedError()
161
162 @abstractmethod
163 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
164 """Return collection of replication chunks known to the database.
165
166 Returns
167 -------
168 chunks : `list` [`ReplicaChunk`] or `None`
169 List of chunks, they may be time-ordered if database supports
170 ordering. `None` is returned if database is not configured for
171 replication.
172 """
173 raise NotImplementedError()
174
175 @abstractmethod
176 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
177 """Remove replication chunks from the database.
178
179 Parameters
180 ----------
181 chunks : `~collections.abc.Iterable` [`int`]
182 Chunk identifiers to remove.
183
184 Notes
185 -----
186 This method causes Apdb to forget about specified chunks. If there
187 are any auxiliary data associated with the identifiers, it is also
188 removed from database (but data in regular tables is not removed).
189 This method should be called after successful transfer of data from
190 APDB to PPDB to free space used by replicas.
191 """
192 raise NotImplementedError()
193
194 @abstractmethod
195 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
196 """Return catalog of DiaObject records from given replica chunks.
197
198 Parameters
199 ----------
200 chunks : `~collections.abc.Iterable` [`int`]
201 Chunk identifiers to return.
202
203 Returns
204 -------
205 data : `ApdbTableData`
206 Catalog containing DiaObject records. In addition to all regular
207 columns it will contain ``apdb_replica_chunk`` column.
208
209 Notes
210 -----
211 This part of API may not be very stable and can change before the
212 implementation finalizes.
213 """
214 raise NotImplementedError()
215
216 @abstractmethod
217 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
218 """Return catalog of DiaSource records from given replica chunks.
219
220 Parameters
221 ----------
222 chunks : `~collections.abc.Iterable` [`int`]
223 Chunk identifiers to return.
224
225 Returns
226 -------
227 data : `ApdbTableData`
228 Catalog containing DiaSource records. In addition to all regular
229 columns it will contain ``apdb_replica_chunk`` column.
230
231 Notes
232 -----
233 This part of API may not be very stable and can change before the
234 implementation finalizes.
235 """
236 raise NotImplementedError()
237
238 @abstractmethod
239 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
240 """Return catalog of DiaForcedSource records from given replica chunks.
241
242 Parameters
243 ----------
244 chunks : `~collections.abc.Iterable` [`int`]
245 Chunk identifiers to return.
246
247 Returns
248 -------
249 data : `ApdbTableData`
250 Catalog containing DiaForcedSource records. In addition to all
251 regular columns it will contain ``apdb_replica_chunk`` column.
252
253 Notes
254 -----
255 This part of API may not be very stable and can change before the
256 implementation finalizes.
257 """
258 raise NotImplementedError()
ApdbReplica from_config(cls, ApdbConfig config)
None deleteReplicaChunks(self, Iterable[int] chunks)
ApdbTableData getDiaObjectsChunks(self, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
VersionTuple apdbReplicaImplementationVersion(cls)
ApdbReplica from_uri(cls, ResourcePathExpression uri)
ApdbTableData getDiaSourcesChunks(self, Iterable[int] chunks)
ApdbTableData getDiaForcedSourcesChunks(self, Iterable[int] chunks)
ReplicaChunk make_replica_chunk(cls, astropy.time.Time last_update_time, int chunk_window_seconds)