LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
connectionContext.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ConnectionContext", "DbVersions"]
25
26import dataclasses
27import logging
28from collections.abc import Mapping
29
30# If cassandra-driver is not there the module can still be imported
31# but ApdbCassandra cannot be instantiated.
32try:
33 from cassandra.cluster import Session
34except ImportError:
35 pass
36
37from ..apdbConfigFreezer import ApdbConfigFreezer
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import Table
41from ..versionTuple import VersionTuple
42from .apdbCassandraReplica import ApdbCassandraReplica
43from .apdbCassandraSchema import ApdbCassandraSchema
44from .apdbMetadataCassandra import ApdbMetadataCassandra
45from .cassandra_utils import PreparedStatementCache
46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
47from .partitioner import Partitioner
48
49_LOG = logging.getLogger(__name__)
50
51_MON = MonAgent(__name__)
52
53
54@dataclasses.dataclass
56 """Versions defined in APDB metadata table."""
57
58 schema_version: VersionTuple
59 """Version of the schema from which database was created."""
60
61 code_version: VersionTuple
62 """Version of ApdbCassandra with which database was created."""
63
64 replica_version: VersionTuple | None
65 """Version of ApdbCassandraReplica with which database was created, None
66 if replication was not configured.
67 """
68
69
71 """Container for all kinds ob objects that are instantiated once the
72 connection to Cassandra is established.
73
74 Parameters
75 ----------
76 session : `cassandra.cluster.Sesion`
77 Cassandra session.
78 config : `ApdbCassandraConfig`
79 Configuration object.
80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
81 Schema definitions for regular APDB tables.
82 """
83
84 metadataSchemaVersionKey = "version:schema"
85 """Name of the metadata key to store schema version number."""
86
87 metadataCodeVersionKey = "version:ApdbCassandra"
88 """Name of the metadata key to store code version number."""
89
90 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
91 """Name of the metadata key to store replica code version number."""
92
93 metadataConfigKey = "config:apdb-cassandra.json"
94 """Name of the metadata key to store frozen part of the configuration."""
95
96 frozen_parameters = (
97 "enable_replica",
98 "ra_dec_columns",
99 "replica_skips_diaobjects",
100 "replica_sub_chunk_count",
101 "partitioning.part_pixelization",
102 "partitioning.part_pix_level",
103 "partitioning.time_partition_tables",
104 "partitioning.time_partition_days",
105 )
106 """Names of the config parameters to be frozen in metadata table."""
107
109 self, session: Session, config: ApdbCassandraConfig, table_schemas: Mapping[ApdbTables, Table]
110 ):
111 self.session = session
112
113 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
115 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
116 )
117
118 # Read frozen config from metadata.
119 config_json = self.metadata.get(self.metadataConfigKey)
120 if config_json is not None:
121 # Update config from metadata.
122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
123 self.config = freezer.update(config, config_json)
124 else:
125 self.config = config
126 del config
127
128 # Read versions stored in database.
130 _LOG.debug("Database versions: %s", self.db_versions)
131
132 # Since replica version 1.1.0 we use finer partitioning for replica
133 # chunk tables.
135 if self.config.enable_replica:
136 assert self.db_versions.replica_version is not None, "Replica version must be defined"
137 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
138 self.db_versions.replica_version
139 )
140
141 # Since version 0.1.3 we have metadata for time partitions.
142 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3)
143
144 # Since version 0.1.2 we have an extra table for visit/detector.
145 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2)
146
147 # Support for DiaObjectLastToPartition was added at code version 0.1.1
148 # in a backward-compatible way (we only use the table if it is there).
149 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1)
150
151 # Cache for prepared statements
153
155
157 session=self.session,
158 keyspace=self.config.keyspace,
159 table_schemas=table_schemas,
160 prefix=self.config.prefix,
161 time_partition_tables=self.config.partitioning.time_partition_tables,
162 enable_replica=self.config.enable_replica,
163 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
164 has_visit_detector_table=self.has_visit_detector_table,
165 )
166
167 @property
168 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
169 """Time partition range or None if instance does not use
170 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
171 """
172 if not self.config.partitioning.time_partition_tables:
173 return None
174
176 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
177 else:
178 # Scan DiaSource tables and see which partitions are present.
179 partitions = set()
180 tables = self.schema.existing_tables(ApdbTables.DiaSource)
181 for table_name in tables[ApdbTables.DiaSource]:
182 _, _, part_str = table_name.rpartition("_")
183 partitions.add(int(part_str))
184 if not partitions:
185 raise LookupError("Failed to find any partitioned DiaSource table.")
186 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions))
187
188 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions:
189 """Read versions of all objects from metadata."""
190
191 def _get_version(key: str) -> VersionTuple:
192 """Retrieve version number from given metadata key."""
193 version_str = metadata.get(key)
194 if version_str is None:
195 # Should not happen with existing metadata table.
196 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
197 return VersionTuple.fromString(version_str)
198
199 db_schema_version = _get_version(self.metadataSchemaVersionKey)
200 db_code_version = _get_version(self.metadataCodeVersionKey)
201
202 # Check replica code version only if replica is enabled.
203 db_replica_version: VersionTuple | None = None
204 if self.config.enable_replica:
205 db_replica_version = _get_version(self.metadataReplicaVersionKey)
206
207 return DbVersions(
208 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
209 )
__init__(self, Session session, ApdbCassandraConfig config, Mapping[ApdbTables, Table] table_schemas)
ApdbCassandraTimePartitionRange|None time_partitions_range(self)
DbVersions _readVersions(self, ApdbMetadataCassandra metadata)