LSST Applications 26.0.0,g0265f82a02+6660c170cc,g07994bdeae+30b05a742e,g0a0026dc87+17526d298f,g0a60f58ba1+17526d298f,g0e4bf8285c+96dd2c2ea9,g0ecae5effc+c266a536c8,g1e7d6db67d+6f7cb1f4bb,g26482f50c6+6346c0633c,g2bbee38e9b+6660c170cc,g2cc88a2952+0a4e78cd49,g3273194fdb+f6908454ef,g337abbeb29+6660c170cc,g337c41fc51+9a8f8f0815,g37c6e7c3d5+7bbafe9d37,g44018dc512+6660c170cc,g4a941329ef+4f7594a38e,g4c90b7bd52+5145c320d2,g58be5f913a+bea990ba40,g635b316a6c+8d6b3a3e56,g67924a670a+bfead8c487,g6ae5381d9b+81bc2a20b4,g93c4d6e787+26b17396bd,g98cecbdb62+ed2cb6d659,g98ffbb4407+81bc2a20b4,g9ddcbc5298+7f7571301f,ga1e77700b3+99e9273977,gae46bcf261+6660c170cc,gb2715bf1a1+17526d298f,gc86a011abf+17526d298f,gcf0d15dbbd+96dd2c2ea9,gdaeeff99f8+0d8dbea60f,gdb4ec4c597+6660c170cc,ge23793e450+96dd2c2ea9,gf041782ebf+171108ac67
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema"]
25
26import enum
27import logging
28from collections.abc import Mapping
29from typing import TYPE_CHECKING, List, Optional, Tuple
30
31import felis.types
32from felis import simple
33
34from .apdbSchema import ApdbSchema, ApdbTables
35
36if TYPE_CHECKING:
37 import cassandra.cluster
38
39
40_LOG = logging.getLogger(__name__)
41
42
43class _FelisUUID(felis.types.FelisType, felis_name="uuid", votable_name="uuid"):
44 """Special internal type for UUID columns. Felis does not support UUID,
45 but we need it here, to simplify logic it's easier to add a special class.
46 """
47
48
49@enum.unique
50class ExtraTables(enum.Enum):
51 """Names of the extra tables used by Cassandra implementation."""
52
53 DiaInsertId = "DiaInsertId"
54 """Name of the table for insert ID records."""
55
56 DiaObjectInsertId = "DiaObjectInsertId"
57 """Name of the table for DIAObject insert ID records."""
58
59 DiaSourceInsertId = "DiaSourceInsertId"
60 """Name of the table for DIASource insert ID records."""
61
62 DiaForcedSourceInsertId = "DiaFSourceInsertId"
63 """Name of the table for DIAForcedSource insert ID records."""
64
65 DiaSourceToPartition = "DiaSourceToPartition"
66 "Maps diaSourceId to its partition values (pixel and time)."
67
68 def table_name(self, prefix: str = "") -> str:
69 """Return full table name."""
70 return prefix + self.value
71
72 @classmethod
73 def insert_id_tables(cls) -> Mapping[ExtraTables, ApdbTables]:
74 """Return mapping of tables used for insert ID tracking to their
75 corresponding regular tables."""
76 return {
77 cls.DiaObjectInsertId: ApdbTables.DiaObject,
78 cls.DiaSourceInsertId: ApdbTables.DiaSource,
79 cls.DiaForcedSourceInsertId: ApdbTables.DiaForcedSource,
80 }
81
82
84 """Class for management of APDB schema.
85
86 Parameters
87 ----------
88 session : `cassandra.cluster.Session`
89 Cassandra session object
90 schema_file : `str`
91 Name of the YAML schema file.
92 schema_name : `str`, optional
93 Name of the schema in YAML files.
94 prefix : `str`, optional
95 Prefix to add to all schema elements.
96 time_partition_tables : `bool`
97 If True then schema will have a separate table for each time partition.
98 """
99
100 _type_map = {
101 felis.types.Double: "DOUBLE",
102 felis.types.Float: "FLOAT",
103 felis.types.Timestamp: "TIMESTAMP",
104 felis.types.Long: "BIGINT",
105 felis.types.Int: "INT",
106 felis.types.Short: "INT",
107 felis.types.Byte: "TINYINT",
108 felis.types.Binary: "BLOB",
109 felis.types.Char: "TEXT",
110 felis.types.String: "TEXT",
111 felis.types.Unicode: "TEXT",
112 felis.types.Text: "TEXT",
113 felis.types.Boolean: "BOOLEAN",
114 _FelisUUID: "UUID",
115 }
116 """Map YAML column types to Cassandra"""
117
118 _time_partitioned_tables = [
119 ApdbTables.DiaObject,
120 ApdbTables.DiaSource,
121 ApdbTables.DiaForcedSource,
122 ]
123 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
124
126 self,
127 session: cassandra.cluster.Session,
128 keyspace: str,
129 schema_file: str,
130 schema_name: str = "ApdbSchema",
131 prefix: str = "",
132 time_partition_tables: bool = False,
133 use_insert_id: bool = False,
134 ):
135 super().__init__(schema_file, schema_name)
136
137 self._session = session
138 self._keyspace = keyspace
139 self._prefix = prefix
140 self._time_partition_tables = time_partition_tables
141 self._use_insert_id = use_insert_id
142 self._has_insert_id: bool | None = None
143
144 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
146
147 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, simple.Table]:
148 """Generate schema for regular APDB tables."""
149 apdb_tables: dict[ApdbTables, simple.Table] = {}
150
151 # add columns and index for partitioning.
152 for table, apdb_table_def in self.tableSchemas.items():
153 part_columns = []
154 add_columns = []
155 primary_key = apdb_table_def.primary_key[:]
156 if table in self._spatially_partitioned_tables:
157 # DiaObjectLast does not need temporal partitioning
158 part_columns = ["apdb_part"]
159 add_columns = part_columns
160 elif table in self._time_partitioned_tables:
161 if time_partition_tables:
162 part_columns = ["apdb_part"]
163 else:
164 part_columns = ["apdb_part", "apdb_time_part"]
165 add_columns = part_columns
166 elif table is ApdbTables.SSObject:
167 # For SSObject there is no natural partition key but we have
168 # to partition it because there are too many of them. I'm
169 # going to partition on its primary key (and drop separate
170 # primary key index).
171 part_columns = ["ssObjectId"]
172 primary_key = []
173 else:
174 # TODO: Do not know what to do with the other tables
175 continue
176
177 column_defs = []
178 if add_columns:
179 column_defs = [
180 simple.Column(id=f"#{name}", name=name, datatype=felis.types.Long, nullable=False)
181 for name in add_columns
182 ]
183
184 annotations = dict(apdb_table_def.annotations)
185 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
186 if part_columns:
187 annotations["cassandra:partitioning_columns"] = part_columns
188
189 apdb_tables[table] = simple.Table(
190 id=apdb_table_def.id,
191 name=apdb_table_def.name,
192 columns=column_defs + apdb_table_def.columns,
193 primary_key=primary_key,
194 indexes=[],
195 constraints=[],
196 annotations=annotations,
197 )
198
199 return apdb_tables
200
201 def _extra_tables_schema(self) -> Mapping[ExtraTables, simple.Table]:
202 """Generate schema for extra tables."""
203 extra_tables: dict[ExtraTables, simple.Table] = {}
204
205 # This table maps DiaSource ID to its partitions in DiaSource table and
206 # DiaSourceInsertId tables.
207 extra_tables[ExtraTables.DiaSourceToPartition] = simple.Table(
208 id="#" + ExtraTables.DiaSourceToPartition.value,
209 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
210 columns=[
211 simple.Column(
212 id="#diaSourceId", name="diaSourceId", datatype=felis.types.Long, nullable=False
213 ),
214 simple.Column(id="#apdb_part", name="apdb_part", datatype=felis.types.Long, nullable=False),
215 simple.Column(
216 id="#apdb_time_part", name="apdb_time_part", datatype=felis.types.Int, nullable=False
217 ),
218 simple.Column(id="#insert_id", name="insert_id", datatype=_FelisUUID, nullable=True),
219 ],
220 primary_key=[],
221 indexes=[],
222 constraints=[],
223 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
224 )
225
226 insert_id_column = simple.Column(
227 id="#insert_id", name="insert_id", datatype=_FelisUUID, nullable=False
228 )
229
230 if not self._use_insert_id:
231 return extra_tables
232
233 # Table containing insert IDs, this one is not partitioned, but
234 # partition key must be defined.
235 extra_tables[ExtraTables.DiaInsertId] = simple.Table(
236 id="#" + ExtraTables.DiaInsertId.value,
237 name=ExtraTables.DiaInsertId.table_name(self._prefix),
238 columns=[
239 simple.Column(id="#partition", name="partition", datatype=felis.types.Int, nullable=False),
240 insert_id_column,
241 simple.Column(
242 id="#insert_time", name="insert_time", datatype=felis.types.Timestamp, nullable=False
243 ),
244 ],
245 primary_key=[insert_id_column],
246 indexes=[],
247 constraints=[],
248 annotations={"cassandra:partitioning_columns": ["partition"]},
249 )
250
251 for insert_id_table_enum, apdb_table_enum in ExtraTables.insert_id_tables().items():
252 apdb_table_def = self.tableSchemas[apdb_table_enum]
253
254 extra_tables[insert_id_table_enum] = simple.Table(
255 id="#" + insert_id_table_enum.value,
256 name=insert_id_table_enum.table_name(self._prefix),
257 columns=[insert_id_column] + apdb_table_def.columns,
258 primary_key=apdb_table_def.primary_key[:],
259 indexes=[],
260 constraints=[],
261 annotations={
262 "cassandra:partitioning_columns": ["insert_id"],
263 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
264 },
265 )
266
267 return extra_tables
268
269 @property
270 def has_insert_id(self) -> bool:
271 """Whether insert ID tables are to be used (`bool`)."""
272 if self._has_insert_id is None:
274 return self._has_insert_id
275
276 def _check_insert_id(self) -> bool:
277 """Check whether database has tables for tracking insert IDs."""
278 table_name = ExtraTables.DiaInsertId.table_name(self._prefix)
279 query = "SELECT count(*) FROM system_schema.tables WHERE keyspace_name = %s and table_name = %s"
280 result = self._session.execute(query, (self._keyspace, table_name))
281 row = result.one()
282 return bool(row[0])
283
284 def tableName(self, table_name: ApdbTables | ExtraTables) -> str:
285 """Return Cassandra table name for APDB table."""
286 return table_name.table_name(self._prefix)
287
288 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, simple.Column]:
289 """Returns mapping of column names to Column definitions.
290
291 Parameters
292 ----------
293 table_name : `ApdbTables`
294 One of known APDB table names.
295
296 Returns
297 -------
298 column_map : `dict`
299 Mapping of column names to `ColumnDef` instances.
300 """
301 table_schema = self._table_schema(table_name)
302 cmap = {column.name: column for column in table_schema.columns}
303 return cmap
304
305 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> List[str]:
306 """Return a list of columns names for a table as defined in APDB schema.
307
308 Parameters
309 ----------
310 table_name : `ApdbTables` or `ExtraTables`
311 Enum for a table in APDB schema.
312
313 Returns
314 -------
315 columns : `list` of `str`
316 Names of regular columns in the table.
317 """
318 table_schema = self._table_schema(table_name)
319 return table_schema.annotations["cassandra:apdb_column_names"]
320
321 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> List[str]:
322 """Return a list of columns used for table partitioning.
323
324 Parameters
325 ----------
326 table_name : `ApdbTables`
327 Table name in APDB schema
328
329 Returns
330 -------
331 columns : `list` of `str`
332 Names of columns used for partitioning.
333 """
334 table_schema = self._table_schema(table_name)
335 return table_schema.annotations.get("cassandra:partitioning_columns", [])
336
337 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> List[str]:
338 """Return a list of columns used for clustering.
339
340 Parameters
341 ----------
342 table_name : `ApdbTables`
343 Table name in APDB schema
344
345 Returns
346 -------
347 columns : `list` of `str`
348 Names of columns for used for clustering.
349 """
350 table_schema = self._table_schema(table_name)
351 return [column.name for column in table_schema.primary_key]
352
353 def makeSchema(self, drop: bool = False, part_range: Optional[Tuple[int, int]] = None) -> None:
354 """Create or re-create all tables.
355
356 Parameters
357 ----------
358 drop : `bool`
359 If True then drop tables before creating new ones.
360 part_range : `tuple` [ `int` ] or `None`
361 Start and end partition number for time partitions, end is not
362 inclusive. Used to create per-partition DiaObject, DiaSource, and
363 DiaForcedSource tables. If `None` then per-partition tables are
364 not created.
365 """
366 for table in self._apdb_tables:
367 self._makeTableSchema(table, drop, part_range)
368 for extra_table in self._extra_tables:
369 self._makeTableSchema(extra_table, drop, part_range)
370 # Reset cached information.
371 self._has_insert_id = None
372
374 self,
375 table: ApdbTables | ExtraTables,
376 drop: bool = False,
377 part_range: Optional[Tuple[int, int]] = None,
378 ) -> None:
379 _LOG.debug("Making table %s", table)
380
381 fullTable = table.table_name(self._prefix)
382
383 table_list = [fullTable]
384 if part_range is not None:
385 if table in self._time_partitioned_tables:
386 partitions = range(*part_range)
387 table_list = [f"{fullTable}_{part}" for part in partitions]
388
389 if drop:
390 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
391 futures = [self._session.execute_async(query, timeout=None) for query in queries]
392 for future in futures:
393 _LOG.debug("wait for query: %s", future.query)
394 future.result()
395 _LOG.debug("query finished: %s", future.query)
396
397 queries = []
398 for table_name in table_list:
399 if_not_exists = "" if drop else "IF NOT EXISTS"
400 columns = ", ".join(self._tableColumns(table))
401 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
402 _LOG.debug("query: %s", query)
403 queries.append(query)
404 futures = [self._session.execute_async(query, timeout=None) for query in queries]
405 for future in futures:
406 _LOG.debug("wait for query: %s", future.query)
407 future.result()
408 _LOG.debug("query finished: %s", future.query)
409
410 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> List[str]:
411 """Return set of columns in a table
412
413 Parameters
414 ----------
415 table_name : `ApdbTables`
416 Name of the table.
417
418 Returns
419 -------
420 column_defs : `list`
421 List of strings in the format "column_name type".
422 """
423 table_schema = self._table_schema(table_name)
424
425 # must have partition columns and clustering columns
426 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
427 clust_columns = [column.name for column in table_schema.primary_key]
428 _LOG.debug("part_columns: %s", part_columns)
429 _LOG.debug("clust_columns: %s", clust_columns)
430 if not part_columns:
431 raise ValueError(f"Table {table_name} configuration is missing partition index")
432
433 # all columns
434 column_defs = []
435 for column in table_schema.columns:
436 ctype = self._type_map[column.datatype]
437 column_defs.append(f'"{column.name}" {ctype}')
438
439 # primary key definition
440 part_columns = [f'"{col}"' for col in part_columns]
441 clust_columns = [f'"{col}"' for col in clust_columns]
442 if len(part_columns) > 1:
443 columns = ", ".join(part_columns)
444 part_columns = [f"({columns})"]
445 pkey = ", ".join(part_columns + clust_columns)
446 _LOG.debug("pkey: %s", pkey)
447 column_defs.append(f"PRIMARY KEY ({pkey})")
448
449 return column_defs
450
451 def _table_schema(self, table: ApdbTables | ExtraTables) -> simple.Table:
452 """Return schema definition for a table."""
453 if isinstance(table, ApdbTables):
454 table_schema = self._apdb_tables[table]
455 else:
456 table_schema = self._extra_tables[table]
457 return table_schema
std::vector< SchemaItem< Flag > > * items
List[str] _tableColumns(self, ApdbTables|ExtraTables table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
List[str] partitionColumns(self, ApdbTables|ExtraTables table_name)
List[str] clusteringColumns(self, ApdbTables|ExtraTables table_name)
simple.Table _table_schema(self, ApdbTables|ExtraTables table)
Mapping[ExtraTables, simple.Table] _extra_tables_schema(self)
None _makeTableSchema(self, ApdbTables|ExtraTables table, bool drop=False, Optional[Tuple[int, int]] part_range=None)
Mapping[str, simple.Column] getColumnMap(self, ApdbTables|ExtraTables table_name)
Mapping[ApdbTables, simple.Table] _apdb_tables_schema(self, bool time_partition_tables)
str tableName(self, ApdbTables|ExtraTables table_name)
__init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False, bool use_insert_id=False)
List[str] apdbColumnNames(self, ApdbTables|ExtraTables table_name)
Mapping[ExtraTables, ApdbTables] insert_id_tables(cls)