LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
apdbCassandraSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraSchema"]
25
26import enum
27import logging
28from typing import List, Mapping, Optional, TYPE_CHECKING, Tuple, Union
29
30from .apdbSchema import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType, TableDef
31
32if TYPE_CHECKING:
33 import cassandra.cluster
34
35
36_LOG = logging.getLogger(__name__)
37
38
39@enum.unique
40class ExtraTables(enum.Enum):
41 """Names of the extra tables used by Cassandra implementation."""
42
43 DiaSourceToPartition = "DiaSourceToPartition"
44 "Maps diaSourceId ro its partition values (pixel and time)."
45
46 def table_name(self, prefix: str = "") -> str:
47 """Return full table name."""
48 return prefix + self.value
49
50
52 """Class for management of APDB schema.
53
54 Parameters
55 ----------
56 session : `cassandra.cluster.Session`
57 Cassandra session object
58 schema_file : `str`
59 Name of the YAML schema file.
60 schema_name : `str`, optional
61 Name of the schema in YAML files.
62 prefix : `str`, optional
63 Prefix to add to all schema elements.
64 time_partition_tables : `bool`
65 If True then schema will have a separate table for each time partition.
66 """
67
68 _type_map = dict(double="DOUBLE",
69 float="FLOAT",
70 timestamp="TIMESTAMP",
71 long="BIGINT",
72 int="INT",
73 short="INT",
74 byte="TINYINT",
75 binary="BLOB",
76 char="TEXT",
77 string="TEXT",
78 unicode="TEXT",
79 text="TEXT",
80 boolean="BOOLEAN")
81 """Map YAML column types to Cassandra"""
82
83 _time_partitioned_tables = [
84 ApdbTables.DiaObject,
85 ApdbTables.DiaSource,
86 ApdbTables.DiaForcedSource,
87 ]
88 _spatially_partitioned_tables = [ApdbTables.DiaObjectLast]
89
91 self,
92 session: cassandra.cluster.Session,
93 keyspace: str,
94 schema_file: str,
95 schema_name: str = "ApdbSchema",
96 prefix: str = "",
97 time_partition_tables: bool = False
98 ):
99
100 super().__init__(schema_file, schema_name)
101
102 self._session_session = session
103 self._keyspace_keyspace = keyspace
104 self._prefix_prefix = prefix
105 self._time_partition_tables_time_partition_tables = time_partition_tables
106
107 # add columns and index for partitioning.
108 self._ignore_tables_ignore_tables = []
109 for table, tableDef in self.tableSchemastableSchemas.items():
110 columns = []
111 add_columns = True
112 if table in self._spatially_partitioned_tables_spatially_partitioned_tables:
113 # DiaObjectLast does not need temporal partitioning
114 columns = ["apdb_part"]
115 elif table in self._time_partitioned_tables_time_partitioned_tables:
116 if time_partition_tables:
117 columns = ["apdb_part"]
118 else:
119 columns = ["apdb_part", "apdb_time_part"]
120 elif table is ApdbTables.SSObject:
121 # For SSObject there is no natural partition key but we have
122 # to partition it because there are too many of them. I'm
123 # going to partition on its primary key (and drop separate
124 # primary key index).
125 columns = ["ssObjectId"]
126 tableDef.indices = [
127 index for index in tableDef.indices if index.type is not IndexType.PRIMARY
128 ]
129 add_columns = False
130 else:
131 # TODO: Do not know yet how other tables can be partitioned
132 self._ignore_tables_ignore_tables.append(table)
133 add_columns = False
134
135 if add_columns:
136 # add columns to the column list
137 columnDefs = [
138 ColumnDef(name=name, type="long", nullable=False) for name in columns
139 ]
140 tableDef.columns = columnDefs + tableDef.columns
141
142 # make an index
143 if columns:
144 index = IndexDef(name=f"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
145 tableDef.indices.append(index)
146
147 self._extra_tables_extra_tables = self._extraTableSchema_extraTableSchema()
148
149 def _extraTableSchema(self) -> Mapping[ExtraTables, TableDef]:
150 """Generate schema for extra tables."""
151 return {
152 ExtraTables.DiaSourceToPartition: TableDef(
153 name=ExtraTables.DiaSourceToPartition.value,
154 columns=[
155 ColumnDef(name="diaSourceId", type="long", nullable=False),
156 ColumnDef(name="apdb_part", type="long", nullable=False),
157 ColumnDef(name="apdb_time_part", type="int", nullable=False),
158 ],
159 indices=[
160 IndexDef(
161 name=f"Part_{ExtraTables.DiaSourceToPartition.value}",
162 type=IndexType.PARTITION,
163 columns=["diaSourceId"],
164 ),
165 ],
166 ),
167 }
168
169 def tableName(self, table_name: Union[ApdbTables, ExtraTables]) -> str:
170 """Return Cassandra table name for APDB table.
171 """
172 return table_name.table_name(self._prefix_prefix)
173
174 def getColumnMap(self, table_name: Union[ApdbTables, ExtraTables]) -> Mapping[str, ColumnDef]:
175 """Returns mapping of column names to Column definitions.
176
177 Parameters
178 ----------
179 table_name : `ApdbTables`
180 One of known APDB table names.
181
182 Returns
183 -------
184 column_map : `dict`
185 Mapping of column names to `ColumnDef` instances.
186 """
187 if isinstance(table_name, ApdbTables):
188 table_schema = self.tableSchemastableSchemas[table_name]
189 else:
190 table_schema = self._extra_tables_extra_tables[table_name]
191 cmap = {column.name: column for column in table_schema.columns}
192 return cmap
193
194 def partitionColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
195 """Return a list of columns used for table partitioning.
196
197 Parameters
198 ----------
199 table_name : `ApdbTables`
200 Table name in APDB schema
201
202 Returns
203 -------
204 columns : `list` of `str`
205 Names of columns for used for partitioning.
206 """
207 if isinstance(table_name, ApdbTables):
208 table_schema = self.tableSchemastableSchemas[table_name]
209 else:
210 table_schema = self._extra_tables_extra_tables[table_name]
211 for index in table_schema.indices:
212 if index.type is IndexType.PARTITION:
213 # there could be just one partitoning index (possibly with few columns)
214 return index.columns
215 return []
216
217 def clusteringColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
218 """Return a list of columns used for clustering.
219
220 Parameters
221 ----------
222 table_name : `ApdbTables`
223 Table name in APDB schema
224
225 Returns
226 -------
227 columns : `list` of `str`
228 Names of columns for used for clustering.
229 """
230 if isinstance(table_name, ApdbTables):
231 table_schema = self.tableSchemastableSchemas[table_name]
232 else:
233 table_schema = self._extra_tables_extra_tables[table_name]
234 for index in table_schema.indices:
235 if index.type is IndexType.PRIMARY:
236 return index.columns
237 return []
238
239 def makeSchema(self, drop: bool = False, part_range: Optional[Tuple[int, int]] = None) -> None:
240 """Create or re-create all tables.
241
242 Parameters
243 ----------
244 drop : `bool`
245 If True then drop tables before creating new ones.
246 part_range : `tuple` [ `int` ] or `None`
247 Start and end partition number for time partitions, end is not
248 inclusive. Used to create per-partition DiaObject, DiaSource, and
249 DiaForcedSource tables. If `None` then per-partition tables are
250 not created.
251 """
252 for table in self.tableSchemastableSchemas:
253 self._makeTableSchema_makeTableSchema(table, drop, part_range)
254 for extra_table in self._extra_tables_extra_tables:
255 self._makeTableSchema_makeTableSchema(extra_table, drop, part_range)
256
257 def _makeTableSchema(
258 self,
259 table: Union[ApdbTables, ExtraTables],
260 drop: bool = False,
261 part_range: Optional[Tuple[int, int]] = None
262 ) -> None:
263 if table in self._ignore_tables_ignore_tables:
264 _LOG.debug("Skipping schema for table %s", table)
265 return
266 _LOG.debug("Making table %s", table)
267
268 fullTable = table.table_name(self._prefix_prefix)
269
270 table_list = [fullTable]
271 if part_range is not None:
272 if table in self._time_partitioned_tables_time_partitioned_tables:
273 partitions = range(*part_range)
274 table_list = [f"{fullTable}_{part}" for part in partitions]
275
276 if drop:
277 queries = [
278 f'DROP TABLE IF EXISTS "{self._keyspace}"."{table_name}"' for table_name in table_list
279 ]
280 futures = [self._session_session.execute_async(query, timeout=None) for query in queries]
281 for future in futures:
282 _LOG.debug("wait for query: %s", future.query)
283 future.result()
284 _LOG.debug("query finished: %s", future.query)
285
286 queries = []
287 for table_name in table_list:
288 if_not_exists = "" if drop else "IF NOT EXISTS"
289 columns = ", ".join(self._tableColumns_tableColumns(table))
290 query = f'CREATE TABLE {if_not_exists} "{self._keyspace}"."{table_name}" ({columns})'
291 _LOG.debug("query: %s", query)
292 queries.append(query)
293 futures = [self._session_session.execute_async(query, timeout=None) for query in queries]
294 for future in futures:
295 _LOG.debug("wait for query: %s", future.query)
296 future.result()
297 _LOG.debug("query finished: %s", future.query)
298
299 def _tableColumns(self, table_name: Union[ApdbTables, ExtraTables]) -> List[str]:
300 """Return set of columns in a table
301
302 Parameters
303 ----------
304 table_name : `ApdbTables`
305 Name of the table.
306
307 Returns
308 -------
309 column_defs : `list`
310 List of strings in the format "column_name type".
311 """
312 if isinstance(table_name, ApdbTables):
313 table_schema = self.tableSchemastableSchemas[table_name]
314 else:
315 table_schema = self._extra_tables_extra_tables[table_name]
316
317 # must have partition columns and clustering columns
318 part_columns = []
319 clust_columns = []
320 index_columns = set()
321 for index in table_schema.indices:
322 if index.type is IndexType.PARTITION:
323 part_columns = index.columns
324 elif index.type is IndexType.PRIMARY:
325 clust_columns = index.columns
326 index_columns.update(index.columns)
327 _LOG.debug("part_columns: %s", part_columns)
328 _LOG.debug("clust_columns: %s", clust_columns)
329 if not part_columns:
330 raise ValueError(f"Table {table_name} configuration is missing partition index")
331
332 # all columns
333 column_defs = []
334 for column in table_schema.columns:
335 ctype = self._type_map_type_map[column.type]
336 column_defs.append(f'"{column.name}" {ctype}')
337
338 # primary key definition
339 part_columns = [f'"{col}"' for col in part_columns]
340 clust_columns = [f'"{col}"' for col in clust_columns]
341 if len(part_columns) > 1:
342 columns = ", ".join(part_columns)
343 part_columns = [f"({columns})"]
344 pkey = ", ".join(part_columns + clust_columns)
345 _LOG.debug("pkey: %s", pkey)
346 column_defs.append(f"PRIMARY KEY ({pkey})")
347
348 return column_defs
std::vector< SchemaItem< Flag > > * items
List[str] clusteringColumns(self, Union[ApdbTables, ExtraTables] table_name)
Mapping[str, ColumnDef] getColumnMap(self, Union[ApdbTables, ExtraTables] table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
Mapping[ExtraTables, TableDef] _extraTableSchema(self)
List[str] partitionColumns(self, Union[ApdbTables, ExtraTables] table_name)
str tableName(self, Union[ApdbTables, ExtraTables] table_name)
def __init__(self, cassandra.cluster.Session session, str keyspace, str schema_file, str schema_name="ApdbSchema", str prefix="", bool time_partition_tables=False)
List[str] _tableColumns(self, Union[ApdbTables, ExtraTables] table_name)
None _makeTableSchema(self, Union[ApdbTables, ExtraTables] table, bool drop=False, Optional[Tuple[int, int]] part_range=None)
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33