Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+b562e0a09f,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g33b6eb7922+23bc9e47ac,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+e64e5ff751,g53246c7159+8c5ae1fdc5,g55585698de+be1c65ba71,g56a49b3a55+92a7603e7a,g60b5630c4e+be1c65ba71,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+be1c65ba71,g989de1cb63+3fc8cb0b9e,g9d31334357+be1c65ba71,g9f33ca652e+69d6bbdd4b,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+b0077987df,gb89ab40317+3fc8cb0b9e,gc91f06edcd+2e2ca305f6,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+b038c5c67d,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,geb5476ad96+a886b35a30,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+3164b09b60,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema", "CreateTableOptions", "TableOptions"]
25
26import enum
27import logging
28from collections.abc import Mapping
29from typing import TYPE_CHECKING, cast
30
31import felis.datamodel
32import pydantic
33
34from .. import schema_model
35from ..apdbSchema import ApdbSchema, ApdbTables
36
37if TYPE_CHECKING:
38 import cassandra.cluster
39
40
41_LOG = logging.getLogger(__name__)
42
43
44class InconsistentSchemaError(RuntimeError):
45 """Exception raised when schema state is inconsistent."""
46
47
48class TableOptions(pydantic.BaseModel):
49 """Set of per-table options for creating Cassandra tables."""
50
51 model_config = pydantic.ConfigDict(extra="forbid")
52
53 tables: list[str]
54 """List of table names for which the options should be applied."""
55
56 options: str
57
58
59class CreateTableOptions(pydantic.BaseModel):
60 """Set of options for creating Cassandra tables."""
61
62 model_config = pydantic.ConfigDict(extra="forbid")
63
64 table_options: list[TableOptions] = pydantic.Field(default_factory=list)
65 """Collection of per-table options."""
66
67 default_table_options: str = ""
68 """Default options used for tables that are not in the above list."""
69
70 def get_options(self, table_name: str) -> str:
71 """Find table options for a given table name."""
72 for table_options in self.table_options:
73 if table_name in table_options.tables:
74 return table_options.options
75 return self.default_table_options
76
77
78@enum.unique
79class ExtraTables(enum.Enum):
80 """Names of the extra tables used by Cassandra implementation.
81
82 Chunk tables exist in two versions now to support both old and new schema.
83 Eventually we will drop support for old tables.
84 """
85
86 ApdbReplicaChunks = "ApdbReplicaChunks"
87 """Name of the table for replica chunk records."""
88
89 DiaObjectChunks = "DiaObjectChunks"
90 """Name of the table for DIAObject chunk data."""
91
92 DiaSourceChunks = "DiaSourceChunks"
93 """Name of the table for DIASource chunk data."""
94
95 DiaForcedSourceChunks = "DiaForcedSourceChunks"
96 """Name of the table for DIAForcedSource chunk data."""
97
98 DiaObjectChunks2 = "DiaObjectChunks2"
99 """Name of the table for DIAObject chunk data."""
100
101 DiaSourceChunks2 = "DiaSourceChunks2"
102 """Name of the table for DIASource chunk data."""
103
104 DiaForcedSourceChunks2 = "DiaForcedSourceChunks2"
105 """Name of the table for DIAForcedSource chunk data."""
106
107 DiaSourceToPartition = "DiaSourceToPartition"
108 "Maps diaSourceId to its partition values (pixel and time)."
109
110 DiaObjectLastToPartition = "DiaObjectLastToPartition"
111 "Maps last diaObjectId version to its partition (pixel)."
112
113 def table_name(self, prefix: str = "") -> str:
114 """Return full table name."""
115 return prefix + self.value
116
117 @classmethod
118 def replica_chunk_tables(cls, has_subchunks: bool) -> Mapping[ApdbTables, ExtraTables]:
119 """Return mapping of APDB tables to corresponding replica chunks
120 tables.
121 """
122 if has_subchunks:
123 return {
124 ApdbTables.DiaObject: cls.DiaObjectChunks2,
125 ApdbTables.DiaSource: cls.DiaSourceChunks2,
126 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks2,
127 }
128 else:
129 return {
130 ApdbTables.DiaObject: cls.DiaObjectChunks,
131 ApdbTables.DiaSource: cls.DiaSourceChunks,
132 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks,
133 }
134
135
137 """Class for management of APDB schema.
138
139 Parameters
140 ----------
141 session : `cassandra.cluster.Session`
142 Cassandra session object
143 keyspace : `str`
144 Keyspace name for all tables.
145 schema_file : `str`
146 Name of the YAML schema file.
147 schema_name : `str`, optional
148 Name of the schema in YAML files.
149 prefix : `str`, optional
150 Prefix to add to all schema elements.
151 time_partition_tables : `bool`
152 If `True` then schema will have a separate table for each time
153 partition.
154 enable_replica : `bool`, optional
155 If `True` then use additional tables for replica chunks.
156 has_chunk_sub_partitions : `bool`, optional
157 If `True` then replica chunk tables have sub-partition columns. Only
158 used if ``enable_replica`` is `True`.
159 """
160
161 _type_map = {
162 felis.datamodel.DataType.double: "DOUBLE",
163 felis.datamodel.DataType.float: "FLOAT",
164 felis.datamodel.DataType.timestamp: "TIMESTAMP",
165 felis.datamodel.DataType.long: "BIGINT",
166 felis.datamodel.DataType.int: "INT",
167 felis.datamodel.DataType.short: "SMALLINT",
168 felis.datamodel.DataType.byte: "TINYINT",
169 felis.datamodel.DataType.binary: "BLOB",
170 felis.datamodel.DataType.char: "TEXT",
171 felis.datamodel.DataType.string: "TEXT",
172 felis.datamodel.DataType.unicode: "TEXT",
173 felis.datamodel.DataType.text: "TEXT",
174 felis.datamodel.DataType.boolean: "BOOLEAN",
175 schema_model.ExtraDataTypes.UUID: "UUID",
176 }
177 """Map YAML column types to Cassandra"""
178
179 _time_partitioned_tables = [
180 ApdbTables.DiaObject,
181 ApdbTables.DiaSource,
182 ApdbTables.DiaForcedSource,
183 ]
184 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
185
187 self,
188 session: cassandra.cluster.Session,
189 keyspace: str,
190 schema_file: str,
191 schema_name: str = "ApdbSchema",
192 prefix: str = "",
193 time_partition_tables: bool = False,
194 enable_replica: bool = False,
195 has_chunk_sub_partitions: bool = True,
196 ):
197 super().__init__(schema_file, schema_name)
198
199 self._session = session
200 self._keyspace = keyspace
201 self._prefix = prefix
202 self._time_partition_tables = time_partition_tables
203 self._enable_replica = enable_replica
204 self._has_chunk_sub_partitions = has_chunk_sub_partitions
205
206 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
208
209 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]:
210 """Generate schema for regular APDB tables."""
211 apdb_tables: dict[ApdbTables, schema_model.Table] = {}
212
213 # add columns and index for partitioning.
214 for table, apdb_table_def in self.tableSchemas.items():
215 part_columns = []
216 add_columns = []
217 primary_key = apdb_table_def.primary_key[:]
218 if table in self._spatially_partitioned_tables:
219 # DiaObjectLast does not need temporal partitioning
220 part_columns = ["apdb_part"]
221 add_columns = part_columns
222 elif table in self._time_partitioned_tables:
223 if time_partition_tables:
224 part_columns = ["apdb_part"]
225 else:
226 part_columns = ["apdb_part", "apdb_time_part"]
227 add_columns = part_columns
228 elif table is ApdbTables.SSObject:
229 # For SSObject there is no natural partition key but we have
230 # to partition it because there are too many of them. I'm
231 # going to partition on its primary key (and drop separate
232 # primary key index).
233 part_columns = ["ssObjectId"]
234 primary_key = []
235 elif table is ApdbTables.metadata:
236 # Metadata is in one partition because we want to read all of
237 # it in one query, add an extra column for partition.
238 part_columns = ["meta_part"]
239 add_columns = part_columns
240 else:
241 # TODO: Do not know what to do with the other tables
242 continue
243
244 column_defs = []
245 if add_columns:
246 column_defs = [
248 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False
249 )
250 for name in add_columns
251 ]
252
253 annotations = dict(apdb_table_def.annotations)
254 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
255 if part_columns:
256 annotations["cassandra:partitioning_columns"] = part_columns
257
258 apdb_tables[table] = schema_model.Table(
259 id=apdb_table_def.id,
260 name=apdb_table_def.name,
261 columns=column_defs + apdb_table_def.columns,
262 primary_key=primary_key,
263 indexes=[],
264 constraints=[],
265 annotations=annotations,
266 )
267
268 return apdb_tables
269
270 def _extra_tables_schema(self) -> Mapping[ExtraTables, schema_model.Table]:
271 """Generate schema for extra tables."""
272 extra_tables: dict[ExtraTables, schema_model.Table] = {}
273
274 # This table maps DiaSource ID to its partitions in DiaSource table and
275 # DiaSourceChunks tables.
276 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table(
277 id="#" + ExtraTables.DiaSourceToPartition.value,
278 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
279 columns=[
281 id="#diaSourceId",
282 name="diaSourceId",
283 datatype=felis.datamodel.DataType.long,
284 nullable=False,
285 ),
287 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
288 ),
290 id="#apdb_time_part",
291 name="apdb_time_part",
292 datatype=felis.datamodel.DataType.int,
293 nullable=False,
294 ),
296 id="#apdb_replica_chunk",
297 name="apdb_replica_chunk",
298 datatype=felis.datamodel.DataType.long,
299 nullable=True,
300 ),
302 id="#apdb_replica_subchunk",
303 name="apdb_replica_subchunk",
304 datatype=felis.datamodel.DataType.int,
305 nullable=True,
306 ),
307 ],
308 primary_key=[],
309 indexes=[],
310 constraints=[],
311 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
312 )
313
314 # This table maps diaObjectId to its partition in DiaObjectLast table.
315 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table(
316 id="#" + ExtraTables.DiaObjectLastToPartition.value,
317 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix),
318 columns=[
320 id="#diaObjectId",
321 name="diaObjectId",
322 datatype=felis.datamodel.DataType.long,
323 nullable=False,
324 ),
326 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
327 ),
328 ],
329 primary_key=[],
330 indexes=[],
331 constraints=[],
332 annotations={"cassandra:partitioning_columns": ["diaObjectId"]},
333 )
334
335 if not self._enable_replica:
336 return extra_tables
337
338 replica_chunk_column = schema_model.Column(
339 id="#apdb_replica_chunk",
340 name="apdb_replica_chunk",
341 datatype=felis.datamodel.DataType.long,
342 nullable=False,
343 )
344
345 replica_chunk_columns = [replica_chunk_column]
347 replica_chunk_columns.append(
349 id="#apdb_replica_subchunk",
350 name="apdb_replica_subchunk",
351 datatype=felis.datamodel.DataType.int,
352 nullable=False,
353 )
354 )
355
356 # Table containing replica chunks, this one is not partitioned, but
357 # partition key must be defined.
358 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table(
359 id="#" + ExtraTables.ApdbReplicaChunks.value,
360 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
361 columns=[
363 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False
364 ),
365 replica_chunk_column,
367 id="#last_update_time",
368 name="last_update_time",
369 datatype=felis.datamodel.DataType.timestamp,
370 nullable=False,
371 ),
373 id="#unique_id",
374 name="unique_id",
375 datatype=schema_model.ExtraDataTypes.UUID,
376 nullable=False,
377 ),
379 id="#has_subchunks",
380 name="has_subchunks",
381 datatype=felis.datamodel.DataType.boolean,
382 nullable=True,
383 ),
384 ],
385 primary_key=[replica_chunk_column],
386 indexes=[],
387 constraints=[],
388 annotations={"cassandra:partitioning_columns": ["partition"]},
389 )
390
391 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions)
392 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items():
393 apdb_table_def = self.tableSchemas[apdb_table_enum]
394
395 extra_tables[chunk_table_enum] = schema_model.Table(
396 id="#" + chunk_table_enum.value,
397 name=chunk_table_enum.table_name(self._prefix),
398 columns=replica_chunk_columns + apdb_table_def.columns,
399 primary_key=apdb_table_def.primary_key[:],
400 indexes=[],
401 constraints=[],
402 annotations={
403 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
404 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
405 },
406 )
407
408 return extra_tables
409
410 @property
411 def replication_enabled(self) -> bool:
412 """True when replication is enabled (`bool`)."""
413 return self._enable_replica
414
415 @property
416 def has_chunk_sub_partitions(self) -> bool:
417 """True when chunk tables have sub-partitions (`bool`)."""
418 return self._has_chunk_sub_partitions
419
420 def empty(self) -> bool:
421 """Return True if database schema is empty.
422
423 Returns
424 -------
425 empty : `bool`
426 `True` if none of the required APDB tables exist in the database,
427 `False` if all required tables exist.
428
429 Raises
430 ------
431 InconsistentSchemaError
432 Raised when some of the required tables exist but not all.
433 """
434 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
435 result = self._session.execute(query, (self._keyspace,))
436 table_names = set(row[0] for row in result.all())
437
438 existing_tables = []
439 missing_tables = []
440 for table_enum in self._apdb_tables:
441 table_name = table_enum.table_name(self._prefix)
442 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
443 # Check prefix for time-partitioned tables.
444 exists = any(table.startswith(f"{table_name}_") for table in table_names)
445 else:
446 exists = table_name in table_names
447 if exists:
448 existing_tables.append(table_name)
449 else:
450 missing_tables.append(table_name)
451
452 if not missing_tables:
453 return False
454 elif not existing_tables:
455 return True
456 else:
458 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
459 )
460
461 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]:
462 """Return the list of existing table names for given table.
463
464 Parameters
465 ----------
466 *args : `ApdbTables`
467 Tables for which to return their existing table names.
468
469 Returns
470 -------
471 tables : `dict` [`ApdbTables`, `list`[`str`]]
472 Mapping of the APDB table to the list of the existing table names.
473 More than one name can be present in the list if configuration
474 specifies per-partition tables.
475 """
476 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables):
477 # Some of the tables should have per-partition tables.
478 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
479 result = self._session.execute(query, (self._keyspace,))
480 table_names = set(row[0] for row in result.all())
481
482 tables = {}
483 for table_enum in args:
484 base_name = table_enum.table_name(self._prefix)
485 if table_enum in self._time_partitioned_tables:
486 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")]
487 else:
488 tables[table_enum] = [base_name]
489 return tables
490 else:
491 # Do not check that they exist, we know that they should.
492 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args}
493
494 def tableName(self, table_name: ApdbTables | ExtraTables) -> str:
495 """Return Cassandra table name for APDB table."""
496 return table_name.table_name(self._prefix)
497
498 def keyspace(self) -> str:
499 """Return Cassandra keyspace for APDB tables."""
500 return self._keyspace
501
502 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]:
503 """Return mapping of column names to Column definitions.
504
505 Parameters
506 ----------
507 table_name : `ApdbTables`
508 One of known APDB table names.
509
510 Returns
511 -------
512 column_map : `dict`
513 Mapping of column names to `ColumnDef` instances.
514 """
515 table_schema = self._table_schema(table_name)
516 cmap = {column.name: column for column in table_schema.columns}
517 return cmap
518
519 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]:
520 """Return a list of columns names for a table as defined in APDB
521 schema.
522
523 Parameters
524 ----------
525 table_name : `ApdbTables` or `ExtraTables`
526 Enum for a table in APDB schema.
527
528 Returns
529 -------
530 columns : `list` of `str`
531 Names of regular columns in the table.
532 """
533 table_schema = self._table_schema(table_name)
534 return table_schema.annotations["cassandra:apdb_column_names"]
535
536 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
537 """Return a list of columns used for table partitioning.
538
539 Parameters
540 ----------
541 table_name : `ApdbTables`
542 Table name in APDB schema
543
544 Returns
545 -------
546 columns : `list` of `str`
547 Names of columns used for partitioning.
548 """
549 table_schema = self._table_schema(table_name)
550 return table_schema.annotations.get("cassandra:partitioning_columns", [])
551
552 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
553 """Return a list of columns used for clustering.
554
555 Parameters
556 ----------
557 table_name : `ApdbTables`
558 Table name in APDB schema
559
560 Returns
561 -------
562 columns : `list` of `str`
563 Names of columns for used for clustering.
564 """
565 table_schema = self._table_schema(table_name)
566 return [column.name for column in table_schema.primary_key]
567
569 self,
570 *,
571 drop: bool = False,
572 part_range: tuple[int, int] | None = None,
573 replication_factor: int | None = None,
574 table_options: CreateTableOptions | None = None,
575 ) -> None:
576 """Create or re-create all tables.
577
578 Parameters
579 ----------
580 drop : `bool`
581 If True then drop tables before creating new ones. Note that
582 only tables are dropped and not the whole keyspace.
583 part_range : `tuple` [ `int` ] or `None`
584 Start and end partition number for time partitions, end is not
585 inclusive. Used to create per-partition DiaObject, DiaSource, and
586 DiaForcedSource tables. If `None` then per-partition tables are
587 not created.
588 replication_factor : `int`, optional
589 Replication factor used when creating new keyspace, if keyspace
590 already exists its replication factor is not changed.
591 """
592 # Try to create keyspace if it does not exist
593 if replication_factor is None:
594 replication_factor = 1
595
596 # If keyspace exists check its replication factor.
597 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
598 result = self._session.execute(query, (self._keyspace,))
599 if row := result.one():
600 # Check replication factor, ignore strategy class.
601 repl_config = cast(Mapping[str, str], row[0])
602 current_repl = int(repl_config["replication_factor"])
603 if replication_factor != current_repl:
604 raise ValueError(
605 f"New replication factor {replication_factor} differs from the replication factor "
606 f"for already existing keyspace: {current_repl}"
607 )
608 else:
609 # Need a new keyspace.
610 query = (
611 f'CREATE KEYSPACE "{self._keyspace}"'
612 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
613 f"{replication_factor}"
614 "}"
615 )
616 self._session.execute(query)
617
618 for table in self._apdb_tables:
619 self._makeTableSchema(table, drop, part_range, table_options)
620 for extra_table in self._extra_tables:
621 self._makeTableSchema(extra_table, drop, part_range, table_options)
622
624 self,
625 table: ApdbTables | ExtraTables,
626 drop: bool = False,
627 part_range: tuple[int, int] | None = None,
628 table_options: CreateTableOptions | None = None,
629 ) -> None:
630 _LOG.debug("Making table %s", table)
631
632 fullTable = table.table_name(self._prefix)
633
634 table_list = [fullTable]
635 if part_range is not None:
636 if table in self._time_partitioned_tables:
637 partitions = range(*part_range)
638 table_list = [f"{fullTable}_{part}" for part in partitions]
639
640 if drop:
641 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
642 futures = [self._session.execute_async(query, timeout=None) for query in queries]
643 for future in futures:
644 _LOG.debug("wait for query: %s", future.query)
645 future.result()
646 _LOG.debug("query finished: %s", future.query)
647
648 queries = []
649 options = table_options.get_options(fullTable).strip() if table_options else None
650 for table_name in table_list:
651 if_not_exists = "" if drop else "IF NOT EXISTS"
652 columns = ", ".join(self._tableColumns(table))
653 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
654 if options:
655 query = f"{query} WITH {options}"
656 _LOG.debug("query: %s", query)
657 queries.append(query)
658 futures = [self._session.execute_async(query, timeout=None) for query in queries]
659 for future in futures:
660 _LOG.debug("wait for query: %s", future.query)
661 future.result()
662 _LOG.debug("query finished: %s", future.query)
663
664 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
665 """Return set of columns in a table
666
667 Parameters
668 ----------
669 table_name : `ApdbTables`
670 Name of the table.
671
672 Returns
673 -------
674 column_defs : `list`
675 List of strings in the format "column_name type".
676 """
677 table_schema = self._table_schema(table_name)
678
679 # must have partition columns and clustering columns
680 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
681 clust_columns = [column.name for column in table_schema.primary_key]
682 _LOG.debug("part_columns: %s", part_columns)
683 _LOG.debug("clust_columns: %s", clust_columns)
684 if not part_columns:
685 raise ValueError(f"Table {table_name} configuration is missing partition index")
686
687 # all columns
688 column_defs = []
689 for column in table_schema.columns:
690 ctype = self._type_map[column.datatype]
691 column_defs.append(f'"{column.name}" {ctype}')
692
693 # primary key definition
694 part_columns = [f'"{col}"' for col in part_columns]
695 clust_columns = [f'"{col}"' for col in clust_columns]
696 if len(part_columns) > 1:
697 columns = ", ".join(part_columns)
698 part_columns = [f"({columns})"]
699 pkey = ", ".join(part_columns + clust_columns)
700 _LOG.debug("pkey: %s", pkey)
701 column_defs.append(f"PRIMARY KEY ({pkey})")
702
703 return column_defs
704
705 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table:
706 """Return schema definition for a table."""
707 if isinstance(table, ApdbTables):
708 table_schema = self._apdb_tables[table]
709 else:
710 table_schema = self._extra_tables[table]
711 return table_schema
712
713 def table_row_size(self, table: ApdbTables | ExtraTables) -> int:
714 """Return an estimate of the row size of a given table.
715
716 Parameters
717 ----------
718 table : `ApdbTables` or `ExtraTables`
719
720 Returns
721 -------
722 size : `int`
723 An estimate of a table row size.
724
725 Notes
726 -----
727 Returned size is not exact. When table has variable-size columns (e.g.
728 strings) may be incorrect. Stored data size or wire-level protocol size
729 can be smaller if some columns are not set or set to NULL.
730 """
731 table_schema = self._table_schema(table)
732 size = sum(column.size() for column in table_schema.columns)
733 return size
Mapping[ExtraTables, schema_model.Table] _extra_tables_schema(self)
__init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False, bool enable_replica=False, bool has_chunk_sub_partitions=True)
schema_model.Table _table_schema(self, ApdbTables|ExtraTables table)
None _makeTableSchema(self, ApdbTables|ExtraTables table, bool drop=False, tuple[int, int]|None part_range=None, CreateTableOptions|None table_options=None)
None makeSchema(self, *, bool drop=False, tuple[int, int]|None part_range=None, int|None replication_factor=None, CreateTableOptions|None table_options=None)
list[str] _tableColumns(self, ApdbTables|ExtraTables table_name)
list[str] clusteringColumns(self, ApdbTables|ExtraTables table_name)
Mapping[ApdbTables, schema_model.Table] _apdb_tables_schema(self, bool time_partition_tables)
Mapping[str, schema_model.Column] getColumnMap(self, ApdbTables|ExtraTables table_name)
list[str] apdbColumnNames(self, ApdbTables|ExtraTables table_name)
list[str] partitionColumns(self, ApdbTables|ExtraTables table_name)
dict[ApdbTables, list[str]] existing_tables(self, *ApdbTables args)
Mapping[ApdbTables, ExtraTables] replica_chunk_tables(cls, bool has_subchunks)