LSST Applications g0603fd7c41+501e3db9f9,g0aad566f14+23d8574c86,g0dd44d6229+a1a4c8b791,g2079a07aa2+86d27d4dc4,g2305ad1205+a62672bbc1,g2bbee38e9b+047b288a59,g337abbeb29+047b288a59,g33d1c0ed96+047b288a59,g3a166c0a6a+047b288a59,g3d1719c13e+23d8574c86,g487adcacf7+cb7fd919b2,g4be5004598+23d8574c86,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+4a9e435310,g63cd9335cc+585e252eca,g858d7b2824+23d8574c86,g88963caddf+0cb8e002cc,g99cad8db69+43388bcaec,g9ddcbc5298+9a081db1e4,ga1e77700b3+a912195c07,gae0086650b+585e252eca,gb0e22166c9+60f28cb32d,gb2522980b2+793639e996,gb3a676b8dc+b4feba26a1,gb4b16eec92+63f8520565,gba4ed39666+c2a2e4ac27,gbb8dafda3b+a5d255a82e,gc120e1dc64+d820f8acdb,gc28159a63d+047b288a59,gc3e9b769f7+f4f1cc6b50,gcf0d15dbbd+a1a4c8b791,gdaeeff99f8+f9a426f77a,gdb0af172c8+b6d5496702,ge79ae78c31+047b288a59,w.2024.19
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica Class Reference
Inheritance diagram for lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica:
lsst.dax.apdb.apdbReplica.ApdbReplica

Public Member Functions

 __init__ (self, ApdbCassandra apdb, ApdbCassandraSchema schema, Any session)
 
VersionTuple apdbReplicaImplementationVersion (cls)
 
list[ReplicaChunk]|None getReplicaChunks (self)
 
None deleteReplicaChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaObjectsChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaSourcesChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaForcedSourcesChunks (self, Iterable[int] chunks)
 

Protected Member Functions

Timer _timer (self, str name, *Mapping[str, str|int]|None tags=None)
 
ApdbTableData _get_chunks (self, ExtraTables table, Iterable[int] chunks)
 

Protected Attributes

 _apdb
 
 _schema
 
 _session
 
 _config
 
 _preparer
 
 _timer_args
 

Detailed Description

Implementation of `ApdbReplica` for Cassandra backend.

Parameters
----------
apdb : `ApdbCassandra`
    Instance of ApbdCassandra for database.
schema : `ApdbCassandraSchema`
    Instance of ApdbCassandraSchema for database.
session
    Instance of cassandra session type.

Definition at line 54 of file apdbCassandraReplica.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.__init__ ( self,
ApdbCassandra apdb,
ApdbCassandraSchema schema,
Any session )

Definition at line 67 of file apdbCassandraReplica.py.

67 def __init__(self, apdb: ApdbCassandra, schema: ApdbCassandraSchema, session: Any):
68 # Note that ApdbCassandra instance must stay alive while this object
69 # exists, so we keep reference to it.
70 self._apdb = apdb
71 self._schema = schema
72 self._session = session
73 self._config = apdb.config
74
75 # Cache for prepared statements
76 self._preparer = PreparedStatementCache(self._session)
77
78 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
79 if self._config.timer:
80 self._timer_args.append(_LOG)
81

Member Function Documentation

◆ _get_chunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._get_chunks ( self,
ExtraTables table,
Iterable[int] chunks )
protected
Return records from a particular table given set of insert IDs.

Definition at line 179 of file apdbCassandraReplica.py.

179 def _get_chunks(self, table: ExtraTables, chunks: Iterable[int]) -> ApdbTableData:
180 """Return records from a particular table given set of insert IDs."""
181 if not self._schema.has_replica_chunks:
182 raise ValueError("APDB is not configured for replication")
183
184 # We do not expect too may chunks in this query.
185 chunks = list(chunks)
186 params = ",".join("?" * len(chunks))
187
188 table_name = self._schema.tableName(table)
189 # I know that chunk table schema has only regular APDB columns plus
190 # apdb_replica_chunk column, and this is exactly what we need to return
191 # from this method, so selecting a star is fine here.
192 query = (
193 f'SELECT * FROM "{self._config.keyspace}"."{table_name}" WHERE apdb_replica_chunk IN ({params})'
194 )
195 statement = self._preparer.prepare(query)
196
197 with self._timer("table_chunk_select_time", tags={"table": table_name}):
198 result = self._session.execute(statement, chunks, execution_profile="read_raw")
199 table_data = cast(ApdbCassandraTableData, result._current_rows)
200 return table_data

◆ _timer()

Timer lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._timer ( self,
str name,
*Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 82 of file apdbCassandraReplica.py.

82 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
83 """Create `Timer` instance given its name."""
84 return Timer(name, *self._timer_args, tags=tags)
85

◆ apdbReplicaImplementationVersion()

VersionTuple lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.apdbReplicaImplementationVersion ( cls)
Return version number for current ApdbReplica implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 87 of file apdbCassandraReplica.py.

87 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
88 # Docstring inherited from base class.
89 return VERSION
90

◆ deleteReplicaChunks()

None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.deleteReplicaChunks ( self,
Iterable[int] chunks )
Remove replication chunks from the database.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to remove.

Notes
-----
This method causes Apdb to forget about specified chunks. If there
are any auxiliary data associated with the identifiers, it is also
removed from database (but data in regular tables is not removed).
This method should be called after successful transfer of data from
APDB to PPDB to free space used by replicas.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 124 of file apdbCassandraReplica.py.

124 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
125 # docstring is inherited from a base class
126 if not self._schema.has_replica_chunks:
127 raise ValueError("APDB is not configured for replication")
128
129 # There is 64k limit on number of markers in Cassandra CQL
130 for chunk_ids in chunk_iterable(chunks, 20_000):
131 params = ",".join("?" * len(chunk_ids))
132
133 # everything goes into a single partition
134 partition = 0
135
136 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
137 query = (
138 f'DELETE FROM "{self._config.keyspace}"."{table_name}" '
139 f"WHERE partition = ? AND apdb_replica_chunk IN ({params})"
140 )
141
142 with self._timer("chunks_delete_time"):
143 self._session.execute(
144 self._preparer.prepare(query),
145 [partition] + list(chunk_ids),
146 timeout=self._config.remove_timeout,
147 )
148
149 # Also remove those chunk_ids from Dia*Chunks tables.
150 for table in (
151 ExtraTables.DiaObjectChunks,
152 ExtraTables.DiaSourceChunks,
153 ExtraTables.DiaForcedSourceChunks,
154 ):
155 table_name = self._schema.tableName(table)
156 query = (
157 f'DELETE FROM "{self._config.keyspace}"."{table_name}"'
158 f" WHERE apdb_replica_chunk IN ({params})"
159 )
160 with self._timer("table_chunk_detele_time", tags={"table": table_name}):
161 self._session.execute(
162 self._preparer.prepare(query),
163 chunk_ids,
164 timeout=self._config.remove_timeout,
165 )
166

◆ getDiaForcedSourcesChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaForcedSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaForcedSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaForcedSource records. In addition to all
    regular columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 175 of file apdbCassandraReplica.py.

175 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
176 # docstring is inherited from a base class
177 return self._get_chunks(ExtraTables.DiaForcedSourceChunks, chunks)
178

◆ getDiaObjectsChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaObjectsChunks ( self,
Iterable[int] chunks )
Return catalog of DiaObject records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaObject records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 167 of file apdbCassandraReplica.py.

167 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
168 # docstring is inherited from a base class
169 return self._get_chunks(ExtraTables.DiaObjectChunks, chunks)
170

◆ getDiaSourcesChunks()

ApdbTableData lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getDiaSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaSource records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 171 of file apdbCassandraReplica.py.

171 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
172 # docstring is inherited from a base class
173 return self._get_chunks(ExtraTables.DiaSourceChunks, chunks)
174

◆ getReplicaChunks()

list[ReplicaChunk] | None lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica.getReplicaChunks ( self)
Return collection of replication chunks known to the database.

Returns
-------
chunks : `list` [`ReplicaChunk`] or `None`
    List of chunks, they may be time-ordered if database supports
    ordering. `None` is returned if database is not configured for
    replication.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 91 of file apdbCassandraReplica.py.

91 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
92 # docstring is inherited from a base class
93 if not self._schema.has_replica_chunks:
94 return None
95
96 # everything goes into a single partition
97 partition = 0
98
99 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
100 # We want to avoid timezone mess so return timestamps as milliseconds.
101 query = (
102 "SELECT toUnixTimestamp(last_update_time), apdb_replica_chunk, unique_id "
103 f'FROM "{self._config.keyspace}"."{table_name}" WHERE partition = ?'
104 )
105
106 with self._timer("chunks_select_time"):
107 result = self._session.execute(
108 self._preparer.prepare(query),
109 (partition,),
110 timeout=self._config.read_timeout,
111 execution_profile="read_tuples",
112 )
113 # order by last_update_time
114 rows = sorted(result)
115 return [
116 ReplicaChunk(
117 id=row[1],
118 last_update_time=astropy.time.Time(row[0] / 1000, format="unix_tai"),
119 unique_id=row[2],
120 )
121 for row in rows
122 ]
123

Member Data Documentation

◆ _apdb

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._apdb
protected

Definition at line 70 of file apdbCassandraReplica.py.

◆ _config

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._config
protected

Definition at line 73 of file apdbCassandraReplica.py.

◆ _preparer

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._preparer
protected

Definition at line 76 of file apdbCassandraReplica.py.

◆ _schema

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._schema
protected

Definition at line 71 of file apdbCassandraReplica.py.

◆ _session

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._session
protected

Definition at line 72 of file apdbCassandraReplica.py.

◆ _timer_args

lsst.dax.apdb.cassandra.apdbCassandraReplica.ApdbCassandraReplica._timer_args
protected

Definition at line 84 of file apdbCassandraReplica.py.


The documentation for this class was generated from the following file: