LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
apdbCassandraSchema.py
Go to the documentation of this file.
1 # This file is part of dax_apdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 from __future__ import annotations
23 
24 __all__ = ["ApdbCassandraSchema"]
25 
26 import itertools
27 import logging
28 from typing import List, Mapping, Optional, TYPE_CHECKING, Tuple
29 
30 from .apdbSchema import ApdbSchema, ApdbTables, ColumnDef, IndexDef, IndexType
31 
32 if TYPE_CHECKING:
33  import cassandra.cluster
34 
35 
36 _LOG = logging.getLogger(__name__)
37 
38 
40  """Class for management of APDB schema.
41 
42  Parameters
43  ----------
44  session : `cassandra.cluster.Session`
45  Cassandra session object
46  schema_file : `str`
47  Name of the YAML schema file.
48  extra_schema_file : `str`, optional
49  Name of the YAML schema file with extra column definitions.
50  prefix : `str`, optional
51  Prefix to add to all schema elements.
52  packing : `str`
53  Type of packing to apply to columns, string "none" disable packing,
54  any other value enables it.
55  time_partition_tables : `bool`
56  If True then schema will have a separate table for each time partition.
57  """
58 
59  _type_map = dict(DOUBLE="DOUBLE",
60  FLOAT="FLOAT",
61  DATETIME="TIMESTAMP",
62  BIGINT="BIGINT",
63  INTEGER="INT",
64  INT="INT",
65  TINYINT="TINYINT",
66  BLOB="BLOB",
67  CHAR="TEXT",
68  BOOL="BOOLEAN")
69  """Map YAML column types to Cassandra"""
70 
71  def __init__(self, session: cassandra.cluster.Session, schema_file: str,
72  extra_schema_file: Optional[str] = None, prefix: str = "",
73  packing: str = "none", time_partition_tables: bool = False):
74 
75  super().__init__(schema_file, extra_schema_file)
76 
77  self._session_session = session
78  self._prefix_prefix = prefix
79  self._packing_packing = packing
80 
81  # add columns and index for partitioning.
82  self._ignore_tables_ignore_tables = []
83  for table, tableDef in self.tableSchemastableSchemas.items():
84  columns = []
85  if table is ApdbTables.DiaObjectLast:
86  # DiaObjectLast does not need temporal partitioning
87  columns = ["apdb_part"]
88  elif table in (ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource):
89  # these three tables can use either pure spatial or combined
90  if time_partition_tables:
91  columns = ["apdb_part"]
92  else:
93  columns = ["apdb_part", "apdb_time_part"]
94  else:
95  # TODO: Do not know yet how other tables can be partitioned
96  self._ignore_tables_ignore_tables.append(table)
97 
98  # add columns to the column list
99  columnDefs = [ColumnDef(name=name,
100  type="BIGINT",
101  nullable=False,
102  default=None,
103  description="",
104  unit=None,
105  ucd=None) for name in columns]
106  tableDef.columns = columnDefs + tableDef.columns
107 
108  # make an index
109  index = IndexDef(name=f"Part_{tableDef.name}", type=IndexType.PARTITION, columns=columns)
110  tableDef.indices.append(index)
111 
112  self._packed_columns_packed_columns = {}
113  if self._packing_packing != "none":
114  for table, tableDef in self.tableSchemastableSchemas.items():
115  index_columns = set(itertools.chain.from_iterable(
116  index.columns for index in tableDef.indices
117  ))
118  columnsDefs = [column for column in tableDef.columns if column.name not in index_columns]
119  self._packed_columns_packed_columns[table] = columnsDefs
120 
121  def tableName(self, table_name: ApdbTables) -> str:
122  """Return Cassandra table name for APDB table.
123  """
124  return table_name.table_name(self._prefix_prefix)
125 
126  def getColumnMap(self, table_name: ApdbTables) -> Mapping[str, ColumnDef]:
127  """Returns mapping of column names to Column definitions.
128 
129  Parameters
130  ----------
131  table_name : `ApdbTables`
132  One of known APDB table names.
133 
134  Returns
135  -------
136  column_map : `dict`
137  Mapping of column names to `ColumnDef` instances.
138  """
139  table = self.tableSchemastableSchemas[table_name]
140  cmap = {column.name: column for column in table.columns}
141  return cmap
142 
143  def partitionColumns(self, table_name: ApdbTables) -> List[str]:
144  """Return a list of columns used for table partitioning.
145 
146  Parameters
147  ----------
148  table_name : `ApdbTables`
149  Table name in APDB schema
150 
151  Returns
152  -------
153  columns : `list` of `str`
154  Names of columns for used for partitioning.
155  """
156  table_schema = self.tableSchemastableSchemas[table_name]
157  for index in table_schema.indices:
158  if index.type is IndexType.PARTITION:
159  # there could be just one partitoning index (possibly with few columns)
160  return index.columns
161  return []
162 
163  def clusteringColumns(self, table_name: ApdbTables) -> List[str]:
164  """Return a list of columns used for clustering.
165 
166  Parameters
167  ----------
168  table_name : `ApdbTables`
169  Table name in APDB schema
170 
171  Returns
172  -------
173  columns : `list` of `str`
174  Names of columns for used for partitioning.
175  """
176  table_schema = self.tableSchemastableSchemas[table_name]
177  for index in table_schema.indices:
178  if index.type is IndexType.PRIMARY:
179  return index.columns
180  return []
181 
182  def makeSchema(self, drop: bool = False, part_range: Optional[Tuple[int, int]] = None) -> None:
183  """Create or re-create all tables.
184 
185  Parameters
186  ----------
187  drop : `bool`
188  If True then drop tables before creating new ones.
189  part_range : `tuple` [ `int` ] or `None`
190  Start and end partition number for time partitions, end is not
191  inclusive. Used to create per-partition DiaObject, DiaSource, and
192  DiaForcedSource tables. If `None` then per-partition tables are
193  not created.
194  """
195 
196  for table in self.tableSchemastableSchemas:
197  if table in self._ignore_tables_ignore_tables:
198  _LOG.debug("Skipping schema for table %s", table)
199  continue
200  _LOG.debug("Making table %s", table)
201 
202  fullTable = table.table_name(self._prefix_prefix)
203 
204  table_list = [fullTable]
205  if part_range is not None:
206  if table in (ApdbTables.DiaSource, ApdbTables.DiaForcedSource, ApdbTables.DiaObject):
207  partitions = range(*part_range)
208  table_list = [f"{fullTable}_{part}" for part in partitions]
209 
210  if drop:
211  queries = [f'DROP TABLE IF EXISTS "{table_name}"' for table_name in table_list]
212  futures = [self._session_session.execute_async(query, timeout=None) for query in queries]
213  for future in futures:
214  _LOG.debug("wait for query: %s", future.query)
215  future.result()
216  _LOG.debug("query finished: %s", future.query)
217 
218  queries = []
219  for table_name in table_list:
220  if_not_exists = "" if drop else "IF NOT EXISTS"
221  columns = ", ".join(self._tableColumns_tableColumns(table))
222  query = f'CREATE TABLE {if_not_exists} "{table_name}" ({columns})'
223  _LOG.debug("query: %s", query)
224  queries.append(query)
225  futures = [self._session_session.execute_async(query, timeout=None) for query in queries]
226  for future in futures:
227  _LOG.debug("wait for query: %s", future.query)
228  future.result()
229  _LOG.debug("query finished: %s", future.query)
230 
231  def _tableColumns(self, table_name: ApdbTables) -> List[str]:
232  """Return set of columns in a table
233 
234  Parameters
235  ----------
236  table_name : `ApdbTables`
237  Name of the table.
238 
239  Returns
240  -------
241  column_defs : `list`
242  List of strings in the format "column_name type".
243  """
244  table_schema = self.tableSchemastableSchemas[table_name]
245 
246  # must have partition columns and clustering columns
247  part_columns = []
248  clust_columns = []
249  index_columns = set()
250  for index in table_schema.indices:
251  if index.type is IndexType.PARTITION:
252  part_columns = index.columns
253  elif index.type is IndexType.PRIMARY:
254  clust_columns = index.columns
255  index_columns.update(index.columns)
256  _LOG.debug("part_columns: %s", part_columns)
257  _LOG.debug("clust_columns: %s", clust_columns)
258  if not part_columns:
259  raise ValueError(f"Table {table_name} configuration is missing partition index")
260  if not clust_columns:
261  raise ValueError(f"Table {table_name} configuration is missing primary index")
262 
263  # all columns
264  column_defs = []
265  for column in table_schema.columns:
266  if self._packing_packing != "none" and column.name not in index_columns:
267  # when packing all non-index columns are replaced by a BLOB
268  continue
269  ctype = self._type_map_type_map[column.type]
270  column_defs.append(f'"{column.name}" {ctype}')
271 
272  # packed content goes to a single blob column
273  if self._packing_packing != "none":
274  column_defs.append('"apdb_packed" blob')
275 
276  # primary key definition
277  part_columns = [f'"{col}"' for col in part_columns]
278  clust_columns = [f'"{col}"' for col in clust_columns]
279  if len(part_columns) > 1:
280  columns = ", ".join(part_columns)
281  part_columns = [f"({columns})"]
282  pkey = ", ".join(part_columns + clust_columns)
283  _LOG.debug("pkey: %s", pkey)
284  column_defs.append(f"PRIMARY KEY ({pkey})")
285 
286  return column_defs
287 
288  def packedColumns(self, table_name: ApdbTables) -> List[ColumnDef]:
289  """Return set of columns that are packed into BLOB.
290 
291  Parameters
292  ----------
293  table_name : `ApdbTables`
294  Name of the table.
295 
296  Returns
297  -------
298  columns : `list` [ `ColumnDef` ]
299  List of column definitions. Empty list is returned if packing is
300  not configured.
301  """
302  return self._packed_columns_packed_columns.get(table_name, [])
std::vector< SchemaItem< Flag > > * items
List[str] _tableColumns(self, ApdbTables table_name)
List[str] partitionColumns(self, ApdbTables table_name)
None makeSchema(self, bool drop=False, Optional[Tuple[int, int]] part_range=None)
def __init__(self, cassandra.cluster.Session session, str schema_file, Optional[str] extra_schema_file=None, str prefix="", str packing="none", bool time_partition_tables=False)
List[str] clusteringColumns(self, ApdbTables table_name)
List[ColumnDef] packedColumns(self, ApdbTables table_name)
Mapping[str, ColumnDef] getColumnMap(self, ApdbTables table_name)
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33