22from __future__
import annotations
24__all__ = [
"ApdbCassandraSchema"]
29from typing
import TYPE_CHECKING, List, Optional, Tuple
32from felis
import simple
34from .apdbSchema
import ApdbSchema, ApdbTables
37 import cassandra.cluster
40_LOG = logging.getLogger(__name__)
43class _FelisUUID(felis.types.FelisType, felis_name=
"uuid", votable_name=
"uuid"):
44 """Special internal type for UUID columns. Felis does not support UUID,
45 but we need it here, to simplify logic it's easier to add a special class.
50class ExtraTables(enum.Enum):
51 """Names of the extra tables used by Cassandra implementation."""
53 DiaInsertId =
"DiaInsertId"
54 """Name of the table for insert ID records."""
56 DiaObjectInsertId =
"DiaObjectInsertId"
57 """Name of the table for DIAObject insert ID records."""
59 DiaSourceInsertId =
"DiaSourceInsertId"
60 """Name of the table for DIASource insert ID records."""
62 DiaForcedSourceInsertId =
"DiaFSourceInsertId"
63 """Name of the table for DIAForcedSource insert ID records."""
65 DiaSourceToPartition =
"DiaSourceToPartition"
66 "Maps diaSourceId to its partition values (pixel and time)."
69 """Return full table name."""
70 return prefix + self.value
74 """Return mapping of tables used for insert ID tracking to their
75 corresponding regular tables."""
84 """Class for management of APDB schema.
88 session : `cassandra.cluster.Session`
89 Cassandra session object
91 Name of the YAML schema file.
92 schema_name : `str`, optional
93 Name of the schema in YAML files.
94 prefix : `str`, optional
95 Prefix to add to all schema elements.
96 time_partition_tables : `bool`
97 If
True then schema will have a separate table
for each time partition.
101 felis.types.Double: "DOUBLE",
102 felis.types.Float:
"FLOAT",
103 felis.types.Timestamp:
"TIMESTAMP",
104 felis.types.Long:
"BIGINT",
105 felis.types.Int:
"INT",
106 felis.types.Short:
"INT",
107 felis.types.Byte:
"TINYINT",
108 felis.types.Binary:
"BLOB",
109 felis.types.Char:
"TEXT",
110 felis.types.String:
"TEXT",
111 felis.types.Unicode:
"TEXT",
112 felis.types.Text:
"TEXT",
113 felis.types.Boolean:
"BOOLEAN",
116 """Map YAML column types to Cassandra"""
118 _time_partitioned_tables = [
119 ApdbTables.DiaObject,
120 ApdbTables.DiaSource,
121 ApdbTables.DiaForcedSource,
123 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
127 session: cassandra.cluster.Session,
130 schema_name: str =
"ApdbSchema",
132 time_partition_tables: bool =
False,
133 use_insert_id: bool =
False,
135 super().
__init__(schema_file, schema_name)
148 """Generate schema for regular APDB tables."""
149 apdb_tables: dict[ApdbTables, simple.Table] = {}
155 primary_key = apdb_table_def.primary_key[:]
158 part_columns = [
"apdb_part"]
159 add_columns = part_columns
161 if time_partition_tables:
162 part_columns = [
"apdb_part"]
164 part_columns = [
"apdb_part",
"apdb_time_part"]
165 add_columns = part_columns
166 elif table
is ApdbTables.SSObject:
171 part_columns = [
"ssObjectId"]
180 simple.Column(id=f
"#{name}", name=name, datatype=felis.types.Long, nullable=
False)
181 for name
in add_columns
184 annotations = dict(apdb_table_def.annotations)
185 annotations[
"cassandra:apdb_column_names"] = [column.name
for column
in apdb_table_def.columns]
187 annotations[
"cassandra:partitioning_columns"] = part_columns
189 apdb_tables[table] = simple.Table(
190 id=apdb_table_def.id,
191 name=apdb_table_def.name,
192 columns=column_defs + apdb_table_def.columns,
193 primary_key=primary_key,
196 annotations=annotations,
202 """Generate schema for extra tables."""
203 extra_tables: dict[ExtraTables, simple.Table] = {}
207 extra_tables[ExtraTables.DiaSourceToPartition] = simple.Table(
208 id=
"#" + ExtraTables.DiaSourceToPartition.value,
209 name=ExtraTables.DiaSourceToPartition.table_name(self.
_prefix),
212 id=
"#diaSourceId", name=
"diaSourceId", datatype=felis.types.Long, nullable=
False
214 simple.Column(id=
"#apdb_part", name=
"apdb_part", datatype=felis.types.Long, nullable=
False),
216 id=
"#apdb_time_part", name=
"apdb_time_part", datatype=felis.types.Int, nullable=
False
218 simple.Column(id=
"#insert_id", name=
"insert_id", datatype=_FelisUUID, nullable=
True),
223 annotations={
"cassandra:partitioning_columns": [
"diaSourceId"]},
226 insert_id_column = simple.Column(
227 id=
"#insert_id", name=
"insert_id", datatype=_FelisUUID, nullable=
False
235 extra_tables[ExtraTables.DiaInsertId] = simple.Table(
236 id=
"#" + ExtraTables.DiaInsertId.value,
237 name=ExtraTables.DiaInsertId.table_name(self.
_prefix),
239 simple.Column(id=
"#partition", name=
"partition", datatype=felis.types.Int, nullable=
False),
242 id=
"#insert_time", name=
"insert_time", datatype=felis.types.Timestamp, nullable=
False
245 primary_key=[insert_id_column],
248 annotations={
"cassandra:partitioning_columns": [
"partition"]},
251 for insert_id_table_enum, apdb_table_enum
in ExtraTables.insert_id_tables().
items():
254 extra_tables[insert_id_table_enum] = simple.Table(
255 id=
"#" + insert_id_table_enum.value,
256 name=insert_id_table_enum.table_name(self.
_prefix),
257 columns=[insert_id_column] + apdb_table_def.columns,
258 primary_key=apdb_table_def.primary_key[:],
262 "cassandra:partitioning_columns": [
"insert_id"],
263 "cassandra:apdb_column_names": [column.name
for column
in apdb_table_def.columns],
271 """Whether insert ID tables are to be used (`bool`)."""
277 """Check whether database has tables for tracking insert IDs."""
278 table_name = ExtraTables.DiaInsertId.table_name(self.
_prefix)
279 query =
"SELECT count(*) FROM system_schema.tables WHERE keyspace_name = %s and table_name = %s"
284 def tableName(self, table_name: ApdbTables | ExtraTables) -> str:
285 """Return Cassandra table name for APDB table."""
286 return table_name.table_name(self.
_prefix)
288 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, simple.Column]:
289 """Returns mapping of column names to Column definitions.
293 table_name : `ApdbTables`
294 One of known APDB table names.
299 Mapping of column names to `ColumnDef` instances.
302 cmap = {column.name: column for column
in table_schema.columns}
306 """Return a list of columns names for a table as defined in APDB schema.
310 table_name : `ApdbTables` or `ExtraTables`
311 Enum
for a table
in APDB schema.
315 columns : `list` of `str`
316 Names of regular columns
in the table.
319 return table_schema.annotations[
"cassandra:apdb_column_names"]
322 """Return a list of columns used for table partitioning.
326 table_name : `ApdbTables`
327 Table name in APDB schema
331 columns : `list` of `str`
332 Names of columns used
for partitioning.
335 return table_schema.annotations.get(
"cassandra:partitioning_columns", [])
338 """Return a list of columns used for clustering.
342 table_name : `ApdbTables`
343 Table name in APDB schema
347 columns : `list` of `str`
348 Names of columns
for used
for clustering.
351 return [column.name
for column
in table_schema.primary_key]
353 def makeSchema(self, drop: bool =
False, part_range: Optional[Tuple[int, int]] =
None) ->
None:
354 """Create or re-create all tables.
359 If True then drop tables before creating new ones.
360 part_range : `tuple` [ `int` ]
or `
None`
361 Start
and end partition number
for time partitions, end
is not
362 inclusive. Used to create per-partition DiaObject, DiaSource,
and
363 DiaForcedSource tables. If `
None` then per-partition tables are
375 table: ApdbTables | ExtraTables,
377 part_range: Optional[Tuple[int, int]] =
None,
379 _LOG.debug(
"Making table %s", table)
381 fullTable = table.table_name(self.
_prefix)
383 table_list = [fullTable]
384 if part_range
is not None:
386 partitions = range(*part_range)
387 table_list = [f
"{fullTable}_{part}" for part
in partitions]
390 queries = [f
'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name
in table_list]
391 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
392 for future
in futures:
393 _LOG.debug(
"wait for query: %s", future.query)
395 _LOG.debug(
"query finished: %s", future.query)
398 for table_name
in table_list:
399 if_not_exists =
"" if drop
else "IF NOT EXISTS"
401 query = f
'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
402 _LOG.debug(
"query: %s", query)
403 queries.append(query)
404 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
405 for future
in futures:
406 _LOG.debug(
"wait for query: %s", future.query)
408 _LOG.debug(
"query finished: %s", future.query)
411 """Return set of columns in a table
415 table_name : `ApdbTables`
421 List of strings in the format
"column_name type".
426 part_columns = table_schema.annotations.get(
"cassandra:partitioning_columns", [])
427 clust_columns = [column.name
for column
in table_schema.primary_key]
428 _LOG.debug(
"part_columns: %s", part_columns)
429 _LOG.debug(
"clust_columns: %s", clust_columns)
431 raise ValueError(f
"Table {table_name} configuration is missing partition index")
435 for column
in table_schema.columns:
437 column_defs.append(f
'"{column.name}" {ctype}')
440 part_columns = [f
'"{col}"' for col
in part_columns]
441 clust_columns = [f
'"{col}"' for col
in clust_columns]
442 if len(part_columns) > 1:
443 columns =
", ".join(part_columns)
444 part_columns = [f
"({columns})"]
445 pkey =
", ".join(part_columns + clust_columns)
446 _LOG.debug(
"pkey: %s", pkey)
447 column_defs.append(f
"PRIMARY KEY ({pkey})")
452 """Return schema definition for a table."""
453 if isinstance(table, ApdbTables):
std::vector< SchemaItem< Flag > > * items
list _time_partitioned_tables
List[str] _tableColumns(self, ApdbTables|ExtraTables table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
List[str] partitionColumns(self, ApdbTables|ExtraTables table_name)
List[str] clusteringColumns(self, ApdbTables|ExtraTables table_name)
simple.Table _table_schema(self, ApdbTables|ExtraTables table)
Mapping[ExtraTables, simple.Table] _extra_tables_schema(self)
None _makeTableSchema(self, ApdbTables|ExtraTables table, bool drop=False, Optional[Tuple[int, int]] part_range=None)
Mapping[str, simple.Column] getColumnMap(self, ApdbTables|ExtraTables table_name)
Mapping[ApdbTables, simple.Table] _apdb_tables_schema(self, bool time_partition_tables)
str tableName(self, ApdbTables|ExtraTables table_name)
bool _check_insert_id(self)
__init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False, bool use_insert_id=False)
List[str] apdbColumnNames(self, ApdbTables|ExtraTables table_name)
list _spatially_partitioned_tables
Mapping[ExtraTables, ApdbTables] insert_id_tables(cls)
str table_name(self, str prefix="")
str DiaForcedSourceInsertId