95 context = self.
_apdb._context
96 config = context.config
98 if not context.schema.replication_enabled:
104 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
107 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
108 f
'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s'
111 with self.
_timer(
"chunks_select_time")
as timer:
112 result = context.session.execute(
115 timeout=config.connection_config.read_timeout,
116 execution_profile=
"read_tuples",
119 rows = sorted(result)
120 timer.add_values(row_count=len(rows))
124 last_update_time=astropy.time.Time(row[0] / 1000, format=
"unix_tai"),
132 context = self.
_apdb._context
133 config = context.config
135 if not context.schema.replication_enabled:
136 raise ValueError(
"APDB is not configured for replication")
143 repl_table_params = []
144 chunk_table_params: list[tuple] = []
146 repl_table_params.append((partition, chunk))
147 if context.schema.has_chunk_sub_partitions:
148 for subchunk
in range(config.replica_sub_chunk_count):
149 chunk_table_params.append((chunk, subchunk))
151 chunk_table_params.append((chunk,))
153 if not repl_table_params:
156 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
158 f
'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?'
160 statement = context.preparer.prepare(query)
162 queries = [(statement, param)
for param
in repl_table_params]
163 with self.
_timer(
"chunks_delete_time")
as timer:
164 execute_concurrent(context.session, queries)
165 timer.add_values(row_count=len(queries))
168 tables = list(ExtraTables.replica_chunk_tables(context.schema.has_chunk_sub_partitions).values())
170 table_name = context.schema.tableName(table)
171 query = f
'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
172 if context.schema.has_chunk_sub_partitions:
173 query +=
" AND apdb_replica_subchunk = ?"
174 statement = context.preparer.prepare(query)
176 queries = [(statement, param)
for param
in chunk_table_params]
177 with self.
_timer(
"table_chunk_detele_time", tags={
"table": table_name})
as timer:
178 execute_concurrent(context.session, queries)
179 timer.add_values(row_count=len(queries))
183 context = self.
_apdb._context
184 config = context.config
186 if not context.schema.replication_enabled:
187 raise ValueError(
"APDB is not configured for replication")
188 if table
not in ExtraTables.replica_chunk_tables(
False):
189 raise ValueError(f
"Table {table} does not support replica chunks.")
192 chunks = list(chunks)
197 has_chunk_sub_partitions: dict[int, bool] = {}
198 if context.schema.has_chunk_sub_partitions:
199 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
200 chunks_str =
",".join(str(chunk_id)
for chunk_id
in chunks)
202 f
'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" '
203 f
"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
206 result = context.session.execute(
209 timeout=config.connection_config.read_timeout,
210 execution_profile=
"read_tuples",
212 has_chunk_sub_partitions = dict(result)
214 has_chunk_sub_partitions = dict.fromkeys(chunks,
False)
218 if has_chunk_sub_partitions:
219 have_subchunks = any(has_chunk_sub_partitions.values())
220 have_non_subchunks =
not all(has_chunk_sub_partitions.values())
222 have_subchunks = context.schema.has_chunk_sub_partitions
223 have_non_subchunks =
not have_subchunks
231 table_data: ApdbCassandraTableData |
None =
None
232 table_data_subchunk: ApdbCassandraTableData |
None =
None
234 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(
False)[table])
235 with self.
_timer(
"table_chunk_select_time", tags={
"table": table_name})
as timer:
237 replica_table = ExtraTables.replica_chunk_tables(
True)[table]
238 table_name = context.schema.tableName(replica_table)
240 f
'SELECT * FROM "{config.keyspace}"."{table_name}" '
241 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
243 statement = context.preparer.prepare(query)
245 queries: list[tuple] = []
247 if has_chunk_sub_partitions.get(chunk,
False):
248 for subchunk
in range(config.replica_sub_chunk_count):
249 queries.append((statement, (chunk, subchunk)))
250 if not queries
and not have_non_subchunks:
252 queries.append((statement, (-1, -1)))
255 table_data_subchunk = cast(
256 ApdbCassandraTableData,
261 config.connection_config.read_concurrency,
265 if have_non_subchunks:
266 replica_table = ExtraTables.replica_chunk_tables(
False)[table]
267 table_name = context.schema.tableName(replica_table)
268 query = f
'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
269 statement = context.preparer.prepare(query)
273 if not has_chunk_sub_partitions.get(chunk,
True):
274 queries.append((statement, (chunk,)))
275 if not queries
and not table_data_subchunk:
277 queries.append((statement, (-1,)))
281 ApdbCassandraTableData,
286 config.connection_config.read_concurrency,
291 if table_data
and table_data_subchunk:
292 table_data_subchunk.project(drop=[
"apdb_replica_subchunk"])
293 table_data.append(table_data_subchunk)
294 elif table_data_subchunk:
295 table_data = table_data_subchunk
297 raise AssertionError(
"above logic is incorrect")
299 timer.add_values(row_count=len(table_data.rows()))
301 table_schema = self.
_apdb._schema.tableSchemas[table]
305 column.name: column.datatype
306 for column
in table_schema.columns
307 if not isinstance(column.datatype, ExtraDataTypes)
309 column_types[
"apdb_replica_chunk"] = felis.datamodel.DataType.long
312 column_types[
"apdb_replica_subchunk"] = felis.datamodel.DataType.int
313 table_data.set_column_types(column_types)