LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSqlReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSqlReplica"]
27
28import logging
29from collections.abc import Collection, Iterable, Mapping, Sequence
30from typing import TYPE_CHECKING, cast
31
32import astropy.time
33import felis.datamodel
34import sqlalchemy
35from sqlalchemy import sql
36
37from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
38from ..apdbSchema import ApdbTables
39from ..monitor import MonAgent
40from ..schema_model import ExtraDataTypes
41from ..timer import Timer
42from ..versionTuple import VersionTuple
43from .apdbSqlSchema import ExtraTables
44
45if TYPE_CHECKING:
46 from .apdbSqlSchema import ApdbSqlSchema
47
48
49_LOG = logging.getLogger(__name__)
50
51_MON = MonAgent(__name__)
52
53VERSION = VersionTuple(1, 0, 0)
54"""Version for the code controlling replication tables. This needs to be
55updated following compatibility rules when schema produced by this code
56changes.
57"""
58
59
61 """Implementation of ApdbTableData that wraps sqlalchemy Result."""
62
63 def __init__(self, result: sqlalchemy.engine.Result, column_types: dict[str, felis.datamodel.DataType]):
64 self._column_defs = tuple((column, column_types[column]) for column in result.keys())
65 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall()))
66
67 def column_names(self) -> Sequence[str]:
68 return tuple(column_def[0] for column_def in self._column_defs)
69
70 def column_defs(self) -> Sequence[tuple[str, felis.datamodel.DataType]]:
71 return self._column_defs
72
73 def rows(self) -> Collection[tuple]:
74 return self._rows
75
76
78 """Implementation of `ApdbReplica` for SQL backend.
79
80 Parameters
81 ----------
82 schema : `ApdbSqlSchema`
83 Instance of `ApdbSqlSchema` class for APDB database.
84 engine : `sqlalchemy.engine.Engine`
85 Engine for database access.
86 db_schema_version : `VersionTuple`
87 Version of the database schema.
88 timer : `bool`, optional
89 If `True` then log timing information.
90 """
91
93 self,
94 schema: ApdbSqlSchema,
95 engine: sqlalchemy.engine.Engine,
96 db_schema_version: VersionTuple,
97 timer: bool = False,
98 ):
99 self._schema = schema
100 self._engine = engine
101 self._db_schema_version = db_schema_version
102
103 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
104 if timer:
105 self._timer_args.append(_LOG)
106
107 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
108 """Create `Timer` instance given its name."""
109 return Timer(name, *self._timer_args, tags=tags)
110
111 def schemaVersion(self) -> VersionTuple:
112 # Docstring inherited from base class.
113 return self._db_schema_version
114
115 @classmethod
116 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
117 # Docstring inherited from base class.
118 return VERSION
119
120 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
121 # docstring is inherited from a base class
122 if not self._schema.replication_enabled:
123 return None
124
125 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
126 assert table is not None, "replication_enabled=True means it must be defined"
127 query = sql.select(
128 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
129 ).order_by(table.columns["last_update_time"])
130 with self._timer("chunks_select_time") as timer:
131 with self._engine.connect() as conn:
132 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
133 ids = []
134 for row in result:
135 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
136 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
137 timer.add_values(row_count=len(ids))
138 return ids
139
140 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
141 # docstring is inherited from a base class
142 if not self._schema.replication_enabled:
143 raise ValueError("APDB is not configured for replication")
144
145 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
146 chunk_list = list(chunks)
147 where_clause = table.columns["apdb_replica_chunk"].in_(chunk_list)
148 stmt = table.delete().where(where_clause)
149 with self._timer("chunks_delete_time") as timer:
150 with self._engine.begin() as conn:
151 conn.execute(stmt)
152 timer.add_values(row_count=len(chunk_list))
153
154 def getTableDataChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
155 # docstring is inherited from a base class
156 for chunk_table, table_enum in ExtraTables.replica_chunk_tables().items():
157 if table is table_enum:
158 return self._get_chunks(chunks, table, chunk_table)
159 raise ValueError(f"Table {table} does not support replica chunks.")
160
162 self,
163 chunks: Iterable[int],
164 table_enum: ApdbTables,
165 chunk_table_enum: ExtraTables,
166 ) -> ApdbTableData:
167 """Return catalog of records for given insert identifiers, common
168 implementation for all DIA tables.
169 """
170 if not self._schema.replication_enabled:
171 raise ValueError("APDB is not configured for replication")
172
173 table = self._schema.get_table(table_enum)
174 chunk_table = self._schema.get_table(chunk_table_enum)
175
176 join = table.join(chunk_table)
177 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
178 apdb_columns = self._schema.get_apdb_columns(table_enum)
179 where_clause = chunk_id_column.in_(chunks)
180 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
181
182 table_schema = self._schema.tableSchemas[table_enum]
183 # Regular tables should never have columns of ExtraDataTypes, this is
184 # just to make mypy happy.
185 column_types = {
186 column.name: column.datatype
187 for column in table_schema.columns
188 if not isinstance(column.datatype, ExtraDataTypes)
189 }
190 column_types["apdb_replica_chunk"] = felis.datamodel.DataType.long
191
192 # execute select
193 with self._timer("table_chunk_select_time", tags={"table": table.name}) as timer:
194 with self._engine.begin() as conn:
195 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
196 table_data = ApdbSqlTableData(result, column_types)
197 timer.add_values(row_count=len(table_data.rows()))
198 return table_data
199
200 def getTableUpdateChunks(self, table: ApdbTables, chunks: Iterable[int]) -> ApdbTableData:
201 # docstring is inherited from a base class
202 raise NotImplementedError()
ApdbTableData getTableDataChunks(self, ApdbTables table, Iterable[int] chunks)
list[ReplicaChunk]|None getReplicaChunks(self)
None deleteReplicaChunks(self, Iterable[int] chunks)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
__init__(self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, VersionTuple db_schema_version, bool timer=False)
ApdbTableData _get_chunks(self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
ApdbTableData getTableUpdateChunks(self, ApdbTables table, Iterable[int] chunks)
Sequence[tuple[str, felis.datamodel.DataType]] column_defs(self)
__init__(self, sqlalchemy.engine.Result result, dict[str, felis.datamodel.DataType] column_types)