LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.connectionContext.ConnectionContext Class Reference

Public Member Functions

 __init__ (self, Session session, ApdbCassandraConfig config, Mapping[ApdbTables, Table] table_schemas)
 
ApdbCassandraTimePartitionRange|None time_partitions_range (self)
 

Public Attributes

 session = session
 
 metadata
 
 config = freezer.update(config, config_json)
 
DbVersions db_versions = self._readVersions(self.metadata)
 
bool has_chunk_sub_partitions = False
 
bool has_update_record_chunks_table = False
 
DbVersions has_time_partition_meta = VersionTuple(0, 1, 3)
 
DbVersions has_visit_detector_table = VersionTuple(0, 1, 2)
 
DbVersions has_dia_object_last_to_partition = VersionTuple(0, 1, 1)
 
 preparer = PreparedStatementCache(self.session)
 
 partitioner = Partitioner(self.config)
 
 schema
 

Static Public Attributes

str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbCassandra"
 
str metadataReplicaVersionKey = "version:ApdbCassandraReplica"
 
str metadataConfigKey = "config:apdb-cassandra.json"
 
tuple frozen_parameters
 

Protected Member Functions

DbVersions _readVersions (self, ApdbMetadataCassandra metadata)
 

Detailed Description

Container for all kinds ob objects that are instantiated once the
connection to Cassandra is established.

Parameters
----------
session : `cassandra.cluster.Sesion`
    Cassandra session.
config : `ApdbCassandraConfig`
    Configuration object.
table_schemas : `~collection.abc.Mapping` [`ApdbTables`, `Table`]
    Schema definitions for regular APDB tables.

Definition at line 70 of file connectionContext.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.__init__ ( self,
Session session,
ApdbCassandraConfig config,
Mapping[ApdbTables, Table] table_schemas )

Definition at line 108 of file connectionContext.py.

110 ):
111 self.session = session
112
113 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
114 self.metadata = ApdbMetadataCassandra(
115 self.session, meta_table_name, config.keyspace, "read_tuples", "write"
116 )
117
118 # Read frozen config from metadata.
119 config_json = self.metadata.get(self.metadataConfigKey)
120 if config_json is not None:
121 # Update config from metadata.
122 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self.frozen_parameters)
123 self.config = freezer.update(config, config_json)
124 else:
125 self.config = config
126 del config
127
128 # Read versions stored in database.
129 self.db_versions = self._readVersions(self.metadata)
130 _LOG.debug("Database versions: %s", self.db_versions)
131
132 # Since replica version 1.1.0 we use finer partitioning for replica
133 # chunk tables.
134 self.has_chunk_sub_partitions = False
135 self.has_update_record_chunks_table = False
136 if self.config.enable_replica:
137 assert self.db_versions.replica_version is not None, "Replica version must be defined"
138 self.has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
139 self.db_versions.replica_version
140 )
141 self.has_update_record_chunks_table = ApdbCassandraReplica.hasUpdateRecordChunks(
142 self.db_versions.replica_version
143 )
144
145 # Since version 0.1.3 we have metadata for time partitions.
146 self.has_time_partition_meta = self.db_versions.code_version >= VersionTuple(0, 1, 3)
147
148 # Since version 0.1.2 we have an extra table for visit/detector.
149 self.has_visit_detector_table = self.db_versions.code_version >= VersionTuple(0, 1, 2)
150
151 # Support for DiaObjectLastToPartition was added at code version 0.1.1
152 # in a backward-compatible way (we only use the table if it is there).
153 self.has_dia_object_last_to_partition = self.db_versions.code_version >= VersionTuple(0, 1, 1)
154
155 # Cache for prepared statements
156 self.preparer = PreparedStatementCache(self.session)
157
158 self.partitioner = Partitioner(self.config)
159
160 self.schema = ApdbCassandraSchema(
161 session=self.session,
162 keyspace=self.config.keyspace,
163 table_schemas=table_schemas,
164 prefix=self.config.prefix,
165 time_partition_tables=self.config.partitioning.time_partition_tables,
166 enable_replica=self.config.enable_replica,
167 replica_skips_diaobjects=self.config.replica_skips_diaobjects,
168 has_chunk_sub_partitions=self.has_chunk_sub_partitions,
169 has_visit_detector_table=self.has_visit_detector_table,
170 )
171

Member Function Documentation

◆ _readVersions()

DbVersions lsst.dax.apdb.cassandra.connectionContext.ConnectionContext._readVersions ( self,
ApdbMetadataCassandra metadata )
protected
Read versions of all objects from metadata.

Definition at line 193 of file connectionContext.py.

193 def _readVersions(self, metadata: ApdbMetadataCassandra) -> DbVersions:
194 """Read versions of all objects from metadata."""
195
196 def _get_version(key: str) -> VersionTuple:
197 """Retrieve version number from given metadata key."""
198 version_str = metadata.get(key)
199 if version_str is None:
200 # Should not happen with existing metadata table.
201 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
202 return VersionTuple.fromString(version_str)
203
204 db_schema_version = _get_version(self.metadataSchemaVersionKey)
205 db_code_version = _get_version(self.metadataCodeVersionKey)
206
207 # Check replica code version only if replica is enabled.
208 db_replica_version: VersionTuple | None = None
209 if self.config.enable_replica:
210 db_replica_version = _get_version(self.metadataReplicaVersionKey)
211
212 return DbVersions(
213 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
214 )

◆ time_partitions_range()

ApdbCassandraTimePartitionRange | None lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.time_partitions_range ( self)
Time partition range or None if instance does not use
time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).

Definition at line 173 of file connectionContext.py.

173 def time_partitions_range(self) -> ApdbCassandraTimePartitionRange | None:
174 """Time partition range or None if instance does not use
175 time-partitioned tables (`ApdbCassandraTimePartitionRange` or `None`).
176 """
177 if not self.config.partitioning.time_partition_tables:
178 return None
179
180 if self.has_time_partition_meta:
181 return ApdbCassandraTimePartitionRange.from_meta(self.metadata)
182 else:
183 # Scan DiaSource tables and see which partitions are present.
184 partitions = set()
185 tables = self.schema.existing_tables(ApdbTables.DiaSource)
186 for table_name in tables[ApdbTables.DiaSource]:
187 _, _, part_str = table_name.rpartition("_")
188 partitions.add(int(part_str))
189 if not partitions:
190 raise LookupError("Failed to find any partitioned DiaSource table.")
191 return ApdbCassandraTimePartitionRange(start=min(partitions), end=max(partitions))
192

Member Data Documentation

◆ config

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.config = freezer.update(config, config_json)

Definition at line 123 of file connectionContext.py.

◆ db_versions

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.db_versions = self._readVersions(self.metadata)

Definition at line 129 of file connectionContext.py.

◆ frozen_parameters

tuple lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.frozen_parameters
static
Initial value:
= (
"enable_replica",
"ra_dec_columns",
"replica_skips_diaobjects",
"replica_sub_chunk_count",
"partitioning.part_pixelization",
"partitioning.part_pix_level",
"partitioning.time_partition_tables",
"partitioning.time_partition_days",
)

Definition at line 96 of file connectionContext.py.

◆ has_chunk_sub_partitions

bool lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.has_chunk_sub_partitions = False

Definition at line 134 of file connectionContext.py.

◆ has_dia_object_last_to_partition

DbVersions lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.has_dia_object_last_to_partition = VersionTuple(0, 1, 1)

Definition at line 153 of file connectionContext.py.

◆ has_time_partition_meta

DbVersions lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.has_time_partition_meta = VersionTuple(0, 1, 3)

Definition at line 146 of file connectionContext.py.

◆ has_update_record_chunks_table

bool lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.has_update_record_chunks_table = False

Definition at line 135 of file connectionContext.py.

◆ has_visit_detector_table

DbVersions lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.has_visit_detector_table = VersionTuple(0, 1, 2)

Definition at line 149 of file connectionContext.py.

◆ metadata

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.metadata
Initial value:
= ApdbMetadataCassandra(
self.session, meta_table_name, config.keyspace, "read_tuples", "write"
)

Definition at line 114 of file connectionContext.py.

◆ metadataCodeVersionKey

str lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.metadataCodeVersionKey = "version:ApdbCassandra"
static

Definition at line 87 of file connectionContext.py.

◆ metadataConfigKey

str lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.metadataConfigKey = "config:apdb-cassandra.json"
static

Definition at line 93 of file connectionContext.py.

◆ metadataReplicaVersionKey

str lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.metadataReplicaVersionKey = "version:ApdbCassandraReplica"
static

Definition at line 90 of file connectionContext.py.

◆ metadataSchemaVersionKey

str lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.metadataSchemaVersionKey = "version:schema"
static

Definition at line 84 of file connectionContext.py.

◆ partitioner

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.partitioner = Partitioner(self.config)

Definition at line 158 of file connectionContext.py.

◆ preparer

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.preparer = PreparedStatementCache(self.session)

Definition at line 156 of file connectionContext.py.

◆ schema

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.schema
Initial value:
= ApdbCassandraSchema(
session=self.session,
keyspace=self.config.keyspace,
table_schemas=table_schemas,
prefix=self.config.prefix,
time_partition_tables=self.config.partitioning.time_partition_tables,
enable_replica=self.config.enable_replica,
replica_skips_diaobjects=self.config.replica_skips_diaobjects,
has_chunk_sub_partitions=self.has_chunk_sub_partitions,
has_visit_detector_table=self.has_visit_detector_table,
)

Definition at line 160 of file connectionContext.py.

◆ session

lsst.dax.apdb.cassandra.connectionContext.ConnectionContext.session = session

Definition at line 111 of file connectionContext.py.


The documentation for this class was generated from the following file: