230 """Generate schema for regular APDB tables."""
237 primary_key = apdb_table_def.primary_key[:]
240 part_columns = [
"apdb_part"]
241 add_columns = part_columns
243 if time_partition_tables:
244 part_columns = [
"apdb_part"]
246 part_columns = [
"apdb_part",
"apdb_time_part"]
247 add_columns = part_columns
248 elif table
is ApdbTables.SSObject:
253 part_columns = [
"ssObjectId"]
255 elif table
is ApdbTables.metadata:
258 part_columns = [
"meta_part"]
259 add_columns = part_columns
268 id=f
"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=
False
270 for name
in add_columns
273 if table
is ApdbTables.DiaObjectLast:
275 validity_start_column =
"validityStartMjdTai"
277 if not self.
check_column(ApdbTables.DiaObjectLast, validity_start_column):
278 for column
in apdb_table_def.columns:
279 if column.name == validity_start_column:
280 apdb_table_def.columns.remove(column)
286 annotations = dict(apdb_table_def.annotations)
287 annotations[
"cassandra:apdb_column_names"] = [column.name
for column
in apdb_table_def.columns]
289 annotations[
"cassandra:partitioning_columns"] = part_columns
292 id=apdb_table_def.id,
293 name=apdb_table_def.name,
294 columns=column_defs + apdb_table_def.columns,
295 primary_key=primary_key,
298 annotations=annotations,
304 """Generate schema for extra tables."""
312 datatype=felis.datamodel.DataType.long,
318 datatype=felis.datamodel.DataType.short,
323 id=
"#" + ExtraTables.ApdbVisitDetector.value,
324 name=ExtraTables.ApdbVisitDetector.table_name(self.
_prefix),
329 annotations={
"cassandra:partitioning_columns": [
"visit",
"detector"]},
335 id=
"#" + ExtraTables.DiaSourceToPartition.value,
336 name=ExtraTables.DiaSourceToPartition.table_name(self.
_prefix),
341 datatype=felis.datamodel.DataType.long,
345 id=
"#apdb_part", name=
"apdb_part", datatype=felis.datamodel.DataType.long, nullable=
False
348 id=
"#apdb_time_part",
349 name=
"apdb_time_part",
350 datatype=felis.datamodel.DataType.int,
354 id=
"#apdb_replica_chunk",
355 name=
"apdb_replica_chunk",
356 datatype=felis.datamodel.DataType.long,
360 id=
"#apdb_replica_subchunk",
361 name=
"apdb_replica_subchunk",
362 datatype=felis.datamodel.DataType.int,
369 annotations={
"cassandra:partitioning_columns": [
"diaSourceId"]},
374 id=
"#" + ExtraTables.DiaObjectLastToPartition.value,
375 name=ExtraTables.DiaObjectLastToPartition.table_name(self.
_prefix),
380 datatype=felis.datamodel.DataType.long,
384 id=
"#apdb_part", name=
"apdb_part", datatype=felis.datamodel.DataType.long, nullable=
False
390 annotations={
"cassandra:partitioning_columns": [
"diaObjectId"]},
397 id=
"#apdb_replica_chunk",
398 name=
"apdb_replica_chunk",
399 datatype=felis.datamodel.DataType.long,
403 replica_chunk_columns = [replica_chunk_column]
405 replica_chunk_columns.append(
407 id=
"#apdb_replica_subchunk",
408 name=
"apdb_replica_subchunk",
409 datatype=felis.datamodel.DataType.int,
417 id=
"#" + ExtraTables.ApdbReplicaChunks.value,
418 name=ExtraTables.ApdbReplicaChunks.table_name(self.
_prefix),
421 id=
"#partition", name=
"partition", datatype=felis.datamodel.DataType.int, nullable=
False
423 replica_chunk_column,
425 id=
"#last_update_time",
426 name=
"last_update_time",
427 datatype=felis.datamodel.DataType.timestamp,
433 datatype=schema_model.ExtraDataTypes.UUID,
438 name=
"has_subchunks",
439 datatype=felis.datamodel.DataType.boolean,
443 primary_key=[replica_chunk_column],
446 annotations={
"cassandra:partitioning_columns": [
"partition"]},
450 for apdb_table_enum, chunk_table_enum
in replica_chunk_tables.items():
454 id=
"#" + chunk_table_enum.value,
455 name=chunk_table_enum.table_name(self.
_prefix),
456 columns=replica_chunk_columns + apdb_table_def.columns,
457 primary_key=apdb_table_def.primary_key[:],
461 "cassandra:partitioning_columns": [column.name
for column
in replica_chunk_columns],
462 "cassandra:apdb_column_names": [column.name
for column
in apdb_table_def.columns],
469 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
470 name=
"update_time_ns",
471 datatype=felis.datamodel.DataType.long,
475 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
477 datatype=felis.datamodel.DataType.int,
481 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
482 name=
"update_unique_id",
483 datatype=schema_model.ExtraDataTypes.UUID,
487 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
488 name=
"update_payload",
489 datatype=felis.datamodel.DataType.string,
494 id=f
"#{ExtraTables.ApdbUpdateRecordChunks.value}",
495 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self.
_prefix),
496 columns=replica_chunk_columns + columns,
497 primary_key=columns[:3],
501 "cassandra:partitioning_columns": [column.name
for column
in replica_chunk_columns],
724 part_range: ApdbCassandraTimePartitionRange |
None =
None,
725 replication_factor: int |
None =
None,
726 table_options: CreateTableOptions |
None =
None,
728 """Create or re-create all tables.
733 If True then drop tables before creating new ones. Note that
734 only tables are dropped and not the whole keyspace.
735 part_range : `ApdbCassandraTimePartitionRange` or `None`
736 Start and end partition number for time partitions. Used to create
737 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
738 `None` then per-partition tables are not created.
739 replication_factor : `int`, optional
740 Replication factor used when creating new keyspace, if keyspace
741 already exists its replication factor is not changed.
744 if replication_factor
is None:
745 replication_factor = 1
748 query =
"SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
750 if row := result.one():
752 repl_config = cast(Mapping[str, str], row[0])
753 current_repl = int(repl_config[
"replication_factor"])
754 if replication_factor != current_repl:
756 f
"New replication factor {replication_factor} differs from the replication factor "
757 f
"for already existing keyspace: {current_repl}"
762 f
'CREATE KEYSPACE "{self._keyspace}"'
763 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
764 f
"{replication_factor}"
773 if table
in (ApdbTables.SSObject, ApdbTables.SSSource):
803 table: ApdbTables | ExtraTables,
805 part_range: ApdbCassandraTimePartitionRange |
None =
None,
806 table_options: CreateTableOptions |
None =
None,
808 _LOG.debug(
"Making table %s", table)
810 fullTable = table.table_name(self.
_prefix)
812 table_list = [fullTable]
813 if part_range
is not None:
815 table_list = [table.table_name(self.
_prefix, part)
for part
in part_range.range()]
818 queries = [f
'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name
in table_list]
819 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
820 for future
in futures:
821 _LOG.debug(
"wait for query: %s", future.query)
823 _LOG.debug(
"query finished: %s", future.query)
826 options = table_options.get_options(fullTable).strip()
if table_options
else None
827 for table_name
in table_list:
828 if_not_exists =
"" if drop
else "IF NOT EXISTS"
830 query = f
'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
832 query = f
"{query} WITH {options}"
833 _LOG.debug(
"query: %s", query)
834 queries.append(query)
835 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
836 for future
in futures:
837 _LOG.debug(
"wait for query: %s", future.query)
839 _LOG.debug(
"query finished: %s", future.query)