Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+b562e0a09f,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g33b6eb7922+23bc9e47ac,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+e64e5ff751,g53246c7159+8c5ae1fdc5,g55585698de+be1c65ba71,g56a49b3a55+92a7603e7a,g60b5630c4e+be1c65ba71,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+be1c65ba71,g989de1cb63+3fc8cb0b9e,g9d31334357+be1c65ba71,g9f33ca652e+69d6bbdd4b,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+b0077987df,gb89ab40317+3fc8cb0b9e,gc91f06edcd+2e2ca305f6,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+b038c5c67d,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,geb5476ad96+a886b35a30,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+3164b09b60,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbCassandraReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraReplica"]
25
26import logging
27from collections.abc import Iterable, Mapping
28from typing import TYPE_CHECKING, Any, cast
29
30import astropy.time
31
32from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
33from ..apdbSchema import ApdbTables
34from ..monitor import MonAgent
35from ..timer import Timer
36from ..versionTuple import VersionTuple
37from .apdbCassandraSchema import ApdbCassandraSchema, ExtraTables
38from .cassandra_utils import (
39 ApdbCassandraTableData,
40 PreparedStatementCache,
41 execute_concurrent,
42 select_concurrent,
43)
44
45if TYPE_CHECKING:
46 from .apdbCassandra import ApdbCassandra
47
48_LOG = logging.getLogger(__name__)
49
50_MON = MonAgent(__name__)
51
52VERSION = VersionTuple(1, 1, 0)
53"""Version for the code controlling replication tables. This needs to be
54updated following compatibility rules when schema produced by this code
55changes.
56"""
57
58
60 """Implementation of `ApdbReplica` for Cassandra backend.
61
62 Parameters
63 ----------
64 apdb : `ApdbCassandra`
65 Instance of ApbdCassandra for database.
66 schema : `ApdbCassandraSchema`
67 Instance of ApdbCassandraSchema for database.
68 session
69 Instance of cassandra session type.
70 """
71
72 def __init__(self, apdb: ApdbCassandra, schema: ApdbCassandraSchema, session: Any):
73 # Note that ApdbCassandra instance must stay alive while this object
74 # exists, so we keep reference to it.
75 self._apdb = apdb
76 self._schema = schema
77 self._session = session
78 self._config = apdb.config
79
80 # Cache for prepared statements
82
83 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
84 """Create `Timer` instance given its name."""
85 return Timer(name, _MON, tags=tags)
86
87 @classmethod
88 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
89 # Docstring inherited from base class.
90 return VERSION
91
92 @classmethod
93 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool:
94 """Return True if replica chunk tables have sub-partitions."""
95 return version >= VersionTuple(1, 1, 0)
96
97 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
98 # docstring is inherited from a base class
99 if not self._schema.replication_enabled:
100 return None
101
102 # everything goes into a single partition
103 partition = 0
104
105 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
106 # We want to avoid timezone mess so return timestamps as milliseconds.
107 query = (
108 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
109 f'FROM "{self._config.keyspace}"."{table_name}" WHERE partition = %s'
110 )
111
112 with self._timer("chunks_select_time") as timer:
113 result = self._session.execute(
114 query,
115 (partition,),
116 timeout=self._config.connection_config.read_timeout,
117 execution_profile="read_tuples",
118 )
119 # order by last_update_time
120 rows = sorted(result)
121 timer.add_values(row_count=len(rows))
122 return [
124 id=row[1],
125 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
126 unique_id=row[2],
127 )
128 for row in rows
129 ]
130
131 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
132 # docstring is inherited from a base class
133 if not self._schema.replication_enabled:
134 raise ValueError("APDB is not configured for replication")
135
136 # everything goes into a single partition
137 partition = 0
138
139 # Iterable can be single pass, make everything that we need from it
140 # in a single loop.
141 repl_table_params = []
142 chunk_table_params: list[tuple] = []
143 for chunk in chunks:
144 repl_table_params.append((partition, chunk))
145 if self._schema.has_chunk_sub_partitions:
146 for subchunk in range(self._config.replica_sub_chunk_count):
147 chunk_table_params.append((chunk, subchunk))
148 else:
149 chunk_table_params.append((chunk,))
150 # Anything to do att all?
151 if not repl_table_params:
152 return
153
154 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
155 query = (
156 f'DELETE FROM "{self._config.keyspace}"."{table_name}" '
157 f"WHERE partition = ? AND apdb_replica_chunk = ?"
158 )
159 statement = self._preparer.prepare(query)
160
161 queries = [(statement, param) for param in repl_table_params]
162 with self._timer("chunks_delete_time") as timer:
163 execute_concurrent(self._session, queries)
164 timer.add_values(row_count=len(queries))
165
166 # Also remove those chunk_ids from Dia*Chunks tables.
167 tables = list(ExtraTables.replica_chunk_tables(self._schema.has_chunk_sub_partitions).values())
168 for table in tables:
169 table_name = self._schema.tableName(table)
170 query = f'DELETE FROM "{self._config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
171 if self._schema.has_chunk_sub_partitions:
172 query += " AND apdb_replica_subchunk = ?"
173 statement = self._preparer.prepare(query)
174
175 queries = [(statement, param) for param in chunk_table_params]
176 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer:
177 execute_concurrent(self._session, queries)
178 timer.add_values(row_count=len(queries))
179
180 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
181 # docstring is inherited from a base class
182 return self._get_chunks(ApdbTables.DiaObject, chunks)
183
184 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
185 # docstring is inherited from a base class
186 return self._get_chunks(ApdbTables.DiaSource, chunks)
187
188 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
189 # docstring is inherited from a base class
190 return self._get_chunks(ApdbTables.DiaForcedSource, chunks)
191
192 def _get_chunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
193 """Return records from a particular table given set of insert IDs."""
194 if not self._schema.replication_enabled:
195 raise ValueError("APDB is not configured for replication")
196
197 # We need to iterate few times.
198 chunks = list(chunks)
199
200 # If schema was migrated then a chunk can appear in either old or new
201 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table
202 # has a column which will be set to true for new table.
203 has_chunk_sub_partitions: dict[int, bool] = {}
204 if self._schema.has_chunk_sub_partitions:
205 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
206 chunks_str = ",".join(str(chunk_id) for chunk_id in chunks)
207 query = (
208 f'SELECT apdb_replica_chunk, has_subchunks FROM "{self._config.keyspace}"."{table_name}" '
209 f"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
210 )
211 partition = 0
212 result = self._session.execute(
213 query,
214 (partition,),
215 timeout=self._config.connection_config.read_timeout,
216 execution_profile="read_tuples",
217 )
218 has_chunk_sub_partitions = {chunk_id: has_subchunk for chunk_id, has_subchunk in result}
219 else:
220 has_chunk_sub_partitions = {chunk_id: False for chunk_id in chunks}
221
222 # Check what kind of tables we want to query, if chunk list is empty
223 # then use tbales which should exist in the schema.
224 if has_chunk_sub_partitions:
225 have_subchunks = any(has_chunk_sub_partitions.values())
226 have_non_subchunks = not all(has_chunk_sub_partitions.values())
227 else:
228 have_subchunks = self._schema.has_chunk_sub_partitions
229 have_non_subchunks = not have_subchunks
230
231 # NOTE: if an existing database is migrated and has both types of chunk
232 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible
233 # that the same chunk can appear in both tables. In reality schema
234 # migration should only happen during the downtime, so there will be
235 # suffient gap and a different chunk ID will be used for new chunks.
236
237 table_data: ApdbCassandraTableData | None = None
238 table_data_subchunk: ApdbCassandraTableData | None = None
239
240 table_name = self._schema.tableName(ExtraTables.replica_chunk_tables(False)[table])
241 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer:
242 if have_subchunks:
243 replica_table = ExtraTables.replica_chunk_tables(True)[table]
244 table_name = self._schema.tableName(replica_table)
245 query = (
246 f'SELECT * FROM "{self._config.keyspace}"."{table_name}" '
247 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
248 )
249 statement = self._preparer.prepare(query)
250
251 queries: list[tuple] = []
252 for chunk in chunks:
253 if has_chunk_sub_partitions.get(chunk, False):
254 for subchunk in range(self._config.replica_sub_chunk_count):
255 queries.append((statement, (chunk, subchunk)))
256 if not queries and not have_non_subchunks:
257 # Add a dummy query to return correct set of columns.
258 queries.append((statement, (-1, -1)))
259
260 if queries:
261 table_data_subchunk = cast(
262 ApdbCassandraTableData,
263 select_concurrent(
264 self._session,
265 queries,
266 "read_raw_multi",
267 self._config.connection_config.read_concurrency,
268 ),
269 )
270
271 if have_non_subchunks:
272 replica_table = ExtraTables.replica_chunk_tables(False)[table]
273 table_name = self._schema.tableName(replica_table)
274 query = f'SELECT * FROM "{self._config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
275 statement = self._preparer.prepare(query)
276
277 queries = []
278 for chunk in chunks:
279 if not has_chunk_sub_partitions.get(chunk, True):
280 queries.append((statement, (chunk,)))
281 if not queries and not table_data_subchunk:
282 # Add a dummy query to return correct set of columns.
283 queries.append((statement, (-1,)))
284
285 if queries:
286 table_data = cast(
287 ApdbCassandraTableData,
288 select_concurrent(
289 self._session,
290 queries,
291 "read_raw_multi",
292 self._config.connection_config.read_concurrency,
293 ),
294 )
295
296 # Merge if both are non-empty.
297 if table_data and table_data_subchunk:
298 table_data_subchunk.project(drop=["apdb_replica_subchunk"])
299 table_data.append(table_data_subchunk)
300 elif table_data_subchunk:
301 table_data = table_data_subchunk
302 elif not table_data:
303 raise AssertionError("above logic is incorrect")
304
305 timer.add_values(row_count=len(table_data.rows()))
306
307 return table_data
__init__(self, ApdbCassandra apdb, ApdbCassandraSchema schema, Any session)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
ApdbTableData _get_chunks(self, ApdbTables table, Iterable[int] chunks)