LSST Applications g013ef56533+63812263fb,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+fde7a7a78c,g210f2d0738+db0c280453,g262e1987ae+abed931625,g29ae962dfc+058d1915d8,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+64337f1634,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+d9e514a434,g64539dfbff+db0c280453,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g7382096ae9+36d16ea71a,g74acd417e5+c70e70fbf6,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+1b779678e3,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+9583a964dd,g98a1a72a9c+028271c396,g98df359435+530b675b85,gb8cb2b794d+4e54f68785,gbf99507273+8c5ae1fdc5,gc2a301910b+db0c280453,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+c70e70fbf6,ge410e46f29+f459a6810c,ge41e95a9f2+db0c280453,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraReplica"]
25
26import logging
27from collections.abc import Iterable, Mapping, Sequence
28from typing import TYPE_CHECKING, cast
29
30import astropy.time
31import felis.datamodel
32
33from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
34from ..apdbSchema import ApdbTables
35from ..apdbUpdateRecord import ApdbUpdateRecord
36from ..monitor import MonAgent
37from ..schema_model import ExtraDataTypes
38from ..timer import Timer
39from ..versionTuple import VersionTuple
40from .apdbCassandraSchema import ExtraTables
41from .cassandra_utils import (
42 ApdbCassandraTableData,
43 execute_concurrent,
44 select_concurrent,
45)
46
47if TYPE_CHECKING:
48 from .apdbCassandra import ApdbCassandra
49
50_LOG = logging.getLogger(__name__)
51
52_MON = MonAgent(__name__)
53
54VERSION = VersionTuple(1, 1, 1)
55"""Version for the code controlling replication tables. This needs to be
56updated following compatibility rules when schema produced by this code
57changes.
58"""
59
60
62 """Implementation of `ApdbReplica` for Cassandra backend.
63
64 Parameters
65 ----------
66 apdb : `ApdbCassandra`
67 Instance of ApbdCassandra for database.
68 """
69
70 def __init__(self, apdb: ApdbCassandra):
71 # Note that ApdbCassandra instance must stay alive while this object
72 # exists, so we keep reference to it.
73 self._apdb = apdb
74
75 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
76 """Create `Timer` instance given its name."""
77 return Timer(name, _MON, tags=tags)
78
79 def schemaVersion(self) -> VersionTuple:
80 # Docstring inherited from base class.
81 context = self._apdb._context
82 return context.db_versions.schema_version
83
84 @classmethod
85 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
86 # Docstring inherited from base class.
87 return VERSION
88
89 @classmethod
90 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool:
91 """Return True if replica chunk tables have sub-partitions."""
92 return version >= VersionTuple(1, 1, 0)
93
94 @classmethod
95 def hasUpdateRecordChunks(cls, version: VersionTuple) -> bool:
96 """Return True if ApdbUpdateRecordChunks should exists."""
97 return version >= VersionTuple(1, 1, 1)
98
99 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
100 # docstring is inherited from a base class
101 context = self._apdb._context
102 config = context.config
103
104 if not context.schema.replication_enabled:
105 return None
106
107 # everything goes into a single partition
108 partition = 0
109
110 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
111 # We want to avoid timezone mess so return timestamps as milliseconds.
112 query = (
113 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
114 f'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s'
115 )
116
117 with self._timer("chunks_select_time") as timer:
118 result = context.session.execute(
119 query,
120 (partition,),
121 timeout=config.connection_config.read_timeout,
122 execution_profile="read_tuples",
123 )
124 # order by last_update_time
125 rows = sorted(result)
126 timer.add_values(row_count=len(rows))
127 return [
129 id=row[1],
130 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
131 unique_id=row[2],
132 )
133 for row in rows
134 ]
135
136 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
137 # docstring is inherited from a base class
138 context = self._apdb._context
139 config = context.config
140
141 if not context.schema.replication_enabled:
142 raise ValueError("APDB is not configured for replication")
143
144 # everything goes into a single partition
145 partition = 0
146
147 # Iterable can be single pass, make everything that we need from it
148 # in a single loop.
149 repl_table_params = []
150 chunk_table_params: list[tuple] = []
151 for chunk in chunks:
152 repl_table_params.append((partition, chunk))
153 if context.has_chunk_sub_partitions:
154 for subchunk in range(config.replica_sub_chunk_count):
155 chunk_table_params.append((chunk, subchunk))
156 else:
157 chunk_table_params.append((chunk,))
158 # Anything to do att all?
159 if not repl_table_params:
160 return
161
162 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
163 query = (
164 f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?'
165 )
166 statement = context.preparer.prepare(query)
167
168 queries = [(statement, param) for param in repl_table_params]
169 with self._timer("chunks_delete_time") as timer:
170 execute_concurrent(context.session, queries)
171 timer.add_values(row_count=len(queries))
172
173 # Also remove those chunk_ids from Dia*Chunks tables.
174 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values())
175 if context.has_update_record_chunks_table:
176 tables.append(ExtraTables.ApdbUpdateRecordChunks)
177 for table in tables:
178 table_name = context.schema.tableName(table)
179 query = f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
180 if context.has_chunk_sub_partitions:
181 query += " AND apdb_replica_subchunk = ?"
182 statement = context.preparer.prepare(query)
183
184 queries = [(statement, param) for param in chunk_table_params]
185 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer:
186 execute_concurrent(context.session, queries)
187 timer.add_values(row_count=len(queries))
188
189 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
190 # docstring is inherited from a base class
191 context = self._apdb._context
192 config = context.config
193
194 if not context.schema.replication_enabled:
195 raise ValueError("APDB is not configured for replication")
196 if table not in ExtraTables.replica_chunk_tables(False):
197 raise ValueError(f"Table {table} does not support replica chunks.")
198
199 # We need to iterate few times.
200 chunks = list(chunks)
201
202 # If schema was migrated then a chunk can appear in either old or new
203 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table
204 # has a column which will be set to true for new table.
205 has_chunk_sub_partitions: dict[int, bool] = {}
206 if context.has_chunk_sub_partitions:
207 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
208 chunks_str = ",".join(str(chunk_id) for chunk_id in chunks)
209 query = (
210 f'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" '
211 f"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
212 )
213 partition = 0
214 result = context.session.execute(
215 query,
216 (partition,),
217 timeout=config.connection_config.read_timeout,
218 execution_profile="read_tuples",
219 )
220 has_chunk_sub_partitions = dict(result)
221 else:
222 has_chunk_sub_partitions = dict.fromkeys(chunks, False)
223
224 # Check what kind of tables we want to query, if chunk list is empty
225 # then use tables which should exist in the schema.
226 if has_chunk_sub_partitions:
227 have_subchunks = any(has_chunk_sub_partitions.values())
228 have_non_subchunks = not all(has_chunk_sub_partitions.values())
229 else:
230 have_subchunks = context.has_chunk_sub_partitions
231 have_non_subchunks = not have_subchunks
232
233 # NOTE: if an existing database is migrated and has both types of chunk
234 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible
235 # that the same chunk can appear in both tables. In reality schema
236 # migration should only happen during the downtime, so there will be
237 # suffient gap and a different chunk ID will be used for new chunks.
238
239 table_data: ApdbCassandraTableData | None = None
240 table_data_subchunk: ApdbCassandraTableData | None = None
241
242 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table])
243 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer:
244 if have_subchunks:
245 replica_table = ExtraTables.replica_chunk_tables(True)[table]
246 table_name = context.schema.tableName(replica_table)
247 query = (
248 f'SELECT * FROM "{config.keyspace}"."{table_name}" '
249 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
250 )
251 statement = context.preparer.prepare(query)
252
253 queries: list[tuple] = []
254 for chunk in chunks:
255 if has_chunk_sub_partitions.get(chunk, False):
256 for subchunk in range(config.replica_sub_chunk_count):
257 queries.append((statement, (chunk, subchunk)))
258 if not queries and not have_non_subchunks:
259 # Add a dummy query to return correct set of columns.
260 queries.append((statement, (-1, -1)))
261
262 if queries:
263 table_data_subchunk = cast(
264 ApdbCassandraTableData,
265 select_concurrent(
266 context.session,
267 queries,
268 "read_raw_multi",
269 config.connection_config.read_concurrency,
270 ),
271 )
272
273 if have_non_subchunks:
274 replica_table = ExtraTables.replica_chunk_tables(False)[table]
275 table_name = context.schema.tableName(replica_table)
276 query = f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
277 statement = context.preparer.prepare(query)
278
279 queries = []
280 for chunk in chunks:
281 if not has_chunk_sub_partitions.get(chunk, True):
282 queries.append((statement, (chunk,)))
283 if not queries and not table_data_subchunk:
284 # Add a dummy query to return correct set of columns.
285 queries.append((statement, (-1,)))
286
287 if queries:
288 table_data = cast(
289 ApdbCassandraTableData,
290 select_concurrent(
291 context.session,
292 queries,
293 "read_raw_multi",
294 config.connection_config.read_concurrency,
295 ),
296 )
297
298 # Merge if both are non-empty.
299 if table_data and table_data_subchunk:
300 table_data_subchunk.project(drop=["apdb_replica_subchunk"])
301 table_data.append(table_data_subchunk)
302 elif table_data_subchunk:
303 table_data = table_data_subchunk
304 elif not table_data:
305 raise AssertionError("above logic is incorrect")
306
307 timer.add_values(row_count=len(table_data.rows()))
308
309 table_schema = self._apdb._schema.tableSchemas[table]
310 # Regular tables should never have columns of ExtraDataTypes, this
311 # is just to make mypy happy.
312 column_types = {
313 column.name: column.datatype
314 for column in table_schema.columns
315 if not isinstance(column.datatype, ExtraDataTypes)
316 }
317 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
318 # It may also have subchunk column, we do not always drop it, and
319 # clients should not need it, but we need to provide type for it.
320 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int
321 table_data.set_column_types(column_types)
322
323 return table_data
324
325 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
326 # docstring is inherited from a base class
327 context = self._apdb._context
328 config = context.config
329
330 if not context.schema.replication_enabled:
331 raise ValueError("APDB is not configured for replication")
332
333 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
334
335 records = []
336 if context.has_chunk_sub_partitions:
337 subchunks = ",".join(str(val) for val in range(config.replica_sub_chunk_count))
338 query = (
339 f'SELECT * FROM "{config.keyspace}"."{table_name}" '
340 f"WHERE apdb_replica_chunk = %s AND apdb_replica_subchunk IN ({subchunks})"
341 )
342
343 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
344 for chunk in chunks:
345 result = context.session.execute(query, [chunk])
346 for row in result:
347 records.append(
348 ApdbUpdateRecord.from_json(
349 row.update_time_ns, row.update_order, row.update_payload
350 )
351 )
352 timer.add_values(row_count=len(records))
353
354 else:
355 chunks_str = ",".join(str(val) for val in chunks)
356 query = (
357 f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({chunks_str})'
358 )
359
360 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
361 result = context.session.execute(query)
362 for row in result:
363 records.append(
364 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
365 )
366 timer.add_values(row_count=len(records))
367
368 records.sort()
369 return records
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Sequence[ApdbUpdateRecord] getUpdateRecordChunks(self, Iterable[int] chunks)