22"""Module responsible for APDB schema operations.
25from __future__
import annotations
27__all__ = [
"ApdbSqlSchema"]
30from typing
import Any, Dict, List, Mapping, Optional, Type
34from felis
import simple
35from sqlalchemy
import DDL, Column, Index, MetaData, PrimaryKeyConstraint, Table, UniqueConstraint, event
37from .apdbSchema
import ApdbSchema, ApdbTables
39_LOG = logging.getLogger(__name__)
43 """Class for management of APDB schema.
47 objects : `sqlalchemy.Table`
48 DiaObject table instance
49 objects_last : `sqlalchemy.Table`
50 DiaObjectLast table instance, may be None
51 sources : `sqlalchemy.Table`
52 DiaSource table instance
53 forcedSources : `sqlalchemy.Table`
54 DiaForcedSource table instance
58 engine : `sqlalchemy.engine.Engine`
59 SQLAlchemy engine instance
60 dia_object_index : `str`
61 Indexing mode
for DiaObject table, see `ApdbSqlConfig.dia_object_index`
63 htm_index_column : `str`
64 Name of a HTM index column
for DiaObject
and DiaSource tables.
66 Name of the YAML schema file.
67 schema_name : `str`, optional
68 Name of the schema
in YAML files.
69 prefix : `str`, optional
70 Prefix to add to all scheam elements.
71 namespace : `str`, optional
72 Namespace (
or schema name) to use
for all APDB tables.
76 engine: sqlalchemy.engine.Engine,
77 dia_object_index: str,
78 htm_index_column: str,
80 schema_name: str =
"ApdbSchema",
82 namespace: Optional[str] =
None,
85 super().
__init__(schema_file, schema_name)
96 felis.types.Float: sqlalchemy.types.Float,
97 felis.types.Timestamp: sqlalchemy.types.TIMESTAMP,
98 felis.types.Long: sqlalchemy.types.BigInteger,
99 felis.types.Int: sqlalchemy.types.Integer,
100 felis.types.Short: sqlalchemy.types.Integer,
101 felis.types.Byte: sqlalchemy.types.Integer,
102 felis.types.Binary: sqlalchemy.types.LargeBinary,
103 felis.types.Text: sqlalchemy.types.CHAR,
104 felis.types.String: sqlalchemy.types.CHAR,
105 felis.types.Char: sqlalchemy.types.CHAR,
106 felis.types.Unicode: sqlalchemy.types.CHAR,
107 felis.types.Boolean: sqlalchemy.types.Boolean
111 for table
in (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource):
115 column = simple.Column(
116 id=f
"#{htm_index_column}",
117 name=htm_index_column,
118 datatype=felis.types.Long,
121 description=
"Pixelization index column.",
124 tableDef.columns.append(column)
128 tableDef.primary_key.insert(0, column)
130 if table
is ApdbTables.DiaObjectLast:
132 tableDef.primary_key.insert(0, column)
135 name = f
"IDX_{tableDef.name}_{htm_index_column}"
136 index = simple.Index(id=f
"#{name}", name=name, columns=[column])
137 tableDef.indexes.append(index)
148 def _makeTables(self, mysql_engine: str =
'InnoDB') -> Mapping[ApdbTables, Table]:
149 """Generate schema for all tables.
153 mysql_engine : `str`, optional
154 MySQL engine type to use for new tables.
157 info: Dict[str, Any] = {}
160 for table_enum
in ApdbTables:
162 if table_enum
is ApdbTables.DiaObjectLast
and self.
_dia_object_index !=
"last_object_table":
167 table = Table(table_enum.table_name(self.
_prefix),
171 mysql_engine=mysql_engine,
173 tables[table_enum] = table
177 def makeSchema(self, drop: bool =
False, mysql_engine: str =
'InnoDB') ->
None:
178 """Create or re-create all tables.
182 drop : `bool`, optional
183 If True then drop tables before creating new ones.
184 mysql_engine : `str`, optional
185 MySQL engine type to use
for new tables.
189 _LOG.debug(
"clear metadata")
191 _LOG.debug(
"re-do schema mysql_engine=%r", mysql_engine)
198 quoted_schema = dialect.preparer(dialect).quote_schema(self.
_metadata.schema)
200 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={
"schema": quoted_schema}
201 ).execute_if(dialect=
'postgresql')
202 event.listen(self.
_metadata,
"before_create", create_schema)
206 _LOG.info(
'dropping all tables')
208 _LOG.info(
'creating all tables')
211 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
212 """Return set of columns in a table
216 table_name : `ApdbTables`
222 List of `Column` objects.
231 for column
in table_schema.columns:
232 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
233 if column.value
is not None:
234 kwargs.update(server_default=
str(column.value))
235 if column
in table_schema.primary_key:
236 kwargs.update(autoincrement=
False)
238 column_defs.append(Column(column.name, ctype, **kwargs))
242 def _tableIndices(self, table_name: ApdbTables, info: Dict) -> List[sqlalchemy.schema.Constraint]:
243 """Return set of constraints/indices in a table
247 table_name : `ApdbTables`
250 Additional options passed to SQLAlchemy index constructor.
255 List of SQLAlchemy index/constraint objects.
261 index_defs: List[sqlalchemy.schema.Constraint] = []
262 if table_schema.primary_key:
263 index_defs.append(PrimaryKeyConstraint(*[column.name
for column
in table_schema.primary_key]))
264 for index
in table_schema.indexes:
265 name = self.
_prefix + index.name
if index.name
else ""
266 index_defs.append(Index(name, *[column.name
for column
in index.columns], info=info))
267 for constraint
in table_schema.constraints:
270 kwargs[
'name'] = self.
_prefix + constraint.name
271 if isinstance(constraint, simple.UniqueConstraint):
272 index_defs.append(UniqueConstraint(*[column.name
for column
in constraint.columns], **kwargs))
277 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type:
278 """DOUBLE type is database-specific, select one based on dialect.
282 engine : `sqlalchemy.engine.Engine`
287 type_object : `object`
288 Database-specific type definition.
290 if engine.name ==
'mysql':
291 from sqlalchemy.dialects.mysql
import DOUBLE
292 return DOUBLE(asdecimal=
False)
293 elif engine.name ==
'postgresql':
294 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
295 return DOUBLE_PRECISION
296 elif engine.name ==
'oracle':
297 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
298 return DOUBLE_PRECISION
299 elif engine.name ==
'sqlite':
301 from sqlalchemy.dialects.sqlite
import REAL
304 raise TypeError(
'cannot determine DOUBLE type, unexpected dialect: ' + engine.name)
Type _getDoubleType(cls, sqlalchemy.engine.Engine engine)
None makeSchema(self, bool drop=False, str mysql_engine='InnoDB')
List[Column] _tableColumns(self, ApdbTables table_name)
Mapping[ApdbTables, Table] _makeTables(self, str mysql_engine='InnoDB')
List[sqlalchemy.schema.Constraint] _tableIndices(self, ApdbTables table_name, Dict info)
def __init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", Optional[str] namespace=None)