LSST Applications g013ef56533+b8d55c8942,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+7a3b874d60,g210f2d0738+7416ca6900,g262e1987ae+1d557ba9a3,g29ae962dfc+519d34895e,g2cef7863aa+aef1011c0b,g30d7c61c20+36d16ea71a,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+cb326ad149,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+6240c63dbc,g64539dfbff+7416ca6900,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g74acd417e5+0bae3c876a,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+dee7680868,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+14b8164b5b,g98a1a72a9c+8389601a76,g98df359435+fff771c62d,gb8cb2b794d+6728931916,gbf99507273+8c5ae1fdc5,gc2a301910b+7416ca6900,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+0bae3c876a,ge410e46f29+f459a6810c,ge41e95a9f2+7416ca6900,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSqlSchema", "ExtraTables"]
27
28import enum
29import itertools
30import logging
31from collections.abc import Mapping
32
33import felis.datamodel
34import sqlalchemy
35
36from .. import schema_model
37from ..apdbSchema import ApdbSchema, ApdbTables
38from .modelToSql import ModelToSql
39
40_LOG = logging.getLogger(__name__)
41
42
43class InconsistentSchemaError(RuntimeError):
44 """Exception raised when schema state is inconsistent."""
45
46
47@enum.unique
48class ExtraTables(enum.Enum):
49 """Names of the tables used for tracking insert IDs."""
50
51 ApdbReplicaChunks = "ApdbReplicaChunks"
52 """Name of the table for replica chunks records."""
53
54 DiaObjectChunks = "DiaObjectChunks"
55 """Name of the table for DIAObject chunk data."""
56
57 DiaSourceChunks = "DiaSourceChunks"
58 """Name of the table for DIASource chunk data."""
59
60 DiaForcedSourceChunks = "DiaForcedSourceChunks"
61 """Name of the table for DIAForcedSource chunk data."""
62
63 ApdbUpdateRecordChunks = "ApdbUpdateRecordChunks"
64 """Name of the table for ApdbUpdateRecord chunk data."""
65
66 def table_name(self, prefix: str = "") -> str:
67 """Return full table name."""
68 return prefix + self.value
69
70 @classmethod
71 def replica_chunk_tables(cls) -> Mapping[ExtraTables, ApdbTables]:
72 """Return mapping of tables used for replica chunk storage to their
73 corresponding regular tables.
74 """
75 return {
76 cls.DiaObjectChunks: ApdbTables.DiaObject,
77 cls.DiaSourceChunks: ApdbTables.DiaSource,
78 cls.DiaForcedSourceChunks: ApdbTables.DiaForcedSource,
79 }
80
81
83 """Class for management of APDB schema.
84
85 Attributes
86 ----------
87 objects : `sqlalchemy.Table`
88 DiaObject table instance
89 objects_last : `sqlalchemy.Table`
90 DiaObjectLast table instance, may be None
91 sources : `sqlalchemy.Table`
92 DiaSource table instance
93 forcedSources : `sqlalchemy.Table`
94 DiaForcedSource table instance
95 replication_enabled : `bool`
96 If true then schema has tables for replication chunks.
97
98 Parameters
99 ----------
100 engine : `sqlalchemy.engine.Engine`
101 SQLAlchemy engine instance
102 dia_object_index : `str`
103 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
104 for details.
105 htm_index_column : `str`
106 Name of a HTM index column for DiaObject and DiaSource tables.
107 schema_file : `str`
108 Name of the YAML schema file.
109 schema_name : `str`, optional
110 Name of the schema in YAML files.
111 prefix : `str`, optional
112 Prefix to add to all schema elements.
113 namespace : `str`, optional
114 Namespace (or schema name) to use for all APDB tables.
115 enable_replica : `bool`, optional
116 If `True` then use additional tables for replica chunks.
117 """
118
119 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource)
120 """Tables that need pixelId column for spatial indexing."""
121
123 self,
124 engine: sqlalchemy.engine.Engine,
125 dia_object_index: str,
126 htm_index_column: str,
127 schema_file: str,
128 schema_name: str = "ApdbSchema",
129 prefix: str = "",
130 namespace: str | None = None,
131 enable_replica: bool = False,
132 ):
133 super().__init__(schema_file, schema_name)
134
135 self._engine = engine
136 self._dia_object_index = dia_object_index
137 self._htm_index_column = htm_index_column
138 self._prefix = prefix
139 self._enable_replica = enable_replica
140
141 self._metadata = sqlalchemy.schema.MetaData(schema=namespace)
142
143 # Add pixelId column and index to tables that need it
144 for table in self.pixel_id_tables:
145 tableDef = self.tableSchemas.get(table)
146 if not tableDef:
147 continue
148 column = schema_model.Column(
149 id=f"#{htm_index_column}",
150 name=htm_index_column,
151 datatype=felis.datamodel.DataType.long,
152 nullable=False,
153 value=None,
154 description="Pixelization index column.",
155 table=tableDef,
156 )
157 tableDef.columns.append(column)
158
159 # Adjust index if needed
160 if table == ApdbTables.DiaObject and self._dia_object_index == "pix_id_iov":
161 tableDef.primary_key.insert(0, column)
162
163 if table is ApdbTables.DiaObjectLast:
164 # use it as a leading PK column
165 tableDef.primary_key.insert(0, column)
166 else:
167 # make a regular index
168 name = f"IDX_{tableDef.name}_{htm_index_column}"
169 index = schema_model.Index(id=f"#{name}", name=name, columns=[column])
170 tableDef.indexes.append(index)
171
172 # generate schema for all tables, must be called last
173 apdb_tables = self._make_apdb_tables()
174 extra_tables = self._make_extra_tables(apdb_tables)
175
176 converter = ModelToSql(metadata=self._metadata, prefix=self._prefix)
177 id_to_table = converter.make_tables(itertools.chain(apdb_tables.values(), extra_tables.values()))
178
180 apdb_enum: id_to_table[table_model.id] for apdb_enum, table_model in apdb_tables.items()
181 }
183 extra_enum: id_to_table[table_model.id] for extra_enum, table_model in extra_tables.items()
184 }
185
186 self._has_replica_chunks: bool | None = None
187 self._metadata_check: bool | None = None
188
189 def empty(self) -> bool:
190 """Return True if database schema is empty.
191
192 Returns
193 -------
194 empty : `bool`
195 `True` if none of the required APDB tables exist in the database,
196 `False` if all required tables exist.
197
198 Raises
199 ------
200 InconsistentSchemaError
201 Raised when some of the required tables exist but not all.
202 """
203 inspector = sqlalchemy.inspect(self._engine)
204 table_names = set(inspector.get_table_names(self._metadata.schema))
205
206 existing_tables = []
207 missing_tables = []
208 for table_enum in self._apdb_tables:
209 table_name = table_enum.table_name(self._prefix)
210 if table_name in table_names:
211 existing_tables.append(table_name)
212 else:
213 missing_tables.append(table_name)
214
215 if not missing_tables:
216 return False
217 elif not existing_tables:
218 return True
219 else:
221 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
222 )
223
224 def makeSchema(self, drop: bool = False) -> None:
225 """Create or re-create all tables.
226
227 Parameters
228 ----------
229 drop : `bool`, optional
230 If True then drop tables before creating new ones.
231 """
232 # Create namespace if it does not exist yet, for now this only makes
233 # sense for postgres.
234 if self._metadata.schema:
235 dialect = self._engine.dialect
236 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema)
237 create_schema = sqlalchemy.schema.DDL(
238 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema}
239 ).execute_if(dialect="postgresql")
240 sqlalchemy.event.listen(self._metadata, "before_create", create_schema)
241
242 # create all tables (optionally drop first)
243 if drop:
244 _LOG.info("dropping all tables")
245 self._metadata.drop_all(self._engine)
246 _LOG.info("creating all tables")
247 self._metadata.create_all(self._engine)
248
249 # Reset possibly cached value.
250 self._has_replica_chunks = None
251 self._metadata_check = None
252
253 def get_table(self, table_enum: ApdbTables | ExtraTables) -> sqlalchemy.schema.Table:
254 """Return SQLAlchemy table instance for a specified table type/enum.
255
256 Parameters
257 ----------
258 table_enum : `ApdbTables` or `ExtraTables`
259 Type of table to return.
260
261 Returns
262 -------
263 table : `sqlalchemy.schema.Table`
264 Table instance.
265
266 Raises
267 ------
268 ValueError
269 Raised if ``table_enum`` is not valid for this database.
270 """
271 try:
272 if isinstance(table_enum, ApdbTables):
273 if table_enum is ApdbTables.metadata:
274 # There may be cases when schema is configured with the
275 # metadata table but database is still missing it. Check
276 # that table actually exists in the database. Note that
277 # this may interact with `makeSchema`.
278 if self._metadata_check is None:
279 inspector = sqlalchemy.inspect(self._engine)
280 table_name = table_enum.table_name(self._prefix)
281 self._metadata_check = inspector.has_table(table_name, schema=self._metadata.schema)
282 if not self._metadata_check:
283 # this will be caught below
284 raise LookupError("metadata table is missing")
285 return self._apdb_tables[table_enum]
286 else:
287 return self._extra_tables[table_enum]
288 except LookupError:
289 raise ValueError(f"Table type {table_enum} does not exist in the schema") from None
290
291 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool:
292 """Check for the existence of the column in a given table, checking is
293 done against database, not APDB schema.
294
295 Parameters
296 ----------
297 table_enum : `ApdbTables` or `ExtraTables`
298 Table to check for a column.
299 column : `str`
300 Name of the column to check.
301
302 Returns
303 -------
304 exists : `bool`
305 True if column exists, False otherwise.
306 """
307 inspector = sqlalchemy.inspect(self._engine)
308 table_name = table_enum.table_name(self._prefix)
309 columns = inspector.get_columns(table_name, schema=self._metadata.schema)
310 for col in columns:
311 if col["name"] == column:
312 return True
313 return False
314
315 def get_apdb_columns(self, table_enum: ApdbTables | ExtraTables) -> list[sqlalchemy.schema.Column]:
316 """Return list of columns defined for a table in APDB schema.
317
318 Returned list excludes columns that are implementation-specific, e.g.
319 ``pixelId`` column is not include in the returned list.
320
321 Parameters
322 ----------
323 table_enum : `ApdbTables` or `ExtraTables`
324 Type of table.
325
326 Returns
327 -------
328 table : `list` [`sqlalchemy.schema.Column`]
329 Table instance.
330
331 Raises
332 ------
333 ValueError
334 Raised if ``table_enum`` is not valid for this database.
335 """
336 table = self.get_table(table_enum)
337 exclude_columns = set()
338 if table_enum in self.pixel_id_tables:
339 exclude_columns.add(self._htm_index_column)
340 return [column for column in table.columns if column.name not in exclude_columns]
341
342 @property
343 def replication_enabled(self) -> bool:
344 """True if replication is enabled (`bool`)."""
345 return self._enable_replica
346
347 def _make_apdb_tables(self, mysql_engine: str = "InnoDB") -> Mapping[ApdbTables, schema_model.Table]:
348 """Generate schema for regular tables.
349
350 Parameters
351 ----------
352 mysql_engine : `str`, optional
353 MySQL engine type to use for new tables.
354 """
355 tables: dict[ApdbTables, schema_model.Table] = {}
356 for table_enum in ApdbTables:
357 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table":
358 continue
359 if table_enum is ApdbTables.metadata and table_enum not in self.tableSchemas:
360 # Schema does not define metadata.
361 continue
362 if table_enum is ApdbTables.SSSource:
363 # We do not support SSSource table yet.
364 continue
365 table = self.tableSchemas[table_enum]
366
367 if table_enum is ApdbTables.DiaObjectLast:
368 # In the past DiaObjectLast table did not have validityStart.
369 validity_start_column = "validityStartMjdTai"
370 try:
371 if not self.check_column(ApdbTables.DiaObjectLast, validity_start_column):
372 for column in table.columns:
373 if column.name == validity_start_column:
374 table.columns.remove(column)
375 break
376 except sqlalchemy.exc.NoSuchTableError:
377 # Table does not exist yet, will be created later.
378 pass
379
380 tables[table_enum] = table
381
382 return tables
383
385 self, apdb_tables: Mapping[ApdbTables, schema_model.Table]
386 ) -> Mapping[ExtraTables, schema_model.Table]:
387 """Generate schema for insert ID tables."""
388 if not self._enable_replica:
389 return {}
390
391 tables = {}
392 column_defs: list[schema_model.Column] = [
394 name="apdb_replica_chunk",
395 id="#ApdbReplicaChunks.apdb_replica_chunk",
396 datatype=felis.datamodel.DataType.long,
397 ),
399 name="last_update_time",
400 id="#ApdbReplicaChunks.last_update_time",
401 datatype=felis.datamodel.DataType.timestamp,
402 nullable=False,
403 ),
405 name="unique_id",
406 id="#ApdbReplicaChunks.unique_id",
407 datatype=schema_model.ExtraDataTypes.UUID,
408 nullable=False,
409 ),
410 ]
411 parent_table = schema_model.Table(
412 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
413 id="#ApdbReplicaChunks",
414 columns=column_defs,
415 primary_key=[column_defs[0]],
416 constraints=[],
417 indexes=[],
418 )
419 tables[ExtraTables.ApdbReplicaChunks] = parent_table
420
421 for table_enum, apdb_enum in ExtraTables.replica_chunk_tables().items():
422 apdb_table = apdb_tables[apdb_enum]
423 table_name = table_enum.table_name(self._prefix)
424
425 columns = self._replicaChunkColumns(table_enum, apdb_enum)
426 column_map = {column.name: column for column in columns}
427 # PK is the same as for original table
428 pk_columns = [column_map[column.name] for column in apdb_table.primary_key]
429
430 indices = self._replicaChunkIndices(table_enum, column_map)
431 constraints = self._replicaChunkConstraints(table_enum, apdb_table, parent_table, column_map)
432 table = schema_model.Table(
433 name=table_name,
434 id=f"#{table_name}",
435 columns=columns,
436 primary_key=pk_columns,
437 indexes=indices,
438 constraints=constraints,
439 )
440 tables[table_enum] = table
441
442 # ApdbUpdateRecordChunks table.
443 table_name = ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix)
444 columns = [
446 name="apdb_replica_chunk",
447 id=f"#{table_name}.apdb_replica_chunk",
448 datatype=felis.datamodel.DataType.long,
449 nullable=False,
450 ),
452 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
453 name="update_time_ns",
454 datatype=felis.datamodel.DataType.long,
455 nullable=False,
456 ),
458 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
459 name="update_order",
460 datatype=felis.datamodel.DataType.int,
461 nullable=False,
462 ),
464 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
465 name="update_unique_id",
466 datatype=schema_model.ExtraDataTypes.UUID,
467 nullable=False,
468 ),
470 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
471 name="update_payload",
472 datatype=felis.datamodel.DataType.string,
473 nullable=False,
474 ),
475 ]
476 tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table(
477 name=table_name,
478 id=f"#{table_name}",
479 columns=columns,
480 primary_key=columns[:4],
481 indexes=[],
482 constraints=[],
483 )
484
485 return tables
486
488 self, table_enum: ExtraTables, apdb_enum: ApdbTables
489 ) -> list[schema_model.Column]:
490 """Return list of columns for replica chunks tables."""
491 table_name = table_enum.table_name()
492 column_defs: list[schema_model.Column] = [
494 name="apdb_replica_chunk",
495 id=f"#{table_name}.apdb_replica_chunk",
496 datatype=felis.datamodel.DataType.long,
497 nullable=False,
498 )
499 ]
500 if table_enum in ExtraTables.replica_chunk_tables():
501 table_model = self.tableSchemas[apdb_enum]
502 column_defs += [column.clone() for column in table_model.primary_key]
503 else:
504 assert False, "Above branches have to cover all enum values"
505 return column_defs
506
508 self,
509 table_enum: ExtraTables,
510 column_map: Mapping[str, schema_model.Column],
511 ) -> list[schema_model.Index]:
512 """Return set of indices for replica chunk table."""
513 index_defs: list[schema_model.Index] = []
514 if table_enum in ExtraTables.replica_chunk_tables():
515 # Non-unique index on replica chunk column.
516 name = self._prefix + table_enum.name + "_apdb_replica_chunk_idx"
517 column = column_map["apdb_replica_chunk"]
518 index_defs.append(schema_model.Index(name=name, id=f"#{name}", columns=[column]))
519 return index_defs
520
522 self,
523 table_enum: ExtraTables,
524 apdb_table: schema_model.Table,
525 parent_table: schema_model.Table,
526 column_map: Mapping[str, schema_model.Column],
527 ) -> list[schema_model.Constraint]:
528 """Return set of constraints for replica chunk table."""
529 constraints: list[schema_model.Constraint] = []
530 replica_chunk_tables = ExtraTables.replica_chunk_tables()
531 if table_enum in replica_chunk_tables:
532 # Foreign key to original table
533 name = f"{table_enum.table_name()}_fk_{apdb_table.name}"
534 other_columns = apdb_table.primary_key
535 this_columns = [column_map[column.name] for column in apdb_table.primary_key]
536 constraints.append(
538 name=name,
539 id=f"#{name}",
540 columns=this_columns,
541 referenced_columns=other_columns,
542 onupdate="CASCADE",
543 ondelete="CASCADE",
544 )
545 )
546
547 # Foreign key to parent chunk ID table
548 name = f"{table_enum.table_name()}_fk_{parent_table.name}"
549 other_columns = parent_table.primary_key
550 this_columns = [column_map[column.name] for column in parent_table.primary_key]
551 constraints.append(
553 name=name,
554 id=f"#{name}",
555 columns=this_columns,
556 referenced_columns=other_columns,
557 onupdate="CASCADE",
558 ondelete="CASCADE",
559 )
560 )
561 return constraints
__init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", str|None namespace=None, bool enable_replica=False)
Mapping[ApdbTables, schema_model.Table] _make_apdb_tables(self, str mysql_engine="InnoDB")
Mapping[ExtraTables, schema_model.Table] _make_extra_tables(self, Mapping[ApdbTables, schema_model.Table] apdb_tables)
sqlalchemy.schema.Table get_table(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Constraint] _replicaChunkConstraints(self, ExtraTables table_enum, schema_model.Table apdb_table, schema_model.Table parent_table, Mapping[str, schema_model.Column] column_map)
bool check_column(self, ApdbTables|ExtraTables table_enum, str column)
list[schema_model.Index] _replicaChunkIndices(self, ExtraTables table_enum, Mapping[str, schema_model.Column] column_map)
list[sqlalchemy.schema.Column] get_apdb_columns(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Column] _replicaChunkColumns(self, ExtraTables table_enum, ApdbTables apdb_enum)
Mapping[ExtraTables, ApdbTables] replica_chunk_tables(cls)