22 from __future__
import annotations
24 __all__ = [
"ApdbCassandraSchema"]
28 from typing
import List, Mapping, Optional, TYPE_CHECKING, Tuple
30 from .apdbSchema
import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
33 import cassandra.cluster
36 _LOG = logging.getLogger(__name__)
40 """Class for management of APDB schema.
44 session : `cassandra.cluster.Session`
45 Cassandra session object
47 Name of the YAML schema file.
48 extra_schema_file : `str`, optional
49 Name of the YAML schema file with extra column definitions.
50 prefix : `str`, optional
51 Prefix to add to all schema elements.
53 Type of packing to apply to columns, string "none" disable packing,
54 any other value enables it.
55 time_partition_tables : `bool`
56 If True then schema will have a separate table for each time partition.
59 _type_map = dict(DOUBLE=
"DOUBLE",
69 """Map YAML column types to Cassandra"""
71 def __init__(self, session: cassandra.cluster.Session, schema_file: str,
72 extra_schema_file: Optional[str] =
None, prefix: str =
"",
73 packing: str =
"none", time_partition_tables: bool =
False):
75 super().
__init__(schema_file, extra_schema_file)
85 if table
is ApdbTables.DiaObjectLast:
87 columns = [
"apdb_part"]
88 elif table
in (ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource):
90 if time_partition_tables:
91 columns = [
"apdb_part"]
93 columns = [
"apdb_part",
"apdb_time_part"]
105 ucd=
None)
for name
in columns]
106 tableDef.columns = columnDefs + tableDef.columns
109 index =
IndexDef(name=f
"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
110 tableDef.indices.append(index)
115 index_columns =
set(itertools.chain.from_iterable(
116 index.columns
for index
in tableDef.indices
118 columnsDefs = [column
for column
in tableDef.columns
if column.name
not in index_columns]
122 """Return Cassandra table name for APDB table.
124 return table_name.table_name(self.
_prefix_prefix)
126 def getColumnMap(self, table_name: ApdbTables) -> Mapping[str, ColumnDef]:
127 """Returns mapping of column names to Column definitions.
131 table_name : `ApdbTables`
132 One of known APDB table names.
137 Mapping of column names to `ColumnDef` instances.
140 cmap = {column.name: column
for column
in table.columns}
144 """Return a list of columns used for table partitioning.
148 table_name : `ApdbTables`
149 Table name in APDB schema
153 columns : `list` of `str`
154 Names of columns for used for partitioning.
156 table_schema = self.
tableSchemastableSchemas[table_name]
157 for index
in table_schema.indices:
158 if index.type
is IndexType.PARTITION:
164 """Return a list of columns used for clustering.
168 table_name : `ApdbTables`
169 Table name in APDB schema
173 columns : `list` of `str`
174 Names of columns for used for partitioning.
176 table_schema = self.
tableSchemastableSchemas[table_name]
177 for index
in table_schema.indices:
178 if index.type
is IndexType.PRIMARY:
182 def makeSchema(self, drop: bool =
False, part_range: Optional[Tuple[int, int]] =
None) ->
None:
183 """Create or re-create all tables.
188 If True then drop tables before creating new ones.
189 part_range : `tuple` [ `int` ] or `None`
190 Start and end partition number for time partitions, end is not
191 inclusive. Used to create per-partition DiaObject, DiaSource, and
192 DiaForcedSource tables. If `None` then per-partition tables are
198 _LOG.debug(
"Skipping schema for table %s", table)
200 _LOG.debug(
"Making table %s", table)
202 fullTable = table.table_name(self.
_prefix_prefix)
204 table_list = [fullTable]
205 if part_range
is not None:
206 if table
in (ApdbTables.DiaSource, ApdbTables.DiaForcedSource, ApdbTables.DiaObject):
207 partitions = range(*part_range)
208 table_list = [f
"{fullTable}_{part}" for part
in partitions]
211 queries = [f
'DROP TABLE IF EXISTS "{table_name}"' for table_name
in table_list]
212 futures = [self.
_session_session.execute_async(query, timeout=
None)
for query
in queries]
213 for future
in futures:
214 _LOG.debug(
"wait for query: %s", future.query)
216 _LOG.debug(
"query finished: %s", future.query)
219 for table_name
in table_list:
220 if_not_exists =
"" if drop
else "IF NOT EXISTS"
222 query = f
'CREATE TABLE {if_not_exists} "{table_name}" ({columns})'
223 _LOG.debug(
"query: %s", query)
224 queries.append(query)
225 futures = [self.
_session_session.execute_async(query, timeout=
None)
for query
in queries]
226 for future
in futures:
227 _LOG.debug(
"wait for query: %s", future.query)
229 _LOG.debug(
"query finished: %s", future.query)
231 def _tableColumns(self, table_name: ApdbTables) -> List[str]:
232 """Return set of columns in a table
236 table_name : `ApdbTables`
242 List of strings in the format "column_name type".
244 table_schema = self.
tableSchemastableSchemas[table_name]
249 index_columns =
set()
250 for index
in table_schema.indices:
251 if index.type
is IndexType.PARTITION:
252 part_columns = index.columns
253 elif index.type
is IndexType.PRIMARY:
254 clust_columns = index.columns
255 index_columns.update(index.columns)
256 _LOG.debug(
"part_columns: %s", part_columns)
257 _LOG.debug(
"clust_columns: %s", clust_columns)
259 raise ValueError(f
"Table {table_name} configuration is missing partition index")
260 if not clust_columns:
261 raise ValueError(f
"Table {table_name} configuration is missing primary index")
265 for column
in table_schema.columns:
266 if self.
_packing_packing !=
"none" and column.name
not in index_columns:
269 ctype = self.
_type_map_type_map[column.type]
270 column_defs.append(f
'"{column.name}" {ctype}')
274 column_defs.append(
'"apdb_packed" blob')
277 part_columns = [f
'"{col}"' for col
in part_columns]
278 clust_columns = [f
'"{col}"' for col
in clust_columns]
279 if len(part_columns) > 1:
280 columns =
", ".join(part_columns)
281 part_columns = [f
"({columns})"]
282 pkey =
", ".join(part_columns + clust_columns)
283 _LOG.debug(
"pkey: %s", pkey)
284 column_defs.append(f
"PRIMARY KEY ({pkey})")
289 """Return set of columns that are packed into BLOB.
293 table_name : `ApdbTables`
298 columns : `list` [ `ColumnDef` ]
299 List of column definitions. Empty list is returned if packing is
std::vector< SchemaItem< Flag > > * items
List[str] _tableColumns(self, ApdbTables table_name)
List[str] partitionColumns(self, ApdbTables table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
def __init__(self, cassandra.cluster.Session session, str schema_file, Optional[str] extra_schema_file=None, str prefix="", str packing="none", bool time_partition_tables=False)
List[str] clusteringColumns(self, ApdbTables table_name)
List[ColumnDef] packedColumns(self, ApdbTables table_name)
Mapping[str, ColumnDef] getColumnMap(self, ApdbTables table_name)
str tableName(self, ApdbTables table_name)
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.