LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica Class Reference
Inheritance diagram for lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica:
lsst.dax.apdb.apdbReplica.ApdbReplica

Public Member Functions

 __init__ (self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, bool timer=False)
 
VersionTuple apdbReplicaImplementationVersion (cls)
 
list[ReplicaChunk]|None getReplicaChunks (self)
 
None deleteReplicaChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaObjectsChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaSourcesChunks (self, Iterable[int] chunks)
 
ApdbTableData getDiaForcedSourcesChunks (self, Iterable[int] chunks)
 

Protected Member Functions

ApdbTableData _get_chunks (self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
 

Protected Attributes

 _schema
 
 _engine
 
 _timer
 

Detailed Description

Implementation of `ApdbReplica` for SQL backend.

Parameters
----------
schema : `ApdbSqlSchema`
    Instance of `ApdbSqlSchema` class for APDB database.
engine : `sqlalchemy.engine.Engine`
    Engine for database access.
timer : `bool`, optional
    If `True` then log timing information.

Definition at line 70 of file apdbSqlReplica.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.__init__ ( self,
ApdbSqlSchema schema,
sqlalchemy.engine.Engine engine,
bool timer = False )

Definition at line 83 of file apdbSqlReplica.py.

83 def __init__(self, schema: ApdbSqlSchema, engine: sqlalchemy.engine.Engine, timer: bool = False):
84 self._schema = schema
85 self._engine = engine
86 self._timer = timer
87

Member Function Documentation

◆ _get_chunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._get_chunks ( self,
Iterable[int] chunks,
ApdbTables table_enum,
ExtraTables chunk_table_enum )
protected
Return catalog of records for given insert identifiers, common
implementation for all DIA tables.

Definition at line 135 of file apdbSqlReplica.py.

140 ) -> ApdbTableData:
141 """Return catalog of records for given insert identifiers, common
142 implementation for all DIA tables.
143 """
144 if not self._schema.has_replica_chunks:
145 raise ValueError("APDB is not configured for replication")
146
147 table = self._schema.get_table(table_enum)
148 chunk_table = self._schema.get_table(chunk_table_enum)
149
150 join = table.join(chunk_table)
151 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
152 apdb_columns = self._schema.get_apdb_columns(table_enum)
153 where_clause = chunk_id_column.in_(chunks)
154 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
155
156 # execute select
157 with Timer(f"{table.name} replica chunk select", self._timer):
158 with self._engine.begin() as conn:
159 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
160 return ApdbSqlTableData(result)

◆ apdbReplicaImplementationVersion()

VersionTuple lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.apdbReplicaImplementationVersion ( cls)
Return version number for current ApdbReplica implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 89 of file apdbSqlReplica.py.

89 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
90 # Docstring inherited from base class.
91 return VERSION
92

◆ deleteReplicaChunks()

None lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.deleteReplicaChunks ( self,
Iterable[int] chunks )
Remove replication chunks from the database.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to remove.

Notes
-----
This method causes Apdb to forget about specified chunks. If there
are any auxiliary data associated with the identifiers, it is also
removed from database (but data in regular tables is not removed).
This method should be called after successful transfer of data from
APDB to PPDB to free space used by replicas.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 112 of file apdbSqlReplica.py.

112 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
113 # docstring is inherited from a base class
114 if not self._schema.has_replica_chunks:
115 raise ValueError("APDB is not configured for replication")
116
117 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
118 where_clause = table.columns["apdb_replica_chunk"].in_(chunks)
119 stmt = table.delete().where(where_clause)
120 with self._engine.begin() as conn:
121 conn.execute(stmt)
122

◆ getDiaForcedSourcesChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaForcedSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaForcedSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaForcedSource records. In addition to all
    regular columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 131 of file apdbSqlReplica.py.

131 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
132 # docstring is inherited from a base class
133 return self._get_chunks(chunks, ApdbTables.DiaForcedSource, ExtraTables.DiaForcedSourceChunks)
134

◆ getDiaObjectsChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaObjectsChunks ( self,
Iterable[int] chunks )
Return catalog of DiaObject records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaObject records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 123 of file apdbSqlReplica.py.

123 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
124 # docstring is inherited from a base class
125 return self._get_chunks(chunks, ApdbTables.DiaObject, ExtraTables.DiaObjectChunks)
126

◆ getDiaSourcesChunks()

ApdbTableData lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getDiaSourcesChunks ( self,
Iterable[int] chunks )
Return catalog of DiaSource records from given replica chunks.

Parameters
----------
chunks : `~collections.abc.Iterable` [`int`]
    Chunk identifiers to return.

Returns
-------
data : `ApdbTableData`
    Catalog containing DiaSource records. In addition to all regular
    columns it will contain ``apdb_replica_chunk`` column.

Notes
-----
This part of API may not be very stable and can change before the
implementation finalizes.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 127 of file apdbSqlReplica.py.

127 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
128 # docstring is inherited from a base class
129 return self._get_chunks(chunks, ApdbTables.DiaSource, ExtraTables.DiaSourceChunks)
130

◆ getReplicaChunks()

list[ReplicaChunk] | None lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica.getReplicaChunks ( self)
Return collection of replication chunks known to the database.

Returns
-------
chunks : `list` [`ReplicaChunk`] or `None`
    List of chunks, they may be time-ordered if database supports
    ordering. `None` is returned if database is not configured for
    replication.

Reimplemented from lsst.dax.apdb.apdbReplica.ApdbReplica.

Definition at line 93 of file apdbSqlReplica.py.

93 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
94 # docstring is inherited from a base class
95 if not self._schema.has_replica_chunks:
96 return None
97
98 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
99 assert table is not None, "has_replica_chunks=True means it must be defined"
100 query = sql.select(
101 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
102 ).order_by(table.columns["last_update_time"])
103 with Timer("DiaObject insert id select", self._timer):
104 with self._engine.connect() as conn:
105 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
106 ids = []
107 for row in result:
108 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
109 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
110 return ids
111

Member Data Documentation

◆ _engine

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._engine
protected

Definition at line 85 of file apdbSqlReplica.py.

◆ _schema

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._schema
protected

Definition at line 84 of file apdbSqlReplica.py.

◆ _timer

lsst.dax.apdb.sql.apdbSqlReplica.ApdbSqlReplica._timer
protected

Definition at line 86 of file apdbSqlReplica.py.


The documentation for this class was generated from the following file: