LSST Applications g013ef56533+7c9321ec0f,g042eb84c57+c6cfa41bc3,g199a45376c+0ba108daf9,g1fd858c14a+fcad0d0313,g210f2d0738+c0f94c6586,g262e1987ae+a7e710680e,g29ae962dfc+fb55f2edb0,g2ac17093b6+61d6563b1e,g2b1d02342f+df6f932764,g2cef7863aa+aef1011c0b,g2f7ad74990+c0f94c6586,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53cf87ae69,g47891489e3+4316d04fff,g511e8cfd20+baa56acf6c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+fd7ad03fde,g64539dfbff+c0f94c6586,g67b6fd64d1+4316d04fff,g67fd3c3899+c0f94c6586,g6985122a63+4316d04fff,g74acd417e5+ca833bee28,g786e29fd12+668abc6043,g81db2e9a8d+b2ec8e584f,g87389fa792+8856018cbb,g89139ef638+4316d04fff,g8d7436a09f+0a24083b20,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+11eb8fd5b8,gbf99507273+8c5ae1fdc5,gcdda8b9158+e4c84c9d5c,gce8aa8abaa+8c5ae1fdc5,gd7ef33dd92+4316d04fff,gdab6d2f7ff+ca833bee28,ge410e46f29+4316d04fff,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.40
LSST Data Management Base Package
Loading...
Searching...
No Matches
connectionContext.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ConnectionContext", "DbVersions"]
25
26import dataclasses
27import logging
28from collections.abc import Mapping
29
30# If cassandra-driver is not there the module can still be imported
31# but ApdbCassandra cannot be instantiated.
32try:
33 from cassandra.cluster import Session
34except ImportError:
35 pass
36
37from ..apdbConfigFreezer import ApdbConfigFreezer
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import Table
41from ..versionTuple import VersionTuple
42from .apdbCassandraReplica import ApdbCassandraReplica
43from .apdbCassandraSchema import ApdbCassandraSchema
44from .apdbMetadataCassandra import ApdbMetadataCassandra
45from .cassandra_utils import PreparedStatementCache
46from .config import ApdbCassandraConfig, ApdbCassandraTimePartitionRange
47from .partitioner import Partitioner
48
49_LOG = logging.getLogger(__name__)
50
51_MON = MonAgent(__name__)
52
53
54@dataclasses.dataclass
56 """Versions defined in APDB metadata table."""
57
58 schema_version: VersionTuple
59 """Version of the schema from which database was created."""
60
61 code_version: VersionTuple
62 """Version of ApdbCassandra with which database was created."""
63
64 replica_version: VersionTuple | None
65 """Version of ApdbCassandraReplica with which database was created, None
66 if replication was not configured.
67 """
68
69
71 """Container for all kinds ob objects that are instantiated once the
72 connection to Cassandra is established.
73
74 Parameters
75 ----------
76 session : `cassandra.cluster.Sesion`
77 Cassandra session.
78 config : `ApdbCassandraConfig`
79 Configuration object.
80 table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
81 Schema definitions for regular APDB tables.
82 """
83
84 metadataSchemaVersionKey = "version:schema"
85 """Name of the metadata key to store schema version number."""
86
87 metadataCodeVersionKey = "version:ApdbCassandra"
88 """Name of the metadata key to store code version number."""
89
90 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
91 """Name of the metadata key to store replica code version number."""
92
93 metadataConfigKey = "config:apdb-cassandra.json"
94 """Name of the metadata key to store frozen part of the configuration."""
95
96 frozen_parameters = (
97 "enable_replica",
98 "ra_dec_columns",
99 "replica_skips_diaobjects",
100 "replica_sub_chunk_count",
101 "partitioning.part_pixelization",
102 "partitioning.part_pix_level",
103 "partitioning.time_partition_tables",
104 "partitioning.time_partition_days",
105 )
106 """Names of the config parameters to be frozen in metadata table."""
107
109 self, session: Session, config: ApdbCassandraConfig, table_schemas: Mapping[ApdbTables, Table]
110 ):
111 self.session = session
112
113 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
115 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
116 )
117
118 # Read frozen config from metadata.
119 config_json = self.metadata.get(self.metadataConfigKey)
120 if config_json is not None:
121 # Update config from metadata.
122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
123 self.config = freezer.update(config, config_json)
124 else:
125 self.config = config
126 del config
127
128 # Read versions stored in database.
130 _LOG.debug("Database versions: %s", self.db_versions)
131
132 # Since replica version 1.1.0 we use finer partitioning for replica
133 # chunk tables.
135 if self.config.enable_replica:
136 assert self.db_versions.replica_version is not None, "Replica version must be defined"
137 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
138 self.db_versions.replica_version
139 )
140
141 # Since version 0.1.3 we have metadata for time partitions.
142 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3)
143
144 # Since version 0.1.2 we have an extra table for visit/detector.
145 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2)
146
147 # Support for DiaObjectLastToPartition was added at code version 0.1.1
148 # in a backward-compatible way (we only use the table if it is there).
149 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1)
150
151 # Cache for prepared statements
153
155
157 session=self.session,
158 keyspace=self.config.keyspace,
159 table_schemas=table_schemas,
160 prefix=self.config.prefix,
161 time_partition_tables=self.config.partitioning.time_partition_tables,
162 enable_replica=self.config.enable_replica,
163 replica_skips_diaobjects=self.config.replica_skips_diaobjects,
164 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
165 has_visit_detector_table=self.has_visit_detector_table,
166 )
167
168 @property
169 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
170 """Time partition range or None if instance does not use
171 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
172 """
173 if not self.config.partitioning.time_partition_tables:
174 return None
175
177 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
178 else:
179 # Scan DiaSource tables and see which partitions are present.
180 partitions = set()
181 tables = self.schema.existing_tables(ApdbTables.DiaSource)
182 for table_name in tables[ApdbTables.DiaSource]:
183 _, _, part_str = table_name.rpartition("_")
184 partitions.add(int(part_str))
185 if not partitions:
186 raise LookupError("Failed to find any partitioned DiaSource table.")
187 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions))
188
189 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions:
190 """Read versions of all objects from metadata."""
191
192 def _get_version(key: str) -> VersionTuple:
193 """Retrieve version number from given metadata key."""
194 version_str = metadata.get(key)
195 if version_str is None:
196 # Should not happen with existing metadata table.
197 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
198 return VersionTuple.fromString(version_str)
199
200 db_schema_version = _get_version(self.metadataSchemaVersionKey)
201 db_code_version = _get_version(self.metadataCodeVersionKey)
202
203 # Check replica code version only if replica is enabled.
204 db_replica_version: VersionTuple | None = None
205 if self.config.enable_replica:
206 db_replica_version = _get_version(self.metadataReplicaVersionKey)
207
208 return DbVersions(
209 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
210 )
__init__(self, Session session, ApdbCassandraConfig config, Mapping[ApdbTables, Table] table_schemas)
ApdbCassandraTimePartitionRange|None time_partitions_range(self)
DbVersions _readVersions(self, ApdbMetadataCassandra metadata)