LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraReplica"]
25
26import logging
27from collections.abc import Iterable, Mapping
28from typing import TYPE_CHECKING, cast
29
30import astropy.time
31import felis.datamodel
32
33from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
34from ..apdbSchema import ApdbTables
35from ..monitor import MonAgent
36from ..schema_model import ExtraDataTypes
37from ..timer import Timer
38from ..versionTuple import VersionTuple
39from .apdbCassandraSchema import ExtraTables
40from .cassandra_utils import (
41 ApdbCassandraTableData,
42 execute_concurrent,
43 select_concurrent,
44)
45
46if TYPE_CHECKING:
47 from .apdbCassandra import ApdbCassandra
48
49_LOG = logging.getLogger(__name__)
50
51_MON = MonAgent(__name__)
52
53VERSION = VersionTuple(1, 1, 0)
54"""Version for the code controlling replication tables. This needs to be
55updated following compatibility rules when schema produced by this code
56changes.
57"""
58
59
61 """Implementation of `ApdbReplica` for Cassandra backend.
62
63 Parameters
64 ----------
65 apdb : `ApdbCassandra`
66 Instance of ApbdCassandra for database.
67 """
68
69 def __init__(self, apdb: ApdbCassandra):
70 # Note that ApdbCassandra instance must stay alive while this object
71 # exists, so we keep reference to it.
72 self._apdb = apdb
73
74 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
75 """Create `Timer` instance given its name."""
76 return Timer(name, _MON, tags=tags)
77
78 def schemaVersion(self) -> VersionTuple:
79 # Docstring inherited from base class.
80 context = self._apdb._context
81 return context.db_versions.schema_version
82
83 @classmethod
84 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
85 # Docstring inherited from base class.
86 return VERSION
87
88 @classmethod
89 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool:
90 """Return True if replica chunk tables have sub-partitions."""
91 return version >= VersionTuple(1, 1, 0)
92
93 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
94 # docstring is inherited from a base class
95 context = self._apdb._context
96 config = context.config
97
98 if not context.schema.replication_enabled:
99 return None
100
101 # everything goes into a single partition
102 partition = 0
103
104 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
105 # We want to avoid timezone mess so return timestamps as milliseconds.
106 query = (
107 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
108 f'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s'
109 )
110
111 with self._timer("chunks_select_time") as timer:
112 result = context.session.execute(
113 query,
114 (partition,),
115 timeout=config.connection_config.read_timeout,
116 execution_profile="read_tuples",
117 )
118 # order by last_update_time
119 rows = sorted(result)
120 timer.add_values(row_count=len(rows))
121 return [
123 id=row[1],
124 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
125 unique_id=row[2],
126 )
127 for row in rows
128 ]
129
130 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
131 # docstring is inherited from a base class
132 context = self._apdb._context
133 config = context.config
134
135 if not context.schema.replication_enabled:
136 raise ValueError("APDB is not configured for replication")
137
138 # everything goes into a single partition
139 partition = 0
140
141 # Iterable can be single pass, make everything that we need from it
142 # in a single loop.
143 repl_table_params = []
144 chunk_table_params: list[tuple] = []
145 for chunk in chunks:
146 repl_table_params.append((partition, chunk))
147 if context.schema.has_chunk_sub_partitions:
148 for subchunk in range(config.replica_sub_chunk_count):
149 chunk_table_params.append((chunk, subchunk))
150 else:
151 chunk_table_params.append((chunk,))
152 # Anything to do att all?
153 if not repl_table_params:
154 return
155
156 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
157 query = (
158 f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?'
159 )
160 statement = context.preparer.prepare(query)
161
162 queries = [(statement, param) for param in repl_table_params]
163 with self._timer("chunks_delete_time") as timer:
164 execute_concurrent(context.session, queries)
165 timer.add_values(row_count=len(queries))
166
167 # Also remove those chunk_ids from Dia*Chunks tables.
168 tables = list(ExtraTables.replica_chunk_tables(context.schema.has_chunk_sub_partitions).values())
169 for table in tables:
170 table_name = context.schema.tableName(table)
171 query = f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
172 if context.schema.has_chunk_sub_partitions:
173 query += " AND apdb_replica_subchunk = ?"
174 statement = context.preparer.prepare(query)
175
176 queries = [(statement, param) for param in chunk_table_params]
177 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer:
178 execute_concurrent(context.session, queries)
179 timer.add_values(row_count=len(queries))
180
181 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
182 # docstring is inherited from a base class
183 context = self._apdb._context
184 config = context.config
185
186 if not context.schema.replication_enabled:
187 raise ValueError("APDB is not configured for replication")
188 if table not in ExtraTables.replica_chunk_tables(False):
189 raise ValueError(f"Table {table} does not support replica chunks.")
190
191 # We need to iterate few times.
192 chunks = list(chunks)
193
194 # If schema was migrated then a chunk can appear in either old or new
195 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table
196 # has a column which will be set to true for new table.
197 has_chunk_sub_partitions: dict[int, bool] = {}
198 if context.schema.has_chunk_sub_partitions:
199 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
200 chunks_str = ",".join(str(chunk_id) for chunk_id in chunks)
201 query = (
202 f'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" '
203 f"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
204 )
205 partition = 0
206 result = context.session.execute(
207 query,
208 (partition,),
209 timeout=config.connection_config.read_timeout,
210 execution_profile="read_tuples",
211 )
212 has_chunk_sub_partitions = dict(result)
213 else:
214 has_chunk_sub_partitions = dict.fromkeys(chunks, False)
215
216 # Check what kind of tables we want to query, if chunk list is empty
217 # then use tbales which should exist in the schema.
218 if has_chunk_sub_partitions:
219 have_subchunks = any(has_chunk_sub_partitions.values())
220 have_non_subchunks = not all(has_chunk_sub_partitions.values())
221 else:
222 have_subchunks = context.schema.has_chunk_sub_partitions
223 have_non_subchunks = not have_subchunks
224
225 # NOTE: if an existing database is migrated and has both types of chunk
226 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible
227 # that the same chunk can appear in both tables. In reality schema
228 # migration should only happen during the downtime, so there will be
229 # suffient gap and a different chunk ID will be used for new chunks.
230
231 table_data: ApdbCassandraTableData | None = None
232 table_data_subchunk: ApdbCassandraTableData | None = None
233
234 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table])
235 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer:
236 if have_subchunks:
237 replica_table = ExtraTables.replica_chunk_tables(True)[table]
238 table_name = context.schema.tableName(replica_table)
239 query = (
240 f'SELECT * FROM "{config.keyspace}"."{table_name}" '
241 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
242 )
243 statement = context.preparer.prepare(query)
244
245 queries: list[tuple] = []
246 for chunk in chunks:
247 if has_chunk_sub_partitions.get(chunk, False):
248 for subchunk in range(config.replica_sub_chunk_count):
249 queries.append((statement, (chunk, subchunk)))
250 if not queries and not have_non_subchunks:
251 # Add a dummy query to return correct set of columns.
252 queries.append((statement, (-1, -1)))
253
254 if queries:
255 table_data_subchunk = cast(
256 ApdbCassandraTableData,
257 select_concurrent(
258 context.session,
259 queries,
260 "read_raw_multi",
261 config.connection_config.read_concurrency,
262 ),
263 )
264
265 if have_non_subchunks:
266 replica_table = ExtraTables.replica_chunk_tables(False)[table]
267 table_name = context.schema.tableName(replica_table)
268 query = f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
269 statement = context.preparer.prepare(query)
270
271 queries = []
272 for chunk in chunks:
273 if not has_chunk_sub_partitions.get(chunk, True):
274 queries.append((statement, (chunk,)))
275 if not queries and not table_data_subchunk:
276 # Add a dummy query to return correct set of columns.
277 queries.append((statement, (-1,)))
278
279 if queries:
280 table_data = cast(
281 ApdbCassandraTableData,
282 select_concurrent(
283 context.session,
284 queries,
285 "read_raw_multi",
286 config.connection_config.read_concurrency,
287 ),
288 )
289
290 # Merge if both are non-empty.
291 if table_data and table_data_subchunk:
292 table_data_subchunk.project(drop=["apdb_replica_subchunk"])
293 table_data.append(table_data_subchunk)
294 elif table_data_subchunk:
295 table_data = table_data_subchunk
296 elif not table_data:
297 raise AssertionError("above logic is incorrect")
298
299 timer.add_values(row_count=len(table_data.rows()))
300
301 table_schema = self._apdb._schema.tableSchemas[table]
302 # Regular tables should never have columns of ExtraDataTypes, this
303 # is just to make mypy happy.
304 column_types = {
305 column.name: column.datatype
306 for column in table_schema.columns
307 if not isinstance(column.datatype, ExtraDataTypes)
308 }
309 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
310 # It may also have subchunk column, we do not always drop it, and
311 # clients should not need it, but we need to provide type for it.
312 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int
313 table_data.set_column_types(column_types)
314
315 return table_data
316
317 def getTableUpdateChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
318 # docstring is inherited from a base class
319 raise NotImplementedError()
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
ApdbTableData getTableUpdateChunks(self, ApdbTables table, Iterable[int] chunks)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)