LSST Applications g013ef56533+b8d55c8942,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+7a3b874d60,g210f2d0738+7416ca6900,g262e1987ae+1d557ba9a3,g29ae962dfc+519d34895e,g2cef7863aa+aef1011c0b,g30d7c61c20+36d16ea71a,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+cb326ad149,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+6240c63dbc,g64539dfbff+7416ca6900,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g74acd417e5+0bae3c876a,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+dee7680868,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+14b8164b5b,g98a1a72a9c+8389601a76,g98df359435+fff771c62d,gb8cb2b794d+6728931916,gbf99507273+8c5ae1fdc5,gc2a301910b+7416ca6900,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+0bae3c876a,ge410e46f29+f459a6810c,ge41e95a9f2+7416ca6900,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSqlReplica"]
27
28import logging
29from collections.abc import Collection, Iterable, Mapping, Sequence
30from typing import TYPE_CHECKING, cast
31
32import astropy.time
33import felis.datamodel
34import sqlalchemy
35from sqlalchemy import sql
36
37from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
38from ..apdbSchema import ApdbTables
39from ..apdbUpdateRecord import ApdbUpdateRecord
40from ..monitor import MonAgent
41from ..schema_model import ExtraDataTypes
42from ..timer import Timer
43from ..versionTuple import VersionTuple
44from .apdbSqlSchema import ExtraTables
45
46if TYPE_CHECKING:
47 from .apdbSqlSchema import ApdbSqlSchema
48
49
50_LOG = logging.getLogger(__name__)
51
52_MON = MonAgent(__name__)
53
54VERSION = VersionTuple(1, 0, 0)
55"""Version for the code controlling replication tables. This needs to be
56updated following compatibility rules when schema produced by this code
57changes.
58"""
59
60
62 """Implementation of ApdbTableData that wraps sqlalchemy Result."""
63
64 def __init__(self, result: sqlalchemy.engine.Result, column_types: dict[str, felis.datamodel.DataType]):
65 self._column_defs = tuple((column, column_types[column]) for column in result.keys())
66 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall()))
67
68 def column_names(self) -> Sequence[str]:
69 return tuple(column_def[0] for column_def in self._column_defs)
70
71 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
72 return self._column_defs
73
74 def rows(self) -> Collection[tuple]:
75 return self._rows
76
77
79 """Implementation of `ApdbReplica` for SQL backend.
80
81 Parameters
82 ----------
83 schema : `ApdbSqlSchema`
84 Instance of `ApdbSqlSchema` class for APDB database.
85 engine : `sqlalchemy.engine.Engine`
86 Engine for database access.
87 db_schema_version : `VersionTuple`
88 Version of the database schema.
89 timer : `bool`, optional
90 If `True` then log timing information.
91 """
92
94 self,
95 schema: ApdbSqlSchema,
96 engine: sqlalchemy.engine.Engine,
97 db_schema_version: VersionTuple,
98 timer: bool = False,
99 ):
100 self._schema = schema
101 self._engine = engine
102 self._db_schema_version = db_schema_version
103
104 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
105 if timer:
106 self._timer_args.append(_LOG)
107
108 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
109 """Create `Timer` instance given its name."""
110 return Timer(name, *self._timer_args, tags=tags)
111
112 def schemaVersion(self) -> VersionTuple:
113 # Docstring inherited from base class.
114 return self._db_schema_version
115
116 @classmethod
117 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
118 # Docstring inherited from base class.
119 return VERSION
120
121 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
122 # docstring is inherited from a base class
123 if not self._schema.replication_enabled:
124 return None
125
126 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
127 assert table is not None, "replication_enabled=True means it must be defined"
128 query = sql.select(
129 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
130 ).order_by(table.columns["last_update_time"])
131 with self._timer("chunks_select_time") as timer:
132 with self._engine.connect() as conn:
133 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
134 ids = []
135 for row in result:
136 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
137 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
138 timer.add_values(row_count=len(ids))
139 return ids
140
141 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
142 # docstring is inherited from a base class
143 if not self._schema.replication_enabled:
144 raise ValueError("APDB is not configured for replication")
145
146 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
147 chunk_list = list(chunks)
148 where_clause = table.columns["apdb_replica_chunk"].in_(chunk_list)
149 stmt = table.delete().where(where_clause)
150 with self._timer("chunks_delete_time") as timer:
151 with self._engine.begin() as conn:
152 conn.execute(stmt)
153 timer.add_values(row_count=len(chunk_list))
154
155 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
156 # docstring is inherited from a base class
157 for chunk_table, table_enum in ExtraTables.replica_chunk_tables().items():
158 if table is table_enum:
159 return self._get_chunks(chunks, table, chunk_table)
160 raise ValueError(f"Table {table} does not support replica chunks.")
161
163 self,
164 chunks: Iterable[int],
165 table_enum: ApdbTables,
166 chunk_table_enum: ExtraTables,
167 ) -> ApdbTableData:
168 """Return catalog of records for given insert identifiers, common
169 implementation for all DIA tables.
170 """
171 if not self._schema.replication_enabled:
172 raise ValueError("APDB is not configured for replication")
173
174 table = self._schema.get_table(table_enum)
175 chunk_table = self._schema.get_table(chunk_table_enum)
176
177 join = table.join(chunk_table)
178 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
179 apdb_columns = self._schema.get_apdb_columns(table_enum)
180 where_clause = chunk_id_column.in_(chunks)
181 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
182
183 table_schema = self._schema.tableSchemas[table_enum]
184 # Regular tables should never have columns of ExtraDataTypes, this is
185 # just to make mypy happy.
186 column_types = {
187 column.name: column.datatype
188 for column in table_schema.columns
189 if not isinstance(column.datatype, ExtraDataTypes)
190 }
191 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
192
193 # execute select
194 with self._timer("table_chunk_select_time", tags={"table": table.name}) as timer:
195 with self._engine.begin() as conn:
196 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
197 table_data = ApdbSqlTableData(result, column_types)
198 timer.add_values(row_count=len(table_data.rows()))
199 return table_data
200
201 def getUpdateRecordChunks(self, chunks: Iterable[int]) -> Sequence[ApdbUpdateRecord]:
202 # docstring is inherited from a base class
203 if not self._schema.replication_enabled:
204 raise ValueError("APDB is not configured for replication")
205
206 try:
207 table = self._schema.get_table(ExtraTables.ApdbUpdateRecordChunks)
208 except ValueError:
209 # Table does not exist yet.
210 return []
211 query = table.select().where(table.columns["apdb_replica_chunk"].in_(chunks))
212
213 records = []
214 with self._timer("select_update_record_time", tags={"table": table.name}) as timer:
215 with self._engine.begin() as conn:
216 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
217 for row in result:
218 records.append(
219 ApdbUpdateRecord.from_json(row.update_time_ns, row.update_order, row.update_payload)
220 )
221 timer.add_values(row_count=len(records))
222
223 records.sort()
224 return records
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
None deleteReplicaChunks(self, Iterable[int] chunks)
Sequence[ApdbUpdateRecord] getUpdateRecordChunks(self, Iterable[int] chunks)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
__init__(self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, VersionTuple db_schema_version, bool timer=False)
ApdbTableData _get_chunks(self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
Sequence[tuple[str, felis.datamodel.DataType]] column_defs(self)
__init__(self, sqlalchemy.engine.Result result, dict[str, felis.datamodel.DataType] column_types)