LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica Class Reference
Inheritance diagram for lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica:
lsst.dax.apdb.apdbReplica.ApdbReplica

Public Member Functions

 __init__ (self, ApdbCassandra apdb)
 
VersionTuple schemaVersion (self)
 
VersionTuple apdbReplicaImplementationVersion (cls)
 
bool hasChunkSubPartitions (cls, VersionTuple version)
 
bool hasUpdateRecordChunks (cls, VersionTuple version)
 
list[ReplicaChunk]|None getReplicaChunks (self)
 
None deleteReplicaChunks (self, Iterable[int] chunks)
 
ApdbTableData getTableDataChunks (self, ApdbTables table, Iterable[int] chunks)
 
Sequence[ApdbUpdateRecordgetUpdateRecordChunks (self, Iterable[int] chunks)
 
ApdbReplica from_config (cls, ApdbConfig config)
 
ApdbReplica from_uri (cls, ResourcePathExpression uri)
 

Protected Member Functions

Timer _timer (self, str name, *, Mapping[str, str|int]|None tags=None)
 

Protected Attributes

 _apdb = apdb
 

Detailed Description

Implementation of `ApdbReplica` for Cassandra backend.

Parameters
----------
apdb : `ApdbCassandra`
    Instance of ApbdCassandra for database.

Definition at line 61 of file apdbCassandraReplica.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.__init__ ( self,
ApdbCassandra apdb )

Definition at line 70 of file apdbCassandraReplica.py.

70 def __init__(self, apdb: ApdbCassandra):
71 # Note that ApdbCassandra instance must stay alive while this object
72 # exists, so we keep reference to it.
73 self._apdb = apdb
74

Member Function Documentation

◆ _timer()

Timer lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._timer ( self,
str name,
* ,
Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 75 of file apdbCassandraReplica.py.

75 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
76 """Create `Timer` instance given its name."""
77 return Timer(name, _MON, tags=tags)
78

◆ apdbReplicaImplementationVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.apdbReplicaImplementationVersion ( cls)
Return version number for current ApdbReplica implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 85 of file apdbCassandraReplica.py.

85 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
86 # Docstring inherited from base class.
87 return VERSION
88

◆ deleteReplicaChunks()

None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.deleteReplicaChunks ( self,
Iterable[int] chunks )
Remove replication chunks from the database.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to remove.

Notes
-----
This method causes Apdb to forget about specified chunks. If there
are any auxiliary data associated with the identifiers, it is also
removed from database (but data in regular tables is not removed).
This method should be called after successful transfer of data from
APDB to PPDB to free space used by replicas.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 136 of file apdbCassandraReplica.py.

136 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
137 # docstring is inherited from a base class
138 context = self._apdb._context
139 config = context.config
140
141 if not context.schema.replication_enabled:
142 raise ValueError("APDB is not configured for replication")
143
144 # everything goes into a single partition
145 partition = 0
146
147 # Iterable can be single pass, make everything that we need from it
148 # in a single loop.
149 repl_table_params = []
150 chunk_table_params: list[tuple] = []
151 for chunk in chunks:
152 repl_table_params.append((partition, chunk))
153 if context.has_chunk_sub_partitions:
154 for subchunk in range(config.replica_sub_chunk_count):
155 chunk_table_params.append((chunk, subchunk))
156 else:
157 chunk_table_params.append((chunk,))
158 # Anything to do att all?
159 if not repl_table_params:
160 return
161
162 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
163 query = (
164 f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE partition = ? AND apdb_replica_chunk = ?'
165 )
166 statement = context.preparer.prepare(query)
167
168 queries = [(statement, param) for param in repl_table_params]
169 with self._timer("chunks_delete_time") as timer:
170 execute_concurrent(context.session, queries)
171 timer.add_values(row_count=len(queries))
172
173 # Also remove those chunk_ids from Dia*Chunks tables.
174 tables = list(ExtraTables.replica_chunk_tables(context.has_chunk_sub_partitions).values())
175 if context.has_update_record_chunks_table:
176 tables.append(ExtraTables.ApdbUpdateRecordChunks)
177 for table in tables:
178 table_name = context.schema.tableName(table)
179 query = f'DELETE FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
180 if context.has_chunk_sub_partitions:
181 query += " AND apdb_replica_subchunk = ?"
182 statement = context.preparer.prepare(query)
183
184 queries = [(statement, param) for param in chunk_table_params]
185 with self._timer("table_chunk_detele_time", tags={"table": table_name}) as timer:
186 execute_concurrent(context.session, queries)
187 timer.add_values(row_count=len(queries))
188

◆ from_config()

ApdbReplica lsst.dax.apdb.apdbReplica.ApdbReplica.from_config ( cls,
ApdbConfig config )
inherited
Create ApdbReplica instance from configuration object.

Parameters
----------
config : `ApdbConfig`
    Configuration object, type of this object determines type of the
    ApdbReplica implementation.

Returns
-------
replica : `ApdbReplica`
    Instance of `ApdbReplica` class.

Definition at line 127 of file apdbReplica.py.

127 def from_config(cls, config: ApdbConfig) -> ApdbReplica:
128 """Create ApdbReplica instance from configuration object.
129
130 Parameters
131 ----------
132 config : `ApdbConfig`
133 Configuration object, type of this object determines type of the
134 ApdbReplica implementation.
135
136 Returns
137 -------
138 replica : `ApdbReplica`
139 Instance of `ApdbReplica` class.
140 """
141 return make_apdb_replica(config)
142

◆ from_uri()

ApdbReplica lsst.dax.apdb.apdbReplica.ApdbReplica.from_uri ( cls,
ResourcePathExpression uri )
inherited
Make ApdbReplica instance from a serialized configuration.

Parameters
----------
uri : `~lsst.resources.ResourcePathExpression`
    URI or local file path pointing to a file with serialized
    configuration, or a string with a "label:" prefix. In the latter
    case, the configuration will be looked up from an APDB index file
    using the label name that follows the prefix. The APDB index file's
    location is determined by the ``DAX_APDB_INDEX_URI`` environment
    variable.

Returns
-------
replica : `ApdbReplica`
    Instance of `ApdbReplica` class, the type of the returned instance
    is determined by configuration.

Definition at line 144 of file apdbReplica.py.

144 def from_uri(cls, uri: ResourcePathExpression) -> ApdbReplica:
145 """Make ApdbReplica instance from a serialized configuration.
146
147 Parameters
148 ----------
149 uri : `~lsst.resources.ResourcePathExpression`
150 URI or local file path pointing to a file with serialized
151 configuration, or a string with a "label:" prefix. In the latter
152 case, the configuration will be looked up from an APDB index file
153 using the label name that follows the prefix. The APDB index file's
154 location is determined by the ``DAX_APDB_INDEX_URI`` environment
155 variable.
156
157 Returns
158 -------
159 replica : `ApdbReplica`
160 Instance of `ApdbReplica` class, the type of the returned instance
161 is determined by configuration.
162 """
163 config = ApdbConfig.from_uri(uri)
164 return make_apdb_replica(config)
165

◆ getReplicaChunks()

list[ReplicaChunk] | None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getReplicaChunks ( self)
Return collection of replication chunks known to the database.

Returns
-------
chunks : `list` [`ReplicaChunk`] or `None`
    List of chunks, they may be time-ordered if database supports
    ordering. `None` is returned if database is not configured for
    replication.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 99 of file apdbCassandraReplica.py.

99 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
100 # docstring is inherited from a base class
101 context = self._apdb._context
102 config = context.config
103
104 if not context.schema.replication_enabled:
105 return None
106
107 # everything goes into a single partition
108 partition = 0
109
110 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
111 # We want to avoid timezone mess so return timestamps as milliseconds.
112 query = (
113 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
114 f'FROM "{config.keyspace}"."{table_name}" WHERE partition = %s'
115 )
116
117 with self._timer("chunks_select_time") as timer:
118 result = context.session.execute(
119 query,
120 (partition,),
121 timeout=config.connection_config.read_timeout,
122 execution_profile="read_tuples",
123 )
124 # order by last_update_time
125 rows = sorted(result)
126 timer.add_values(row_count=len(rows))
127 return [
128 ReplicaChunk(
129 id=row[1],
130 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
131 unique_id=row[2],
132 )
133 for row in rows
134 ]
135

◆ getTableDataChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getTableDataChunks ( self,
ApdbTables table,
Iterable[int] chunks )
Return catalog of new records for a table from given replica chunks.

Parameters
----------
table : `ApdbTables`
    Table for which to return the data. Acceptable tables are
    `ApdbTables.DiaObject`, `ApdbTables.DiaSource`, and
    `ApdbTables.DiaForcedSource`.
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing table records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This method returns new records that have been added to the table by
`Apdb.store()` method. Updates to the records that happen at later time
are available from `getTableUpdateChunks` method.

This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 189 of file apdbCassandraReplica.py.

189 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
190 # docstring is inherited from a base class
191 context = self._apdb._context
192 config = context.config
193
194 if not context.schema.replication_enabled:
195 raise ValueError("APDB is not configured for replication")
196 if table not in ExtraTables.replica_chunk_tables(False):
197 raise ValueError(f"Table {table} does not support replica chunks.")
198
199 # We need to iterate few times.
200 chunks = list(chunks)
201
202 # If schema was migrated then a chunk can appear in either old or new
203 # chunk table (e.g. DiaObjectChunks or DiaObjectChunks2). Chunk table
204 # has a column which will be set to true for new table.
205 has_chunk_sub_partitions: dict[int, bool] = {}
206 if context.has_chunk_sub_partitions:
207 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
208 chunks_str = ",".join(str(chunk_id) for chunk_id in chunks)
209 query = (
210 f'SELECT apdb_replica_chunk, has_subchunks FROM "{config.keyspace}"."{table_name}" '
211 f"WHERE partition = %s and apdb_replica_chunk IN ({chunks_str})"
212 )
213 partition = 0
214 result = context.session.execute(
215 query,
216 (partition,),
217 timeout=config.connection_config.read_timeout,
218 execution_profile="read_tuples",
219 )
220 has_chunk_sub_partitions = dict(result)
221 else:
222 has_chunk_sub_partitions = dict.fromkeys(chunks, False)
223
224 # Check what kind of tables we want to query, if chunk list is empty
225 # then use tables which should exist in the schema.
226 if has_chunk_sub_partitions:
227 have_subchunks = any(has_chunk_sub_partitions.values())
228 have_non_subchunks = not all(has_chunk_sub_partitions.values())
229 else:
230 have_subchunks = context.has_chunk_sub_partitions
231 have_non_subchunks = not have_subchunks
232
233 # NOTE: if an existing database is migrated and has both types of chunk
234 # tables (e.g. DiaObjectChunks and DiaObjectChunks2) it is possible
235 # that the same chunk can appear in both tables. In reality schema
236 # migration should only happen during the downtime, so there will be
237 # suffient gap and a different chunk ID will be used for new chunks.
238
239 table_data: ApdbCassandraTableData | None = None
240 table_data_subchunk: ApdbCassandraTableData | None = None
241
242 table_name = context.schema.tableName(ExtraTables.replica_chunk_tables(False)[table])
243 with self._timer("table_chunk_select_time", tags={"table": table_name}) as timer:
244 if have_subchunks:
245 replica_table = ExtraTables.replica_chunk_tables(True)[table]
246 table_name = context.schema.tableName(replica_table)
247 query = (
248 f'SELECT * FROM "{config.keyspace}"."{table_name}" '
249 "WHERE apdb_replica_chunk = ? AND apdb_replica_subchunk = ?"
250 )
251 statement = context.preparer.prepare(query)
252
253 queries: list[tuple] = []
254 for chunk in chunks:
255 if has_chunk_sub_partitions.get(chunk, False):
256 for subchunk in range(config.replica_sub_chunk_count):
257 queries.append((statement, (chunk, subchunk)))
258 if not queries and not have_non_subchunks:
259 # Add a dummy query to return correct set of columns.
260 queries.append((statement, (-1, -1)))
261
262 if queries:
263 table_data_subchunk = cast(
264 ApdbCassandraTableData,
265 select_concurrent(
266 context.session,
267 queries,
268 "read_raw_multi",
269 config.connection_config.read_concurrency,
270 ),
271 )
272
273 if have_non_subchunks:
274 replica_table = ExtraTables.replica_chunk_tables(False)[table]
275 table_name = context.schema.tableName(replica_table)
276 query = f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk = ?'
277 statement = context.preparer.prepare(query)
278
279 queries = []
280 for chunk in chunks:
281 if not has_chunk_sub_partitions.get(chunk, True):
282 queries.append((statement, (chunk,)))
283 if not queries and not table_data_subchunk:
284 # Add a dummy query to return correct set of columns.
285 queries.append((statement, (-1,)))
286
287 if queries:
288 table_data = cast(
289 ApdbCassandraTableData,
290 select_concurrent(
291 context.session,
292 queries,
293 "read_raw_multi",
294 config.connection_config.read_concurrency,
295 ),
296 )
297
298 # Merge if both are non-empty.
299 if table_data and table_data_subchunk:
300 table_data_subchunk.project(drop=["apdb_replica_subchunk"])
301 table_data.append(table_data_subchunk)
302 elif table_data_subchunk:
303 table_data = table_data_subchunk
304 elif not table_data:
305 raise AssertionError("above logic is incorrect")
306
307 timer.add_values(row_count=len(table_data.rows()))
308
309 table_schema = self._apdb._schema.tableSchemas[table]
310 # Regular tables should never have columns of ExtraDataTypes, this
311 # is just to make mypy happy.
312 column_types = {
313 column.name: column.datatype
314 for column in table_schema.columns
315 if not isinstance(column.datatype, ExtraDataTypes)
316 }
317 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
318 # It may also have subchunk column, we do not always drop it, and
319 # clients should not need it, but we need to provide type for it.
320 column_types["apdb_replica_subchunk"] = felis.datamodel.DataType.int
321 table_data.set_column_types(column_types)
322
323 return table_data
324

◆ getUpdateRecordChunks()

Sequence[ApdbUpdateRecord] lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getUpdateRecordChunks ( self,
Iterable[int] chunks )
Return the list of record updates from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
records : `~collections.abc.Sequence` [`ApdbUpdateRecord`]
    Collection of update records. Records will be sorted according
    their update time and update order.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 325 of file apdbCassandraReplica.py.

325 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
326 # docstring is inherited from a base class
327 context = self._apdb._context
328 config = context.config
329
330 if not context.schema.replication_enabled:
331 raise ValueError("APDB is not configured for replication")
332
333 if not context.has_update_record_chunks_table:
334 # Table does not exist yet.
335 return []
336
337 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
338
339 records = []
340 if context.has_chunk_sub_partitions:
341 subchunks = ",".join(str(val) for val in range(config.replica_sub_chunk_count))
342 query = (
343 f'SELECT * FROM "{config.keyspace}"."{table_name}" '
344 f"WHERE apdb_replica_chunk = %s AND apdb_replica_subchunk IN ({subchunks})"
345 )
346
347 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
348 for chunk in chunks:
349 result = context.session.execute(query, [chunk])
350 for row in result:
351 records.append(
352 ApdbUpdateRecord.from_json(
353 row.update_time_ns, row.update_order, row.update_payload
354 )
355 )
356 timer.add_values(row_count=len(records))
357
358 else:
359 chunks_str = ",".join(str(val) for val in chunks)
360 query = (
361 f'SELECT * FROM "{config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({chunks_str})'
362 )
363
364 with self._timer("select_update_record_time", tags={"table": table_name}) as timer:
365 result = context.session.execute(query)
366 for row in result:
367 records.append(
368 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
369 )
370 timer.add_values(row_count=len(records))
371
372 records.sort()
373 return records

◆ hasChunkSubPartitions()

bool lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.hasChunkSubPartitions ( cls,
VersionTuple version )
Return True if replica chunk tables have sub-partitions.

Definition at line 90 of file apdbCassandraReplica.py.

90 def hasChunkSubPartitions(cls, version: VersionTuple) -> bool:
91 """Return True if replica chunk tables have sub-partitions."""
92 return version >= VersionTuple(1, 1, 0)
93

◆ hasUpdateRecordChunks()

bool lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.hasUpdateRecordChunks ( cls,
VersionTuple version )
Return True if ApdbUpdateRecordChunks should exists.

Definition at line 95 of file apdbCassandraReplica.py.

95 def hasUpdateRecordChunks(cls, version: VersionTuple) -> bool:
96 """Return True if ApdbUpdateRecordChunks should exists."""
97 return version >= VersionTuple(1, 1, 1)
98

◆ schemaVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.schemaVersion ( self)
Return version number of the database schema.

Returns
-------
version : `VersionTuple`
    Version of the database schema.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 79 of file apdbCassandraReplica.py.

79 def schemaVersion(self) -> VersionTuple:
80 # Docstring inherited from base class.
81 context = self._apdb._context
82 return context.db_versions.schema_version
83

Member Data Documentation

◆ _apdb

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._apdb = apdb
protected

Definition at line 73 of file apdbCassandraReplica.py.


The documentation for this class was generated from the following file: