LSST Applications g0603fd7c41+501e3db9f9,g0aad566f14+23d8574c86,g0dd44d6229+a1a4c8b791,g2079a07aa2+86d27d4dc4,g2305ad1205+a62672bbc1,g2bbee38e9b+047b288a59,g337abbeb29+047b288a59,g33d1c0ed96+047b288a59,g3a166c0a6a+047b288a59,g3d1719c13e+23d8574c86,g487adcacf7+cb7fd919b2,g4be5004598+23d8574c86,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+4a9e435310,g63cd9335cc+585e252eca,g858d7b2824+23d8574c86,g88963caddf+0cb8e002cc,g99cad8db69+43388bcaec,g9ddcbc5298+9a081db1e4,ga1e77700b3+a912195c07,gae0086650b+585e252eca,gb0e22166c9+60f28cb32d,gb2522980b2+793639e996,gb3a676b8dc+b4feba26a1,gb4b16eec92+63f8520565,gba4ed39666+c2a2e4ac27,gbb8dafda3b+a5d255a82e,gc120e1dc64+d820f8acdb,gc28159a63d+047b288a59,gc3e9b769f7+f4f1cc6b50,gcf0d15dbbd+a1a4c8b791,gdaeeff99f8+f9a426f77a,gdb0af172c8+b6d5496702,ge79ae78c31+047b288a59,w.2024.19
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica Class Reference
Inheritance diagram for lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica:
lsst.dax.apdb.apdbReplica.ApdbReplica

Public Member Functions

 __init__ (self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, bool timer=False)
 
VersionTuple apdbReplicaImplementationVersion (cls)
 
list[ReplicaChunk]|None getReplicaChunks (self)
 
None deleteReplicaChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaObjectsChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaSourcesChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaForcedSourcesChunks (self, Iterable[int] chunks)
 

Protected Member Functions

Timer _timer (self, str name, *Mapping[str, str|int]|None tags=None)
 
ApdbTableData _get_chunks (self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
 

Protected Attributes

 _schema
 
 _engine
 
 _timer_args
 

Detailed Description

Implementation of `ApdbReplica` for SQL backend.

Parameters
----------
schema : `ApdbSqlSchema`
    Instance of `ApdbSqlSchema` class for APDB database.
engine : `sqlalchemy.engine.Engine`
    Engine for database access.
timer : `bool`, optional
    If `True` then log timing information.

Definition at line 73 of file apdbSqlReplica.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.__init__ ( self,
ApdbSqlSchema schema,
sqlalchemy.engine.Engine engine,
bool timer = False )

Definition at line 86 of file apdbSqlReplica.py.

86 def __init__(self, schema: ApdbSqlSchema, engine: sqlalchemy.engine.Engine, timer: bool = False):
87 self._schema = schema
88 self._engine = engine
89
90 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
91 if timer:
92 self._timer_args.append(_LOG)
93

Member Function Documentation

◆ _get_chunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._get_chunks ( self,
Iterable[int] chunks,
ApdbTables table_enum,
ExtraTables chunk_table_enum )
protected
Return catalog of records for given insert identifiers, common
implementation for all DIA tables.

Definition at line 146 of file apdbSqlReplica.py.

151 ) -> ApdbTableData:
152 """Return catalog of records for given insert identifiers, common
153 implementation for all DIA tables.
154 """
155 if not self._schema.has_replica_chunks:
156 raise ValueError("APDB is not configured for replication")
157
158 table = self._schema.get_table(table_enum)
159 chunk_table = self._schema.get_table(chunk_table_enum)
160
161 join = table.join(chunk_table)
162 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
163 apdb_columns = self._schema.get_apdb_columns(table_enum)
164 where_clause = chunk_id_column.in_(chunks)
165 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
166
167 # execute select
168 with self._timer("table_chunk_select_time", tags={"table": table.name}):
169 with self._engine.begin() as conn:
170 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
171 return ApdbSqlTableData(result)

◆ _timer()

Timer lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._timer ( self,
str name,
*Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 94 of file apdbSqlReplica.py.

94 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
95 """Create `Timer` instance given its name."""
96 return Timer(name, *self._timer_args, tags=tags)
97

◆ apdbReplicaImplementationVersion()

VersionTuple lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.apdbReplicaImplementationVersion ( cls)
Return version number for current ApdbReplica implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 99 of file apdbSqlReplica.py.

99 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
100 # Docstring inherited from base class.
101 return VERSION
102

◆ deleteReplicaChunks()

None lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.deleteReplicaChunks ( self,
Iterable[int] chunks )
Remove replication chunks from the database.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to remove.

Notes
-----
This method causes Apdb to forget about specified chunks. If there
are any auxiliary data associated with the identifiers, it is also
removed from database (but data in regular tables is not removed).
This method should be called after successful transfer of data from
APDB to PPDB to free space used by replicas.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 122 of file apdbSqlReplica.py.

122 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
123 # docstring is inherited from a base class
124 if not self._schema.has_replica_chunks:
125 raise ValueError("APDB is not configured for replication")
126
127 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
128 where_clause = table.columns["apdb_replica_chunk"].in_(chunks)
129 stmt = table.delete().where(where_clause)
130 with self._timer("chunks_delete_time"):
131 with self._engine.begin() as conn:
132 conn.execute(stmt)
133

◆ getDiaForcedSourcesChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaForcedSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaForcedSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaForcedSource records. In addition to all
    regular columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 142 of file apdbSqlReplica.py.

142 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
143 # docstring is inherited from a base class
144 return self._get_chunks(chunks, ApdbTables.DiaForcedSource, ExtraTables.DiaForcedSourceChunks)
145

◆ getDiaObjectsChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaObjectsChunks ( self,
Iterable[int] chunks )
Return catalog of DiaObject records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaObject records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 134 of file apdbSqlReplica.py.

134 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
135 # docstring is inherited from a base class
136 return self._get_chunks(chunks, ApdbTables.DiaObject, ExtraTables.DiaObjectChunks)
137

◆ getDiaSourcesChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaSource records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 138 of file apdbSqlReplica.py.

138 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
139 # docstring is inherited from a base class
140 return self._get_chunks(chunks, ApdbTables.DiaSource, ExtraTables.DiaSourceChunks)
141

◆ getReplicaChunks()

list[ReplicaChunk] | None lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getReplicaChunks ( self)
Return collection of replication chunks known to the database.

Returns
-------
chunks : `list` [`ReplicaChunk`] or `None`
    List of chunks, they may be time-ordered if database supports
    ordering. `None` is returned if database is not configured for
    replication.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 103 of file apdbSqlReplica.py.

103 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
104 # docstring is inherited from a base class
105 if not self._schema.has_replica_chunks:
106 return None
107
108 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
109 assert table is not None, "has_replica_chunks=True means it must be defined"
110 query = sql.select(
111 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
112 ).order_by(table.columns["last_update_time"])
113 with self._timer("chunks_select_time"):
114 with self._engine.connect() as conn:
115 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
116 ids = []
117 for row in result:
118 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
119 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
120 return ids
121

Member Data Documentation

◆ _engine

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._engine
protected

Definition at line 88 of file apdbSqlReplica.py.

◆ _schema

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._schema
protected

Definition at line 87 of file apdbSqlReplica.py.

◆ _timer_args

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._timer_args
protected

Definition at line 96 of file apdbSqlReplica.py.


The documentation for this class was generated from the following file: