22"""Module responsible for APDB schema operations.
25from __future__
import annotations
27__all__ = [
"ApdbSqlSchema",
"ExtraTables"]
32from typing
import Any, Dict, List, Mapping, Optional, Type
36from felis
import simple
37from sqlalchemy
import (
49from sqlalchemy.dialects.postgresql
import UUID
51from .apdbSchema
import ApdbSchema, ApdbTables
53_LOG = logging.getLogger(__name__)
59class GUID(sqlalchemy.TypeDecorator):
60 """Platform-independent GUID type.
62 Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as
63 stringified hex values.
66 impl = sqlalchemy.CHAR
70 def load_dialect_impl(self, dialect: sqlalchemy.engine.Dialect) -> sqlalchemy.types.TypeEngine:
71 if dialect.name ==
"postgresql":
72 return dialect.type_descriptor(UUID())
74 return dialect.type_descriptor(sqlalchemy.CHAR(32))
82 if isinstance(value, int):
83 value = uuid.UUID(int=value)
84 elif isinstance(value, bytes):
85 value = uuid.UUID(bytes=value)
86 elif isinstance(value, str):
88 value = uuid.UUID(hex=value)
89 elif not isinstance(value, uuid.UUID):
90 raise TypeError(f
"Unexpected type of a bind value: {type(value)}")
92 if dialect.name ==
"postgresql":
95 return "%.32x" % value.int
98 self, value: str | uuid.UUID |
None, dialect: sqlalchemy.engine.Dialect
99 ) -> Optional[uuid.UUID]:
102 elif isinstance(value, uuid.UUID):
106 return uuid.UUID(hex=value)
111 """Names of the tables used for tracking insert IDs."""
113 DiaInsertId =
"DiaInsertId"
114 """Name of the table for insert ID records."""
116 DiaObjectInsertId =
"DiaObjectInsertId"
117 """Name of the table for DIAObject insert ID records."""
119 DiaSourceInsertId =
"DiaSourceInsertId"
120 """Name of the table for DIASource insert ID records."""
122 DiaForcedSourceInsertId =
"DiaFSourceInsertId"
123 """Name of the table for DIAForcedSource insert ID records."""
126 """Return full table name."""
127 return prefix + self.value
131 """Return mapping of tables used for insert ID tracking to their
132 corresponding regular tables."""
141 """Class for management of APDB schema.
145 objects : `sqlalchemy.Table`
146 DiaObject table instance
147 objects_last : `sqlalchemy.Table`
148 DiaObjectLast table instance, may be None
149 sources : `sqlalchemy.Table`
150 DiaSource table instance
151 forcedSources : `sqlalchemy.Table`
152 DiaForcedSource table instance
153 has_insert_id : `bool`
154 If true then schema has tables
for insert ID tracking.
158 engine : `sqlalchemy.engine.Engine`
159 SQLAlchemy engine instance
160 dia_object_index : `str`
161 Indexing mode
for DiaObject table, see `ApdbSqlConfig.dia_object_index`
163 htm_index_column : `str`
164 Name of a HTM index column
for DiaObject
and DiaSource tables.
166 Name of the YAML schema file.
167 schema_name : `str`, optional
168 Name of the schema
in YAML files.
169 prefix : `str`, optional
170 Prefix to add to all schema elements.
171 namespace : `str`, optional
172 Namespace (
or schema name) to use
for all APDB tables.
173 use_insert_id : `bool`, optional
177 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource)
178 """Tables that need pixelId column for spatial indexing."""
182 engine: sqlalchemy.engine.Engine,
183 dia_object_index: str,
184 htm_index_column: str,
186 schema_name: str =
"ApdbSchema",
188 namespace: Optional[str] =
None,
189 use_insert_id: bool =
False,
191 super().
__init__(schema_file, schema_name)
204 felis.types.Float: sqlalchemy.types.Float,
205 felis.types.Timestamp: sqlalchemy.types.TIMESTAMP,
206 felis.types.Long: sqlalchemy.types.BigInteger,
207 felis.types.Int: sqlalchemy.types.Integer,
208 felis.types.Short: sqlalchemy.types.Integer,
209 felis.types.Byte: sqlalchemy.types.Integer,
210 felis.types.Binary: sqlalchemy.types.LargeBinary,
211 felis.types.Text: sqlalchemy.types.CHAR,
212 felis.types.String: sqlalchemy.types.CHAR,
213 felis.types.Char: sqlalchemy.types.CHAR,
214 felis.types.Unicode: sqlalchemy.types.CHAR,
215 felis.types.Boolean: sqlalchemy.types.Boolean,
223 column = simple.Column(
224 id=f
"#{htm_index_column}",
225 name=htm_index_column,
226 datatype=felis.types.Long,
229 description=
"Pixelization index column.",
232 tableDef.columns.append(column)
236 tableDef.primary_key.insert(0, column)
238 if table
is ApdbTables.DiaObjectLast:
240 tableDef.primary_key.insert(0, column)
243 name = f
"IDX_{tableDef.name}_{htm_index_column}"
244 index = simple.Index(id=f
"#{name}", name=name, columns=[column])
245 tableDef.indexes.append(index)
254 """Create or re-create all tables.
258 drop : `bool`, optional
259 If True then drop tables before creating new ones.
265 quoted_schema = dialect.preparer(dialect).quote_schema(self.
_metadata.schema)
267 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={
"schema": quoted_schema}
268 ).execute_if(dialect=
"postgresql")
269 event.listen(self.
_metadata,
"before_create", create_schema)
273 _LOG.info(
"dropping all tables")
275 _LOG.info(
"creating all tables")
281 def get_table(self, table_enum: ApdbTables | ExtraTables) -> Table:
282 """Return SQLAlchemy table instance for a specified table type/enum.
286 table_enum : `ApdbTables` or `ExtraTables`
287 Type of table to
return.
291 table : `sqlalchemy.schema.Table`
297 Raised
if ``table_enum``
is not valid
for this database.
300 if isinstance(table_enum, ApdbTables):
305 raise ValueError(f
"Table type {table_enum} does not exist in the schema")
from None
308 """Return list of columns defined for a table in APDB schema.
310 Returned list excludes columns that are implementation-specific, e.g.
311 ``pixelId`` column is not include
in the returned list.
315 table_enum : `ApdbTables`
or `ExtraTables`
320 table : `list` [`sqlalchemy.schema.Column`]
326 Raised
if ``table_enum``
is not valid
for this database.
329 exclude_columns = set()
332 return [column
for column
in table.columns
if column.name
not in exclude_columns]
336 """Whether insert ID tables are to be used (`bool`)."""
342 """Check whether database has tables for tracking insert IDs."""
343 inspector = inspect(self.
_engine)
344 db_tables =
set(inspector.get_table_names(schema=self.
_metadata.schema))
345 return ExtraTables.DiaInsertId.table_name(self.
_prefix)
in db_tables
348 """Generate schema for regular tables.
352 mysql_engine : `str`, optional
353 MySQL engine type to use for new tables.
356 for table_enum
in ApdbTables:
357 if table_enum
is ApdbTables.DiaObjectLast
and self.
_dia_object_index !=
"last_object_table":
363 table_enum.table_name(self.
_prefix),
367 mysql_engine=mysql_engine,
369 tables[table_enum] = table
374 self, apdb_tables: Mapping[ApdbTables, Table], mysql_engine: str =
"InnoDB"
375 ) -> Mapping[ExtraTables, Table]:
376 """Generate schema for insert ID tables."""
377 tables: dict[ExtraTables, Table] = {}
383 Column(
"insert_id", GUID, primary_key=
True),
384 Column(
"insert_time", sqlalchemy.types.TIMESTAMP, nullable=
False),
386 parent_table = Table(
387 ExtraTables.DiaInsertId.table_name(self.
_prefix),
390 mysql_engine=mysql_engine,
392 tables[ExtraTables.DiaInsertId] = parent_table
394 for table_enum, apdb_enum
in ExtraTables.insert_id_tables().
items():
395 apdb_table = apdb_tables[apdb_enum]
399 table_enum.table_name(self.
_prefix),
403 mysql_engine=mysql_engine,
405 tables[table_enum] = table
410 """Return set of columns in a table
414 table_name : `ApdbTables`
420 List of `Column` objects.
427 column_defs: list[Column] = []
428 for column
in table_schema.columns:
429 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
430 if column.value
is not None:
431 kwargs.update(server_default=str(column.value))
432 if column
in table_schema.primary_key:
433 kwargs.update(autoincrement=
False)
435 column_defs.append(Column(column.name, ctype, **kwargs))
439 def _tableIndices(self, table_name: ApdbTables) -> List[sqlalchemy.schema.SchemaItem]:
440 """Return set of constraints/indices in a table
444 table_name : `ApdbTables`
447 Additional options passed to SQLAlchemy index constructor.
452 List of SQLAlchemy index/constraint objects.
458 index_defs: List[sqlalchemy.schema.SchemaItem] = []
459 if table_schema.primary_key:
460 index_defs.append(PrimaryKeyConstraint(*[column.name
for column
in table_schema.primary_key]))
461 for index
in table_schema.indexes:
462 name = self.
_prefix + index.name
if index.name
else ""
463 index_defs.append(Index(name, *[column.name
for column
in index.columns]))
464 for constraint
in table_schema.constraints:
465 constr_name: str |
None =
None
467 constr_name = self.
_prefix + constraint.name
468 if isinstance(constraint, simple.UniqueConstraint):
470 UniqueConstraint(*[column.name
for column
in constraint.columns], name=constr_name)
476 """Return list of columns for insert ID tables."""
477 column_defs: list[Column] = [Column(
"insert_id", GUID, nullable=
False)]
478 insert_id_tables = ExtraTables.insert_id_tables()
479 if table_enum
in insert_id_tables:
482 assert False,
"Above branches have to cover all enum values"
486 """Return a list of columns for table PK."""
488 column_defs: list[Column] = []
489 for column
in table_schema.primary_key:
491 column_defs.append(Column(column.name, ctype, nullable=
False, autoincrement=
False))
496 table_enum: ExtraTables,
497 apdb_table: sqlalchemy.schema.Table,
498 parent_table: sqlalchemy.schema.Table,
499 ) -> List[sqlalchemy.schema.SchemaItem]:
500 """Return set of constraints/indices for insert ID tables."""
501 index_defs: List[sqlalchemy.schema.SchemaItem] = []
504 insert_id_tables = ExtraTables.insert_id_tables()
505 if table_enum
in insert_id_tables:
507 pk_names = [column.name
for column
in self.
_tablePkColumns(insert_id_tables[table_enum])]
508 index_defs.append(PrimaryKeyConstraint(*pk_names))
510 name = self.
_prefix + table_enum.name +
"_idx"
511 index_defs.append(Index(name,
"insert_id"))
513 pk_columns = [apdb_table.columns[column]
for column
in pk_names]
515 ForeignKeyConstraint(pk_names, pk_columns, onupdate=
"CASCADE", ondelete=
"CASCADE")
519 ForeignKeyConstraint(
520 [
"insert_id"], [parent_table.columns[
"insert_id"]], onupdate=
"CASCADE", ondelete=
"CASCADE"
524 assert False,
"Above branches have to cover all enum values"
528 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type | sqlalchemy.types.TypeEngine:
529 """DOUBLE type is database-specific, select one based on dialect.
533 engine : `sqlalchemy.engine.Engine`
538 type_object : `object`
539 Database-specific type definition.
541 if engine.name ==
"mysql":
542 from sqlalchemy.dialects.mysql
import DOUBLE
544 return DOUBLE(asdecimal=
False)
545 elif engine.name ==
"postgresql":
546 from sqlalchemy.dialects.postgresql
import DOUBLE_PRECISION
548 return DOUBLE_PRECISION
549 elif engine.name ==
"oracle":
550 from sqlalchemy.dialects.oracle
import DOUBLE_PRECISION
552 return DOUBLE_PRECISION
553 elif engine.name ==
"sqlite":
555 from sqlalchemy.dialects.sqlite
import REAL
559 raise TypeError(
"cannot determine DOUBLE type, unexpected dialect: " + engine.name)
std::vector< SchemaItem< Flag > > * items
Type|sqlalchemy.types.TypeEngine _getDoubleType(cls, sqlalchemy.engine.Engine engine)
Table get_table(self, ApdbTables|ExtraTables table_enum)
Mapping[ApdbTables, Table] _make_apdb_tables(self, str mysql_engine="InnoDB")
Mapping[ExtraTables, Table] _make_extra_tables(self, Mapping[ApdbTables, Table] apdb_tables, str mysql_engine="InnoDB")
List[Column] _tableColumns(self, ApdbTables table_name)
__init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", Optional[str] namespace=None, bool use_insert_id=False)
List[sqlalchemy.schema.SchemaItem] _insertIdIndices(self, ExtraTables table_enum, sqlalchemy.schema.Table apdb_table, sqlalchemy.schema.Table parent_table)
list[Column] _tablePkColumns(self, ApdbTables table_enum)
list[Column] get_apdb_columns(self, ApdbTables|ExtraTables table_enum)
None makeSchema(self, bool drop=False)
bool _check_insert_id(self)
List[Column] _insertIdColumns(self, ExtraTables table_enum)
List[sqlalchemy.schema.SchemaItem] _tableIndices(self, ApdbTables table_name)
Optional[uuid.UUID] process_result_value(self, str|uuid.UUID|None value, sqlalchemy.engine.Dialect dialect)
sqlalchemy.types.TypeEngine load_dialect_impl(self, sqlalchemy.engine.Dialect dialect)
Optional[str] process_bind_param(self, Any value, sqlalchemy.engine.Dialect dialect)
daf::base::PropertySet * set