Loading [MathJax]/extensions/tex2jax.js
LSST Applications g04a91732dc+cc8870d3f5,g07dc498a13+5aa0b8792f,g0fba68d861+80045be308,g1409bbee79+5aa0b8792f,g1a7e361dbc+5aa0b8792f,g1fd858c14a+f64bc332a9,g208c678f98+1ae86710ed,g35bb328faa+fcb1d3bbc8,g4d2262a081+47ad8a29a8,g4d39ba7253+9633a327c1,g4e0f332c67+5d362be553,g53246c7159+fcb1d3bbc8,g60b5630c4e+9633a327c1,g668ecb457e+25d63fd678,g78460c75b0+2f9a1b4bcd,g786e29fd12+cf7ec2a62a,g7b71ed6315+fcb1d3bbc8,g8852436030+8b64ca622a,g89139ef638+5aa0b8792f,g89e1512fd8+04325574d3,g8d6b6b353c+9633a327c1,g9125e01d80+fcb1d3bbc8,g989de1cb63+5aa0b8792f,g9f33ca652e+b196626af7,ga9baa6287d+9633a327c1,gaaedd4e678+5aa0b8792f,gabe3b4be73+1e0a283bba,gb1101e3267+71e32094df,gb58c049af0+f03b321e39,gb90eeb9370+2807b1ad02,gcf25f946ba+8b64ca622a,gd315a588df+a39986a76f,gd6cbbdb0b4+c8606af20c,gd9a9a58781+fcb1d3bbc8,gde0f65d7ad+4e42d81ab7,ge278dab8ac+932305ba37,ge82c20c137+76d20ab76d,gfe73954cf8+a1301e4c20,w.2025.11
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbSqlReplica.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module defining Apdb class and related methods."""
23
24from __future__ import annotations
25
26__all__ = ["ApdbSqlReplica"]
27
28import logging
29from collections.abc import Collection, Iterable, Mapping, Sequence
30from typing import TYPE_CHECKING, cast
31
32import astropy.time
33import sqlalchemy
34from sqlalchemy import sql
35
36from ..apdbReplica import ApdbReplica, ApdbTableData, ReplicaChunk
37from ..apdbSchema import ApdbTables
38from ..monitor import MonAgent
39from ..timer import Timer
40from ..versionTuple import VersionTuple
41from .apdbSqlSchema import ExtraTables
42
43if TYPE_CHECKING:
44 from .apdbSqlSchema import ApdbSqlSchema
45
46
47_LOG = logging.getLogger(__name__)
48
49_MON = MonAgent(__name__)
50
51VERSION = VersionTuple(1, 0, 0)
52"""Version for the code controlling replication tables. This needs to be
53updated following compatibility rules when schema produced by this code
54changes.
55"""
56
57
59 """Implementation of ApdbTableData that wraps sqlalchemy Result."""
60
61 def __init__(self, result: sqlalchemy.engine.Result):
62 self._keys = list(result.keys())
63 self._rows: list[tuple] = cast(list[tuple], list(result.fetchall()))
64
65 def column_names(self) -> Sequence[str]:
66 return self._keys
67
68 def rows(self) -> Collection[tuple]:
69 return self._rows
70
71
73 """Implementation of `ApdbReplica` for SQL backend.
74
75 Parameters
76 ----------
77 schema : `ApdbSqlSchema`
78 Instance of `ApdbSqlSchema` class for APDB database.
79 engine : `sqlalchemy.engine.Engine`
80 Engine for database access.
81 timer : `bool`, optional
82 If `True` then log timing information.
83 """
84
85 def __init__(self, schema: ApdbSqlSchema, engine: sqlalchemy.engine.Engine, timer: bool = False):
86 self._schema = schema
87 self._engine = engine
88
89 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
90 if timer:
91 self._timer_args.append(_LOG)
92
93 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
94 """Create `Timer` instance given its name."""
95 return Timer(name, *self._timer_args, tags=tags)
96
97 @classmethod
98 def apdbReplicaImplementationVersion(cls) -> VersionTuple:
99 # Docstring inherited from base class.
100 return VERSION
101
102 def getReplicaChunks(self) -> list[ReplicaChunk] | None:
103 # docstring is inherited from a base class
104 if not self._schema.has_replica_chunks:
105 return None
106
107 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
108 assert table is not None, "has_replica_chunks=True means it must be defined"
109 query = sql.select(
110 table.columns["apdb_replica_chunk"], table.columns["last_update_time"], table.columns["unique_id"]
111 ).order_by(table.columns["last_update_time"])
112 with self._timer("chunks_select_time") as timer:
113 with self._engine.connect() as conn:
114 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
115 ids = []
116 for row in result:
117 last_update_time = astropy.time.Time(row[1].timestamp(), format="unix_tai")
118 ids.append(ReplicaChunk(id=row[0], last_update_time=last_update_time, unique_id=row[2]))
119 timer.add_values(row_count=len(ids))
120 return ids
121
122 def deleteReplicaChunks(self, chunks: Iterable[int]) -> None:
123 # docstring is inherited from a base class
124 if not self._schema.has_replica_chunks:
125 raise ValueError("APDB is not configured for replication")
126
127 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
128 chunk_list = list(chunks)
129 where_clause = table.columns["apdb_replica_chunk"].in_(chunk_list)
130 stmt = table.delete().where(where_clause)
131 with self._timer("chunks_delete_time") as timer:
132 with self._engine.begin() as conn:
133 conn.execute(stmt)
134 timer.add_values(row_count=len(chunk_list))
135
136 def getDiaObjectsChunks(self, chunks: Iterable[int]) -> ApdbTableData:
137 # docstring is inherited from a base class
138 return self._get_chunks(chunks, ApdbTables.DiaObject, ExtraTables.DiaObjectChunks)
139
140 def getDiaSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
141 # docstring is inherited from a base class
142 return self._get_chunks(chunks, ApdbTables.DiaSource, ExtraTables.DiaSourceChunks)
143
144 def getDiaForcedSourcesChunks(self, chunks: Iterable[int]) -> ApdbTableData:
145 # docstring is inherited from a base class
146 return self._get_chunks(chunks, ApdbTables.DiaForcedSource, ExtraTables.DiaForcedSourceChunks)
147
149 self,
150 chunks: Iterable[int],
151 table_enum: ApdbTables,
152 chunk_table_enum: ExtraTables,
153 ) -> ApdbTableData:
154 """Return catalog of records for given insert identifiers, common
155 implementation for all DIA tables.
156 """
157 if not self._schema.has_replica_chunks:
158 raise ValueError("APDB is not configured for replication")
159
160 table = self._schema.get_table(table_enum)
161 chunk_table = self._schema.get_table(chunk_table_enum)
162
163 join = table.join(chunk_table)
164 chunk_id_column = chunk_table.columns["apdb_replica_chunk"]
165 apdb_columns = self._schema.get_apdb_columns(table_enum)
166 where_clause = chunk_id_column.in_(chunks)
167 query = sql.select(chunk_id_column, *apdb_columns).select_from(join).where(where_clause)
168
169 # execute select
170 with self._timer("table_chunk_select_time", tags={"table": table.name}) as timer:
171 with self._engine.begin() as conn:
172 result = conn.execution_options(stream_results=True, max_row_buffer=10000).execute(query)
173 table_data = ApdbSqlTableData(result)
174 timer.add_values(row_count=len(table_data.rows()))
175 return table_data
list[ReplicaChunk]|None getReplicaChunks(self)
ApdbTableData getDiaSourcesChunks(self, Iterable[int] chunks)
None deleteReplicaChunks(self, Iterable[int] chunks)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
ApdbTableData _get_chunks(self, Iterable[int] chunks, ApdbTables table_enum, ExtraTables chunk_table_enum)
ApdbTableData getDiaObjectsChunks(self, Iterable[int] chunks)
__init__(self, ApdbSqlSchema schema, sqlalchemy.engine.Engine engine, bool timer=False)
ApdbTableData getDiaForcedSourcesChunks(self, Iterable[int] chunks)
__init__(self, sqlalchemy.engine.Result result)