22"""Module responsible for APDB schema operations.
25from __future__
import annotations
27__all__ = [
"ApdbSqlSchema"]
30from typing
import Any, Dict, List, Mapping, Type
33from sqlalchemy
import (Column, Index, MetaData, PrimaryKeyConstraint,
34 UniqueConstraint, Table)
36from .apdbSchema
import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
39_LOG = logging.getLogger(__name__)
43 """Class for management of APDB schema.
47 objects : `sqlalchemy.Table`
48 DiaObject table instance
49 objects_last : `sqlalchemy.Table`
50 DiaObjectLast table instance, may be None
51 sources : `sqlalchemy.Table`
52 DiaSource table instance
53 forcedSources : `sqlalchemy.Table`
54 DiaForcedSource table instance
58 engine : `sqlalchemy.engine.Engine`
59 SQLAlchemy engine instance
60 dia_object_index : `str`
61 Indexing mode
for DiaObject table, see `ApdbSqlConfig.dia_object_index`
63 htm_index_column : `str`
64 Name of a HTM index column
for DiaObject
and DiaSource tables.
66 Name of the YAML schema file.
67 schema_name : `str`, optional
68 Name of the schema
in YAML files.
69 prefix : `str`, optional
70 Prefix to add to all scheam elements.
72 def __init__(self, engine: sqlalchemy.engine.Engine, dia_object_index: str, htm_index_column: str,
73 schema_file: str, schema_name: str =
"ApdbSchema", prefix: str =
""):
75 super().
__init__(schema_file, schema_name)
85 float=sqlalchemy.types.Float,
86 timestamp=sqlalchemy.types.TIMESTAMP,
87 long=sqlalchemy.types.BigInteger,
88 int=sqlalchemy.types.Integer,
89 short=sqlalchemy.types.Integer,
90 byte=sqlalchemy.types.Integer,
91 binary=sqlalchemy.types.LargeBinary,
92 text=sqlalchemy.types.CHAR,
93 string=sqlalchemy.types.CHAR,
94 char=sqlalchemy.types.CHAR,
95 unicode=sqlalchemy.types.CHAR,
96 boolean=sqlalchemy.types.Boolean)
100 objects = self.
tableSchemastableSchemas[ApdbTables.DiaObject]
101 objects.primary_key.columns.insert(0, htm_index_column)
104 for table
in (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource):
108 column =
ColumnDef(name=htm_index_column,
115 tableDef.columns.append(column)
117 if table
is ApdbTables.DiaObjectLast:
119 tableDef.primary_key.columns.insert(0, htm_index_column)
122 index =
IndexDef(name=f
"IDX_{tableDef.name}_{htm_index_column}",
123 type=IndexType.INDEX, columns=[htm_index_column])
124 tableDef.indices.append(index)
135 def _makeTables(self, mysql_engine: str =
'InnoDB') -> Mapping[ApdbTables, Table]:
136 """Generate schema for all tables.
140 mysql_engine : `str`, optional
141 MySQL engine type to use for new tables.
144 info: Dict[str, Any] = {}
147 for table_enum
in ApdbTables:
149 if table_enum
is ApdbTables.DiaObjectLast
and self.
_dia_object_index_dia_object_index !=
"last_object_table":
153 constraints = self.
_tableIndices_tableIndices(table_enum, info)
154 table = Table(table_enum.table_name(self.
_prefix_prefix),
158 mysql_engine=mysql_engine,
160 tables[table_enum] = table
164 def makeSchema(self, drop: bool =
False, mysql_engine: str =
'InnoDB') ->
None:
165 """Create or re-create all tables.
169 drop : `bool`, optional
170 If True then drop tables before creating new ones.
171 mysql_engine : `str`, optional
172 MySQL engine type to use
for new tables.
176 _LOG.debug(
"clear metadata")
178 _LOG.debug(
"re-do schema mysql_engine=%r", mysql_engine)
179 self.
_makeTables_makeTables(mysql_engine=mysql_engine)
183 _LOG.info(
'dropping all tables')
185 _LOG.info(
'creating all tables')
188 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
189 """Return set of columns in a table
193 table_name : `ApdbTables`
199 List of `Column` objects.
204 table_schema = self.
tableSchemastableSchemas[table_name]
206 for index
in table_schema.indices:
207 if index.type
is IndexType.PRIMARY:
208 pkey_columns =
set(index.columns)
213 for column
in table_schema.columns:
214 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
215 if column.default
is not None:
216 kwargs.update(server_default=
str(column.default))
217 if column.name
in pkey_columns:
218 kwargs.update(autoincrement=
False)
219 ctype = self.
_type_map_type_map[column.type]
220 column_defs.append(Column(column.name, ctype, **kwargs))
224 def _tableIndices(self, table_name: ApdbTables, info: Dict) -> List[sqlalchemy.schema.Constraint]:
225 """Return set of constraints/indices in a table
229 table_name : `ApdbTables`
232 Additional options passed to SQLAlchemy index constructor.
237 List of SQLAlchemy index/constraint objects.
240 table_schema = self.tableSchemastableSchemas[table_name]
243 index_defs: List[sqlalchemy.schema.Constraint] = []
244 for index
in table_schema.indices:
245 if index.type
is IndexType.INDEX:
246 index_defs.append(Index(self.
_prefix_prefix + index.name, *index.columns, info=info))
250 kwargs[
'name'] = self.
_prefix_prefix + index.name
251 if index.type
is IndexType.PRIMARY:
252 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
253 elif index.type
is IndexType.UNIQUE:
254 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
259 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type:
260 """DOUBLE type is database-specific, select one based on dialect.
264 engine : `sqlalchemy.engine.Engine`
269 type_object : `object`
270 Database-specific type definition.
272 if engine.name ==
'mysql':
273 from sqlalchemy.dialects.mysql
import DOUBLE
274 return DOUBLE(asdecimal=
False)
275 elif engine.name ==
'postgresql':
276 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
277 return DOUBLE_PRECISION
278 elif engine.name ==
'oracle':
279 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
280 return DOUBLE_PRECISION
281 elif engine.name ==
'sqlite':
283 from sqlalchemy.dialects.sqlite
import REAL
286 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + engine.name)
Type _getDoubleType(cls, sqlalchemy.engine.Engine engine)
None makeSchema(self, bool drop=False, str mysql_engine='InnoDB')
List[Column] _tableColumns(self, ApdbTables table_name)
Mapping[ApdbTables, Table] _makeTables(self, str mysql_engine='InnoDB')
List[sqlalchemy.schema.Constraint] _tableIndices(self, ApdbTables table_name, Dict info)
def __init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="")
daf::base::PropertySet * set