LSST Applications g013ef56533+63812263fb,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+fde7a7a78c,g210f2d0738+db0c280453,g262e1987ae+abed931625,g29ae962dfc+058d1915d8,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+64337f1634,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+d9e514a434,g64539dfbff+db0c280453,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g7382096ae9+36d16ea71a,g74acd417e5+c70e70fbf6,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+1b779678e3,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+9583a964dd,g98a1a72a9c+028271c396,g98df359435+530b675b85,gb8cb2b794d+4e54f68785,gbf99507273+8c5ae1fdc5,gc2a301910b+db0c280453,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+c70e70fbf6,ge410e46f29+f459a6810c,ge41e95a9f2+db0c280453,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema", "CreateTableOptions", "TableOptions"]
25
26import enum
27import logging
28from collections.abc import Mapping
29from typing import TYPE_CHECKING, cast
30
31import felis.datamodel
32import pydantic
33
34from .. import schema_model
35from ..apdbSchema import ApdbTables
36
37if TYPE_CHECKING:
38 import cassandra.cluster
39
40 from ..schema_model import Table
41 from .config import ApdbCassandraTimePartitionRange
42
43
44_LOG = logging.getLogger(__name__)
45
46
47class InconsistentSchemaError(RuntimeError):
48 """Exception raised when schema state is inconsistent."""
49
50
51class TableOptions(pydantic.BaseModel):
52 """Set of per-table options for creating Cassandra tables."""
53
54 model_config = pydantic.ConfigDict(extra="forbid")
55
56 tables: list[str]
57 """List of table names for which the options should be applied."""
58
59 options: str
60
61
62class CreateTableOptions(pydantic.BaseModel):
63 """Set of options for creating Cassandra tables."""
64
65 model_config = pydantic.ConfigDict(extra="forbid")
66
67 table_options: list[TableOptions] = pydantic.Field(default_factory=list)
68 """Collection of per-table options."""
69
70 default_table_options: str = ""
71 """Default options used for tables that are not in the above list."""
72
73 def get_options(self, table_name: str) -> str:
74 """Find table options for a given table name."""
75 for table_options in self.table_options:
76 if table_name in table_options.tables:
77 return table_options.options
78 return self.default_table_options
79
80
81@enum.unique
82class ExtraTables(enum.Enum):
83 """Names of the extra tables used by Cassandra implementation.
84
85 Chunk tables exist in two versions now to support both old and new schema.
86 Eventually we will drop support for old tables.
87 """
88
89 ApdbReplicaChunks = "ApdbReplicaChunks"
90 """Name of the table for replica chunk records."""
91
92 DiaObjectChunks = "DiaObjectChunks"
93 """Name of the table for DIAObject chunk data."""
94
95 DiaSourceChunks = "DiaSourceChunks"
96 """Name of the table for DIASource chunk data."""
97
98 DiaForcedSourceChunks = "DiaForcedSourceChunks"
99 """Name of the table for DIAForcedSource chunk data."""
100
101 DiaObjectChunks2 = "DiaObjectChunks2"
102 """Name of the table for DIAObject chunk data."""
103
104 DiaSourceChunks2 = "DiaSourceChunks2"
105 """Name of the table for DIASource chunk data."""
106
107 DiaForcedSourceChunks2 = "DiaForcedSourceChunks2"
108 """Name of the table for DIAForcedSource chunk data."""
109
110 ApdbUpdateRecordChunks = "ApdbUpdateRecordChunks"
111 """Name of the table for ApdbUpdateRecord chunk data."""
112
113 DiaSourceToPartition = "DiaSourceToPartition"
114 """Maps diaSourceId to its partition values (pixel and time)."""
115
116 DiaObjectLastToPartition = "DiaObjectLastToPartition"
117 """Maps last diaObjectId version to its partition (pixel)."""
118
119 ApdbVisitDetector = "ApdbVisitDetector"
120 """Records attempted processing of visit/detector."""
121
122 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
123 """Return full table name.
124
125 Parameters
126 ----------
127 prefix : `str`, optional
128 Optional prefix for table name.
129 time_partition : `int`, optional
130 Optional time partition, only used for tables that support time
131 patitioning.
132 """
133 return f"{prefix}{self.value}"
134
135 @classmethod
136 def replica_chunk_tables(cls, has_subchunks: bool) -> Mapping[ApdbTables, ExtraTables]:
137 """Return mapping of APDB tables to corresponding replica chunks
138 tables.
139 """
140 if has_subchunks:
141 return {
142 ApdbTables.DiaObject: cls.DiaObjectChunks2,
143 ApdbTables.DiaSource: cls.DiaSourceChunks2,
144 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks2,
145 }
146 else:
147 return {
148 ApdbTables.DiaObject: cls.DiaObjectChunks,
149 ApdbTables.DiaSource: cls.DiaSourceChunks,
150 ApdbTables.DiaForcedSource: cls.DiaForcedSourceChunks,
151 }
152
153
155 """Class for management of APDB schema.
156
157 Parameters
158 ----------
159 session : `cassandra.cluster.Session`
160 Cassandra session object
161 keyspace : `str`
162 Keyspace name for all tables.
163 schema_file : `str`
164 Name of the YAML schema file.
165 schema_name : `str`, optional
166 Name of the schema in YAML files.
167 prefix : `str`, optional
168 Prefix to add to all schema elements.
169 time_partition_tables : `bool`
170 If `True` then schema will have a separate table for each time
171 partition.
172 enable_replica : `bool`, optional
173 If `True` then use additional tables for replica chunks.
174 has_chunk_sub_partitions : `bool`, optional
175 If `True` then replica chunk tables have sub-partition columns. Only
176 used if ``enable_replica`` is `True`.
177 """
178
179 _type_map = {
180 felis.datamodel.DataType.double: "DOUBLE",
181 felis.datamodel.DataType.float: "FLOAT",
182 felis.datamodel.DataType.timestamp: "TIMESTAMP",
183 felis.datamodel.DataType.long: "BIGINT",
184 felis.datamodel.DataType.int: "INT",
185 felis.datamodel.DataType.short: "SMALLINT",
186 felis.datamodel.DataType.byte: "TINYINT",
187 felis.datamodel.DataType.binary: "BLOB",
188 felis.datamodel.DataType.char: "TEXT",
189 felis.datamodel.DataType.string: "TEXT",
190 felis.datamodel.DataType.unicode: "TEXT",
191 felis.datamodel.DataType.text: "TEXT",
192 felis.datamodel.DataType.boolean: "BOOLEAN",
193 schema_model.ExtraDataTypes.UUID: "UUID",
194 }
195 """Map YAML column types to Cassandra"""
196
197 _time_partitioned_tables = [
198 ApdbTables.DiaObject,
199 ApdbTables.DiaSource,
200 ApdbTables.DiaForcedSource,
201 ]
202 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
203
205 self,
206 session: cassandra.cluster.Session,
207 keyspace: str,
208 table_schemas: Mapping[ApdbTables, Table],
209 prefix: str = "",
210 time_partition_tables: bool = False,
211 enable_replica: bool = False,
212 replica_skips_diaobjects: bool = False,
213 has_chunk_sub_partitions: bool = True,
214 has_visit_detector_table: bool = True,
215 ):
216 self._session = session
217 self._keyspace = keyspace
218 self._table_schemas = table_schemas
219 self._prefix = prefix
220 self._time_partition_tables = time_partition_tables
221 self._enable_replica = enable_replica
222 self._replica_skips_diaobjects = replica_skips_diaobjects
223 self._has_chunk_sub_partitions = has_chunk_sub_partitions
224 self._has_visit_detector_table = has_visit_detector_table
225
226 self._apdb_tables = self._apdb_tables_schema(time_partition_tables)
228
229 def _apdb_tables_schema(self, time_partition_tables: bool) -> Mapping[ApdbTables, schema_model.Table]:
230 """Generate schema for regular APDB tables."""
231 apdb_tables: dict[ApdbTables, schema_model.Table] = {}
232
233 # add columns and index for partitioning.
234 for table, apdb_table_def in self._table_schemas.items():
235 part_columns = []
236 add_columns = []
237 primary_key = apdb_table_def.primary_key[:]
238 if table in self._spatially_partitioned_tables:
239 # DiaObjectLast does not need temporal partitioning
240 part_columns = ["apdb_part"]
241 add_columns = part_columns
242 elif table in self._time_partitioned_tables:
243 if time_partition_tables:
244 part_columns = ["apdb_part"]
245 else:
246 part_columns = ["apdb_part", "apdb_time_part"]
247 add_columns = part_columns
248 elif table is ApdbTables.SSObject:
249 # For SSObject there is no natural partition key but we have
250 # to partition it because there are too many of them. I'm
251 # going to partition on its primary key (and drop separate
252 # primary key index).
253 part_columns = ["ssObjectId"]
254 primary_key = []
255 elif table is ApdbTables.metadata:
256 # Metadata is in one partition because we want to read all of
257 # it in one query, add an extra column for partition.
258 part_columns = ["meta_part"]
259 add_columns = part_columns
260 else:
261 # TODO: Do not know what to do with the other tables
262 continue
263
264 column_defs = []
265 if add_columns:
266 column_defs = [
268 id=f"#{name}", name=name, datatype=felis.datamodel.DataType.long, nullable=False
269 )
270 for name in add_columns
271 ]
272
273 annotations = dict(apdb_table_def.annotations)
274 annotations["cassandra:apdb_column_names"] = [column.name for column in apdb_table_def.columns]
275 if part_columns:
276 annotations["cassandra:partitioning_columns"] = part_columns
277
278 apdb_tables[table] = schema_model.Table(
279 id=apdb_table_def.id,
280 name=apdb_table_def.name,
281 columns=column_defs + apdb_table_def.columns,
282 primary_key=primary_key,
283 indexes=[],
284 constraints=[],
285 annotations=annotations,
286 )
287
288 return apdb_tables
289
290 def _extra_tables_schema(self) -> Mapping[ExtraTables, schema_model.Table]:
291 """Generate schema for extra tables."""
292 extra_tables: dict[ExtraTables, schema_model.Table] = {}
293
295 columns = [
297 id="#visit",
298 name="visit",
299 datatype=felis.datamodel.DataType.long,
300 nullable=False,
301 ),
303 id="#detector",
304 name="detector",
305 datatype=felis.datamodel.DataType.short,
306 nullable=False,
307 ),
308 ]
309 extra_tables[ExtraTables.ApdbVisitDetector] = schema_model.Table(
310 id="#" + ExtraTables.ApdbVisitDetector.value,
311 name=ExtraTables.ApdbVisitDetector.table_name(self._prefix),
312 columns=columns,
313 primary_key=[],
314 indexes=[],
315 constraints=[],
316 annotations={"cassandra:partitioning_columns": ["visit", "detector"]},
317 )
318
319 # This table maps DiaSource ID to its partitions in DiaSource table and
320 # DiaSourceChunks tables.
321 extra_tables[ExtraTables.DiaSourceToPartition] = schema_model.Table(
322 id="#" + ExtraTables.DiaSourceToPartition.value,
323 name=ExtraTables.DiaSourceToPartition.table_name(self._prefix),
324 columns=[
326 id="#diaSourceId",
327 name="diaSourceId",
328 datatype=felis.datamodel.DataType.long,
329 nullable=False,
330 ),
332 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
333 ),
335 id="#apdb_time_part",
336 name="apdb_time_part",
337 datatype=felis.datamodel.DataType.int,
338 nullable=False,
339 ),
341 id="#apdb_replica_chunk",
342 name="apdb_replica_chunk",
343 datatype=felis.datamodel.DataType.long,
344 nullable=True,
345 ),
347 id="#apdb_replica_subchunk",
348 name="apdb_replica_subchunk",
349 datatype=felis.datamodel.DataType.int,
350 nullable=True,
351 ),
352 ],
353 primary_key=[],
354 indexes=[],
355 constraints=[],
356 annotations={"cassandra:partitioning_columns": ["diaSourceId"]},
357 )
358
359 # This table maps diaObjectId to its partition in DiaObjectLast table.
360 extra_tables[ExtraTables.DiaObjectLastToPartition] = schema_model.Table(
361 id="#" + ExtraTables.DiaObjectLastToPartition.value,
362 name=ExtraTables.DiaObjectLastToPartition.table_name(self._prefix),
363 columns=[
365 id="#diaObjectId",
366 name="diaObjectId",
367 datatype=felis.datamodel.DataType.long,
368 nullable=False,
369 ),
371 id="#apdb_part", name="apdb_part", datatype=felis.datamodel.DataType.long, nullable=False
372 ),
373 ],
374 primary_key=[],
375 indexes=[],
376 constraints=[],
377 annotations={"cassandra:partitioning_columns": ["diaObjectId"]},
378 )
379
380 if not self._enable_replica:
381 return extra_tables
382
383 replica_chunk_column = schema_model.Column(
384 id="#apdb_replica_chunk",
385 name="apdb_replica_chunk",
386 datatype=felis.datamodel.DataType.long,
387 nullable=False,
388 )
389
390 replica_chunk_columns = [replica_chunk_column]
392 replica_chunk_columns.append(
394 id="#apdb_replica_subchunk",
395 name="apdb_replica_subchunk",
396 datatype=felis.datamodel.DataType.int,
397 nullable=False,
398 )
399 )
400
401 # Table containing replica chunks, this one is not partitioned, but
402 # partition key must be defined.
403 extra_tables[ExtraTables.ApdbReplicaChunks] = schema_model.Table(
404 id="#" + ExtraTables.ApdbReplicaChunks.value,
405 name=ExtraTables.ApdbReplicaChunks.table_name(self._prefix),
406 columns=[
408 id="#partition", name="partition", datatype=felis.datamodel.DataType.int, nullable=False
409 ),
410 replica_chunk_column,
412 id="#last_update_time",
413 name="last_update_time",
414 datatype=felis.datamodel.DataType.timestamp,
415 nullable=False,
416 ),
418 id="#unique_id",
419 name="unique_id",
420 datatype=schema_model.ExtraDataTypes.UUID,
421 nullable=False,
422 ),
424 id="#has_subchunks",
425 name="has_subchunks",
426 datatype=felis.datamodel.DataType.boolean,
427 nullable=True,
428 ),
429 ],
430 primary_key=[replica_chunk_column],
431 indexes=[],
432 constraints=[],
433 annotations={"cassandra:partitioning_columns": ["partition"]},
434 )
435
436 replica_chunk_tables = ExtraTables.replica_chunk_tables(self._has_chunk_sub_partitions)
437 for apdb_table_enum, chunk_table_enum in replica_chunk_tables.items():
438 apdb_table_def = self._table_schemas[apdb_table_enum]
439
440 extra_tables[chunk_table_enum] = schema_model.Table(
441 id="#" + chunk_table_enum.value,
442 name=chunk_table_enum.table_name(self._prefix),
443 columns=replica_chunk_columns + apdb_table_def.columns,
444 primary_key=apdb_table_def.primary_key[:],
445 indexes=[],
446 constraints=[],
447 annotations={
448 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
449 "cassandra:apdb_column_names": [column.name for column in apdb_table_def.columns],
450 },
451 )
452
453 # Table with replica chunk data for ApdbUpdateRecord.
454 columns = [
456 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_time_ns",
457 name="update_time_ns",
458 datatype=felis.datamodel.DataType.long,
459 nullable=False,
460 ),
462 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_order",
463 name="update_order",
464 datatype=felis.datamodel.DataType.int,
465 nullable=False,
466 ),
468 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_unique_id",
469 name="update_unique_id",
470 datatype=schema_model.ExtraDataTypes.UUID,
471 nullable=False,
472 ),
474 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}.update_payload",
475 name="update_payload",
476 datatype=felis.datamodel.DataType.string,
477 nullable=False,
478 ),
479 ]
480 extra_tables[ExtraTables.ApdbUpdateRecordChunks] = schema_model.Table(
481 id=f"#{ExtraTables.ApdbUpdateRecordChunks.value}",
482 name=ExtraTables.ApdbUpdateRecordChunks.table_name(self._prefix),
483 columns=replica_chunk_columns + columns,
484 primary_key=columns[:3],
485 indexes=[],
486 constraints=[],
487 annotations={
488 "cassandra:partitioning_columns": [column.name for column in replica_chunk_columns],
489 },
490 )
491
492 return extra_tables
493
494 @property
495 def replication_enabled(self) -> bool:
496 """True when replication is enabled (`bool`)."""
497 return self._enable_replica
498
499 def empty(self) -> bool:
500 """Return True if database schema is empty.
501
502 Returns
503 -------
504 empty : `bool`
505 `True` if none of the required APDB tables exist in the database,
506 `False` if all required tables exist.
507
508 Raises
509 ------
510 InconsistentSchemaError
511 Raised when some of the required tables exist but not all.
512 """
513 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
514 result = self._session.execute(query, (self._keyspace,))
515 table_names = {row[0] for row in result.all()}
516
517 existing_tables = []
518 missing_tables = []
519 for table_enum in self._apdb_tables:
520 table_name = table_enum.table_name(self._prefix)
521 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
522 # Check prefix for time-partitioned tables.
523 exists = any(table.startswith(f"{table_name}_") for table in table_names)
524 else:
525 exists = table_name in table_names
526 if exists:
527 existing_tables.append(table_name)
528 else:
529 missing_tables.append(table_name)
530
531 if not missing_tables:
532 return False
533 elif not existing_tables:
534 return True
535 else:
537 f"Only some required APDB tables exist: {existing_tables}, missing tables: {missing_tables}"
538 )
539
540 def existing_tables(self, *args: ApdbTables) -> dict[ApdbTables, list[str]]:
541 """Return the list of existing table names for given table.
542
543 Parameters
544 ----------
545 *args : `ApdbTables`
546 Tables for which to return their existing table names.
547
548 Returns
549 -------
550 tables : `dict` [`ApdbTables`, `list`[`str`]]
551 Mapping of the APDB table to the list of the existing table names.
552 More than one name can be present in the list if configuration
553 specifies per-partition tables.
554 """
555 if self._time_partition_tables and not set(args).isdisjoint(self._time_partitioned_tables):
556 # Some of the tables should have per-partition tables.
557 query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s"
558 result = self._session.execute(query, (self._keyspace,))
559 table_names = {row[0] for row in result.all()}
560
561 tables = {}
562 for table_enum in args:
563 base_name = table_enum.table_name(self._prefix)
564 if table_enum in self._time_partitioned_tables:
565 tables[table_enum] = [table for table in table_names if table.startswith(f"{base_name}_")]
566 else:
567 tables[table_enum] = [base_name]
568 return tables
569 else:
570 # Do not check that they exist, we know that they should.
571 return {table_enum: [table_enum.table_name(self._prefix)] for table_enum in args}
572
573 def check_column(self, table_enum: ApdbTables | ExtraTables, column: str) -> bool:
574 """Check for the existence of the column in a given table.
575
576 Parameters
577 ----------
578 table_enum : `ApdbTables` or `ExtraTables`
579 Table to check for a column.
580 column : `str`
581 Name of the column to check.
582
583 Returns
584 -------
585 exists : `bool`
586 True if column exists, False otherwise.
587 """
588 if self._time_partition_tables and table_enum in self._time_partitioned_tables:
589 query = (
590 "SELECT table_name FROM system_schema.columns WHERE keyspace_name = %s AND column_name = %s "
591 "ALLOW FILTERING"
592 )
593 result = self._session.execute(query, (self._keyspace, column))
594 base_name = table_enum.table_name(self._prefix)
595 for row in result.all():
596 table_name = row[0]
597 if table_name.startswith(f"{base_name}_"):
598 return True
599 return False
600 else:
601 table_name = table_enum.table_name(self._prefix)
602 query = (
603 "SELECT column_name FROM system_schema.columns "
604 "WHERE keyspace_name = %s AND table_name = %s AND column_name = %s"
605 )
606 result = self._session.execute(query, (self._keyspace, table_name, column))
607 row = result.one()
608 return row is not None
609
610 def tableName(self, table_name: ApdbTables | ExtraTables, time_partition: int | None = None) -> str:
611 """Return Cassandra table name for APDB table.
612
613 Parameters
614 ----------
615 table_name : `ApdbTables` or `ExtraTables`
616 Table enum for which to generate table name.
617 time_partition : `int`, optional
618 Optional time partition, only used for tables that support time
619 patitioning.
620 """
621 return table_name.table_name(self._prefix, time_partition)
622
623 def keyspace(self) -> str:
624 """Return Cassandra keyspace for APDB tables."""
625 return self._keyspace
626
627 def getColumnMap(self, table_name: ApdbTables | ExtraTables) -> Mapping[str, schema_model.Column]:
628 """Return mapping of column names to Column definitions.
629
630 Parameters
631 ----------
632 table_name : `ApdbTables`
633 One of known APDB table names.
634
635 Returns
636 -------
637 column_map : `dict`
638 Mapping of column names to `ColumnDef` instances.
639 """
640 table_schema = self._table_schema(table_name)
641 cmap = {column.name: column for column in table_schema.columns}
642 return cmap
643
644 def apdbColumnNames(self, table_name: ApdbTables | ExtraTables) -> list[str]:
645 """Return a list of columns names for a table as defined in APDB
646 schema.
647
648 Parameters
649 ----------
650 table_name : `ApdbTables` or `ExtraTables`
651 Enum for a table in APDB schema.
652
653 Returns
654 -------
655 columns : `list` of `str`
656 Names of regular columns in the table.
657 """
658 table_schema = self._table_schema(table_name)
659 return table_schema.annotations["cassandra:apdb_column_names"]
660
661 def partitionColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
662 """Return a list of columns used for table partitioning.
663
664 Parameters
665 ----------
666 table_name : `ApdbTables`
667 Table name in APDB schema
668
669 Returns
670 -------
671 columns : `list` of `str`
672 Names of columns used for partitioning.
673 """
674 table_schema = self._table_schema(table_name)
675 return table_schema.annotations.get("cassandra:partitioning_columns", [])
676
677 def clusteringColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
678 """Return a list of columns used for clustering.
679
680 Parameters
681 ----------
682 table_name : `ApdbTables`
683 Table name in APDB schema
684
685 Returns
686 -------
687 columns : `list` of `str`
688 Names of columns for used for clustering.
689 """
690 table_schema = self._table_schema(table_name)
691 return [column.name for column in table_schema.primary_key]
692
694 self,
695 *,
696 drop: bool = False,
697 part_range: ApdbCassandraTimePartitionRange | None = None,
698 replication_factor: int | None = None,
699 table_options: CreateTableOptions | None = None,
700 ) -> None:
701 """Create or re-create all tables.
702
703 Parameters
704 ----------
705 drop : `bool`
706 If True then drop tables before creating new ones. Note that
707 only tables are dropped and not the whole keyspace.
708 part_range : `ApdbCassandraTimePartitionRange` or `None`
709 Start and end partition number for time partitions. Used to create
710 per-partition DiaObject, DiaSource, and DiaForcedSource tables. If
711 `None` then per-partition tables are not created.
712 replication_factor : `int`, optional
713 Replication factor used when creating new keyspace, if keyspace
714 already exists its replication factor is not changed.
715 """
716 # Try to create keyspace if it does not exist
717 if replication_factor is None:
718 replication_factor = 1
719
720 # If keyspace exists check its replication factor.
721 query = "SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = %s"
722 result = self._session.execute(query, (self._keyspace,))
723 if row := result.one():
724 # Check replication factor, ignore strategy class.
725 repl_config = cast(Mapping[str, str], row[0])
726 current_repl = int(repl_config["replication_factor"])
727 if replication_factor != current_repl:
728 raise ValueError(
729 f"New replication factor {replication_factor} differs from the replication factor "
730 f"for already existing keyspace: {current_repl}"
731 )
732 else:
733 # Need a new keyspace.
734 query = (
735 f'CREATE KEYSPACE "{self._keyspace}"'
736 " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': "
737 f"{replication_factor}"
738 "}"
739 )
740 self._session.execute(query)
741
742 table_options = self._update_table_options(table_options)
743 for table in self._apdb_tables:
744 if table is ApdbTables.DiaObject and self._enable_replica and self._replica_skips_diaobjects:
745 continue
746 if table is ApdbTables.SSSource:
747 # We do not support SSSource table yet.
748 continue
749 self._makeTableSchema(table, drop, part_range, table_options)
750 for extra_table in self._extra_tables:
751 self._makeTableSchema(extra_table, drop, part_range, table_options)
752
753 def _update_table_options(self, options: CreateTableOptions | None) -> CreateTableOptions | None:
754 """Extend table options with options for internal tables."""
755 # We want to add TTL option to ApdbVisitDetector table.
756 if not self._has_visit_detector_table:
757 return options
758
759 if not options:
760 options = CreateTableOptions()
761
762 # set both TTL and gc_grace_seconds to 24h.
763 options.table_options.append(
765 tables=[ExtraTables.ApdbVisitDetector.table_name(self._prefix)],
766 options="default_time_to_live=86400 AND gc_grace_seconds=86400",
767 )
768 )
769
770 return options
771
773 self,
774 table: ApdbTables | ExtraTables,
775 drop: bool = False,
776 part_range: ApdbCassandraTimePartitionRange | None = None,
777 table_options: CreateTableOptions | None = None,
778 ) -> None:
779 _LOG.debug("Making table %s", table)
780
781 fullTable = table.table_name(self._prefix)
782
783 table_list = [fullTable]
784 if part_range is not None:
785 if table in self._time_partitioned_tables:
786 table_list = [table.table_name(self._prefix, part) for part in part_range.range()]
787
788 if drop:
789 queries = [f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list]
790 futures = [self._session.execute_async(query, timeout=None) for query in queries]
791 for future in futures:
792 _LOG.debug("wait for query: %s", future.query)
793 future.result()
794 _LOG.debug("query finished: %s", future.query)
795
796 queries = []
797 options = table_options.get_options(fullTable).strip() if table_options else None
798 for table_name in table_list:
799 if_not_exists = "" if drop else "IF NOT EXISTS"
800 columns = ", ".join(self._tableColumns(table))
801 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
802 if options:
803 query = f"{query} WITH {options}"
804 _LOG.debug("query: %s", query)
805 queries.append(query)
806 futures = [self._session.execute_async(query, timeout=None) for query in queries]
807 for future in futures:
808 _LOG.debug("wait for query: %s", future.query)
809 future.result()
810 _LOG.debug("query finished: %s", future.query)
811
812 def _tableColumns(self, table_name: ApdbTables | ExtraTables) -> list[str]:
813 """Return set of columns in a table
814
815 Parameters
816 ----------
817 table_name : `ApdbTables`
818 Name of the table.
819
820 Returns
821 -------
822 column_defs : `list`
823 List of strings in the format "column_name type".
824 """
825 table_schema = self._table_schema(table_name)
826
827 # must have partition columns and clustering columns
828 part_columns = table_schema.annotations.get("cassandra:partitioning_columns", [])
829 clust_columns = [column.name for column in table_schema.primary_key]
830 _LOG.debug("part_columns: %s", part_columns)
831 _LOG.debug("clust_columns: %s", clust_columns)
832 if not part_columns:
833 raise ValueError(f"Table {table_name} configuration is missing partition index")
834
835 # all columns
836 column_defs = []
837 for column in table_schema.columns:
838 ctype = self._type_map[column.datatype]
839 column_defs.append(f'"{column.name}" {ctype}')
840
841 # primary key definition
842 part_columns = [f'"{col}"' for col in part_columns]
843 clust_columns = [f'"{col}"' for col in clust_columns]
844 if len(part_columns) > 1:
845 columns = ", ".join(part_columns)
846 part_columns = [f"({columns})"]
847 pkey = ", ".join(part_columns + clust_columns)
848 _LOG.debug("pkey: %s", pkey)
849 column_defs.append(f"PRIMARY KEY ({pkey})")
850
851 return column_defs
852
853 def _table_schema(self, table: ApdbTables | ExtraTables) -> schema_model.Table:
854 """Return schema definition for a table."""
855 if isinstance(table, ApdbTables):
856 table_schema = self._apdb_tables[table]
857 else:
858 table_schema = self._extra_tables[table]
859 return table_schema
860
861 def table_row_size(self, table: ApdbTables | ExtraTables) -> int:
862 """Return an estimate of the row size of a given table.
863
864 Parameters
865 ----------
866 table : `ApdbTables` or `ExtraTables`
867
868 Returns
869 -------
870 size : `int`
871 An estimate of a table row size.
872
873 Notes
874 -----
875 Returned size is not exact. When table has variable-size columns (e.g.
876 strings) may be incorrect. Stored data size or wire-level protocol size
877 can be smaller if some columns are not set or set to NULL.
878 """
879 table_schema = self._table_schema(table)
880 size = sum(column.size() for column in table_schema.columns)
881 return size
882
883 def time_partitioned_tables(self) -> list[ApdbTables]:
884 """Make the list of time-partitioned tables.
885
886 Returns
887 -------
888 tables : `list` [`ApdbTables`]
889 Tables the are time-partitioned.
890 """
891 if not self._time_partition_tables:
892 return []
893 has_dia_object_table = not (self._enable_replica and self._replica_skips_diaobjects)
894 tables = [ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
895 if has_dia_object_table:
896 tables.append(ApdbTables.DiaObject)
897 return tables
Mapping[ExtraTables, schema_model.Table] _extra_tables_schema(self)
schema_model.Table _table_schema(self, ApdbTables|ExtraTables table)
None _makeTableSchema(self, ApdbTables|ExtraTables table, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, CreateTableOptions|None table_options=None)
None makeSchema(self, *, bool drop=False, ApdbCassandraTimePartitionRange|None part_range=None, int|None replication_factor=None, CreateTableOptions|None table_options=None)
__init__(self, cassandra.cluster.Session session, str keyspace, Mapping[ApdbTables, Table] table_schemas, str prefix="", bool time_partition_tables=False, bool enable_replica=False, bool replica_skips_diaobjects=False, bool has_chunk_sub_partitions=True, bool has_visit_detector_table=True)
str tableName(self, ApdbTables|ExtraTables table_name, int|None time_partition=None)
list[str] _tableColumns(self, ApdbTables|ExtraTables table_name)
list[str] clusteringColumns(self, ApdbTables|ExtraTables table_name)
CreateTableOptions|None _update_table_options(self, CreateTableOptions|None options)
Mapping[ApdbTables, schema_model.Table] _apdb_tables_schema(self, bool time_partition_tables)
Mapping[str, schema_model.Column] getColumnMap(self, ApdbTables|ExtraTables table_name)
list[str] apdbColumnNames(self, ApdbTables|ExtraTables table_name)
list[str] partitionColumns(self, ApdbTables|ExtraTables table_name)
dict[ApdbTables, list[str]] existing_tables(self, *ApdbTables args)
bool check_column(self, ApdbTables|ExtraTables table_enum, str column)
Mapping[ApdbTables, ExtraTables] replica_chunk_tables(cls, bool has_subchunks)
str table_name(self, str prefix="", int|None time_partition=None)