LSST Applications g013ef56533+63812263fb,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+fde7a7a78c,g210f2d0738+db0c280453,g262e1987ae+abed931625,g29ae962dfc+058d1915d8,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+64337f1634,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+d9e514a434,g64539dfbff+db0c280453,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g7382096ae9+36d16ea71a,g74acd417e5+c70e70fbf6,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+1b779678e3,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+9583a964dd,g98a1a72a9c+028271c396,g98df359435+530b675b85,gb8cb2b794d+4e54f68785,gbf99507273+8c5ae1fdc5,gc2a301910b+db0c280453,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+c70e70fbf6,ge410e46f29+f459a6810c,ge41e95a9f2+db0c280453,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
connectionContext.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ConnectionContext", "DbVersions"]
25
26import dataclasses
27import logging
28from collections.abc import Mapping
29
30# If cassandra-driver is not there the module can still be imported
31# but ApdbCassandra cannot be instantiated.
32try:
33 from cassandra.cluster import Session
34except ImportError:
35 pass
36
37from ..apdbConfigFreezer import ApdbConfigFreezer
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import Table
41from ..versionTuple import VersionTuple
42from .apdbCassandraReplica import ApdbCassandraReplica
43from .apdbCassandraSchema import ApdbCassandraSchema
44from .apdbMetadataCassandra import ApdbMetadataCassandra
45from .cassandra_utils import PreparedStatementCache
46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
47from .partitioner import Partitioner
48
49_LOG = logging.getLogger(__name__)
50
51_MON = MonAgent(__name__)
52
53
54@dataclasses.dataclass
56 """Versions defined in APDB metadata table."""
57
58 schema_version: VersionTuple
59 """Version of the schema from which database was created."""
60
61 code_version: VersionTuple
62 """Version of ApdbCassandra with which database was created."""
63
64 replica_version: VersionTuple | None
65 """Version of ApdbCassandraReplica with which database was created, None
66 if replication was not configured.
67 """
68
69
71 """Container for all kinds ob objects that are instantiated once the
72 connection to Cassandra is established.
73
74 Parameters
75 ----------
76 session : `cassandra.cluster.Sesion`
77 Cassandra session.
78 config : `ApdbCassandraConfig`
79 Configuration object.
80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
81 Schema definitions for regular APDB tables.
82 """
83
84 metadataSchemaVersionKey = "version:schema"
85 """Name of the metadata key to store schema version number."""
86
87 metadataCodeVersionKey = "version:ApdbCassandra"
88 """Name of the metadata key to store code version number."""
89
90 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
91 """Name of the metadata key to store replica code version number."""
92
93 metadataConfigKey = "config:apdb-cassandra.json"
94 """Name of the metadata key to store frozen part of the configuration."""
95
96 frozen_parameters = (
97 "enable_replica",
98 "ra_dec_columns",
99 "replica_skips_diaobjects",
100 "replica_sub_chunk_count",
101 "partitioning.part_pixelization",
102 "partitioning.part_pix_level",
103 "partitioning.time_partition_tables",
104 "partitioning.time_partition_days",
105 )
106 """Names of the config parameters to be frozen in metadata table."""
107
109 self, session: Session, config: ApdbCassandraConfig, table_schemas: Mapping[ApdbTables, Table]
110 ):
111 self.session = session
112
113 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
115 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
116 )
117
118 # Read frozen config from metadata.
119 config_json = self.metadata.get(self.metadataConfigKey)
120 if config_json is not None:
121 # Update config from metadata.
122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
123 self.config = freezer.update(config, config_json)
124 else:
125 self.config = config
126 del config
127
128 # Read versions stored in database.
130 _LOG.debug("Database versions: %s", self.db_versions)
131
132 # Since replica version 1.1.0 we use finer partitioning for replica
133 # chunk tables.
136 if self.config.enable_replica:
137 assert self.db_versions.replica_version is not None, "Replica version must be defined"
138 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
139 self.db_versions.replica_version
140 )
141 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks(
142 self.db_versions.replica_version
143 )
144
145 # Since version 0.1.3 we have metadata for time partitions.
146 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3)
147
148 # Since version 0.1.2 we have an extra table for visit/detector.
149 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2)
150
151 # Support for DiaObjectLastToPartition was added at code version 0.1.1
152 # in a backward-compatible way (we only use the table if it is there).
153 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1)
154
155 # Cache for prepared statements
157
159
161 session=self.session,
162 keyspace=self.config.keyspace,
163 table_schemas=table_schemas,
164 prefix=self.config.prefix,
165 time_partition_tables=self.config.partitioning.time_partition_tables,
166 enable_replica=self.config.enable_replica,
167 replica_skips_diaobjects=self.config.replica_skips_diaobjects,
168 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
169 has_visit_detector_table=self.has_visit_detector_table,
170 )
171
172 @property
173 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
174 """Time partition range or None if instance does not use
175 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
176 """
177 if not self.config.partitioning.time_partition_tables:
178 return None
179
181 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
182 else:
183 # Scan DiaSource tables and see which partitions are present.
184 partitions = set()
185 tables = self.schema.existing_tables(ApdbTables.DiaSource)
186 for table_name in tables[ApdbTables.DiaSource]:
187 _, _, part_str = table_name.rpartition("_")
188 partitions.add(int(part_str))
189 if not partitions:
190 raise LookupError("Failed to find any partitioned DiaSource table.")
191 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions))
192
193 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions:
194 """Read versions of all objects from metadata."""
195
196 def _get_version(key: str) -> VersionTuple:
197 """Retrieve version number from given metadata key."""
198 version_str = metadata.get(key)
199 if version_str is None:
200 # Should not happen with existing metadata table.
201 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
202 return VersionTuple.fromString(version_str)
203
204 db_schema_version = _get_version(self.metadataSchemaVersionKey)
205 db_code_version = _get_version(self.metadataCodeVersionKey)
206
207 # Check replica code version only if replica is enabled.
208 db_replica_version: VersionTuple | None = None
209 if self.config.enable_replica:
210 db_replica_version = _get_version(self.metadataReplicaVersionKey)
211
212 return DbVersions(
213 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
214 )
__init__(self, Session session, ApdbCassandraConfig config, Mapping[ApdbTables, Table] table_schemas)
ApdbCassandraTimePartitionRange|None time_partitions_range(self)
DbVersions _readVersions(self, ApdbMetadataCassandra metadata)