22from __future__
import annotations
24__all__ = [
"ApdbCassandraSchema"]
28from typing
import List, Mapping, Optional, TYPE_CHECKING, Tuple, Union
30from .apdbSchema
import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType, TableDef
33 import cassandra.cluster
36_LOG = logging.getLogger(__name__)
41 """Names of the extra tables used by Cassandra implementation."""
43 DiaSourceToPartition =
"DiaSourceToPartition"
44 "Maps diaSourceId ro its partition values (pixel and time)."
47 """Return full table name."""
48 return prefix + self.value
52 """Class for management of APDB schema.
56 session : `cassandra.cluster.Session`
57 Cassandra session object
59 Name of the YAML schema file.
60 schema_name : `str`, optional
61 Name of the schema in YAML files.
62 prefix : `str`, optional
63 Prefix to add to all schema elements.
64 time_partition_tables : `bool`
65 If
True then schema will have a separate table
for each time partition.
68 _type_map = dict(double="DOUBLE",
70 timestamp=
"TIMESTAMP",
81 """Map YAML column types to Cassandra"""
83 _time_partitioned_tables = [
86 ApdbTables.DiaForcedSource,
88 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
92 session: cassandra.cluster.Session,
95 schema_name: str =
"ApdbSchema",
97 time_partition_tables: bool =
False
100 super().
__init__(schema_file, schema_name)
114 columns = [
"apdb_part"]
116 if time_partition_tables:
117 columns = [
"apdb_part"]
119 columns = [
"apdb_part",
"apdb_time_part"]
120 elif table
is ApdbTables.SSObject:
125 columns = [
"ssObjectId"]
127 index
for index
in tableDef.indices
if index.type
is not IndexType.PRIMARY
138 ColumnDef(name=name, type=
"long", nullable=
False)
for name
in columns
140 tableDef.columns = columnDefs + tableDef.columns
144 index =
IndexDef(name=f
"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
145 tableDef.indices.append(index)
149 def _extraTableSchema(self) -> Mapping[ExtraTables, TableDef]:
150 """Generate schema for extra tables."""
152 ExtraTables.DiaSourceToPartition:
TableDef(
153 name=ExtraTables.DiaSourceToPartition.value,
155 ColumnDef(name=
"diaSourceId", type=
"long", nullable=
False),
156 ColumnDef(name=
"apdb_part", type=
"long", nullable=
False),
157 ColumnDef(name=
"apdb_time_part", type=
"int", nullable=
False),
161 name=f
"Part_{ExtraTables.DiaSourceToPartition.value}",
162 type=IndexType.PARTITION,
163 columns=[
"diaSourceId"],
169 def tableName(self, table_name: Union[ApdbTables, ExtraTables]) -> str:
170 """Return Cassandra table name for APDB table.
172 return table_name.table_name(self.
_prefix_prefix)
174 def getColumnMap(self, table_name: Union[ApdbTables, ExtraTables]) -> Mapping[str, ColumnDef]:
175 """Returns mapping of column names to Column definitions.
179 table_name : `ApdbTables`
180 One of known APDB table names.
185 Mapping of column names to `ColumnDef` instances.
187 if isinstance(table_name, ApdbTables):
188 table_schema = self.
tableSchemastableSchemas[table_name]
191 cmap = {column.name: column
for column
in table_schema.columns}
195 """Return a list of columns used for table partitioning.
199 table_name : `ApdbTables`
200 Table name in APDB schema
204 columns : `list` of `str`
205 Names of columns
for used
for partitioning.
207 if isinstance(table_name, ApdbTables):
208 table_schema = self.
tableSchemastableSchemas[table_name]
211 for index
in table_schema.indices:
212 if index.type
is IndexType.PARTITION:
218 """Return a list of columns used for clustering.
222 table_name : `ApdbTables`
223 Table name in APDB schema
227 columns : `list` of `str`
228 Names of columns
for used
for clustering.
230 if isinstance(table_name, ApdbTables):
231 table_schema = self.
tableSchemastableSchemas[table_name]
234 for index
in table_schema.indices:
235 if index.type
is IndexType.PRIMARY:
239 def makeSchema(self, drop: bool =
False, part_range: Optional[Tuple[int, int]] =
None) ->
None:
240 """Create or re-create all tables.
245 If True then drop tables before creating new ones.
246 part_range : `tuple` [ `int` ]
or `
None`
247 Start
and end partition number
for time partitions, end
is not
248 inclusive. Used to create per-partition DiaObject, DiaSource,
and
249 DiaForcedSource tables. If `
None` then per-partition tables are
257 def _makeTableSchema(
259 table: Union[ApdbTables, ExtraTables],
261 part_range: Optional[Tuple[int, int]] =
None
264 _LOG.debug(
"Skipping schema for table %s", table)
266 _LOG.debug(
"Making table %s", table)
268 fullTable = table.table_name(self.
_prefix_prefix)
270 table_list = [fullTable]
271 if part_range
is not None:
273 partitions = range(*part_range)
274 table_list = [f
"{fullTable}_{part}" for part
in partitions]
278 f
'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name
in table_list
280 futures = [self.
_session_session.execute_async(query, timeout=
None)
for query
in queries]
281 for future
in futures:
282 _LOG.debug(
"wait for query: %s", future.query)
284 _LOG.debug(
"query finished: %s", future.query)
287 for table_name
in table_list:
288 if_not_exists =
"" if drop
else "IF NOT EXISTS"
290 query = f
'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
291 _LOG.debug(
"query: %s", query)
292 queries.append(query)
293 futures = [self.
_session_session.execute_async(query, timeout=
None)
for query
in queries]
294 for future
in futures:
295 _LOG.debug(
"wait for query: %s", future.query)
297 _LOG.debug(
"query finished: %s", future.query)
299 def _tableColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
300 """Return set of columns in a table
304 table_name : `ApdbTables`
310 List of strings in the format
"column_name type".
312 if isinstance(table_name, ApdbTables):
313 table_schema = self.
tableSchemastableSchemas[table_name]
320 index_columns =
set()
321 for index
in table_schema.indices:
322 if index.type
is IndexType.PARTITION:
323 part_columns = index.columns
324 elif index.type
is IndexType.PRIMARY:
325 clust_columns = index.columns
326 index_columns.update(index.columns)
327 _LOG.debug(
"part_columns: %s", part_columns)
328 _LOG.debug(
"clust_columns: %s", clust_columns)
330 raise ValueError(f
"Table {table_name} configuration is missing partition index")
334 for column
in table_schema.columns:
335 ctype = self.
_type_map_type_map[column.type]
336 column_defs.append(f
'"{column.name}" {ctype}')
339 part_columns = [f
'"{col}"' for col
in part_columns]
340 clust_columns = [f
'"{col}"' for col
in clust_columns]
341 if len(part_columns) > 1:
342 columns =
", ".join(part_columns)
343 part_columns = [f
"({columns})"]
344 pkey =
", ".join(part_columns + clust_columns)
345 _LOG.debug(
"pkey: %s", pkey)
346 column_defs.append(f
"PRIMARY KEY ({pkey})")
std::vector< SchemaItem< Flag > > * items
list _time_partitioned_tables
List[str] clusteringColumns(self, Union[ApdbTables, ExtraTables] table_name)
Mapping[str, ColumnDef] getColumnMap(self, Union[ApdbTables, ExtraTables] table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
Mapping[ExtraTables, TableDef] _extraTableSchema(self)
List[str] partitionColumns(self, Union[ApdbTables, ExtraTables] table_name)
str tableName(self, Union[ApdbTables, ExtraTables] table_name)
def __init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False)
List[str] _tableColumns(self, Union[ApdbTables, ExtraTables] table_name)
None _makeTableSchema(self, Union[ApdbTables, ExtraTables] table, bool drop=False, Optional[Tuple[int, int]] part_range=None)
list _spatially_partitioned_tables
str table_name(self, str prefix="")
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.