LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
apdbSqlSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module responsible for APDB schema operations.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlSchema"]
28
29import logging
30from typing import Any, Dict, List, Mapping, Type
31
32import sqlalchemy
33from sqlalchemy import (Column, Index, MetaData, PrimaryKeyConstraint,
34 UniqueConstraint, Table)
35
36from .apdbSchema import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
37
38
39_LOG = logging.getLogger(__name__)
40
41
43 """Class for management of APDB schema.
44
45 Attributes
46 ----------
47 objects : `sqlalchemy.Table`
48 DiaObject table instance
49 objects_last : `sqlalchemy.Table`
50 DiaObjectLast table instance, may be None
51 sources : `sqlalchemy.Table`
52 DiaSource table instance
53 forcedSources : `sqlalchemy.Table`
54 DiaForcedSource table instance
55
56 Parameters
57 ----------
58 engine : `sqlalchemy.engine.Engine`
59 SQLAlchemy engine instance
60 dia_object_index : `str`
61 Indexing mode for DiaObject table, see `ApdbSqlConfig.dia_object_index`
62 for details.
63 htm_index_column : `str`
64 Name of a HTM index column for DiaObject and DiaSource tables.
65 schema_file : `str`
66 Name of the YAML schema file.
67 schema_name : `str`, optional
68 Name of the schema in YAML files.
69 prefix : `str`, optional
70 Prefix to add to all scheam elements.
71 """
72 def __init__(self, engine: sqlalchemy.engine.Engine, dia_object_index: str, htm_index_column: str,
73 schema_file: str, schema_name: str = "ApdbSchema", prefix: str = ""):
74
75 super().__init__(schema_file, schema_name)
76
77 self._engine_engine = engine
78 self._dia_object_index_dia_object_index = dia_object_index
79 self._prefix_prefix = prefix
80
81 self._metadata_metadata = MetaData(self._engine_engine)
82
83 # map YAML column types to SQLAlchemy
84 self._type_map_type_map = dict(double=self._getDoubleType_getDoubleType(engine),
85 float=sqlalchemy.types.Float,
86 timestamp=sqlalchemy.types.TIMESTAMP,
87 long=sqlalchemy.types.BigInteger,
88 int=sqlalchemy.types.Integer,
89 short=sqlalchemy.types.Integer,
90 byte=sqlalchemy.types.Integer,
91 binary=sqlalchemy.types.LargeBinary,
92 text=sqlalchemy.types.CHAR,
93 string=sqlalchemy.types.CHAR,
94 char=sqlalchemy.types.CHAR,
95 unicode=sqlalchemy.types.CHAR,
96 boolean=sqlalchemy.types.Boolean)
97
98 # Adjust index if needed
99 if self._dia_object_index_dia_object_index == 'pix_id_iov':
100 objects = self.tableSchemastableSchemas[ApdbTables.DiaObject]
101 objects.primary_key.columns.insert(0, htm_index_column)
102
103 # Add pixelId column and index to tables that need it
104 for table in (ApdbTables.DiaObject, ApdbTables.DiaObjectLast, ApdbTables.DiaSource):
105 tableDef = self.tableSchemastableSchemas.get(table)
106 if not tableDef:
107 continue
108 column = ColumnDef(name=htm_index_column,
109 type="long",
110 nullable=False,
111 default=None,
112 description="",
113 unit="",
114 ucd="")
115 tableDef.columns.append(column)
116
117 if table is ApdbTables.DiaObjectLast:
118 # use it as a leading PK column
119 tableDef.primary_key.columns.insert(0, htm_index_column)
120 else:
121 # make a regular index
122 index = IndexDef(name=f"IDX_{tableDef.name}_{htm_index_column}",
123 type=IndexType.INDEX, columns=[htm_index_column])
124 tableDef.indices.append(index)
125
126 # generate schema for all tables, must be called last
127 self._tables_tables = self._makeTables_makeTables()
128
129 self.objectsobjects = self._tables_tables[ApdbTables.DiaObject]
130 self.objects_lastobjects_last = self._tables_tables.get(ApdbTables.DiaObjectLast)
131 self.sourcessources = self._tables_tables[ApdbTables.DiaSource]
132 self.forcedSourcesforcedSources = self._tables_tables[ApdbTables.DiaForcedSource]
133 self.ssObjectsssObjects = self._tables_tables[ApdbTables.SSObject]
134
135 def _makeTables(self, mysql_engine: str = 'InnoDB') -> Mapping[ApdbTables, Table]:
136 """Generate schema for all tables.
137
138 Parameters
139 ----------
140 mysql_engine : `str`, optional
141 MySQL engine type to use for new tables.
142 """
143
144 info: Dict[str, Any] = {}
145
146 tables = {}
147 for table_enum in ApdbTables:
148
149 if table_enum is ApdbTables.DiaObjectLast and self._dia_object_index_dia_object_index != "last_object_table":
150 continue
151
152 columns = self._tableColumns_tableColumns(table_enum)
153 constraints = self._tableIndices_tableIndices(table_enum, info)
154 table = Table(table_enum.table_name(self._prefix_prefix),
155 self._metadata_metadata,
156 *columns,
157 *constraints,
158 mysql_engine=mysql_engine,
159 info=info)
160 tables[table_enum] = table
161
162 return tables
163
164 def makeSchema(self, drop: bool = False, mysql_engine: str = 'InnoDB') -> None:
165 """Create or re-create all tables.
166
167 Parameters
168 ----------
169 drop : `bool`, optional
170 If True then drop tables before creating new ones.
171 mysql_engine : `str`, optional
172 MySQL engine type to use for new tables.
173 """
174
175 # re-make table schema for all needed tables with possibly different options
176 _LOG.debug("clear metadata")
177 self._metadata_metadata.clear()
178 _LOG.debug("re-do schema mysql_engine=%r", mysql_engine)
179 self._makeTables_makeTables(mysql_engine=mysql_engine)
180
181 # create all tables (optionally drop first)
182 if drop:
183 _LOG.info('dropping all tables')
184 self._metadata_metadata.drop_all()
185 _LOG.info('creating all tables')
186 self._metadata_metadata.create_all()
187
188 def _tableColumns(self, table_name: ApdbTables) -> List[Column]:
189 """Return set of columns in a table
190
191 Parameters
192 ----------
193 table_name : `ApdbTables`
194 Name of the table.
195
196 Returns
197 -------
198 column_defs : `list`
199 List of `Column` objects.
200 """
201
202 # get the list of columns in primary key, they are treated somewhat
203 # specially below
204 table_schema = self.tableSchemastableSchemas[table_name]
205 pkey_columns = set()
206 for index in table_schema.indices:
207 if index.type is IndexType.PRIMARY:
208 pkey_columns = set(index.columns)
209 break
210
211 # convert all column dicts into alchemy Columns
212 column_defs = []
213 for column in table_schema.columns:
214 kwargs: Dict[str, Any] = dict(nullable=column.nullable)
215 if column.default is not None:
216 kwargs.update(server_default=str(column.default))
217 if column.name in pkey_columns:
218 kwargs.update(autoincrement=False)
219 ctype = self._type_map_type_map[column.type]
220 column_defs.append(Column(column.name, ctype, **kwargs))
221
222 return column_defs
223
224 def _tableIndices(self, table_name: ApdbTables, info: Dict) -> List[sqlalchemy.schema.Constraint]:
225 """Return set of constraints/indices in a table
226
227 Parameters
228 ----------
229 table_name : `ApdbTables`
230 Name of the table.
231 info : `dict`
232 Additional options passed to SQLAlchemy index constructor.
233
234 Returns
235 -------
236 index_defs : `list`
237 List of SQLAlchemy index/constraint objects.
238 """
239
240 table_schema = self.tableSchemastableSchemas[table_name]
241
242 # convert all index dicts into alchemy Columns
243 index_defs: List[sqlalchemy.schema.Constraint] = []
244 for index in table_schema.indices:
245 if index.type is IndexType.INDEX:
246 index_defs.append(Index(self._prefix_prefix + index.name, *index.columns, info=info))
247 else:
248 kwargs = {}
249 if index.name:
250 kwargs['name'] = self._prefix_prefix + index.name
251 if index.type is IndexType.PRIMARY:
252 index_defs.append(PrimaryKeyConstraint(*index.columns, **kwargs))
253 elif index.type is IndexType.UNIQUE:
254 index_defs.append(UniqueConstraint(*index.columns, **kwargs))
255
256 return index_defs
257
258 @classmethod
259 def _getDoubleType(cls, engine: sqlalchemy.engine.Engine) -> Type:
260 """DOUBLE type is database-specific, select one based on dialect.
261
262 Parameters
263 ----------
264 engine : `sqlalchemy.engine.Engine`
265 Database engine.
266
267 Returns
268 -------
269 type_object : `object`
270 Database-specific type definition.
271 """
272 if engine.name == 'mysql':
273 from sqlalchemy.dialects.mysql import DOUBLE
274 return DOUBLE(asdecimal=False)
275 elif engine.name == 'postgresql':
276 from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
277 return DOUBLE_PRECISION
278 elif engine.name == 'oracle':
279 from sqlalchemy.dialects.oracle import DOUBLE_PRECISION
280 return DOUBLE_PRECISION
281 elif engine.name == 'sqlite':
282 # all floats in sqlite are 8-byte
283 from sqlalchemy.dialects.sqlite import REAL
284 return REAL
285 else:
286 raise TypeError('cannot determine DOUBLE type, unexpected dialect: ' + engine.name)
Type _getDoubleType(cls, sqlalchemy.engine.Engine engine)
None makeSchema(self, bool drop=False, str mysql_engine='InnoDB')
List[Column] _tableColumns(self, ApdbTables table_name)
Mapping[ApdbTables, Table] _makeTables(self, str mysql_engine='InnoDB')
List[sqlalchemy.schema.Constraint] _tableIndices(self, ApdbTables table_name, Dict info)
def __init__(self, sqlalchemy.engine.Engine engine, str dia_object_index, str htm_index_column, str schema_file, str schema_name="ApdbSchema", str prefix="")
daf::base::PropertySet * set
Definition: fits.cc:912