LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlSchema"]
28
29import logging
30from typing import Any, Dict, List, Mapping, Optional, Type
31
32import felis.types
33import sqlalchemy
34from felis import simple
35from sqlalchemy import DDL, Column, Index, MetaData, PrimaryKeyConstraint, Table, UniqueConstraint, event
36
37from .apdbSchema import ApdbSchema, ApdbTables
38
39_LOG = logging.getLogger(__name__)
40
41
43 """Class for management of APDB schema.
44
45 Attributes
46 ----------
47 objects : `sqlalchemy.Table`
48 DiaObject table instance
49 objects_last : `sqlalchemy.Table`
50 DiaObjectLast table instance, may be None
51 sources : `sqlalchemy.Table`
52 DiaSource table instance
53 forcedSources : `sqlalchemy.Table`
54 DiaForcedSource table instance
55
56 Parameters
57 ----------
58 engine : `sqlalchemy.engine.Engine`
59 SQLAlchemy engine instance
60 dia_object_index : `str`
61 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
62 for details.
63 htm_index_column : `str`
64 Name of a HTM index column for DiaObject and DiaSource tables.
65 schema_file : `str`
66 Name of the YAML schema file.
67 schema_name : `str`, optional
68 Name of the schema in YAML files.
69 prefix : `str`, optional
70 Prefix to add to all scheam elements.
71 namespace : `str`, optional
72 Namespace (or schema name) to use for all APDB tables.
73 """
75 self,
76 engine: sqlalchemy.engine.Engine,
77 dia_object_index: str,
78 htm_index_column: str,
79 schema_file: str,
80 schema_name: str = "ApdbSchema",
81 prefix: str = "",
82 namespace: Optional[str] = None,
83 ):
84
85 super().__init__(schema_file, schema_name)
86
87 self._engine = engine
88 self._dia_object_index = dia_object_index
89 self._prefix = prefix
90
91 self._metadata = MetaData(self._engine, schema=namespace)
92
93 # map YAML column types to SQLAlchemy
94 self._type_map = {
95 felis.types.Double: self._getDoubleType(engine),
96 felis.types.Float: sqlalchemy.types.Float,
97 felis.types.Timestamp: sqlalchemy.types.TIMESTAMP,
98 felis.types.Long: sqlalchemy.types.BigInteger,
99 felis.types.Int: sqlalchemy.types.Integer,
100 felis.types.Short: sqlalchemy.types.Integer,
101 felis.types.Byte: sqlalchemy.types.Integer,
102 felis.types.Binary: sqlalchemy.types.LargeBinary,
103 felis.types.Text: sqlalchemy.types.CHAR,
104 felis.types.String: sqlalchemy.types.CHAR,
105 felis.types.Char: sqlalchemy.types.CHAR,
106 felis.types.Unicode: sqlalchemy.types.CHAR,
107 felis.types.Boolean: sqlalchemy.types.Boolean
108 }
109
110 # Add pixelId column and index to tables that need it
111 for table in (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource):
112 tableDef = self.tableSchemas.get(table)
113 if not tableDef:
114 continue
115 column = simple.Column(
116 id=f"#{htm_index_column}",
117 name=htm_index_column,
118 datatype=felis.types.Long,
119 nullable=False,
120 value=None,
121 description="Pixelization index column.",
122 table=tableDef
123 )
124 tableDef.columns.append(column)
125
126 # Adjust index if needed
127 if table == ApdbTables.DiaObject and self._dia_object_index == 'pix_id_iov':
128 tableDef.primary_key.insert(0, column)
129
130 if table is ApdbTables.DiaObjectLast:
131 # use it as a leading PK column
132 tableDef.primary_key.insert(0, column)
133 else:
134 # make a regular index
135 name = f"IDX_{tableDef.name}_{htm_index_column}"
136 index = simple.Index(id=f"#{name}", name=name, columns=[column])
137 tableDef.indexes.append(index)
138
139 # generate schema for all tables, must be called last
140 self._tables = self._makeTables()
141
142 self.objects = self._tables[ApdbTables.DiaObject]
143 self.objects_last = self._tables.get(ApdbTables.DiaObjectLast)
144 self.sources = self._tables[ApdbTables.DiaSource]
145 self.forcedSources = self._tables[ApdbTables.DiaForcedSource]
146 self.ssObjects = self._tables[ApdbTables.SSObject]
147
148 def _makeTables(self, mysql_engine: str = 'InnoDB') -> Mapping[ApdbTables, Table]:
149 """Generate schema for all tables.
150
151 Parameters
152 ----------
153 mysql_engine : `str`, optional
154 MySQL engine type to use for new tables.
155 """
156
157 info: Dict[str, Any] = {}
158
159 tables = {}
160 for table_enum in ApdbTables:
161
162 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index != "last_object_table":
163 continue
164
165 columns = self._tableColumns(table_enum)
166 constraints = self._tableIndices(table_enum, info)
167 table = Table(table_enum.table_name(self._prefix),
168 self._metadata,
169 *columns,
170 *constraints,
171 mysql_engine=mysql_engine,
172 info=info)
173 tables[table_enum] = table
174
175 return tables
176
177 def makeSchema(self, drop: bool = False, mysql_engine: str = 'InnoDB') -> None:
178 """Create or re-create all tables.
179
180 Parameters
181 ----------
182 drop : `bool`, optional
183 If True then drop tables before creating new ones.
184 mysql_engine : `str`, optional
185 MySQL engine type to use for new tables.
186 """
187
188 # re-make table schema for all needed tables with possibly different options
189 _LOG.debug("clear metadata")
190 self._metadata.clear()
191 _LOG.debug("re-do schema mysql_engine=%r", mysql_engine)
192 self._makeTables(mysql_engine=mysql_engine)
193
194 # Create namespace if it does not exist yet, for now this only makes
195 # sense for postgres.
196 if self._metadata.schema:
197 dialect = self._engine.dialect
198 quoted_schema = dialect.preparer(dialect).quote_schema(self._metadata.schema)
199 create_schema = DDL(
200 "CREATE SCHEMA IF NOT EXISTS %(schema)s", context={"schema": quoted_schema}
201 ).execute_if(dialect='postgresql')
202 event.listen(self._metadata, "before_create", create_schema)
203
204 # create all tables (optionally drop first)
205 if drop:
206 _LOG.info('dropping all tables')
207 self._metadata.drop_all()
208 _LOG.info('creating all tables')
209 self._metadata.create_all()
210
211 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
212 """Return set of columns in a table
213
214 Parameters
215 ----------
216 table_name : `ApdbTables`
217 Name of the table.
218
219 Returns
220 -------
221 column_defs : `list`
222 List of `Column` objects.
223 """
224
225 # get the list of columns in primary key, they are treated somewhat
226 # specially below
227 table_schema = self.tableSchemas[table_name]
228
229 # convert all column dicts into alchemy Columns
230 column_defs = []
231 for column in table_schema.columns:
232 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
233 if column.value is not None:
234 kwargs.update(server_default=str(column.value))
235 if column in table_schema.primary_key:
236 kwargs.update(autoincrement=False)
237 ctype = self._type_map[column.datatype]
238 column_defs.append(Column(column.name, ctype, **kwargs))
239
240 return column_defs
241
242 def _tableIndices(self, table_name: ApdbTables, info: Dict) -> List[sqlalchemy.schema.Constraint]:
243 """Return set of constraints/indices in a table
244
245 Parameters
246 ----------
247 table_name : `ApdbTables`
248 Name of the table.
249 info : `dict`
250 Additional options passed to SQLAlchemy index constructor.
251
252 Returns
253 -------
254 index_defs : `list`
255 List of SQLAlchemy index/constraint objects.
256 """
257
258 table_schema = self.tableSchemas[table_name]
259
260 # convert all index dicts into alchemy Columns
261 index_defs: List[sqlalchemy.schema.Constraint] = []
262 if table_schema.primary_key:
263 index_defs.append(PrimaryKeyConstraint(*[column.name for column in table_schema.primary_key]))
264 for index in table_schema.indexes:
265 name = self._prefix + index.name if index.name else ""
266 index_defs.append(Index(name, *[column.name for column in index.columns], info=info))
267 for constraint in table_schema.constraints:
268 kwargs = {}
269 if constraint.name:
270 kwargs['name'] = self._prefix + constraint.name
271 if isinstance(constraint, simple.UniqueConstraint):
272 index_defs.append(UniqueConstraint(*[column.name for column in constraint.columns], **kwargs))
273
274 return index_defs
275
276 @classmethod
277 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type:
278 """DOUBLE type is database-specific, select one based on dialect.
279
280 Parameters
281 ----------
282 engine : `sqlalchemy.engine.Engine`
283 Database engine.
284
285 Returns
286 -------
287 type_object : `object`
288 Database-specific type definition.
289 """
290 if engine.name == 'mysql':
291 from sqlalchemy.dialects.mysql import DOUBLE
292 return DOUBLE(asdecimal=False)
293 elif engine.name == 'postgresql':
294 from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
295 return DOUBLE_PRECISION
296 elif engine.name == 'oracle':
297 from sqlalchemy.dialects.oracle import DOUBLE_PRECISION
298 return DOUBLE_PRECISION
299 elif engine.name == 'sqlite':
300 # all floats in sqlite are 8-byte
301 from sqlalchemy.dialects.sqlite import REAL
302 return REAL
303 else:
304 raise TypeError('cannot determine DOUBLE type, unexpected dialect: ' + engine.name)
Type _getDoubleType(cls, sqlalchemy.engine.Engine engine)
None makeSchema(self, bool drop=False, str mysql_engine='InnoDB')
List[Column] _tableColumns(self, ApdbTables table_name)
Mapping[ApdbTables, Table] _makeTables(self, str mysql_engine='InnoDB')
List[sqlalchemy.schema.Constraint] _tableIndices(self, ApdbTables table_name, Dict info)
def __init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="", Optional[str] namespace=None)