22 from __future__ 
import annotations
 
   24 __all__ = [
"ApdbCassandraSchema"]
 
   28 from typing 
import List, Mapping, Optional, TYPE_CHECKING, Tuple
 
   30 from .apdbSchema 
import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
 
   33     import cassandra.cluster
 
   36 _LOG = logging.getLogger(__name__)
 
   40     """Class for management of APDB schema. 
   44     session : `cassandra.cluster.Session` 
   45         Cassandra session object 
   47         Name of the YAML schema file. 
   48     extra_schema_file : `str`, optional 
   49         Name of the YAML schema file with extra column definitions. 
   50     prefix : `str`, optional 
   51         Prefix to add to all schema elements. 
   53         Type of packing to apply to columns, string "none" disable packing, 
   54         any other value enables it. 
   55     time_partition_tables : `bool` 
   56         If True then schema will have a separate table for each time partition. 
   59     _type_map = dict(DOUBLE=
"DOUBLE",
 
   69     """Map YAML column types to Cassandra""" 
   71     def __init__(self, session: cassandra.cluster.Session, schema_file: str,
 
   72                  extra_schema_file: Optional[str] = 
None, prefix: str = 
"",
 
   73                  packing: str = 
"none", time_partition_tables: bool = 
False):
 
   75         super().
__init__(schema_file, extra_schema_file)
 
   85             if table 
is ApdbTables.DiaObjectLast:
 
   87                 columns = [
"apdb_part"]
 
   88             elif table 
in (ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource):
 
   90                 if time_partition_tables:
 
   91                     columns = [
"apdb_part"]
 
   93                     columns = [
"apdb_part", 
"apdb_time_part"]
 
  105                                     ucd=
None) 
for name 
in columns]
 
  106             tableDef.columns = columnDefs + tableDef.columns
 
  109             index = 
IndexDef(name=f
"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
 
  110             tableDef.indices.append(index)
 
  115                 index_columns = 
set(itertools.chain.from_iterable(
 
  116                     index.columns 
for index 
in tableDef.indices
 
  118                 columnsDefs = [column 
for column 
in tableDef.columns 
if column.name 
not in index_columns]
 
  122         """Return Cassandra table name for APDB table. 
  124         return table_name.table_name(self.
_prefix_prefix)
 
  126     def getColumnMap(self, table_name: ApdbTables) -> Mapping[str, ColumnDef]:
 
  127         """Returns mapping of column names to Column definitions. 
  131         table_name : `ApdbTables` 
  132             One of known APDB table names. 
  137             Mapping of column names to `ColumnDef` instances. 
  140         cmap = {column.name: column 
for column 
in table.columns}
 
  144         """Return a list of columns used for table partitioning. 
  148         table_name : `ApdbTables` 
  149             Table name in APDB schema 
  153         columns : `list` of `str` 
  154             Names of columns for used for partitioning. 
  156         table_schema = self.
tableSchemastableSchemas[table_name]
 
  157         for index 
in table_schema.indices:
 
  158             if index.type 
is IndexType.PARTITION:
 
  164         """Return a list of columns used for clustering. 
  168         table_name : `ApdbTables` 
  169             Table name in APDB schema 
  173         columns : `list` of `str` 
  174             Names of columns for used for partitioning. 
  176         table_schema = self.
tableSchemastableSchemas[table_name]
 
  177         for index 
in table_schema.indices:
 
  178             if index.type 
is IndexType.PRIMARY:
 
  182     def makeSchema(self, drop: bool = 
False, part_range: Optional[Tuple[int, int]] = 
None) -> 
None:
 
  183         """Create or re-create all tables. 
  188             If True then drop tables before creating new ones. 
  189         part_range : `tuple` [ `int` ] or `None` 
  190             Start and end partition number for time partitions, end is not 
  191             inclusive. Used to create per-partition DiaObject, DiaSource, and 
  192             DiaForcedSource tables. If `None` then per-partition tables are 
  198                 _LOG.debug(
"Skipping schema for table %s", table)
 
  200             _LOG.debug(
"Making table %s", table)
 
  202             fullTable = table.table_name(self.
_prefix_prefix)
 
  204             table_list = [fullTable]
 
  205             if part_range 
is not None:
 
  206                 if table 
in (ApdbTables.DiaSource, ApdbTables.DiaForcedSource, ApdbTables.DiaObject):
 
  207                     partitions = range(*part_range)
 
  208                     table_list = [f
"{fullTable}_{part}" for part 
in partitions]
 
  211                 queries = [f
'DROP TABLE IF EXISTS "{table_name}"' for table_name 
in table_list]
 
  212                 futures = [self.
_session_session.execute_async(query, timeout=
None) 
for query 
in queries]
 
  213                 for future 
in futures:
 
  214                     _LOG.debug(
"wait for query: %s", future.query)
 
  216                     _LOG.debug(
"query finished: %s", future.query)
 
  219             for table_name 
in table_list:
 
  220                 if_not_exists = 
"" if drop 
else "IF NOT EXISTS" 
  222                 query = f
'CREATE TABLE {if_not_exists} "{table_name}" ({columns})' 
  223                 _LOG.debug(
"query: %s", query)
 
  224                 queries.append(query)
 
  225             futures = [self.
_session_session.execute_async(query, timeout=
None) 
for query 
in queries]
 
  226             for future 
in futures:
 
  227                 _LOG.debug(
"wait for query: %s", future.query)
 
  229                 _LOG.debug(
"query finished: %s", future.query)
 
  231     def _tableColumns(self, table_name: ApdbTables) -> List[str]:
 
  232         """Return set of columns in a table 
  236         table_name : `ApdbTables` 
  242             List of strings in the format "column_name type". 
  244         table_schema = self.
tableSchemastableSchemas[table_name]
 
  249         index_columns = 
set()
 
  250         for index 
in table_schema.indices:
 
  251             if index.type 
is IndexType.PARTITION:
 
  252                 part_columns = index.columns
 
  253             elif index.type 
is IndexType.PRIMARY:
 
  254                 clust_columns = index.columns
 
  255             index_columns.update(index.columns)
 
  256         _LOG.debug(
"part_columns: %s", part_columns)
 
  257         _LOG.debug(
"clust_columns: %s", clust_columns)
 
  259             raise ValueError(f
"Table {table_name} configuration is missing partition index")
 
  260         if not clust_columns:
 
  261             raise ValueError(f
"Table {table_name} configuration is missing primary index")
 
  265         for column 
in table_schema.columns:
 
  266             if self.
_packing_packing != 
"none" and column.name 
not in index_columns:
 
  269             ctype = self.
_type_map_type_map[column.type]
 
  270             column_defs.append(f
'"{column.name}" {ctype}')
 
  274             column_defs.append(
'"apdb_packed" blob')
 
  277         part_columns = [f
'"{col}"' for col 
in part_columns]
 
  278         clust_columns = [f
'"{col}"' for col 
in clust_columns]
 
  279         if len(part_columns) > 1:
 
  280             columns = 
", ".join(part_columns)
 
  281             part_columns = [f
"({columns})"]
 
  282         pkey = 
", ".join(part_columns + clust_columns)
 
  283         _LOG.debug(
"pkey: %s", pkey)
 
  284         column_defs.append(f
"PRIMARY KEY ({pkey})")
 
  289         """Return set of columns that are packed into BLOB. 
  293         table_name : `ApdbTables` 
  298         columns : `list` [ `ColumnDef` ] 
  299             List of column definitions. Empty list is returned if packing is 
std::vector< SchemaItem< Flag > > * items
List[str] _tableColumns(self, ApdbTables table_name)
List[str] partitionColumns(self, ApdbTables table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
def __init__(self, cassandra.cluster.Session session, str schema_file, Optional[str] extra_schema_file=None, str prefix="", str packing="none", bool time_partition_tables=False)
List[str] clusteringColumns(self, ApdbTables table_name)
List[ColumnDef] packedColumns(self, ApdbTables table_name)
Mapping[str, ColumnDef] getColumnMap(self, ApdbTables table_name)
str tableName(self, ApdbTables table_name)
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.