LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema", "CreateTableOptions", "TableOptions"]
25
26import enum
27import logging
28from collections.abc import Mapping
29from typing import TYPE_CHECKING, cast
30
31import felis.datamodel
32import pydantic
33
34from .. import schema_model
35from ..apdbSchema import ApdbTables
36
37if TYPE_CHECKING:
38 import cassandra.cluster
39
40 from ..schema_model import Table
41 from .config import ApdbCassandraTimePartitionRange
42
43
44_LOG = logging.getLogger(__name__)
45
46
47class InconsistentSchemaError(RuntimeError):
48 """Exception raised when schema state is inconsistent."""
49
50
51class TableOptions(pydantic.BaseModel):
52 """Set of per-table options for creating Cassandra tables."""
53
54 model_config = pydantic.ConfigDict(extra="forbid")
55
56 tables: list[str]
57 """List of table names for which the options should be applied."""
58
59 options: str
60
61
62class CreateTableOptions(pydantic.BaseModel):
63 """Set of options for creating Cassandra tables."""
64
65 model_config = pydantic.ConfigDict(extra="forbid")
66
67 table_options: list[TableOptions] = pydantic.Field(default_factory=list)
68 """Collection of per-table options."""
69
70 default_table_options: str = ""
71 """Default options used for tables that are not in the above list."""
72
73 def get_options(self, table_name: str) -> str:
74 """Find table options for a given table name."""
75 for table_options in self.table_options:
76 if table_name in table_options.tables:
77 return table_options.options
78 return self.default_table_options
79
80
81@enum.unique
82class ExtraTables(enum.Enum):
83 """Names of the extra tables used by Cassandra implementation.
84
85 Chunk tables exist in two versions now to support both old and new schema.
86 Eventually we will drop support for old tables.
87 """
88
89 ApdbReplicaChunks = "ApdbReplicaChunks"
90 """Name of the table for replica chunk records."""
91
92 DiaObjectChunks = "DiaObjectChunks"
93 """Name of the table for DIAObject chunk data."""
94
95 DiaSourceChunks = "DiaSourceChunks"
96 """Name of the table for DIASource chunk data."""
97
98 DiaForcedSourceChunks = "DiaForcedSourceChunks"
99 """Name of the table for DIAForcedSource chunk data."""
100
101 DiaObjectChunks2 = "DiaObjectChunks2"
102 """Name of the table for DIAObject chunk data."""
103
104 DiaSourceChunks2 = "DiaSourceChunks2"
105 """Name of the table for DIASource chunk data."""
106
107 DiaForcedSourceChunks2 = "DiaForcedSourceChunks2"
108 """Name of the table for DIAForcedSource chunk data."""
109
110 DiaSourceToPartition = "DiaSourceToPartition"
111 """Maps diaSourceId to its partition values (pixel and time)."""
112
113 DiaObjectLastToPartition = "DiaObjectLastToPartition"
114 """Maps last diaObjectId version to its partition (pixel)."""
115
116 ApdbVisitDetector = "ApdbVisitDetector"
117 """Records attempted processing of visit/detector."""
118
119 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
120 """Return full table name.
121
122 Parameters
123 ----------
124 prefix : `str`, optional
125 Optional prefix for table name.
126 time_partition : `int`, optional
127 Optional time partition, only used for tables that support time
128 patitioning.
129 """
130 return f"{prefix}{self.value}"
131
132 @classmethod
133 def replica_chunk_tables(cls, has_subchunks: bool) -> Mapping[ApdbTables, ExtraTables]:
134 """Return mapping of APDB tables to corresponding replica chunks
135 tables.
136 """
137 if has_subchunks:
138 return {
139 ApdbTables.DiaObject: cls.DiaObjectChunks2,
140 ApdbTables.DiaSource: cls.DiaSourceChunks2,
141 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks2,
142 }
143 else:
144 return {
145 ApdbTables.DiaObject: cls.DiaObjectChunks,
146 ApdbTables.DiaSource: cls.DiaSourceChunks,
147 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks,
148 }
149
150
152 """Class for management of APDB schema.
153
154 Parameters
155 ----------
156 session : `cassandra.cluster.Session`
157 Cassandra session object
158 keyspace : `str`
159 Keyspace name for all tables.
160 schema_file : `str`
161 Name of the YAML schema file.
162 schema_name : `str`, optional
163 Name of the schema in YAML files.
164 prefix : `str`, optional
165 Prefix to add to all schema elements.
166 time_partition_tables : `bool`
167 If `True` then schema will have a separate table for each time
168 partition.
169 enable_replica : `bool`, optional
170 If `True` then use additional tables for replica chunks.
171 has_chunk_sub_partitions : `bool`, optional
172 If `True` then replica chunk tables have sub-partition columns. Only
173 used if ``enable_replica`` is `True`.
174 """
175
176 _type_map = {
177 felis.datamodel.DataType.double: "DOUBLE",
178 felis.datamodel.DataType.float: "FLOAT",
179 felis.datamodel.DataType.timestamp: "TIMESTAMP",
180 felis.datamodel.DataType.long: "BIGINT",
181 felis.datamodel.DataType.int: "INT",
182 felis.datamodel.DataType.short: "SMALLINT",
183 felis.datamodel.DataType.byte: "TINYINT",
184 felis.datamodel.DataType.binary: "BLOB",
185 felis.datamodel.DataType.char: "TEXT",
186 felis.datamodel.DataType.string: "TEXT",
187 felis.datamodel.DataType.unicode: "TEXT",
188 felis.datamodel.DataType.text: "TEXT",
189 felis.datamodel.DataType.boolean: "BOOLEAN",
190 schema_model.ExtraDataTypes.UUID: "UUID",
191 }
192 """Map YAML column types to Cassandra"""
193
194 _time_partitioned_tables = [
195 ApdbTables.DiaObject,
196 ApdbTables.DiaSource,
197 ApdbTables.DiaForcedSource,
198 ]
199 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
200
202 self,
203 session: cassandra.cluster.Session,
204 keyspace: str,
205 table_schemas: Mapping[ApdbTables, Table],
206 prefix: str = "",
207 time_partition_tables: bool = False,
208 enable_replica: bool = False,
209 replica_skips_diaobjects: bool = False,
210 has_chunk_sub_partitions: bool = True,
211 has_visit_detector_table: bool = True,
212 ):
213 self._session = session
214 self._keyspace = keyspace
215 self._table_schemas = table_schemas
216 self._prefix = prefix
217 self._time_partition_tables = time_partition_tables
218 self._enable_replica = enable_replica
219 self._replica_skips_diaobjects = replica_skips_diaobjects
220 self._has_chunk_sub_partitions = has_chunk_sub_partitions
221 self._has_visit_detector_table = has_visit_detector_table
222
223 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
225
226 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]:
227 """Generate schema for regular APDB tables."""
228 apdb_tables: dict[ApdbTables, schema_model.Table] = {}
229
230 # add columns and index for partitioning.
231 for table, apdb_table_def in self._table_schemas.items():
232 part_columns = []
233 add_columns = []
234 primary_key = apdb_table_def.primary_key[:]
235 if table in self._spatially_partitioned_tables:
236 # DiaObjectLast does not need temporal partitioning
237 part_columns = ["apdb_part"]
238 add_columns = part_columns
239 elif table in self._time_partitioned_tables:
240 if time_partition_tables:
241 part_columns = ["apdb_part"]
242 else:
243 part_columns = ["apdb_part", "apdb_time_part"]
244 add_columns = part_columns
245 elif table is ApdbTables.SSObject:
246 # For SSObject there is no natural partition key but we have
247 # to partition it because there are too many of them. I'm
248 # going to partition on its primary key (and drop separate
249 # primary key index).
250 part_columns = ["ssObjectId"]
251 primary_key = []
252 elif table is ApdbTables.metadata:
253 # Metadata is in one partition because we want to read all of
254 # it in one query, add an extra column for partition.
255 part_columns = ["meta_part"]
256 add_columns = part_columns
257 else:
258 # TODO: Do not know what to do with the other tables
259 continue
260
261 column_defs = []
262 if add_columns:
263 column_defs = [
265 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False
266 )
267 for name in add_columns
268 ]
269
270 annotations = dict(apdb_table_def.annotations)
271 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
272 if part_columns:
273 annotations["cassandra:partitioning_columns"] = part_columns
274
275 apdb_tables[table] = schema_model.Table(
276 id=apdb_table_def.id,
277 name=apdb_table_def.name,
278 columns=column_defs + apdb_table_def.columns,
279 primary_key=primary_key,
280 indexes=[],
281 constraints=[],
282 annotations=annotations,
283 )
284
285 return apdb_tables
286
287 def _extra_tables_schema(self) -> Mapping[ExtraTables, schema_model.Table]:
288 """Generate schema for extra tables."""
289 extra_tables: dict[ExtraTables, schema_model.Table] = {}
290
292 columns = [
294 id="#visit",
295 name="visit",
296 datatype=felis.datamodel.DataType.long,
297 nullable=False,
298 ),
300 id="#detector",
301 name="detector",
302 datatype=felis.datamodel.DataType.short,
303 nullable=False,
304 ),
305 ]
306 extra_tables[ExtraTables.ApdbVisitDetector] = schema_model.Table(
307 id="#" + ExtraTables.ApdbVisitDetector.value,
308 name=ExtraTables.ApdbVisitDetector.table_name(self._prefix),
309 columns=columns,
310 primary_key=[],
311 indexes=[],
312 constraints=[],
313 annotations={"cassandra:partitioning_columns": ["visit", "detector"]},
314 )
315
316 # This table maps DiaSource ID to its partitions in DiaSource table and
317 # DiaSourceChunks tables.
318 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table(
319 id="#" + ExtraTables.DiaSourceToPartition.value,
320 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
321 columns=[
323 id="#diaSourceId",
324 name="diaSourceId",
325 datatype=felis.datamodel.DataType.long,
326 nullable=False,
327 ),
329 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
330 ),
332 id="#apdb_time_part",
333 name="apdb_time_part",
334 datatype=felis.datamodel.DataType.int,
335 nullable=False,
336 ),
338 id="#apdb_replica_chunk",
339 name="apdb_replica_chunk",
340 datatype=felis.datamodel.DataType.long,
341 nullable=True,
342 ),
344 id="#apdb_replica_subchunk",
345 name="apdb_replica_subchunk",
346 datatype=felis.datamodel.DataType.int,
347 nullable=True,
348 ),
349 ],
350 primary_key=[],
351 indexes=[],
352 constraints=[],
353 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
354 )
355
356 # This table maps diaObjectId to its partition in DiaObjectLast table.
357 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table(
358 id="#" + ExtraTables.DiaObjectLastToPartition.value,
359 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix),
360 columns=[
362 id="#diaObjectId",
363 name="diaObjectId",
364 datatype=felis.datamodel.DataType.long,
365 nullable=False,
366 ),
368 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
369 ),
370 ],
371 primary_key=[],
372 indexes=[],
373 constraints=[],
374 annotations={"cassandra:partitioning_columns": ["diaObjectId"]},
375 )
376
377 if not self._enable_replica:
378 return extra_tables
379
380 replica_chunk_column = schema_model.Column(
381 id="#apdb_replica_chunk",
382 name="apdb_replica_chunk",
383 datatype=felis.datamodel.DataType.long,
384 nullable=False,
385 )
386
387 replica_chunk_columns = [replica_chunk_column]
389 replica_chunk_columns.append(
391 id="#apdb_replica_subchunk",
392 name="apdb_replica_subchunk",
393 datatype=felis.datamodel.DataType.int,
394 nullable=False,
395 )
396 )
397
398 # Table containing replica chunks, this one is not partitioned, but
399 # partition key must be defined.
400 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table(
401 id="#" + ExtraTables.ApdbReplicaChunks.value,
402 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
403 columns=[
405 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False
406 ),
407 replica_chunk_column,
409 id="#last_update_time",
410 name="last_update_time",
411 datatype=felis.datamodel.DataType.timestamp,
412 nullable=False,
413 ),
415 id="#unique_id",
416 name="unique_id",
417 datatype=schema_model.ExtraDataTypes.UUID,
418 nullable=False,
419 ),
421 id="#has_subchunks",
422 name="has_subchunks",
423 datatype=felis.datamodel.DataType.boolean,
424 nullable=True,
425 ),
426 ],
427 primary_key=[replica_chunk_column],
428 indexes=[],
429 constraints=[],
430 annotations={"cassandra:partitioning_columns": ["partition"]},
431 )
432
433 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions)
434 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items():
435 apdb_table_def = self._table_schemas[apdb_table_enum]
436
437 extra_tables[chunk_table_enum] = schema_model.Table(
438 id="#" + chunk_table_enum.value,
439 name=chunk_table_enum.table_name(self._prefix),
440 columns=replica_chunk_columns + apdb_table_def.columns,
441 primary_key=apdb_table_def.primary_key[:],
442 indexes=[],
443 constraints=[],
444 annotations={
445 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
446 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
447 },
448 )
449
450 return extra_tables
451
452 @property
453 def replication_enabled(self) -> bool:
454 """True when replication is enabled (`bool`)."""
455 return self._enable_replica
456
457 @property
458 def has_chunk_sub_partitions(self) -> bool:
459 """True when chunk tables have sub-partitions (`bool`)."""
460 return self._has_chunk_sub_partitions
461
462 def empty(self) -> bool:
463 """Return True if database schema is empty.
464
465 Returns
466 -------
467 empty : `bool`
468 `True` if none of the required APDB tables exist in the database,
469 `False` if all required tables exist.
470
471 Raises
472 ------
473 InconsistentSchemaError
474 Raised when some of the required tables exist but not all.
475 """
476 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
477 result = self._session.execute(query, (self._keyspace,))
478 table_names = {row[0] for row in result.all()}
479
480 existing_tables = []
481 missing_tables = []
482 for table_enum in self._apdb_tables:
483 table_name = table_enum.table_name(self._prefix)
484 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
485 # Check prefix for time-partitioned tables.
486 exists = any(table.startswith(f"{table_name}_") for table in table_names)
487 else:
488 exists = table_name in table_names
489 if exists:
490 existing_tables.append(table_name)
491 else:
492 missing_tables.append(table_name)
493
494 if not missing_tables:
495 return False
496 elif not existing_tables:
497 return True
498 else:
500 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
501 )
502
503 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]:
504 """Return the list of existing table names for given table.
505
506 Parameters
507 ----------
508 *args : `ApdbTables`
509 Tables for which to return their existing table names.
510
511 Returns
512 -------
513 tables : `dict` [`ApdbTables`, `list`[`str`]]
514 Mapping of the APDB table to the list of the existing table names.
515 More than one name can be present in the list if configuration
516 specifies per-partition tables.
517 """
518 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables):
519 # Some of the tables should have per-partition tables.
520 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
521 result = self._session.execute(query, (self._keyspace,))
522 table_names = {row[0] for row in result.all()}
523
524 tables = {}
525 for table_enum in args:
526 base_name = table_enum.table_name(self._prefix)
527 if table_enum in self._time_partitioned_tables:
528 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")]
529 else:
530 tables[table_enum] = [base_name]
531 return tables
532 else:
533 # Do not check that they exist, we know that they should.
534 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args}
535
536 def tableName(self, table_name: ApdbTables | ExtraTables, time_partition: int | None = None) -> str:
537 """Return Cassandra table name for APDB table.
538
539 Parameters
540 ----------
541 table_name : `ApdbTables` or `ExtraTables`
542 Table enum for which to generate table name.
543 time_partition : `int`, optional
544 Optional time partition, only used for tables that support time
545 patitioning.
546 """
547 return table_name.table_name(self._prefix, time_partition)
548
549 def keyspace(self) -> str:
550 """Return Cassandra keyspace for APDB tables."""
551 return self._keyspace
552
553 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]:
554 """Return mapping of column names to Column definitions.
555
556 Parameters
557 ----------
558 table_name : `ApdbTables`
559 One of known APDB table names.
560
561 Returns
562 -------
563 column_map : `dict`
564 Mapping of column names to `ColumnDef` instances.
565 """
566 table_schema = self._table_schema(table_name)
567 cmap = {column.name: column for column in table_schema.columns}
568 return cmap
569
570 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]:
571 """Return a list of columns names for a table as defined in APDB
572 schema.
573
574 Parameters
575 ----------
576 table_name : `ApdbTables` or `ExtraTables`
577 Enum for a table in APDB schema.
578
579 Returns
580 -------
581 columns : `list` of `str`
582 Names of regular columns in the table.
583 """
584 table_schema = self._table_schema(table_name)
585 return table_schema.annotations["cassandra:apdb_column_names"]
586
587 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
588 """Return a list of columns used for table partitioning.
589
590 Parameters
591 ----------
592 table_name : `ApdbTables`
593 Table name in APDB schema
594
595 Returns
596 -------
597 columns : `list` of `str`
598 Names of columns used for partitioning.
599 """
600 table_schema = self._table_schema(table_name)
601 return table_schema.annotations.get("cassandra:partitioning_columns", [])
602
603 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
604 """Return a list of columns used for clustering.
605
606 Parameters
607 ----------
608 table_name : `ApdbTables`
609 Table name in APDB schema
610
611 Returns
612 -------
613 columns : `list` of `str`
614 Names of columns for used for clustering.
615 """
616 table_schema = self._table_schema(table_name)
617 return [column.name for column in table_schema.primary_key]
618
620 self,
621 *,
622 drop: bool = False,
623 part_range: ApdbCassandraTimePartitionRange | None = None,
624 replication_factor: int | None = None,
625 table_options: CreateTableOptions | None = None,
626 ) -> None:
627 """Create or re-create all tables.
628
629 Parameters
630 ----------
631 drop : `bool`
632 If True then drop tables before creating new ones. Note that
633 only tables are dropped and not the whole keyspace.
634 part_range : `ApdbCassandraTimePartitionRange` or `None`
635 Start and end partition number for time partitions. Used to create
636 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
637 `None` then per-partition tables are not created.
638 replication_factor : `int`, optional
639 Replication factor used when creating new keyspace, if keyspace
640 already exists its replication factor is not changed.
641 """
642 # Try to create keyspace if it does not exist
643 if replication_factor is None:
644 replication_factor = 1
645
646 # If keyspace exists check its replication factor.
647 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
648 result = self._session.execute(query, (self._keyspace,))
649 if row := result.one():
650 # Check replication factor, ignore strategy class.
651 repl_config = cast(Mapping[str, str], row[0])
652 current_repl = int(repl_config["replication_factor"])
653 if replication_factor != current_repl:
654 raise ValueError(
655 f"New replication factor {replication_factor} differs from the replication factor "
656 f"for already existing keyspace: {current_repl}"
657 )
658 else:
659 # Need a new keyspace.
660 query = (
661 f'CREATE KEYSPACE "{self._keyspace}"'
662 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
663 f"{replication_factor}"
664 "}"
665 )
666 self._session.execute(query)
667
668 table_options = self._update_table_options(table_options)
669 for table in self._apdb_tables:
670 if table is ApdbTables.DiaObject and self._enable_replica and self._replica_skips_diaobjects:
671 continue
672 if table is ApdbTables.SSSource:
673 # We do not support SSSource table yet.
674 continue
675 self._makeTableSchema(table, drop, part_range, table_options)
676 for extra_table in self._extra_tables:
677 self._makeTableSchema(extra_table, drop, part_range, table_options)
678
679 def _update_table_options(self, options: CreateTableOptions | None) -> CreateTableOptions | None:
680 """Extend table options with options for internal tables."""
681 # We want to add TTL option to ApdbVisitDetector table.
682 if not self._has_visit_detector_table:
683 return options
684
685 if not options:
686 options = CreateTableOptions()
687
688 # set both TTL and gc_grace_seconds to 24h.
689 options.table_options.append(
691 tables=[ExtraTables.ApdbVisitDetector.table_name(self._prefix)],
692 options="default_time_to_live=86400 AND gc_grace_seconds=86400",
693 )
694 )
695
696 return options
697
699 self,
700 table: ApdbTables | ExtraTables,
701 drop: bool = False,
702 part_range: ApdbCassandraTimePartitionRange | None = None,
703 table_options: CreateTableOptions | None = None,
704 ) -> None:
705 _LOG.debug("Making table %s", table)
706
707 fullTable = table.table_name(self._prefix)
708
709 table_list = [fullTable]
710 if part_range is not None:
711 if table in self._time_partitioned_tables:
712 table_list = [table.table_name(self._prefix, part) for part in part_range.range()]
713
714 if drop:
715 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
716 futures = [self._session.execute_async(query, timeout=None) for query in queries]
717 for future in futures:
718 _LOG.debug("wait for query: %s", future.query)
719 future.result()
720 _LOG.debug("query finished: %s", future.query)
721
722 queries = []
723 options = table_options.get_options(fullTable).strip() if table_options else None
724 for table_name in table_list:
725 if_not_exists = "" if drop else "IF NOT EXISTS"
726 columns = ", ".join(self._tableColumns(table))
727 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
728 if options:
729 query = f"{query} WITH {options}"
730 _LOG.debug("query: %s", query)
731 queries.append(query)
732 futures = [self._session.execute_async(query, timeout=None) for query in queries]
733 for future in futures:
734 _LOG.debug("wait for query: %s", future.query)
735 future.result()
736 _LOG.debug("query finished: %s", future.query)
737
738 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
739 """Return set of columns in a table
740
741 Parameters
742 ----------
743 table_name : `ApdbTables`
744 Name of the table.
745
746 Returns
747 -------
748 column_defs : `list`
749 List of strings in the format "column_name type".
750 """
751 table_schema = self._table_schema(table_name)
752
753 # must have partition columns and clustering columns
754 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
755 clust_columns = [column.name for column in table_schema.primary_key]
756 _LOG.debug("part_columns: %s", part_columns)
757 _LOG.debug("clust_columns: %s", clust_columns)
758 if not part_columns:
759 raise ValueError(f"Table {table_name} configuration is missing partition index")
760
761 # all columns
762 column_defs = []
763 for column in table_schema.columns:
764 ctype = self._type_map[column.datatype]
765 column_defs.append(f'"{column.name}" {ctype}')
766
767 # primary key definition
768 part_columns = [f'"{col}"' for col in part_columns]
769 clust_columns = [f'"{col}"' for col in clust_columns]
770 if len(part_columns) > 1:
771 columns = ", ".join(part_columns)
772 part_columns = [f"({columns})"]
773 pkey = ", ".join(part_columns + clust_columns)
774 _LOG.debug("pkey: %s", pkey)
775 column_defs.append(f"PRIMARY KEY ({pkey})")
776
777 return column_defs
778
779 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table:
780 """Return schema definition for a table."""
781 if isinstance(table, ApdbTables):
782 table_schema = self._apdb_tables[table]
783 else:
784 table_schema = self._extra_tables[table]
785 return table_schema
786
787 def table_row_size(self, table: ApdbTables | ExtraTables) -> int:
788 """Return an estimate of the row size of a given table.
789
790 Parameters
791 ----------
792 table : `ApdbTables` or `ExtraTables`
793
794 Returns
795 -------
796 size : `int`
797 An estimate of a table row size.
798
799 Notes
800 -----
801 Returned size is not exact. When table has variable-size columns (e.g.
802 strings) may be incorrect. Stored data size or wire-level protocol size
803 can be smaller if some columns are not set or set to NULL.
804 """
805 table_schema = self._table_schema(table)
806 size = sum(column.size() for column in table_schema.columns)
807 return size
808
809 def time_partitioned_tables(self) -> list[ApdbTables]:
810 """Make the list of time-partitioned tables.
811
812 Returns
813 -------
814 tables : `list` [`ApdbTables`]
815 Tables the are time-partitioned.
816 """
817 if not self._time_partition_tables:
818 return []
819 has_dia_object_table = not (self._enable_replica and self._replica_skips_diaobjects)
820 tables = [ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
821 if has_dia_object_table:
822 tables.append(ApdbTables.DiaObject)
823 return tables
Mapping[ExtraTables, schema_model.Table] _extra_tables_schema(self)
schema_model.Table _table_schema(self, ApdbTables|ExtraTables table)
None _makeTableSchema(self, ApdbTables|ExtraTables table, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, CreateTableOptions|None table_options=None)
None makeSchema(self, *, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, int|None replication_factor=None, CreateTableOptions|None table_options=None)
__init__(self, cassandra.cluster.Session session, str keyspace, Mapping[ApdbTables, Table] table_schemas, str prefix="", bool time_partition_tables=False, bool enable_replica=False, bool replica_skips_diaobjects=False, bool has_chunk_sub_partitions=True, bool has_visit_detector_table=True)
str tableName(self, ApdbTables|ExtraTables table_name, int|None time_partition=None)
list[str] _tableColumns(self, ApdbTables|ExtraTables table_name)
list[str] clusteringColumns(self, ApdbTables|ExtraTables table_name)
CreateTableOptions|None _update_table_options(self, CreateTableOptions|None options)
Mapping[ApdbTables, schema_model.Table] _apdb_tables_schema(self, bool time_partition_tables)
Mapping[str, schema_model.Column] getColumnMap(self, ApdbTables|ExtraTables table_name)
list[str] apdbColumnNames(self, ApdbTables|ExtraTables table_name)
list[str] partitionColumns(self, ApdbTables|ExtraTables table_name)
dict[ApdbTables, list[str]] existing_tables(self, *ApdbTables args)
Mapping[ApdbTables, ExtraTables] replica_chunk_tables(cls, bool has_subchunks)
str table_name(self, str prefix="", int|None time_partition=None)