LSST Applications g063fba187b+cac8b7c890,g0f08755f38+6aee506743,g1653933729+a8ce1bb630,g168dd56ebc+a8ce1bb630,g1a2382251a+b4475c5878,g1dcb35cd9c+8f9bc1652e,g20f6ffc8e0+6aee506743,g217e2c1bcf+73dee94bd0,g28da252d5a+1f19c529b9,g2bbee38e9b+3f2625acfc,g2bc492864f+3f2625acfc,g3156d2b45e+6e55a43351,g32e5bea42b+1bb94961c2,g347aa1857d+3f2625acfc,g35bb328faa+a8ce1bb630,g3a166c0a6a+3f2625acfc,g3e281a1b8c+c5dd892a6c,g3e8969e208+a8ce1bb630,g414038480c+5927e1bc1e,g41af890bb2+8a9e676b2a,g7af13505b9+809c143d88,g80478fca09+6ef8b1810f,g82479be7b0+f568feb641,g858d7b2824+6aee506743,g89c8672015+f4add4ffd5,g9125e01d80+a8ce1bb630,ga5288a1d22+2903d499ea,gb58c049af0+d64f4d3760,gc28159a63d+3f2625acfc,gcab2d0539d+b12535109e,gcf0d15dbbd+46a3f46ba9,gda6a2b7d83+46a3f46ba9,gdaeeff99f8+1711a396fd,ge79ae78c31+3f2625acfc,gef2f8181fd+0a71e47438,gf0baf85859+c1f95f4921,gfa517265be+6aee506743,gfa999e8aa5+17cd334064,w.2024.51
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlSchema", "ExtraTables"]
28
29import enum
30import itertools
31import logging
32from collections.abc import Mapping
33
34import felis.datamodel
35import sqlalchemy
36
37from .. import schema_model
38from ..apdbSchema import ApdbSchema, ApdbTables
39from .modelToSql import ModelToSql
40
41_LOG = logging.getLogger(__name__)
42
43
44class InconsistentSchemaError(RuntimeError):
45 """Exception raised when schema state is inconsistent."""
46
47
48@enum.unique
49class ExtraTables(enum.Enum):
50 """Names of the tables used for tracking insert IDs."""
51
52 ApdbReplicaChunks = "ApdbReplicaChunks"
53 """Name of the table for replica chunks records."""
54
55 DiaObjectChunks = "DiaObjectChunks"
56 """Name of the table for DIAObject chunk data."""
57
58 DiaSourceChunks = "DiaSourceChunks"
59 """Name of the table for DIASource chunk data."""
60
61 DiaForcedSourceChunks = "DiaForcedSourceChunks"
62 """Name of the table for DIAForcedSource chunk data."""
63
64 def table_name(self, prefix: str = "") -> str:
65 """Return full table name."""
66 return prefix + self.value
67
68 @classmethod
69 def replica_chunk_tables(cls) -> Mapping[ExtraTables, ApdbTables]:
70 """Return mapping of tables used for replica chunk storage to their
71 corresponding regular tables.
72 """
73 return {
74 cls.DiaObjectChunks: ApdbTables.DiaObject,
75 cls.DiaSourceChunks: ApdbTables.DiaSource,
76 cls.DiaForcedSourceChunks: ApdbTables.DiaForcedSource,
77 }
78
79
81 """Class for management of APDB schema.
82
83 Attributes
84 ----------
85 objects : `sqlalchemy.Table`
86 DiaObject table instance
87 objects_last : `sqlalchemy.Table`
88 DiaObjectLast table instance, may be None
89 sources : `sqlalchemy.Table`
90 DiaSource table instance
91 forcedSources : `sqlalchemy.Table`
92 DiaForcedSource table instance
93 has_replica_chunks : `bool`
94 If true then schema has tables for replication chunks.
95
96 Parameters
97 ----------
98 engine : `sqlalchemy.engine.Engine`
99 SQLAlchemy engine instance
100 dia_object_index : `str`
101 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
102 for details.
103 htm_index_column : `str`
104 Name of a HTM index column for DiaObject and DiaSource tables.
105 schema_file : `str`
106 Name of the YAML schema file.
107 schema_name : `str`, optional
108 Name of the schema in YAML files.
109 prefix : `str`, optional
110 Prefix to add to all schema elements.
111 namespace : `str`, optional
112 Namespace (or schema name) to use for all APDB tables.
113 enable_replica : `bool`, optional
114 If `True` then use additional tables for replica chunks.
115 """
116
117 pixel_id_tables = (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource)
118 """Tables that need pixelId column for spatial indexing."""
119
121 self,
122 engine: sqlalchemy.engine.Engine,
123 dia_object_index: str,
124 htm_index_column: str,
125 schema_file: str,
126 schema_name: str = "ApdbSchema",
127 prefix: str = "",
128 namespace: str | None = None,
129 enable_replica: bool = False,
130 ):
131 super().__init__(schema_file, schema_name)
132
133 self._engine = engine
134 self._dia_object_index = dia_object_index
135 self._htm_index_column = htm_index_column
136 self._prefix = prefix
137 self._enable_replica = enable_replica
138
139 self._metadata = sqlalchemy.schema.MetaData(schema=namespace)
140
141 # Add pixelId column and index to tables that need it
142 for table in self.pixel_id_tables:
143 tableDef = self.tableSchemas.get(table)
144 if not tableDef:
145 continue
146 column = schema_model.Column(
147 id=f"#{htm_index_column}",
148 name=htm_index_column,
149 datatype=felis.datamodel.DataType.long,
150 nullable=False,
151 value=None,
152 description="Pixelization index column.",
153 table=tableDef,
154 )
155 tableDef.columns.append(column)
156
157 # Adjust index if needed
158 if table == ApdbTables.DiaObject and self._dia_object_index == "pix_id_iov":
159 tableDef.primary_key.insert(0, column)
160
161 if table is ApdbTables.DiaObjectLast:
162 # use it as a leading PK column
163 tableDef.primary_key.insert(0, column)
164 else:
165 # make a regular index
166 name = f"IDX_{tableDef.name}_{htm_index_column}"
167 index = schema_model.Index(id=f"#{name}", name=name, columns=[column])
168 tableDef.indexes.append(index)
169
170 # generate schema for all tables, must be called last
171 apdb_tables = self._make_apdb_tables()
172 extra_tables = self._make_extra_tables(apdb_tables)
173
174 converter = ModelToSql(metadata=self._metadata, prefix=self._prefix)
175 id_to_table = converter.make_tables(itertools.chain(apdb_tables.values(), extra_tables.values()))
176
178 apdb_enum: id_to_table[table_model.id] for apdb_enum, table_model in apdb_tables.items()
179 }
181 extra_enum: id_to_table[table_model.id] for extra_enum, table_model in extra_tables.items()
182 }
183
184 self._has_replica_chunks: bool | None = None
185 self._metadata_check: bool | None = None
186
187 def empty(self) -> bool:
188 """Return True if database schema is empty.
189
190 Returns
191 -------
192 empty : `bool`
193 `True` if none of the required APDB tables exist in the database,
194 `False` if all required tables exist.
195
196 Raises
197 ------
198 InconsistentSchemaError
199 Raised when some of the required tables exist but not all.
200 """
201 inspector = sqlalchemy.inspect(self._engine)
202 table_names = set(inspector.get_table_names(self._metadata.schema))
203
204 existing_tables = []
205 missing_tables = []
206 for table_enum in self._apdb_tables:
207 table_name = table_enum.table_name(self._prefix)
208 if table_name in table_names:
209 existing_tables.append(table_name)
210 else:
211 missing_tables.append(table_name)
212
213 if not missing_tables:
214 return False
215 elif not existing_tables:
216 return True
217 else:
219 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
220 )
221
222 def makeSchema(self, drop: bool = False) -> None:
223 """Create or re-create all tables.
224
225 Parameters
226 ----------
227 drop : `bool`, optional
228 If True then drop tables before creating new ones.
229 """
230 # Create namespace if it does not exist yet, for now this only makes
231 # sense for postgres.
232 if self._metadata.schema:
233 dialect = self._engine.dialect
234 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema)
235 create_schema = sqlalchemy.schema.DDL(
236 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema}
237 ).execute_if(dialect="postgresql")
238 sqlalchemy.event.listen(self._metadata, "before_create", create_schema)
239
240 # create all tables (optionally drop first)
241 if drop:
242 _LOG.info("dropping all tables")
243 self._metadata.drop_all(self._engine)
244 _LOG.info("creating all tables")
245 self._metadata.create_all(self._engine)
246
247 # Reset possibly cached value.
249 self._metadata_check = None
250
251 def get_table(self, table_enum: ApdbTables | ExtraTables) -> sqlalchemy.schema.Table:
252 """Return SQLAlchemy table instance for a specified table type/enum.
253
254 Parameters
255 ----------
256 table_enum : `ApdbTables` or `ExtraTables`
257 Type of table to return.
258
259 Returns
260 -------
261 table : `sqlalchemy.schema.Table`
262 Table instance.
263
264 Raises
265 ------
266 ValueError
267 Raised if ``table_enum`` is not valid for this database.
268 """
269 try:
270 if isinstance(table_enum, ApdbTables):
271 if table_enum is ApdbTables.metadata:
272 # There may be cases when schema is configured with the
273 # metadata table but database is still missing it. Check
274 # that table actually exists in the database. Note that
275 # this may interact with `makeSchema`.
276 if self._metadata_check is None:
277 inspector = sqlalchemy.inspect(self._engine)
278 table_name = table_enum.table_name(self._prefix)
279 self._metadata_check = inspector.has_table(table_name, schema=self._metadata.schema)
280 if not self._metadata_check:
281 # this will be caught below
282 raise LookupError("metadata table is missing")
283 return self._apdb_tables[table_enum]
284 else:
285 return self._extra_tables[table_enum]
286 except LookupError:
287 raise ValueError(f"Table type {table_enum} does not exist in the schema") from None
288
289 def get_apdb_columns(self, table_enum: ApdbTables | ExtraTables) -> list[sqlalchemy.schema.Column]:
290 """Return list of columns defined for a table in APDB schema.
291
292 Returned list excludes columns that are implementation-specific, e.g.
293 ``pixelId`` column is not include in the returned list.
294
295 Parameters
296 ----------
297 table_enum : `ApdbTables` or `ExtraTables`
298 Type of table.
299
300 Returns
301 -------
302 table : `list` [`sqlalchemy.schema.Column`]
303 Table instance.
304
305 Raises
306 ------
307 ValueError
308 Raised if ``table_enum`` is not valid for this database.
309 """
310 table = self.get_table(table_enum)
311 exclude_columns = set()
312 if table_enum in self.pixel_id_tables:
313 exclude_columns.add(self._htm_index_column)
314 return [column for column in table.columns if column.name not in exclude_columns]
315
316 @property
317 def has_replica_chunks(self) -> bool:
318 """Whether insert ID tables are to be used (`bool`)."""
319 if self._has_replica_chunks is None:
321 return self._has_replica_chunks
322
323 def _check_replica_chunks(self) -> bool:
324 """Check whether database has tables for tracking insert IDs."""
325 inspector = sqlalchemy.inspect(self._engine)
326 db_tables = set(inspector.get_table_names(schema=self._metadata.schema))
327 return ExtraTables.ApdbReplicaChunks.table_name(self._prefix) in db_tables
328
329 def _make_apdb_tables(self, mysql_engine: str = "InnoDB") -> Mapping[ApdbTables, schema_model.Table]:
330 """Generate schema for regular tables.
331
332 Parameters
333 ----------
334 mysql_engine : `str`, optional
335 MySQL engine type to use for new tables.
336 """
337 tables = {}
338 for table_enum in ApdbTables:
339 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table":
340 continue
341 if table_enum is ApdbTables.metadata and table_enum not in self.tableSchemas:
342 # Schema does not define metadata.
343 continue
344 table = self.tableSchemas[table_enum]
345 tables[table_enum] = table
346
347 return tables
348
350 self, apdb_tables: Mapping[ApdbTables, schema_model.Table]
351 ) -> Mapping[ExtraTables, schema_model.Table]:
352 """Generate schema for insert ID tables."""
353 if not self._enable_replica:
354 return {}
355
356 tables = {}
357 column_defs: list[schema_model.Column] = [
359 name="apdb_replica_chunk",
360 id="#ApdbReplicaChunks.apdb_replica_chunk",
361 datatype=felis.datamodel.DataType.long,
362 ),
364 name="last_update_time",
365 id="#ApdbReplicaChunks.last_update_time",
366 datatype=felis.datamodel.DataType.timestamp,
367 nullable=False,
368 ),
370 name="unique_id",
371 id="#ApdbReplicaChunks.unique_id",
372 datatype=schema_model.ExtraDataTypes.UUID,
373 nullable=False,
374 ),
375 ]
376 parent_table = schema_model.Table(
377 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
378 id="#ApdbReplicaChunks",
379 columns=column_defs,
380 primary_key=[column_defs[0]],
381 constraints=[],
382 indexes=[],
383 )
384 tables[ExtraTables.ApdbReplicaChunks] = parent_table
385
386 for table_enum, apdb_enum in ExtraTables.replica_chunk_tables().items():
387 apdb_table = apdb_tables[apdb_enum]
388 table_name = table_enum.table_name(self._prefix)
389
390 columns = self._replicaChunkColumns(table_enum, apdb_enum)
391 column_map = {column.name: column for column in columns}
392 # PK is the same as for original table
393 pk_columns = [column_map[column.name] for column in apdb_table.primary_key]
394
395 indices = self._replicaChunkIndices(table_enum, column_map)
396 constraints = self._replicaChunkConstraints(table_enum, apdb_table, parent_table, column_map)
397 table = schema_model.Table(
398 name=table_name,
399 id=f"#{table_name}",
400 columns=columns,
401 primary_key=pk_columns,
402 indexes=indices,
403 constraints=constraints,
404 )
405 tables[table_enum] = table
406
407 return tables
408
410 self, table_enum: ExtraTables, apdb_enum: ApdbTables
411 ) -> list[schema_model.Column]:
412 """Return list of columns for replica chunks tables."""
413 table_name = table_enum.table_name()
414 column_defs: list[schema_model.Column] = [
416 name="apdb_replica_chunk",
417 id=f"#{table_name}.apdb_replica_chunk",
418 datatype=felis.datamodel.DataType.long,
419 nullable=False,
420 )
421 ]
422 if table_enum in ExtraTables.replica_chunk_tables():
423 table_model = self.tableSchemas[apdb_enum]
424 column_defs += [column.clone() for column in table_model.primary_key]
425 else:
426 assert False, "Above branches have to cover all enum values"
427 return column_defs
428
430 self,
431 table_enum: ExtraTables,
432 column_map: Mapping[str, schema_model.Column],
433 ) -> list[schema_model.Index]:
434 """Return set of indices for replica chunk table."""
435 index_defs: list[schema_model.Index] = []
436 if table_enum in ExtraTables.replica_chunk_tables():
437 # Non-unique index on replica chunk column.
438 name = self._prefix + table_enum.name + "_apdb_replica_chunk_idx"
439 column = column_map["apdb_replica_chunk"]
440 index_defs.append(schema_model.Index(name=name, id=f"#{name}", columns=[column]))
441 return index_defs
442
444 self,
445 table_enum: ExtraTables,
446 apdb_table: schema_model.Table,
447 parent_table: schema_model.Table,
448 column_map: Mapping[str, schema_model.Column],
449 ) -> list[schema_model.Constraint]:
450 """Return set of constraints for replica chunk table."""
451 constraints: list[schema_model.Constraint] = []
452 replica_chunk_tables = ExtraTables.replica_chunk_tables()
453 if table_enum in replica_chunk_tables:
454 # Foreign key to original table
455 name = f"{table_enum.table_name()}_fk_{apdb_table.name}"
456 other_columns = apdb_table.primary_key
457 this_columns = [column_map[column.name] for column in apdb_table.primary_key]
458 constraints.append(
460 name=name,
461 id=f"#{name}",
462 columns=this_columns,
463 referenced_columns=other_columns,
464 onupdate="CASCADE",
465 ondelete="CASCADE",
466 )
467 )
468
469 # Foreign key to parent chunk ID table
470 name = f"{table_enum.table_name()}_fk_{parent_table.name}"
471 other_columns = parent_table.primary_key
472 this_columns = [column_map[column.name] for column in parent_table.primary_key]
473 constraints.append(
475 name=name,
476 id=f"#{name}",
477 columns=this_columns,
478 referenced_columns=other_columns,
479 onupdate="CASCADE",
480 ondelete="CASCADE",
481 )
482 )
483 return constraints
std::vector< SchemaItem< Flag > > * items
__init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", str|None namespace=None, bool enable_replica=False)
Mapping[ApdbTables, schema_model.Table] _make_apdb_tables(self, str mysql_engine="InnoDB")
Mapping[ExtraTables, schema_model.Table] _make_extra_tables(self, Mapping[ApdbTables, schema_model.Table] apdb_tables)
sqlalchemy.schema.Table get_table(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Constraint] _replicaChunkConstraints(self, ExtraTables table_enum, schema_model.Table apdb_table, schema_model.Table parent_table, Mapping[str, schema_model.Column] column_map)
list[schema_model.Index] _replicaChunkIndices(self, ExtraTables table_enum, Mapping[str, schema_model.Column] column_map)
list[sqlalchemy.schema.Column] get_apdb_columns(self, ApdbTables|ExtraTables table_enum)
list[schema_model.Column] _replicaChunkColumns(self, ExtraTables table_enum, ApdbTables apdb_enum)
Mapping[ExtraTables, ApdbTables] replica_chunk_tables(cls)