LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods.
23"""
24
25from __future__ import annotations
26
27__all__ = ["ApdbSqlReplica"]
28
29import logging
30from collections.abc import Collection, Iterable, Sequence
31from typing import TYPE_CHECKING, cast
32
33import astropy.time
34import sqlalchemy
35from sqlalchemy import sql
36
37from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
38from ..apdbSchema import ApdbTables
39from ..timer import Timer
40from ..versionTuple import VersionTuple
41from .apdbSqlSchema import ExtraTables
42
43if TYPE_CHECKING:
44 from .apdbSqlSchema import ApdbSqlSchema
45
46
47_LOG = logging.getLogger(__name__)
48
49VERSION = VersionTuple(1, 0, 0)
50"""Version for the code controlling replication tables. This needs to be
51updated following compatibility rules when schema produced by this code
52changes.
53"""
54
55
57 """Implementation of ApdbTableData that wraps sqlalchemy Result."""
58
59 def __init__(self, result: sqlalchemy.engine.Result):
60 self._keys = list(result.keys())
61 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall()))
62
63 def column_names(self) -> Sequence[str]:
64 return self._keys
65
66 def rows(self) -> Collection[tuple]:
67 return self._rows
68
69
71 """Implementation of `ApdbReplica` for SQL backend.
72
73 Parameters
74 ----------
75 schema : `ApdbSqlSchema`
76 Instance of `ApdbSqlSchema` class for APDB database.
77 engine : `sqlalchemy.engine.Engine`
78 Engine for database access.
79 timer : `bool`, optional
80 If `True` then log timing information.
81 """
82
83 def __init__(self, schema: ApdbSqlSchema, engine: sqlalchemy.engine.Engine, timer: bool = False):
84 self._schema = schema
85 self._engine = engine
86 self._timer = timer
87
88 @classmethod
89 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
90 # Docstring inherited from base class.
91 return VERSION
92
93 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
94 # docstring is inherited from a base class
95 if not self._schema.has_replica_chunks:
96 return None
97
98 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
99 assert table is not None, "has_replica_chunks=True means it must be defined"
100 query = sql.select(
101 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
102 ).order_by(table.columns["last_update_time"])
103 with Timer("DiaObject insert id select", self._timer):
104 with self._engine.connect() as conn:
105 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
106 ids = []
107 for row in result:
108 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
109 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
110 return ids
111
112 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
113 # docstring is inherited from a base class
114 if not self._schema.has_replica_chunks:
115 raise ValueError("APDB is not configured for replication")
116
117 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
118 where_clause = table.columns["apdb_replica_chunk"].in_(chunks)
119 stmt = table.delete().where(where_clause)
120 with self._engine.begin() as conn:
121 conn.execute(stmt)
122
123 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
124 # docstring is inherited from a base class
125 return self._get_chunks(chunks, ApdbTables.DiaObject, ExtraTables.DiaObjectChunks)
126
127 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
128 # docstring is inherited from a base class
129 return self._get_chunks(chunks, ApdbTables.DiaSource, ExtraTables.DiaSourceChunks)
130
131 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
132 # docstring is inherited from a base class
133 return self._get_chunks(chunks, ApdbTables.DiaForcedSource, ExtraTables.DiaForcedSourceChunks)
134
136 self,
137 chunks: Iterable[int],
138 table_enum: ApdbTables,
139 chunk_table_enum: ExtraTables,
140 ) -> ApdbTableData:
141 """Return catalog of records for given insert identifiers, common
142 implementation for all DIA tables.
143 """
144 if not self._schema.has_replica_chunks:
145 raise ValueError("APDB is not configured for replication")
146
147 table = self._schema.get_table(table_enum)
148 chunk_table = self._schema.get_table(chunk_table_enum)
149
150 join = table.join(chunk_table)
151 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
152 apdb_columns = self._schema.get_apdb_columns(table_enum)
153 where_clause = chunk_id_column.in_(chunks)
154 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
155
156 # execute select
157 with Timer(f"{table.name} replica chunk select", self._timer):
158 with self._engine.begin() as conn:
159 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
160 return ApdbSqlTableData(result)
list[ReplicaChunk]|None getReplicaChunks(self)
ApdbTableData getDiaSourcesChunks(self, Iterable[int] chunks)
None deleteReplicaChunks(self, Iterable[int] chunks)
ApdbTableData _get_chunks(self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
ApdbTableData getDiaObjectsChunks(self, Iterable[int] chunks)
__init__(self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, bool timer=False)
ApdbTableData getDiaForcedSourcesChunks(self, Iterable[int] chunks)
__init__(self, sqlalchemy.engine.Result result)