LSST Applications 26.0.0,g0265f82a02+6660c170cc,g07994bdeae+30b05a742e,g0a0026dc87+17526d298f,g0a60f58ba1+17526d298f,g0e4bf8285c+96dd2c2ea9,g0ecae5effc+c266a536c8,g1e7d6db67d+6f7cb1f4bb,g26482f50c6+6346c0633c,g2bbee38e9b+6660c170cc,g2cc88a2952+0a4e78cd49,g3273194fdb+f6908454ef,g337abbeb29+6660c170cc,g337c41fc51+9a8f8f0815,g37c6e7c3d5+7bbafe9d37,g44018dc512+6660c170cc,g4a941329ef+4f7594a38e,g4c90b7bd52+5145c320d2,g58be5f913a+bea990ba40,g635b316a6c+8d6b3a3e56,g67924a670a+bfead8c487,g6ae5381d9b+81bc2a20b4,g93c4d6e787+26b17396bd,g98cecbdb62+ed2cb6d659,g98ffbb4407+81bc2a20b4,g9ddcbc5298+7f7571301f,ga1e77700b3+99e9273977,gae46bcf261+6660c170cc,gb2715bf1a1+17526d298f,gc86a011abf+17526d298f,gcf0d15dbbd+96dd2c2ea9,gdaeeff99f8+0d8dbea60f,gdb4ec4c597+6660c170cc,ge23793e450+96dd2c2ea9,gf041782ebf+171108ac67
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlSchema", "ExtraTables"]
28
29import enum
30import logging
31import uuid
32from typing import Any, Dict, List, Mapping, Optional, Type
33
34import felis.types
35import sqlalchemy
36from felis import simple
37from sqlalchemy import (
38 DDL,
39 Column,
40 ForeignKeyConstraint,
41 Index,
42 MetaData,
43 PrimaryKeyConstraint,
44 Table,
45 UniqueConstraint,
46 event,
47 inspect,
48)
49from sqlalchemy.dialects.postgresql import UUID
50
51from .apdbSchema import ApdbSchema, ApdbTables
52
53_LOG = logging.getLogger(__name__)
54
55
56#
57# Copied from daf_butler.
58#
59class GUID(sqlalchemy.TypeDecorator):
60 """Platform-independent GUID type.
61
62 Uses PostgreSQL's UUID type, otherwise uses CHAR(32), storing as
63 stringified hex values.
64 """
65
66 impl = sqlalchemy.CHAR
67
68 cache_ok = True
69
70 def load_dialect_impl(self, dialect: sqlalchemy.engine.Dialect) -> sqlalchemy.types.TypeEngine:
71 if dialect.name == "postgresql":
72 return dialect.type_descriptor(UUID())
73 else:
74 return dialect.type_descriptor(sqlalchemy.CHAR(32))
75
76 def process_bind_param(self, value: Any, dialect: sqlalchemy.engine.Dialect) -> Optional[str]:
77 if value is None:
78 return value
79
80 # Coerce input to UUID type, in general having UUID on input is the
81 # only thing that we want but there is code right now that uses ints.
82 if isinstance(value, int):
83 value = uuid.UUID(int=value)
84 elif isinstance(value, bytes):
85 value = uuid.UUID(bytes=value)
86 elif isinstance(value, str):
87 # hexstring
88 value = uuid.UUID(hex=value)
89 elif not isinstance(value, uuid.UUID):
90 raise TypeError(f"Unexpected type of a bind value: {type(value)}")
91
92 if dialect.name == "postgresql":
93 return str(value)
94 else:
95 return "%.32x" % value.int
96
98 self, value: str | uuid.UUID | None, dialect: sqlalchemy.engine.Dialect
99 ) -> Optional[uuid.UUID]:
100 if value is None:
101 return value
102 elif isinstance(value, uuid.UUID):
103 # sqlalchemy 2 converts to UUID internally
104 return value
105 else:
106 return uuid.UUID(hex=value)
107
108
109@enum.unique
110class ExtraTables(enum.Enum):
111 """Names of the tables used for tracking insert IDs."""
112
113 DiaInsertId = "DiaInsertId"
114 """Name of the table for insert ID records."""
115
116 DiaObjectInsertId = "DiaObjectInsertId"
117 """Name of the table for DIAObject insert ID records."""
118
119 DiaSourceInsertId = "DiaSourceInsertId"
120 """Name of the table for DIASource insert ID records."""
121
122 DiaForcedSourceInsertId = "DiaFSourceInsertId"
123 """Name of the table for DIAForcedSource insert ID records."""
124
125 def table_name(self, prefix: str = "") -> str:
126 """Return full table name."""
127 return prefix + self.value
128
129 @classmethod
130 def insert_id_tables(cls) -> Mapping[ExtraTables, ApdbTables]:
131 """Return mapping of tables used for insert ID tracking to their
132 corresponding regular tables."""
133 return {
134 cls.DiaObjectInsertId: ApdbTables.DiaObject,
135 cls.DiaSourceInsertId: ApdbTables.DiaSource,
136 cls.DiaForcedSourceInsertId: ApdbTables.DiaForcedSource,
137 }
138
139
141 """Class for management of APDB schema.
142
143 Attributes
144 ----------
145 objects : `sqlalchemy.Table`
146 DiaObject table instance
147 objects_last : `sqlalchemy.Table`
148 DiaObjectLast table instance, may be None
149 sources : `sqlalchemy.Table`
150 DiaSource table instance
151 forcedSources : `sqlalchemy.Table`
152 DiaForcedSource table instance
153 has_insert_id : `bool`
154 If true then schema has tables for insert ID tracking.
155
156 Parameters
157 ----------
158 engine : `sqlalchemy.engine.Engine`
159 SQLAlchemy engine instance
160 dia_object_index : `str`
161 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
162 for details.
163 htm_index_column : `str`
164 Name of a HTM index column for DiaObject and DiaSource tables.
165 schema_file : `str`
166 Name of the YAML schema file.
167 schema_name : `str`, optional
168 Name of the schema in YAML files.
169 prefix : `str`, optional
170 Prefix to add to all schema elements.
171 namespace : `str`, optional
172 Namespace (or schema name) to use for all APDB tables.
173 use_insert_id : `bool`, optional
174
175 """
176
177 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource)
178 """Tables that need pixelId column for spatial indexing."""
179
181 self,
182 engine: sqlalchemy.engine.Engine,
183 dia_object_index: str,
184 htm_index_column: str,
185 schema_file: str,
186 schema_name: str = "ApdbSchema",
187 prefix: str = "",
188 namespace: Optional[str] = None,
189 use_insert_id: bool = False,
190 ):
191 super().__init__(schema_file, schema_name)
192
193 self._engine = engine
194 self._dia_object_index = dia_object_index
195 self._htm_index_column = htm_index_column
196 self._prefix = prefix
197 self._use_insert_id = use_insert_id
198
199 self._metadata = MetaData(schema=namespace)
200
201 # map YAML column types to SQLAlchemy
202 self._type_map = {
203 felis.types.Double: self._getDoubleType(engine),
204 felis.types.Float: sqlalchemy.types.Float,
205 felis.types.Timestamp: sqlalchemy.types.TIMESTAMP,
206 felis.types.Long: sqlalchemy.types.BigInteger,
207 felis.types.Int: sqlalchemy.types.Integer,
208 felis.types.Short: sqlalchemy.types.Integer,
209 felis.types.Byte: sqlalchemy.types.Integer,
210 felis.types.Binary: sqlalchemy.types.LargeBinary,
211 felis.types.Text: sqlalchemy.types.CHAR,
212 felis.types.String: sqlalchemy.types.CHAR,
213 felis.types.Char: sqlalchemy.types.CHAR,
214 felis.types.Unicode: sqlalchemy.types.CHAR,
215 felis.types.Boolean: sqlalchemy.types.Boolean,
216 }
217
218 # Add pixelId column and index to tables that need it
219 for table in self.pixel_id_tables:
220 tableDef = self.tableSchemas.get(table)
221 if not tableDef:
222 continue
223 column = simple.Column(
224 id=f"#{htm_index_column}",
225 name=htm_index_column,
226 datatype=felis.types.Long,
227 nullable=False,
228 value=None,
229 description="Pixelization index column.",
230 table=tableDef,
231 )
232 tableDef.columns.append(column)
233
234 # Adjust index if needed
235 if table == ApdbTables.DiaObject and self._dia_object_index == "pix_id_iov":
236 tableDef.primary_key.insert(0, column)
237
238 if table is ApdbTables.DiaObjectLast:
239 # use it as a leading PK column
240 tableDef.primary_key.insert(0, column)
241 else:
242 # make a regular index
243 name = f"IDX_{tableDef.name}_{htm_index_column}"
244 index = simple.Index(id=f"#{name}", name=name, columns=[column])
245 tableDef.indexes.append(index)
246
247 # generate schema for all tables, must be called last
250
251 self._has_insert_id: bool | None = None
252
253 def makeSchema(self, drop: bool = False) -> None:
254 """Create or re-create all tables.
255
256 Parameters
257 ----------
258 drop : `bool`, optional
259 If True then drop tables before creating new ones.
260 """
261 # Create namespace if it does not exist yet, for now this only makes
262 # sense for postgres.
263 if self._metadata.schema:
264 dialect = self._engine.dialect
265 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema)
266 create_schema = DDL(
267 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema}
268 ).execute_if(dialect="postgresql")
269 event.listen(self._metadata, "before_create", create_schema)
270
271 # create all tables (optionally drop first)
272 if drop:
273 _LOG.info("dropping all tables")
274 self._metadata.drop_all(self._engine)
275 _LOG.info("creating all tables")
276 self._metadata.create_all(self._engine)
277
278 # Reset possibly cached value.
279 self._has_insert_id = None
280
281 def get_table(self, table_enum: ApdbTables | ExtraTables) -> Table:
282 """Return SQLAlchemy table instance for a specified table type/enum.
283
284 Parameters
285 ----------
286 table_enum : `ApdbTables` or `ExtraTables`
287 Type of table to return.
288
289 Returns
290 -------
291 table : `sqlalchemy.schema.Table`
292 Table instance.
293
294 Raises
295 ------
296 ValueError
297 Raised if ``table_enum`` is not valid for this database.
298 """
299 try:
300 if isinstance(table_enum, ApdbTables):
301 return self._apdb_tables[table_enum]
302 else:
303 return self._extra_tables[table_enum]
304 except LookupError:
305 raise ValueError(f"Table type {table_enum} does not exist in the schema") from None
306
307 def get_apdb_columns(self, table_enum: ApdbTables | ExtraTables) -> list[Column]:
308 """Return list of columns defined for a table in APDB schema.
309
310 Returned list excludes columns that are implementation-specific, e.g.
311 ``pixelId`` column is not include in the returned list.
312
313 Parameters
314 ----------
315 table_enum : `ApdbTables` or `ExtraTables`
316 Type of table.
317
318 Returns
319 -------
320 table : `list` [`sqlalchemy.schema.Column`]
321 Table instance.
322
323 Raises
324 ------
325 ValueError
326 Raised if ``table_enum`` is not valid for this database.
327 """
328 table = self.get_table(table_enum)
329 exclude_columns = set()
330 if table_enum in self.pixel_id_tables:
331 exclude_columns.add(self._htm_index_column)
332 return [column for column in table.columns if column.name not in exclude_columns]
333
334 @property
335 def has_insert_id(self) -> bool:
336 """Whether insert ID tables are to be used (`bool`)."""
337 if self._has_insert_id is None:
338 self._has_insert_id = self._use_insert_id and self._check_insert_id()
339 return self._has_insert_id
340
341 def _check_insert_id(self) -> bool:
342 """Check whether database has tables for tracking insert IDs."""
343 inspector = inspect(self._engine)
344 db_tables = set(inspector.get_table_names(schema=self._metadata.schema))
345 return ExtraTables.DiaInsertId.table_name(self._prefix) in db_tables
346
347 def _make_apdb_tables(self, mysql_engine: str = "InnoDB") -> Mapping[ApdbTables, Table]:
348 """Generate schema for regular tables.
349
350 Parameters
351 ----------
352 mysql_engine : `str`, optional
353 MySQL engine type to use for new tables.
354 """
355 tables = {}
356 for table_enum in ApdbTables:
357 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table":
358 continue
359
360 columns = self._tableColumns(table_enum)
361 constraints = self._tableIndices(table_enum)
362 table = Table(
363 table_enum.table_name(self._prefix),
364 self._metadata,
365 *columns,
366 *constraints,
367 mysql_engine=mysql_engine,
368 )
369 tables[table_enum] = table
370
371 return tables
372
374 self, apdb_tables: Mapping[ApdbTables, Table], mysql_engine: str = "InnoDB"
375 ) -> Mapping[ExtraTables, Table]:
376 """Generate schema for insert ID tables."""
377 tables: dict[ExtraTables, Table] = {}
378 if not self._use_insert_id:
379 return tables
380
381 # Parent table needs to be defined first
382 column_defs = [
383 Column("insert_id", GUID, primary_key=True),
384 Column("insert_time", sqlalchemy.types.TIMESTAMP, nullable=False),
385 ]
386 parent_table = Table(
387 ExtraTables.DiaInsertId.table_name(self._prefix),
388 self._metadata,
389 *column_defs,
390 mysql_engine=mysql_engine,
391 )
392 tables[ExtraTables.DiaInsertId] = parent_table
393
394 for table_enum, apdb_enum in ExtraTables.insert_id_tables().items():
395 apdb_table = apdb_tables[apdb_enum]
396 columns = self._insertIdColumns(table_enum)
397 constraints = self._insertIdIndices(table_enum, apdb_table, parent_table)
398 table = Table(
399 table_enum.table_name(self._prefix),
400 self._metadata,
401 *columns,
402 *constraints,
403 mysql_engine=mysql_engine,
404 )
405 tables[table_enum] = table
406
407 return tables
408
409 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
410 """Return set of columns in a table
411
412 Parameters
413 ----------
414 table_name : `ApdbTables`
415 Name of the table.
416
417 Returns
418 -------
419 column_defs : `list`
420 List of `Column` objects.
421 """
422 # get the list of columns in primary key, they are treated somewhat
423 # specially below
424 table_schema = self.tableSchemas[table_name]
425
426 # convert all column dicts into alchemy Columns
427 column_defs: list[Column] = []
428 for column in table_schema.columns:
429 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
430 if column.value is not None:
431 kwargs.update(server_default=str(column.value))
432 if column in table_schema.primary_key:
433 kwargs.update(autoincrement=False)
434 ctype = self._type_map[column.datatype]
435 column_defs.append(Column(column.name, ctype, **kwargs))
436
437 return column_defs
438
439 def _tableIndices(self, table_name: ApdbTables) -> List[sqlalchemy.schema.SchemaItem]:
440 """Return set of constraints/indices in a table
441
442 Parameters
443 ----------
444 table_name : `ApdbTables`
445 Name of the table.
446 info : `dict`
447 Additional options passed to SQLAlchemy index constructor.
448
449 Returns
450 -------
451 index_defs : `list`
452 List of SQLAlchemy index/constraint objects.
453 """
454
455 table_schema = self.tableSchemas[table_name]
456
457 # convert all index dicts into alchemy Columns
458 index_defs: List[sqlalchemy.schema.SchemaItem] = []
459 if table_schema.primary_key:
460 index_defs.append(PrimaryKeyConstraint(*[column.name for column in table_schema.primary_key]))
461 for index in table_schema.indexes:
462 name = self._prefix + index.name if index.name else ""
463 index_defs.append(Index(name, *[column.name for column in index.columns]))
464 for constraint in table_schema.constraints:
465 constr_name: str | None = None
466 if constraint.name:
467 constr_name = self._prefix + constraint.name
468 if isinstance(constraint, simple.UniqueConstraint):
469 index_defs.append(
470 UniqueConstraint(*[column.name for column in constraint.columns], name=constr_name)
471 )
472
473 return index_defs
474
475 def _insertIdColumns(self, table_enum: ExtraTables) -> List[Column]:
476 """Return list of columns for insert ID tables."""
477 column_defs: list[Column] = [Column("insert_id", GUID, nullable=False)]
478 insert_id_tables = ExtraTables.insert_id_tables()
479 if table_enum in insert_id_tables:
480 column_defs += self._tablePkColumns(insert_id_tables[table_enum])
481 else:
482 assert False, "Above branches have to cover all enum values"
483 return column_defs
484
485 def _tablePkColumns(self, table_enum: ApdbTables) -> list[Column]:
486 """Return a list of columns for table PK."""
487 table_schema = self.tableSchemas[table_enum]
488 column_defs: list[Column] = []
489 for column in table_schema.primary_key:
490 ctype = self._type_map[column.datatype]
491 column_defs.append(Column(column.name, ctype, nullable=False, autoincrement=False))
492 return column_defs
493
495 self,
496 table_enum: ExtraTables,
497 apdb_table: sqlalchemy.schema.Table,
498 parent_table: sqlalchemy.schema.Table,
499 ) -> List[sqlalchemy.schema.SchemaItem]:
500 """Return set of constraints/indices for insert ID tables."""
501 index_defs: List[sqlalchemy.schema.SchemaItem] = []
502
503 # Special case for insert ID tables that are not in felis schema.
504 insert_id_tables = ExtraTables.insert_id_tables()
505 if table_enum in insert_id_tables:
506 # PK is the same as for original table
507 pk_names = [column.name for column in self._tablePkColumns(insert_id_tables[table_enum])]
508 index_defs.append(PrimaryKeyConstraint(*pk_names))
509 # Non-unique index on insert_id column.
510 name = self._prefix + table_enum.name + "_idx"
511 index_defs.append(Index(name, "insert_id"))
512 # Foreign key to original table
513 pk_columns = [apdb_table.columns[column] for column in pk_names]
514 index_defs.append(
515 ForeignKeyConstraint(pk_names, pk_columns, onupdate="CASCADE", ondelete="CASCADE")
516 )
517 # Foreign key to parent table
518 index_defs.append(
519 ForeignKeyConstraint(
520 ["insert_id"], [parent_table.columns["insert_id"]], onupdate="CASCADE", ondelete="CASCADE"
521 )
522 )
523 else:
524 assert False, "Above branches have to cover all enum values"
525 return index_defs
526
527 @classmethod
528 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type | sqlalchemy.types.TypeEngine:
529 """DOUBLE type is database-specific, select one based on dialect.
530
531 Parameters
532 ----------
533 engine : `sqlalchemy.engine.Engine`
534 Database engine.
535
536 Returns
537 -------
538 type_object : `object`
539 Database-specific type definition.
540 """
541 if engine.name == "mysql":
542 from sqlalchemy.dialects.mysql import DOUBLE
543
544 return DOUBLE(asdecimal=False)
545 elif engine.name == "postgresql":
546 from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
547
548 return DOUBLE_PRECISION
549 elif engine.name == "oracle":
550 from sqlalchemy.dialects.oracle import DOUBLE_PRECISION
551
552 return DOUBLE_PRECISION
553 elif engine.name == "sqlite":
554 # all floats in sqlite are 8-byte
555 from sqlalchemy.dialects.sqlite import REAL
556
557 return REAL
558 else:
559 raise TypeError("cannot determine DOUBLE type, unexpected dialect: " + engine.name)
std::vector< SchemaItem< Flag > > * items
Type|sqlalchemy.types.TypeEngine _getDoubleType(cls, sqlalchemy.engine.Engine engine)
Table get_table(self, ApdbTables|ExtraTables table_enum)
Mapping[ApdbTables, Table] _make_apdb_tables(self, str mysql_engine="InnoDB")
Mapping[ExtraTables, Table] _make_extra_tables(self, Mapping[ApdbTables, Table] apdb_tables, str mysql_engine="InnoDB")
List[Column] _tableColumns(self, ApdbTables table_name)
__init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", Optional[str] namespace=None, bool use_insert_id=False)
List[sqlalchemy.schema.SchemaItem] _insertIdIndices(self, ExtraTables table_enum, sqlalchemy.schema.Table apdb_table, sqlalchemy.schema.Table parent_table)
list[Column] _tablePkColumns(self, ApdbTables table_enum)
list[Column] get_apdb_columns(self, ApdbTables|ExtraTables table_enum)
None makeSchema(self, bool drop=False)
List[Column] _insertIdColumns(self, ExtraTables table_enum)
List[sqlalchemy.schema.SchemaItem] _tableIndices(self, ApdbTables table_name)
Mapping[ExtraTables, ApdbTables] insert_id_tables(cls)
Optional[uuid.UUID] process_result_value(self, str|uuid.UUID|None value, sqlalchemy.engine.Dialect dialect)
sqlalchemy.types.TypeEngine load_dialect_impl(self, sqlalchemy.engine.Dialect dialect)
Optional[str] process_bind_param(self, Any value, sqlalchemy.engine.Dialect dialect)
daf::base::PropertySet * set
Definition fits.cc:927