22 """Module responsible for APDB schema operations.
25 from __future__
import annotations
27 __all__ = [
"ApdbSqlSchema"]
30 from typing
import Any, Dict, List, Mapping, Optional, Type
33 from sqlalchemy
import (Column, Index, MetaData, PrimaryKeyConstraint,
34 UniqueConstraint, Table)
36 from .apdbSchema
import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
39 _LOG = logging.getLogger(__name__)
43 """Class for management of APDB schema.
47 objects : `sqlalchemy.Table`
48 DiaObject table instance
49 objects_last : `sqlalchemy.Table`
50 DiaObjectLast table instance, may be None
51 sources : `sqlalchemy.Table`
52 DiaSource table instance
53 forcedSources : `sqlalchemy.Table`
54 DiaForcedSource table instance
58 engine : `sqlalchemy.engine.Engine`
59 SQLAlchemy engine instance
60 dia_object_index : `str`
61 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
63 htm_index_column : `str`
64 Name of a HTM index column for DiaObject and DiaSource tables.
66 Name of the YAML schema file.
67 extra_schema_file : `str`, optional
68 Name of the YAML schema file with extra column definitions.
69 prefix : `str`, optional
70 Prefix to add to all scheam elements.
72 def __init__(self, engine: sqlalchemy.engine.Engine, dia_object_index: str, htm_index_column: str,
73 schema_file: str, extra_schema_file: Optional[str] =
None, prefix: str =
""):
75 super().
__init__(schema_file, extra_schema_file)
85 FLOAT=sqlalchemy.types.Float,
86 DATETIME=sqlalchemy.types.TIMESTAMP,
87 BIGINT=sqlalchemy.types.BigInteger,
88 INTEGER=sqlalchemy.types.Integer,
89 INT=sqlalchemy.types.Integer,
90 TINYINT=sqlalchemy.types.Integer,
91 BLOB=sqlalchemy.types.LargeBinary,
92 CHAR=sqlalchemy.types.CHAR,
93 BOOL=sqlalchemy.types.Boolean)
97 objects = self.
tableSchemastableSchemas[ApdbTables.DiaObject]
98 objects.primary_key.columns.insert(0, htm_index_column)
101 for table
in (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource):
112 tableDef.columns.append(column)
114 if table
is ApdbTables.DiaObjectLast:
116 tableDef.primary_key.columns.insert(0,
"pixelId")
119 index =
IndexDef(name=f
"IDX_{tableDef.name}_pixelId",
120 type=IndexType.INDEX, columns=[
"pixelId"])
121 tableDef.indices.append(index)
131 def _makeTables(self, mysql_engine: str =
'InnoDB') -> Mapping[ApdbTables, Table]:
132 """Generate schema for all tables.
136 mysql_engine : `str`, optional
137 MySQL engine type to use for new tables.
140 info: Dict[str, Any] = {}
143 for table_enum
in ApdbTables:
145 if table_enum
is ApdbTables.DiaObjectLast
and self.
_dia_object_index_dia_object_index !=
"last_object_table":
149 constraints = self.
_tableIndices_tableIndices(table_enum, info)
150 table = Table(table_enum.table_name(self.
_prefix_prefix),
154 mysql_engine=mysql_engine,
156 tables[table_enum] = table
160 def makeSchema(self, drop: bool =
False, mysql_engine: str =
'InnoDB') ->
None:
161 """Create or re-create all tables.
165 drop : `bool`, optional
166 If True then drop tables before creating new ones.
167 mysql_engine : `str`, optional
168 MySQL engine type to use for new tables.
172 _LOG.debug(
"clear metadata")
174 _LOG.debug(
"re-do schema mysql_engine=%r", mysql_engine)
175 self.
_makeTables_makeTables(mysql_engine=mysql_engine)
179 _LOG.info(
'dropping all tables')
181 _LOG.info(
'creating all tables')
184 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
185 """Return set of columns in a table
189 table_name : `ApdbTables`
195 List of `Column` objects.
200 table_schema = self.
tableSchemastableSchemas[table_name]
202 for index
in table_schema.indices:
203 if index.type
is IndexType.PRIMARY:
204 pkey_columns =
set(index.columns)
209 for column
in table_schema.columns:
210 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
211 if column.default
is not None:
212 kwargs.update(server_default=str(column.default))
213 if column.name
in pkey_columns:
214 kwargs.update(autoincrement=
False)
215 ctype = self.
_type_map_type_map[column.type]
216 column_defs.append(Column(column.name, ctype, **kwargs))
220 def _tableIndices(self, table_name: ApdbTables, info: Dict) -> List[sqlalchemy.schema.Constraint]:
221 """Return set of constraints/indices in a table
225 table_name : `ApdbTables`
228 Additional options passed to SQLAlchemy index constructor.
233 List of SQLAlchemy index/constraint objects.
236 table_schema = self.
tableSchemastableSchemas[table_name]
239 index_defs: List[sqlalchemy.schema.Constraint] = []
240 for index
in table_schema.indices:
241 if index.type
is IndexType.INDEX:
242 index_defs.append(Index(self.
_prefix_prefix + index.name, *index.columns, info=info))
246 kwargs[
'name'] = self.
_prefix_prefix + index.name
247 if index.type
is IndexType.PRIMARY:
248 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
249 elif index.type
is IndexType.UNIQUE:
250 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
255 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type:
256 """DOUBLE type is database-specific, select one based on dialect.
260 engine : `sqlalchemy.engine.Engine`
265 type_object : `object`
266 Database-specific type definition.
268 if engine.name ==
'mysql':
269 from sqlalchemy.dialects.mysql
import DOUBLE
270 return DOUBLE(asdecimal=
False)
271 elif engine.name ==
'postgresql':
272 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
273 return DOUBLE_PRECISION
274 elif engine.name ==
'oracle':
275 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
276 return DOUBLE_PRECISION
277 elif engine.name ==
'sqlite':
279 from sqlalchemy.dialects.sqlite
import REAL
282 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + engine.name)
Type _getDoubleType(cls, sqlalchemy.engine.Engine engine)
None makeSchema(self, bool drop=False, str mysql_engine='InnoDB')
List[Column] _tableColumns(self, ApdbTables table_name)
Mapping[ApdbTables, Table] _makeTables(self, str mysql_engine='InnoDB')
List[sqlalchemy.schema.Constraint] _tableIndices(self, ApdbTables table_name, Dict info)
def __init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, Optional[str] extra_schema_file=None, str prefix="")
daf::base::PropertySet * set