LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema"]
25
26import enum
27import logging
28from collections.abc import Mapping
29from typing import TYPE_CHECKING, List, Optional, Tuple, Union
30
31import felis.types
32from felis import simple
33
34from .apdbSchema import ApdbSchema, ApdbTables
35
36if TYPE_CHECKING:
37 import cassandra.cluster
38
39
40_LOG = logging.getLogger(__name__)
41
42
43@enum.unique
44class ExtraTables(enum.Enum):
45 """Names of the extra tables used by Cassandra implementation."""
46
47 DiaSourceToPartition = "DiaSourceToPartition"
48 "Maps diaSourceId ro its partition values (pixel and time)."
49
50 def table_name(self, prefix: str = "") -> str:
51 """Return full table name."""
52 return prefix + self.value
53
54
56 """Class for management of APDB schema.
57
58 Parameters
59 ----------
60 session : `cassandra.cluster.Session`
61 Cassandra session object
62 schema_file : `str`
63 Name of the YAML schema file.
64 schema_name : `str`, optional
65 Name of the schema in YAML files.
66 prefix : `str`, optional
67 Prefix to add to all schema elements.
68 time_partition_tables : `bool`
69 If True then schema will have a separate table for each time partition.
70 """
71
72 _type_map = {
73 felis.types.Double: "DOUBLE",
74 felis.types.Float: "FLOAT",
75 felis.types.Timestamp: "TIMESTAMP",
76 felis.types.Long: "BIGINT",
77 felis.types.Int: "INT",
78 felis.types.Short: "INT",
79 felis.types.Byte: "TINYINT",
80 felis.types.Binary: "BLOB",
81 felis.types.Char: "TEXT",
82 felis.types.String: "TEXT",
83 felis.types.Unicode: "TEXT",
84 felis.types.Text: "TEXT",
85 felis.types.Boolean: "BOOLEAN",
86 }
87 """Map YAML column types to Cassandra"""
88
89 _time_partitioned_tables = [
90 ApdbTables.DiaObject,
91 ApdbTables.DiaSource,
92 ApdbTables.DiaForcedSource,
93 ]
94 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
95
97 self,
98 session: cassandra.cluster.Session,
99 keyspace: str,
100 schema_file: str,
101 schema_name: str = "ApdbSchema",
102 prefix: str = "",
103 time_partition_tables: bool = False
104 ):
105
106 super().__init__(schema_file, schema_name)
107
108 self._session = session
109 self._keyspace = keyspace
110 self._prefix = prefix
111 self._time_partition_tables = time_partition_tables
112
113 # add columns and index for partitioning.
114 self._ignore_tables = []
115 for table, tableDef in self.tableSchemas.items():
116 columns = []
117 add_columns = True
118 if table in self._spatially_partitioned_tables:
119 # DiaObjectLast does not need temporal partitioning
120 columns = ["apdb_part"]
121 elif table in self._time_partitioned_tables:
122 if time_partition_tables:
123 columns = ["apdb_part"]
124 else:
125 columns = ["apdb_part", "apdb_time_part"]
126 elif table is ApdbTables.SSObject:
127 # For SSObject there is no natural partition key but we have
128 # to partition it because there are too many of them. I'm
129 # going to partition on its primary key (and drop separate
130 # primary key index).
131 columns = ["ssObjectId"]
132 tableDef.primary_key = []
133 add_columns = False
134 else:
135 # TODO: Do not know yet how other tables can be partitioned
136 self._ignore_tables.append(table)
137 add_columns = False
138
139 if add_columns:
140 # add columns to the column list
141 columnDefs = [
142 simple.Column(
143 id=f"#{name}",
144 name=name,
145 datatype=felis.types.Long,
146 nullable=False,
147 ) for name in columns
148 ]
149 tableDef.columns = columnDefs + tableDef.columns
150
151 # make a partitioning index
152 if columns:
153 annotations = dict(tableDef.annotations)
154 annotations["cassandra:paritioning_columns"] = columns
155 tableDef.annotations = annotations
156
157 self._extra_tables = self._extraTableSchema()
158
159 def _extraTableSchema(self) -> Mapping[ExtraTables, simple.Table]:
160 """Generate schema for extra tables."""
161 return {
162 ExtraTables.DiaSourceToPartition: simple.Table(
163 id="#" + ExtraTables.DiaSourceToPartition.value,
164 name=ExtraTables.DiaSourceToPartition.value,
165 columns=[
166 simple.Column(
167 id="#diaSourceId", name="diaSourceId", datatype=felis.types.Long, nullable=False
168 ),
169 simple.Column(
170 id="#apdb_part", name="apdb_part", datatype=felis.types.Long, nullable=False
171 ),
172 simple.Column(
173 id="#apdb_time_part", name="apdb_time_part", datatype=felis.types.Int, nullable=False
174 ),
175 ],
176 primary_key=[],
177 indexes=[],
178 constraints=[],
179 annotations={"cassandra:paritioning_columns": ["diaSourceId"]},
180 ),
181 }
182
183 def tableName(self, table_name: Union[ApdbTables, ExtraTables]) -> str:
184 """Return Cassandra table name for APDB table.
185 """
186 return table_name.table_name(self._prefix)
187
188 def getColumnMap(self, table_name: Union[ApdbTables, ExtraTables]) -> Mapping[str, simple.Column]:
189 """Returns mapping of column names to Column definitions.
190
191 Parameters
192 ----------
193 table_name : `ApdbTables`
194 One of known APDB table names.
195
196 Returns
197 -------
198 column_map : `dict`
199 Mapping of column names to `ColumnDef` instances.
200 """
201 if isinstance(table_name, ApdbTables):
202 table_schema = self.tableSchemas[table_name]
203 else:
204 table_schema = self._extra_tables[table_name]
205 cmap = {column.name: column for column in table_schema.columns}
206 return cmap
207
208 def partitionColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
209 """Return a list of columns used for table partitioning.
210
211 Parameters
212 ----------
213 table_name : `ApdbTables`
214 Table name in APDB schema
215
216 Returns
217 -------
218 columns : `list` of `str`
219 Names of columns for used for partitioning.
220 """
221 if isinstance(table_name, ApdbTables):
222 table_schema = self.tableSchemas[table_name]
223 else:
224 table_schema = self._extra_tables[table_name]
225 return table_schema.annotations.get("cassandra:paritioning_columns", [])
226
227 def clusteringColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
228 """Return a list of columns used for clustering.
229
230 Parameters
231 ----------
232 table_name : `ApdbTables`
233 Table name in APDB schema
234
235 Returns
236 -------
237 columns : `list` of `str`
238 Names of columns for used for clustering.
239 """
240 if isinstance(table_name, ApdbTables):
241 table_schema = self.tableSchemas[table_name]
242 else:
243 table_schema = self._extra_tables[table_name]
244 return [column.name for column in table_schema.primary_key]
245
246 def makeSchema(self, drop: bool = False, part_range: Optional[Tuple[int, int]] = None) -> None:
247 """Create or re-create all tables.
248
249 Parameters
250 ----------
251 drop : `bool`
252 If True then drop tables before creating new ones.
253 part_range : `tuple` [ `int` ] or `None`
254 Start and end partition number for time partitions, end is not
255 inclusive. Used to create per-partition DiaObject, DiaSource, and
256 DiaForcedSource tables. If `None` then per-partition tables are
257 not created.
258 """
259 for table in self.tableSchemas:
260 self._makeTableSchema(table, drop, part_range)
261 for extra_table in self._extra_tables:
262 self._makeTableSchema(extra_table, drop, part_range)
263
264 def _makeTableSchema(
265 self,
266 table: Union[ApdbTables, ExtraTables],
267 drop: bool = False,
268 part_range: Optional[Tuple[int, int]] = None
269 ) -> None:
270 if table in self._ignore_tables:
271 _LOG.debug("Skipping schema for table %s", table)
272 return
273 _LOG.debug("Making table %s", table)
274
275 fullTable = table.table_name(self._prefix)
276
277 table_list = [fullTable]
278 if part_range is not None:
279 if table in self._time_partitioned_tables:
280 partitions = range(*part_range)
281 table_list = [f"{fullTable}_{part}" for part in partitions]
282
283 if drop:
284 queries = [
285 f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list
286 ]
287 futures = [self._session.execute_async(query, timeout=None) for query in queries]
288 for future in futures:
289 _LOG.debug("wait for query: %s", future.query)
290 future.result()
291 _LOG.debug("query finished: %s", future.query)
292
293 queries = []
294 for table_name in table_list:
295 if_not_exists = "" if drop else "IF NOT EXISTS"
296 columns = ", ".join(self._tableColumns(table))
297 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
298 _LOG.debug("query: %s", query)
299 queries.append(query)
300 futures = [self._session.execute_async(query, timeout=None) for query in queries]
301 for future in futures:
302 _LOG.debug("wait for query: %s", future.query)
303 future.result()
304 _LOG.debug("query finished: %s", future.query)
305
306 def _tableColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
307 """Return set of columns in a table
308
309 Parameters
310 ----------
311 table_name : `ApdbTables`
312 Name of the table.
313
314 Returns
315 -------
316 column_defs : `list`
317 List of strings in the format "column_name type".
318 """
319 if isinstance(table_name, ApdbTables):
320 table_schema = self.tableSchemas[table_name]
321 else:
322 table_schema = self._extra_tables[table_name]
323
324 # must have partition columns and clustering columns
325 part_columns = table_schema.annotations.get("cassandra:paritioning_columns", [])
326 clust_columns = [column.name for column in table_schema.primary_key]
327 _LOG.debug("part_columns: %s", part_columns)
328 _LOG.debug("clust_columns: %s", clust_columns)
329 if not part_columns:
330 raise ValueError(f"Table {table_name} configuration is missing partition index")
331
332 # all columns
333 column_defs = []
334 for column in table_schema.columns:
335 ctype = self._type_map[column.datatype]
336 column_defs.append(f'"{column.name}" {ctype}')
337
338 # primary key definition
339 part_columns = [f'"{col}"' for col in part_columns]
340 clust_columns = [f'"{col}"' for col in clust_columns]
341 if len(part_columns) > 1:
342 columns = ", ".join(part_columns)
343 part_columns = [f"({columns})"]
344 pkey = ", ".join(part_columns + clust_columns)
345 _LOG.debug("pkey: %s", pkey)
346 column_defs.append(f"PRIMARY KEY ({pkey})")
347
348 return column_defs
std::vector< SchemaItem< Flag > > * items
List[str] clusteringColumns(self, Union[ApdbTables, ExtraTables] table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
List[str] partitionColumns(self, Union[ApdbTables, ExtraTables] table_name)
Mapping[ExtraTables, simple.Table] _extraTableSchema(self)
Mapping[str, simple.Column] getColumnMap(self, Union[ApdbTables, ExtraTables] table_name)
str tableName(self, Union[ApdbTables, ExtraTables] table_name)
def __init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False)
List[str] _tableColumns(self, Union[ApdbTables, ExtraTables] table_name)
None _makeTableSchema(self, Union[ApdbTables, ExtraTables] table, bool drop=False, Optional[Tuple[int, int]] part_range=None)