22from __future__
import annotations
24__all__ = [
"ApdbCassandraSchema"]
29from typing
import TYPE_CHECKING, List, Optional, Tuple, Union
32from felis
import simple
34from .apdbSchema
import ApdbSchema, ApdbTables
37 import cassandra.cluster
40_LOG = logging.getLogger(__name__)
45 """Names of the extra tables used by Cassandra implementation."""
47 DiaSourceToPartition =
"DiaSourceToPartition"
48 "Maps diaSourceId ro its partition values (pixel and time)."
51 """Return full table name."""
52 return prefix + self.value
56 """Class for management of APDB schema.
60 session : `cassandra.cluster.Session`
61 Cassandra session object
63 Name of the YAML schema file.
64 schema_name : `str`, optional
65 Name of the schema in YAML files.
66 prefix : `str`, optional
67 Prefix to add to all schema elements.
68 time_partition_tables : `bool`
69 If
True then schema will have a separate table
for each time partition.
73 felis.types.Double: "DOUBLE",
74 felis.types.Float:
"FLOAT",
75 felis.types.Timestamp:
"TIMESTAMP",
76 felis.types.Long:
"BIGINT",
77 felis.types.Int:
"INT",
78 felis.types.Short:
"INT",
79 felis.types.Byte:
"TINYINT",
80 felis.types.Binary:
"BLOB",
81 felis.types.Char:
"TEXT",
82 felis.types.String:
"TEXT",
83 felis.types.Unicode:
"TEXT",
84 felis.types.Text:
"TEXT",
85 felis.types.Boolean:
"BOOLEAN",
87 """Map YAML column types to Cassandra"""
89 _time_partitioned_tables = [
92 ApdbTables.DiaForcedSource,
94 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
98 session: cassandra.cluster.Session,
101 schema_name: str =
"ApdbSchema",
103 time_partition_tables: bool =
False
106 super().
__init__(schema_file, schema_name)
120 columns = [
"apdb_part"]
122 if time_partition_tables:
123 columns = [
"apdb_part"]
125 columns = [
"apdb_part",
"apdb_time_part"]
126 elif table
is ApdbTables.SSObject:
131 columns = [
"ssObjectId"]
132 tableDef.primary_key = []
145 datatype=felis.types.Long,
147 )
for name
in columns
149 tableDef.columns = columnDefs + tableDef.columns
153 annotations = dict(tableDef.annotations)
154 annotations[
"cassandra:paritioning_columns"] = columns
155 tableDef.annotations = annotations
159 def _extraTableSchema(self) -> Mapping[ExtraTables, simple.Table]:
160 """Generate schema for extra tables."""
162 ExtraTables.DiaSourceToPartition: simple.Table(
163 id=
"#" + ExtraTables.DiaSourceToPartition.value,
164 name=ExtraTables.DiaSourceToPartition.value,
167 id=
"#diaSourceId", name=
"diaSourceId", datatype=felis.types.Long, nullable=
False
170 id=
"#apdb_part", name=
"apdb_part", datatype=felis.types.Long, nullable=
False
173 id=
"#apdb_time_part", name=
"apdb_time_part", datatype=felis.types.Int, nullable=
False
179 annotations={
"cassandra:paritioning_columns": [
"diaSourceId"]},
183 def tableName(self, table_name: Union[ApdbTables, ExtraTables]) -> str:
184 """Return Cassandra table name for APDB table.
186 return table_name.table_name(self.
_prefix)
188 def getColumnMap(self, table_name: Union[ApdbTables, ExtraTables]) -> Mapping[str, simple.Column]:
189 """Returns mapping of column names to Column definitions.
193 table_name : `ApdbTables`
194 One of known APDB table names.
199 Mapping of column names to `ColumnDef` instances.
201 if isinstance(table_name, ApdbTables):
205 cmap = {column.name: column
for column
in table_schema.columns}
209 """Return a list of columns used for table partitioning.
213 table_name : `ApdbTables`
214 Table name in APDB schema
218 columns : `list` of `str`
219 Names of columns
for used
for partitioning.
221 if isinstance(table_name, ApdbTables):
225 return table_schema.annotations.get(
"cassandra:paritioning_columns", [])
228 """Return a list of columns used for clustering.
232 table_name : `ApdbTables`
233 Table name in APDB schema
237 columns : `list` of `str`
238 Names of columns
for used
for clustering.
240 if isinstance(table_name, ApdbTables):
244 return [column.name
for column
in table_schema.primary_key]
246 def makeSchema(self, drop: bool =
False, part_range: Optional[Tuple[int, int]] =
None) ->
None:
247 """Create or re-create all tables.
252 If True then drop tables before creating new ones.
253 part_range : `tuple` [ `int` ]
or `
None`
254 Start
and end partition number
for time partitions, end
is not
255 inclusive. Used to create per-partition DiaObject, DiaSource,
and
256 DiaForcedSource tables. If `
None` then per-partition tables are
264 def _makeTableSchema(
266 table: Union[ApdbTables, ExtraTables],
268 part_range: Optional[Tuple[int, int]] =
None
271 _LOG.debug(
"Skipping schema for table %s", table)
273 _LOG.debug(
"Making table %s", table)
275 fullTable = table.table_name(self.
_prefix)
277 table_list = [fullTable]
278 if part_range
is not None:
280 partitions = range(*part_range)
281 table_list = [f
"{fullTable}_{part}" for part
in partitions]
285 f
'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name
in table_list
287 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
288 for future
in futures:
289 _LOG.debug(
"wait for query: %s", future.query)
291 _LOG.debug(
"query finished: %s", future.query)
294 for table_name
in table_list:
295 if_not_exists =
"" if drop
else "IF NOT EXISTS"
297 query = f
'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
298 _LOG.debug(
"query: %s", query)
299 queries.append(query)
300 futures = [self.
_session.execute_async(query, timeout=
None)
for query
in queries]
301 for future
in futures:
302 _LOG.debug(
"wait for query: %s", future.query)
304 _LOG.debug(
"query finished: %s", future.query)
306 def _tableColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
307 """Return set of columns in a table
311 table_name : `ApdbTables`
317 List of strings in the format
"column_name type".
319 if isinstance(table_name, ApdbTables):
325 part_columns = table_schema.annotations.get(
"cassandra:paritioning_columns", [])
326 clust_columns = [column.name
for column
in table_schema.primary_key]
327 _LOG.debug(
"part_columns: %s", part_columns)
328 _LOG.debug(
"clust_columns: %s", clust_columns)
330 raise ValueError(f
"Table {table_name} configuration is missing partition index")
334 for column
in table_schema.columns:
336 column_defs.append(f
'"{column.name}" {ctype}')
339 part_columns = [f
'"{col}"' for col
in part_columns]
340 clust_columns = [f
'"{col}"' for col
in clust_columns]
341 if len(part_columns) > 1:
342 columns =
", ".join(part_columns)
343 part_columns = [f
"({columns})"]
344 pkey =
", ".join(part_columns + clust_columns)
345 _LOG.debug(
"pkey: %s", pkey)
346 column_defs.append(f
"PRIMARY KEY ({pkey})")
std::vector< SchemaItem< Flag > > * items
list _time_partitioned_tables
List[str] clusteringColumns(self, Union[ApdbTables, ExtraTables] table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
List[str] partitionColumns(self, Union[ApdbTables, ExtraTables] table_name)
Mapping[ExtraTables, simple.Table] _extraTableSchema(self)
Mapping[str, simple.Column] getColumnMap(self, Union[ApdbTables, ExtraTables] table_name)
str tableName(self, Union[ApdbTables, ExtraTables] table_name)
def __init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False)
List[str] _tableColumns(self, Union[ApdbTables, ExtraTables] table_name)
None _makeTableSchema(self, Union[ApdbTables, ExtraTables] table, bool drop=False, Optional[Tuple[int, int]] part_range=None)
list _spatially_partitioned_tables
str table_name(self, str prefix="")