230 """Generate schema for regular APDB tables."""
237 primary_key = apdb_table_def.primary_key[:]
240 part_columns = [
"apdb_part"]
241 add_columns = part_columns
243 if time_partition_tables:
244 part_columns = [
"apdb_part"]
246 part_columns = [
"apdb_part",
"apdb_time_part"]
247 add_columns = part_columns
248 elif table
is ApdbTables.SSObject:
253 part_columns = [
"ssObjectId"]
255 elif table
is ApdbTables.metadata:
258 part_columns = [
"meta_part"]
259 add_columns = part_columns
268 id=f
"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=
False
270 for name
in add_columns
273 annotations = dict(apdb_table_def.annotations)
274 annotations[
"cassandra:apdb_column_names"] = [column.name
for column
in apdb_table_def.columns]
276 annotations[
"cassandra:partitioning_columns"] = part_columns
279 id=apdb_table_def.id,
280 name=apdb_table_def.name,
281 columns=column_defs + apdb_table_def.columns,
282 primary_key=primary_key,
285 annotations=annotations,
291 """Generate schema for extra tables."""
299 datatype=felis.datamodel.DataType.long,
305 datatype=felis.datamodel.DataType.short,
310 id=
"#" + ExtraTables.ApdbVisitDetector.value,
311 name=ExtraTables.ApdbVisitDetector.table_name(self.
_prefix),
316 annotations={
"cassandra:partitioning_columns": [
"visit",
"detector"]},
322 id=
"#" + ExtraTables.DiaSourceToPartition.value,
323 name=ExtraTables.DiaSourceToPartition.table_name(self.
_prefix),
328 datatype=felis.datamodel.DataType.long,
332 id=
"#apdb_part", name=
"apdb_part", datatype=felis.datamodel.DataType.long, nullable=
False
335 id=
"#apdb_time_part",
336 name=
"apdb_time_part",
337 datatype=felis.datamodel.DataType.int,
341 id=
"#apdb_replica_chunk",
342 name=
"apdb_replica_chunk",
343 datatype=felis.datamodel.DataType.long,
347 id=
"#apdb_replica_subchunk",
348 name=
"apdb_replica_subchunk",
349 datatype=felis.datamodel.DataType.int,
356 annotations={
"cassandra:partitioning_columns": [
"diaSourceId"]},
361 id=
"#" + ExtraTables.DiaObjectLastToPartition.value,
362 name=ExtraTables.DiaObjectLastToPartition.table_name(self.
_prefix),
367 datatype=felis.datamodel.DataType.long,
371 id=
"#apdb_part", name=
"apdb_part", datatype=felis.datamodel.DataType.long, nullable=
False
377 annotations={
"cassandra:partitioning_columns": [
"diaObjectId"]},
384 id=
"#apdb_replica_chunk",
385 name=
"apdb_replica_chunk",
386 datatype=felis.datamodel.DataType.long,
390 replica_chunk_columns = [replica_chunk_column]
392 replica_chunk_columns.append(
394 id=
"#apdb_replica_subchunk",
395 name=
"apdb_replica_subchunk",
396 datatype=felis.datamodel.DataType.int,
404 id=
"#" + ExtraTables.ApdbReplicaChunks.value,
405 name=ExtraTables.ApdbReplicaChunks.table_name(self.
_prefix),
408 id=
"#partition", name=
"partition", datatype=felis.datamodel.DataType.int, nullable=
False
410 replica_chunk_column,
412 id=
"#last_update_time",
413 name=
"last_update_time",
414 datatype=felis.datamodel.DataType.timestamp,
420 datatype=schema_model.ExtraDataTypes.UUID,
425 name=
"has_subchunks",
426 datatype=felis.datamodel.DataType.boolean,
430 primary_key=[replica_chunk_column],
433 annotations={
"cassandra:partitioning_columns": [
"partition"]},
437 for apdb_table_enum, chunk_table_enum
in replica_chunk_tables.items():
441 id=
"#" + chunk_table_enum.value,
442 name=chunk_table_enum.table_name(self.
_prefix),
443 columns=replica_chunk_columns + apdb_table_def.columns,
444 primary_key=apdb_table_def.primary_key[:],
448 "cassandra:partitioning_columns": [column.name
for column
in replica_chunk_columns],
449 "cassandra:apdb_column_names": [column.name
for column
in apdb_table_def.columns],
456 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
457 name=
"update_time_ns",
458 datatype=felis.datamodel.DataType.long,
462 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
464 datatype=felis.datamodel.DataType.int,
468 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
469 name=
"update_unique_id",
470 datatype=schema_model.ExtraDataTypes.UUID,
474 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
475 name=
"update_payload",
476 datatype=felis.datamodel.DataType.string,
481 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}",
482 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self.
_prefix),
483 columns=replica_chunk_columns + columns,
484 primary_key=columns[:3],
488 "cassandra:partitioning_columns": [column.name
for column
in replica_chunk_columns],
697 part_range: ApdbCassandraTimePartitionRange |
None =
None,
698 replication_factor: int |
None =
None,
699 table_options: CreateTableOptions |
None =
None,
701 """Create or re-create all tables.
706 If True then drop tables before creating new ones. Note that
707 only tables are dropped and not the whole keyspace.
708 part_range : `ApdbCassandraTimePartitionRange` or `None`
709 Start and end partition number for time partitions. Used to create
710 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
711 `None` then per-partition tables are not created.
712 replication_factor : `int`, optional
713 Replication factor used when creating new keyspace, if keyspace
714 already exists its replication factor is not changed.
717 if replication_factor
is None:
718 replication_factor = 1
721 query =
"SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
723 if row := result.one():
725 repl_config = cast(Mapping[str, str], row[0])
726 current_repl = int(repl_config[
"replication_factor"])
727 if replication_factor != current_repl:
729 f
"New replication factor {replication_factor} differs from the replication factor "
730 f
"for already existing keyspace: {current_repl}"
735 f
'CREATE KEYSPACE "{self._keyspace}"'
736 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
737 f
"{replication_factor}"
746 if table
is ApdbTables.SSSource:
774 table: ApdbTables | ExtraTables,
776 part_range: ApdbCassandraTimePartitionRange |
None =
None,
777 table_options: CreateTableOptions |
None =
None,
779 _LOG.debug(
"Making table %s", table)
781 fullTable = table.table_name(self.
_prefix)
783 table_list = [fullTable]
784 if part_range
is not None:
786 table_list = [table.table_name(self.
_prefix, part)
for part
in part_range.range()]
789 queries = [f
'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name
in table_list]
790 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
791 for future
in futures:
792 _LOG.debug(
"wait for query: %s", future.query)
794 _LOG.debug(
"query finished: %s", future.query)
797 options = table_options.get_options(fullTable).strip()
if table_options
else None
798 for table_name
in table_list:
799 if_not_exists =
"" if drop
else "IF NOT EXISTS"
801 query = f
'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
803 query = f
"{query} WITH {options}"
804 _LOG.debug(
"query: %s", query)
805 queries.append(query)
806 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
807 for future
in futures:
808 _LOG.debug(
"wait for query: %s", future.query)
810 _LOG.debug(
"query finished: %s", future.query)