LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica Class Reference
Inheritance diagram for lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica:
lsst.dax.apdb.apdbReplica.ApdbReplica

Public Member Functions

 __init__ (self, ApdbCassandra apdb, ApdbCassandraSchema schema, Any session)
 
VersionTuple apdbReplicaImplementationVersion (cls)
 
list[ReplicaChunk]|None getReplicaChunks (self)
 
None deleteReplicaChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaObjectsChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaSourcesChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaForcedSourcesChunks (self, Iterable[int] chunks)
 

Protected Member Functions

ApdbTableData _get_chunks (self, ExtraTables table, Iterable[int] chunks)
 

Protected Attributes

 _apdb
 
 _schema
 
 _session
 
 _config
 
 _preparer
 

Detailed Description

Implementation of `ApdbReplica` for Cassandra backend.

Parameters
----------
apdb : `ApdbCassandra`
    Instance of ApbdCassandra for database.
schema : `ApdbCassandraSchema`
    Instance of ApdbCassandraSchema for database.
session
    Instance of cassandra session type.

Definition at line 51 of file apdbCassandraReplica.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.__init__ ( self,
ApdbCassandra apdb,
ApdbCassandraSchema schema,
Any session )

Definition at line 64 of file apdbCassandraReplica.py.

64 def __init__(self, apdb: ApdbCassandra, schema: ApdbCassandraSchema, session: Any):
65 # Note that ApdbCassandra instance must stay alive while this object
66 # exists, so we keep reference to it.
67 self._apdb = apdb
68 self._schema = schema
69 self._session = session
70 self._config = apdb.config
71
72 # Cache for prepared statements
73 self._preparer = PreparedStatementCache(self._session)
74

Member Function Documentation

◆ _get_chunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._get_chunks ( self,
ExtraTables table,
Iterable[int] chunks )
protected
Return records from a particular table given set of insert IDs.

Definition at line 165 of file apdbCassandraReplica.py.

165 def _get_chunks(self, table: ExtraTables, chunks: Iterable[int]) -> ApdbTableData:
166 """Return records from a particular table given set of insert IDs."""
167 if not self._schema.has_replica_chunks:
168 raise ValueError("APDB is not configured for replication")
169
170 # We do not expect too may chunks in this query.
171 chunks = list(chunks)
172 params = ",".join("?" * len(chunks))
173
174 table_name = self._schema.tableName(table)
175 # I know that chunk table schema has only regular APDB columns plus
176 # apdb_replica_chunk column, and this is exactly what we need to return
177 # from this method, so selecting a star is fine here.
178 query = (
179 f'SELECT * FROM "{self._config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({params})'
180 )
181 statement = self._preparer.prepare(query)
182
183 with Timer(f"{table_name} select chunk", self._config.timer):
184 result = self._session.execute(statement, chunks, execution_profile="read_raw")
185 table_data = cast(ApdbCassandraTableData, result._current_rows)
186 return table_data

◆ apdbReplicaImplementationVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.apdbReplicaImplementationVersion ( cls)
Return version number for current ApdbReplica implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 76 of file apdbCassandraReplica.py.

76 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
77 # Docstring inherited from base class.
78 return VERSION
79

◆ deleteReplicaChunks()

None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.deleteReplicaChunks ( self,
Iterable[int] chunks )
Remove replication chunks from the database.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to remove.

Notes
-----
This method causes Apdb to forget about specified chunks. If there
are any auxiliary data associated with the identifiers, it is also
removed from database (but data in regular tables is not removed).
This method should be called after successful transfer of data from
APDB to PPDB to free space used by replicas.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 112 of file apdbCassandraReplica.py.

112 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
113 # docstring is inherited from a base class
114 if not self._schema.has_replica_chunks:
115 raise ValueError("APDB is not configured for replication")
116
117 # There is 64k limit on number of markers in Cassandra CQL
118 for chunk_ids in chunk_iterable(chunks, 20_000):
119 params = ",".join("?" * len(chunk_ids))
120
121 # everything goes into a single partition
122 partition = 0
123
124 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
125 query = (
126 f'DELETE FROM "{self._config.keyspace}"."{table_name}" '
127 f"WHERE partition = ? AND apdb_replica_chunk IN ({params})"
128 )
129
130 self._session.execute(
131 self._preparer.prepare(query),
132 [partition] + list(chunk_ids),
133 timeout=self._config.remove_timeout,
134 )
135
136 # Also remove those chunk_ids from Dia*Chunks tables.
137 for table in (
138 ExtraTables.DiaObjectChunks,
139 ExtraTables.DiaSourceChunks,
140 ExtraTables.DiaForcedSourceChunks,
141 ):
142 table_name = self._schema.tableName(table)
143 query = (
144 f'DELETE FROM "{self._config.keyspace}"."{table_name}"'
145 f" WHERE apdb_replica_chunk IN ({params})"
146 )
147 self._session.execute(
148 self._preparer.prepare(query),
149 chunk_ids,
150 timeout=self._config.remove_timeout,
151 )
152

◆ getDiaForcedSourcesChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaForcedSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaForcedSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaForcedSource records. In addition to all
    regular columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 161 of file apdbCassandraReplica.py.

161 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
162 # docstring is inherited from a base class
163 return self._get_chunks(ExtraTables.DiaForcedSourceChunks, chunks)
164

◆ getDiaObjectsChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaObjectsChunks ( self,
Iterable[int] chunks )
Return catalog of DiaObject records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaObject records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 153 of file apdbCassandraReplica.py.

153 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
154 # docstring is inherited from a base class
155 return self._get_chunks(ExtraTables.DiaObjectChunks, chunks)
156

◆ getDiaSourcesChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaSource records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 157 of file apdbCassandraReplica.py.

157 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
158 # docstring is inherited from a base class
159 return self._get_chunks(ExtraTables.DiaSourceChunks, chunks)
160

◆ getReplicaChunks()

list[ReplicaChunk] | None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getReplicaChunks ( self)
Return collection of replication chunks known to the database.

Returns
-------
chunks : `list` [`ReplicaChunk`] or `None`
    List of chunks, they may be time-ordered if database supports
    ordering. `None` is returned if database is not configured for
    replication.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 80 of file apdbCassandraReplica.py.

80 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
81 # docstring is inherited from a base class
82 if not self._schema.has_replica_chunks:
83 return None
84
85 # everything goes into a single partition
86 partition = 0
87
88 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
89 # We want to avoid timezone mess so return timestamps as milliseconds.
90 query = (
91 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
92 f'FROM "{self._config.keyspace}"."{table_name}" WHERE partition = ?'
93 )
94
95 result = self._session.execute(
96 self._preparer.prepare(query),
97 (partition,),
98 timeout=self._config.read_timeout,
99 execution_profile="read_tuples",
100 )
101 # order by last_update_time
102 rows = sorted(result)
103 return [
104 ReplicaChunk(
105 id=row[1],
106 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
107 unique_id=row[2],
108 )
109 for row in rows
110 ]
111

Member Data Documentation

◆ _apdb

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._apdb
protected

Definition at line 67 of file apdbCassandraReplica.py.

◆ _config

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._config
protected

Definition at line 70 of file apdbCassandraReplica.py.

◆ _preparer

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._preparer
protected

Definition at line 73 of file apdbCassandraReplica.py.

◆ _schema

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._schema
protected

Definition at line 68 of file apdbCassandraReplica.py.

◆ _session

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._session
protected

Definition at line 69 of file apdbCassandraReplica.py.


The documentation for this class was generated from the following file: