Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+cc8870d3f5,g07dc498a13+5aa0b8792f,g0fba68d861+80045be308,g1409bbee79+5aa0b8792f,g1a7e361dbc+5aa0b8792f,g1fd858c14a+f64bc332a9,g208c678f98+1ae86710ed,g35bb328faa+fcb1d3bbc8,g4d2262a081+47ad8a29a8,g4d39ba7253+9633a327c1,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g60b5630c4e+9633a327c1,g668ecb457e+25d63fd678,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+8b64ca622a,g89139ef638+5aa0b8792f,g89e1512fd8+04325574d3,g8d6b6b353c+9633a327c1,g9125e01d80+fcb1d3bbc8,g989de1cb63+5aa0b8792f,g9f33ca652e+b196626af7,ga9baa6287d+9633a327c1,gaaedd4e678+5aa0b8792f,gabe3b4be73+1e0a283bba,gb1101e3267+71e32094df,gb58c049af0+f03b321e39,gb90eeb9370+2807b1ad02,gcf25f946ba+8b64ca622a,gd315a588df+a39986a76f,gd6cbbdb0b4+c8606af20c,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+4e42d81ab7,ge278dab8ac+932305ba37,ge82c20c137+76d20ab76d,gfe73954cf8+a1301e4c20,w.2025.11
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSqlSchema", "ExtraTables"]
27
28import enum
29import itertools
30import logging
31from collections.abc import Mapping
32
33import felis.datamodel
34import sqlalchemy
35
36from .. import schema_model
37from ..apdbSchema import ApdbSchema, ApdbTables
38from .modelToSql import ModelToSql
39
40_LOG = logging.getLogger(__name__)
41
42
43class InconsistentSchemaError(RuntimeError):
44 """Exception raised when schema state is inconsistent."""
45
46
47@enum.unique
48class ExtraTables(enum.Enum):
49 """Names of the tables used for tracking insert IDs."""
50
51 ApdbReplicaChunks = "ApdbReplicaChunks"
52 """Name of the table for replica chunks records."""
53
54 DiaObjectChunks = "DiaObjectChunks"
55 """Name of the table for DIAObject chunk data."""
56
57 DiaSourceChunks = "DiaSourceChunks"
58 """Name of the table for DIASource chunk data."""
59
60 DiaForcedSourceChunks = "DiaForcedSourceChunks"
61 """Name of the table for DIAForcedSource chunk data."""
62
63 def table_name(self, prefix: str = "") -> str:
64 """Return full table name."""
65 return prefix + self.value
66
67 @classmethod
68 def replica_chunk_tables(cls) -> Mapping[ExtraTables, ApdbTables]:
69 """Return mapping of tables used for replica chunk storage to their
70 corresponding regular tables.
71 """
72 return {
73 cls.DiaObjectChunks: ApdbTables.DiaObject,
74 cls.DiaSourceChunks: ApdbTables.DiaSource,
75 cls.DiaForcedSourceChunks: ApdbTables.DiaForcedSource,
76 }
77
78
80 """Class for management of APDB schema.
81
82 Attributes
83 ----------
84 objects : `sqlalchemy.Table`
85 DiaObject table instance
86 objects_last : `sqlalchemy.Table`
87 DiaObjectLast table instance, may be None
88 sources : `sqlalchemy.Table`
89 DiaSource table instance
90 forcedSources : `sqlalchemy.Table`
91 DiaForcedSource table instance
92 has_replica_chunks : `bool`
93 If true then schema has tables for replication chunks.
94
95 Parameters
96 ----------
97 engine : `sqlalchemy.engine.Engine`
98 SQLAlchemy engine instance
99 dia_object_index : `str`
100 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
101 for details.
102 htm_index_column : `str`
103 Name of a HTM index column for DiaObject and DiaSource tables.
104 schema_file : `str`
105 Name of the YAML schema file.
106 schema_name : `str`, optional
107 Name of the schema in YAML files.
108 prefix : `str`, optional
109 Prefix to add to all schema elements.
110 namespace : `str`, optional
111 Namespace (or schema name) to use for all APDB tables.
112 enable_replica : `bool`, optional
113 If `True` then use additional tables for replica chunks.
114 """
115
116 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource)
117 """Tables that need pixelId column for spatial indexing."""
118
120 self,
121 engine: sqlalchemy.engine.Engine,
122 dia_object_index: str,
123 htm_index_column: str,
124 schema_file: str,
125 schema_name: str = "ApdbSchema",
126 prefix: str = "",
127 namespace: str | None = None,
128 enable_replica: bool = False,
129 ):
130 super().__init__(schema_file, schema_name)
131
132 self._engine = engine
133 self._dia_object_index = dia_object_index
134 self._htm_index_column = htm_index_column
135 self._prefix = prefix
136 self._enable_replica = enable_replica
137
138 self._metadata = sqlalchemy.schema.MetaData(schema=namespace)
139
140 # Add pixelId column and index to tables that need it
141 for table in self.pixel_id_tables:
142 tableDef = self.tableSchemas.get(table)
143 if not tableDef:
144 continue
145 column = schema_model.Column(
146 id=f"#{htm_index_column}",
147 name=htm_index_column,
148 datatype=felis.datamodel.DataType.long,
149 nullable=False,
150 value=None,
151 description="Pixelization index column.",
152 table=tableDef,
153 )
154 tableDef.columns.append(column)
155
156 # Adjust index if needed
157 if table == ApdbTables.DiaObject and self._dia_object_index == "pix_id_iov":
158 tableDef.primary_key.insert(0, column)
159
160 if table is ApdbTables.DiaObjectLast:
161 # use it as a leading PK column
162 tableDef.primary_key.insert(0, column)
163 else:
164 # make a regular index
165 name = f"IDX_{tableDef.name}_{htm_index_column}"
166 index = schema_model.Index(id=f"#{name}", name=name, columns=[column])
167 tableDef.indexes.append(index)
168
169 # generate schema for all tables, must be called last
170 apdb_tables = self._make_apdb_tables()
171 extra_tables = self._make_extra_tables(apdb_tables)
172
173 converter = ModelToSql(metadata=self._metadata, prefix=self._prefix)
174 id_to_table = converter.make_tables(itertools.chain(apdb_tables.values(), extra_tables.values()))
175
177 apdb_enum: id_to_table[table_model.id] for apdb_enum, table_model in apdb_tables.items()
178 }
180 extra_enum: id_to_table[table_model.id] for extra_enum, table_model in extra_tables.items()
181 }
182
183 self._has_replica_chunks: bool | None = None
184 self._metadata_check: bool | None = None
185
186 def empty(self) -> bool:
187 """Return True if database schema is empty.
188
189 Returns
190 -------
191 empty : `bool`
192 `True` if none of the required APDB tables exist in the database,
193 `False` if all required tables exist.
194
195 Raises
196 ------
197 InconsistentSchemaError
198 Raised when some of the required tables exist but not all.
199 """
200 inspector = sqlalchemy.inspect(self._engine)
201 table_names = set(inspector.get_table_names(self._metadata.schema))
202
203 existing_tables = []
204 missing_tables = []
205 for table_enum in self._apdb_tables:
206 table_name = table_enum.table_name(self._prefix)
207 if table_name in table_names:
208 existing_tables.append(table_name)
209 else:
210 missing_tables.append(table_name)
211
212 if not missing_tables:
213 return False
214 elif not existing_tables:
215 return True
216 else:
218 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
219 )
220
221 def makeSchema(self, drop: bool = False) -> None:
222 """Create or re-create all tables.
223
224 Parameters
225 ----------
226 drop : `bool`, optional
227 If True then drop tables before creating new ones.
228 """
229 # Create namespace if it does not exist yet, for now this only makes
230 # sense for postgres.
231 if self._metadata.schema:
232 dialect = self._engine.dialect
233 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema)
234 create_schema = sqlalchemy.schema.DDL(
235 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema}
236 ).execute_if(dialect="postgresql")
237 sqlalchemy.event.listen(self._metadata, "before_create", create_schema)
238
239 # create all tables (optionally drop first)
240 if drop:
241 _LOG.info("dropping all tables")
242 self._metadata.drop_all(self._engine)
243 _LOG.info("creating all tables")
244 self._metadata.create_all(self._engine)
245
246 # Reset possibly cached value.
247 self._has_replica_chunks = None
248 self._metadata_check = None
249
250 def get_table(self, table_enum: ApdbTables | ExtraTables) -> sqlalchemy.schema.Table:
251 """Return SQLAlchemy table instance for a specified table type/enum.
252
253 Parameters
254 ----------
255 table_enum : `ApdbTables` or `ExtraTables`
256 Type of table to return.
257
258 Returns
259 -------
260 table : `sqlalchemy.schema.Table`
261 Table instance.
262
263 Raises
264 ------
265 ValueError
266 Raised if ``table_enum`` is not valid for this database.
267 """
268 try:
269 if isinstance(table_enum, ApdbTables):
270 if table_enum is ApdbTables.metadata:
271 # There may be cases when schema is configured with the
272 # metadata table but database is still missing it. Check
273 # that table actually exists in the database. Note that
274 # this may interact with `makeSchema`.
275 if self._metadata_check is None:
276 inspector = sqlalchemy.inspect(self._engine)
277 table_name = table_enum.table_name(self._prefix)
278 self._metadata_check = inspector.has_table(table_name, schema=self._metadata.schema)
279 if not self._metadata_check:
280 # this will be caught below
281 raise LookupError("metadata table is missing")
282 return self._apdb_tables[table_enum]
283 else:
284 return self._extra_tables[table_enum]
285 except LookupError:
286 raise ValueError(f"Table type {table_enum} does not exist in the schema") from None
287
288 def get_apdb_columns(self, table_enum: ApdbTables | ExtraTables) -> list[sqlalchemy.schema.Column]:
289 """Return list of columns defined for a table in APDB schema.
290
291 Returned list excludes columns that are implementation-specific, e.g.
292 ``pixelId`` column is not include in the returned list.
293
294 Parameters
295 ----------
296 table_enum : `ApdbTables` or `ExtraTables`
297 Type of table.
298
299 Returns
300 -------
301 table : `list` [`sqlalchemy.schema.Column`]
302 Table instance.
303
304 Raises
305 ------
306 ValueError
307 Raised if ``table_enum`` is not valid for this database.
308 """
309 table = self.get_table(table_enum)
310 exclude_columns = set()
311 if table_enum in self.pixel_id_tables:
312 exclude_columns.add(self._htm_index_column)
313 return [column for column in table.columns if column.name not in exclude_columns]
314
315 @property
316 def has_replica_chunks(self) -> bool:
317 """Whether insert ID tables are to be used (`bool`)."""
318 if self._has_replica_chunks is None:
320 return self._has_replica_chunks
321
322 def _check_replica_chunks(self) -> bool:
323 """Check whether database has tables for tracking insert IDs."""
324 inspector = sqlalchemy.inspect(self._engine)
325 db_tables = set(inspector.get_table_names(schema=self._metadata.schema))
326 return ExtraTables.ApdbReplicaChunks.table_name(self._prefix) in db_tables
327
328 def _make_apdb_tables(self, mysql_engine: str = "InnoDB") -> Mapping[ApdbTables, schema_model.Table]:
329 """Generate schema for regular tables.
330
331 Parameters
332 ----------
333 mysql_engine : `str`, optional
334 MySQL engine type to use for new tables.
335 """
336 tables = {}
337 for table_enum in ApdbTables:
338 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table":
339 continue
340 if table_enum is ApdbTables.metadata and table_enum not in self.tableSchemas:
341 # Schema does not define metadata.
342 continue
343 table = self.tableSchemas[table_enum]
344 tables[table_enum] = table
345
346 return tables
347
349 self, apdb_tables: Mapping[ApdbTables, schema_model.Table]
350 ) -> Mapping[ExtraTables, schema_model.Table]:
351 """Generate schema for insert ID tables."""
352 if not self._enable_replica:
353 return {}
354
355 tables = {}
356 column_defs: list[schema_model.Column] = [
358 name="apdb_replica_chunk",
359 id="#ApdbReplicaChunks.apdb_replica_chunk",
360 datatype=felis.datamodel.DataType.long,
361 ),
363 name="last_update_time",
364 id="#ApdbReplicaChunks.last_update_time",
365 datatype=felis.datamodel.DataType.timestamp,
366 nullable=False,
367 ),
369 name="unique_id",
370 id="#ApdbReplicaChunks.unique_id",
371 datatype=schema_model.ExtraDataTypes.UUID,
372 nullable=False,
373 ),
374 ]
375 parent_table = schema_model.Table(
376 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
377 id="#ApdbReplicaChunks",
378 columns=column_defs,
379 primary_key=[column_defs[0]],
380 constraints=[],
381 indexes=[],
382 )
383 tables[ExtraTables.ApdbReplicaChunks] = parent_table
384
385 for table_enum, apdb_enum in ExtraTables.replica_chunk_tables().items():
386 apdb_table = apdb_tables[apdb_enum]
387 table_name = table_enum.table_name(self._prefix)
388
389 columns = self._replicaChunkColumns(table_enum, apdb_enum)
390 column_map = {column.name: column for column in columns}
391 # PK is the same as for original table
392 pk_columns = [column_map[column.name] for column in apdb_table.primary_key]
393
394 indices = self._replicaChunkIndices(table_enum, column_map)
395 constraints = self._replicaChunkConstraints(table_enum, apdb_table, parent_table, column_map)
396 table = schema_model.Table(
397 name=table_name,
398 id=f"#{table_name}",
399 columns=columns,
400 primary_key=pk_columns,
401 indexes=indices,
402 constraints=constraints,
403 )
404 tables[table_enum] = table
405
406 return tables
407
409 self, table_enum: ExtraTables, apdb_enum: ApdbTables
410 ) -> list[schema_model.Column]:
411 """Return list of columns for replica chunks tables."""
412 table_name = table_enum.table_name()
413 column_defs: list[schema_model.Column] = [
415 name="apdb_replica_chunk",
416 id=f"#{table_name}.apdb_replica_chunk",
417 datatype=felis.datamodel.DataType.long,
418 nullable=False,
419 )
420 ]
421 if table_enum in ExtraTables.replica_chunk_tables():
422 table_model = self.tableSchemas[apdb_enum]
423 column_defs += [column.clone() for column in table_model.primary_key]
424 else:
425 assert False, "Above branches have to cover all enum values"
426 return column_defs
427
429 self,
430 table_enum: ExtraTables,
431 column_map: Mapping[str, schema_model.Column],
432 ) -> list[schema_model.Index]:
433 """Return set of indices for replica chunk table."""
434 index_defs: list[schema_model.Index] = []
435 if table_enum in ExtraTables.replica_chunk_tables():
436 # Non-unique index on replica chunk column.
437 name = self._prefix + table_enum.name + "_apdb_replica_chunk_idx"
438 column = column_map["apdb_replica_chunk"]
439 index_defs.append(schema_model.Index(name=name, id=f"#{name}", columns=[column]))
440 return index_defs
441
443 self,
444 table_enum: ExtraTables,
445 apdb_table: schema_model.Table,
446 parent_table: schema_model.Table,
447 column_map: Mapping[str, schema_model.Column],
448 ) -> list[schema_model.Constraint]:
449 """Return set of constraints for replica chunk table."""
450 constraints: list[schema_model.Constraint] = []
451 replica_chunk_tables = ExtraTables.replica_chunk_tables()
452 if table_enum in replica_chunk_tables:
453 # Foreign key to original table
454 name = f"{table_enum.table_name()}_fk_{apdb_table.name}"
455 other_columns = apdb_table.primary_key
456 this_columns = [column_map[column.name] for column in apdb_table.primary_key]
457 constraints.append(
459 name=name,
460 id=f"#{name}",
461 columns=this_columns,
462 referenced_columns=other_columns,
463 onupdate="CASCADE",
464 ondelete="CASCADE",
465 )
466 )
467
468 # Foreign key to parent chunk ID table
469 name = f"{table_enum.table_name()}_fk_{parent_table.name}"
470 other_columns = parent_table.primary_key
471 this_columns = [column_map[column.name] for column in parent_table.primary_key]
472 constraints.append(
474 name=name,
475 id=f"#{name}",
476 columns=this_columns,
477 referenced_columns=other_columns,
478 onupdate="CASCADE",
479 ondelete="CASCADE",
480 )
481 )
482 return constraints
__init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", str|None namespace=None, bool enable_replica=False)
Mapping[ApdbTables, schema_model.Table] _make_apdb_tables(self, str mysql_engine="InnoDB")
Mapping[ExtraTables, schema_model.Table] _make_extra_tables(self, Mapping[ApdbTables, schema_model.Table] apdb_tables)
sqlalchemy.schema.Table get_table(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Constraint] _replicaChunkConstraints(self, ExtraTables table_enum, schema_model.Table apdb_table, schema_model.Table parent_table, Mapping[str, schema_model.Column] column_map)
list[schema_model.Index] _replicaChunkIndices(self, ExtraTables table_enum, Mapping[str, schema_model.Column] column_map)
list[sqlalchemy.schema.Column] get_apdb_columns(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Column] _replicaChunkColumns(self, ExtraTables table_enum, ApdbTables apdb_enum)
Mapping[ExtraTables, ApdbTables] replica_chunk_tables(cls)