101 context = self.
_apdb._context
102 config = context.config
104 if not context.schema.replication_enabled:
110 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
113 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
114 f
'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s'
117 with self.
_timer(
"chunks_select_time")
as timer:
118 result = context.session.execute(
121 timeout=config.connection_config.read_timeout,
122 execution_profile=
"read_tuples",
125 rows = sorted(result)
126 timer.add_values(row_count=len(rows))
130 last_update_time=astropy.time.Time(row[0] / 1000, format=
"unix_tai"),
138 context = self.
_apdb._context
139 config = context.config
141 if not context.schema.replication_enabled:
142 raise ValueError(
"APDB is not configured for replication")
149 repl_table_params = []
150 chunk_table_params: list[tuple] = []
152 repl_table_params.append((partition, chunk))
153 if context.has_chunk_sub_partitions:
154 for subchunk
in range(config.replica_sub_chunk_count):
155 chunk_table_params.append((chunk, subchunk))
157 chunk_table_params.append((chunk,))
159 if not repl_table_params:
162 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
164 f
'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?'
166 statement = context.preparer.prepare(query)
168 queries = [(statement, param)
for param
in repl_table_params]
169 with self.
_timer(
"chunks_delete_time")
as timer:
170 execute_concurrent(context.session, queries)
171 timer.add_values(row_count=len(queries))
174 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values())
175 if context.has_update_record_chunks_table:
176 tables.append(ExtraTables.ApdbUpdateRecordChunks)
178 table_name = context.schema.tableName(table)
179 query = f
'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
180 if context.has_chunk_sub_partitions:
181 query +=
" AND apdb_replica_subchunk = ?"
182 statement = context.preparer.prepare(query)
184 queries = [(statement, param)
for param
in chunk_table_params]
185 with self.
_timer(
"table_chunk_detele_time", tags={
"table": table_name})
as timer:
186 execute_concurrent(context.session, queries)
187 timer.add_values(row_count=len(queries))
191 context = self.
_apdb._context
192 config = context.config
194 if not context.schema.replication_enabled:
195 raise ValueError(
"APDB is not configured for replication")
196 if table
not in ExtraTables.replica_chunk_tables(
False):
197 raise ValueError(f
"Table {table} does not support replica chunks.")
200 chunks = list(chunks)
205 has_chunk_sub_partitions: dict[int, bool] = {}
206 if context.has_chunk_sub_partitions:
207 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
208 chunks_str =
",".join(str(chunk_id)
for chunk_id
in chunks)
210 f
'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" '
211 f
"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
214 result = context.session.execute(
217 timeout=config.connection_config.read_timeout,
218 execution_profile=
"read_tuples",
220 has_chunk_sub_partitions = dict(result)
222 has_chunk_sub_partitions = dict.fromkeys(chunks,
False)
226 if has_chunk_sub_partitions:
227 have_subchunks = any(has_chunk_sub_partitions.values())
228 have_non_subchunks =
not all(has_chunk_sub_partitions.values())
230 have_subchunks = context.has_chunk_sub_partitions
231 have_non_subchunks =
not have_subchunks
239 table_data: ApdbCassandraTableData |
None =
None
240 table_data_subchunk: ApdbCassandraTableData |
None =
None
242 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(
False)[table])
243 with self.
_timer(
"table_chunk_select_time", tags={
"table": table_name})
as timer:
245 replica_table = ExtraTables.replica_chunk_tables(
True)[table]
246 table_name = context.schema.tableName(replica_table)
248 f
'SELECT * FROM "{config.keyspace}"."{table_name}" '
249 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
251 statement = context.preparer.prepare(query)
253 queries: list[tuple] = []
255 if has_chunk_sub_partitions.get(chunk,
False):
256 for subchunk
in range(config.replica_sub_chunk_count):
257 queries.append((statement, (chunk, subchunk)))
258 if not queries
and not have_non_subchunks:
260 queries.append((statement, (-1, -1)))
263 table_data_subchunk = cast(
264 ApdbCassandraTableData,
269 config.connection_config.read_concurrency,
273 if have_non_subchunks:
274 replica_table = ExtraTables.replica_chunk_tables(
False)[table]
275 table_name = context.schema.tableName(replica_table)
276 query = f
'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
277 statement = context.preparer.prepare(query)
281 if not has_chunk_sub_partitions.get(chunk,
True):
282 queries.append((statement, (chunk,)))
283 if not queries
and not table_data_subchunk:
285 queries.append((statement, (-1,)))
289 ApdbCassandraTableData,
294 config.connection_config.read_concurrency,
299 if table_data
and table_data_subchunk:
300 table_data_subchunk.project(drop=[
"apdb_replica_subchunk"])
301 table_data.append(table_data_subchunk)
302 elif table_data_subchunk:
303 table_data = table_data_subchunk
305 raise AssertionError(
"above logic is incorrect")
307 timer.add_values(row_count=len(table_data.rows()))
309 table_schema = self.
_apdb._schema.tableSchemas[table]
313 column.name: column.datatype
314 for column
in table_schema.columns
315 if not isinstance(column.datatype, ExtraDataTypes)
317 column_types[
"apdb_replica_chunk"] = felis.datamodel.DataType.long
320 column_types[
"apdb_replica_subchunk"] = felis.datamodel.DataType.int
321 table_data.set_column_types(column_types)
327 context = self.
_apdb._context
328 config = context.config
330 if not context.schema.replication_enabled:
331 raise ValueError(
"APDB is not configured for replication")
333 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
336 if context.has_chunk_sub_partitions:
337 subchunks =
",".join(str(val)
for val
in range(config.replica_sub_chunk_count))
339 f
'SELECT * FROM "{config.keyspace}"."{table_name}" '
340 f
"WHERE apdb_replica_chunk = %s AND apdb_replica_subchunk IN ({subchunks})"
343 with self.
_timer(
"select_update_record_time", tags={
"table": table_name})
as timer:
345 result = context.session.execute(query, [chunk])
348 ApdbUpdateRecord.from_json(
349 row.update_time_ns, row.update_order, row.update_payload
352 timer.add_values(row_count=len(records))
355 chunks_str =
",".join(str(val)
for val
in chunks)
357 f
'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({chunks_str})'
360 with self.
_timer(
"select_update_record_time", tags={
"table": table_name})
as timer:
361 result = context.session.execute(query)
364 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
366 timer.add_values(row_count=len(records))