LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
apdbSchema.py
Go to the documentation of this file.
1 # This file is part of dax_apdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """This module contains methods and classes for generic APDB schema operations.
23 
24 The code in this module is independent of the specific technology used to
25 implement APDB.
26 """
27 
28 from __future__ import annotations
29 
30 __all__ = ["ColumnDef", "IndexType", "IndexDef", "TableDef", "ApdbTables", "ApdbSchema"]
31 
32 import enum
33 from dataclasses import dataclass
34 import logging
35 import numpy
36 import os
37 from typing import Any, List, Mapping, Optional, Type, Union
38 import yaml
39 
40 
41 _LOG = logging.getLogger(__name__)
42 
43 # In most cases column types are determined by Cassandra driver, but in some
44 # cases we need to create Pandas Dataframe ourselves and we use this map to
45 # infer types of columns from their YAML schema.
46 _dtype_map: Mapping[str, Union[Type, str]] = dict(
47  DOUBLE=numpy.float64,
48  FLOAT=numpy.float32,
49  DATETIME="datetime64[ms]",
50  BIGINT=numpy.int64,
51  INTEGER=numpy.int32,
52  INT=numpy.int32,
53  TINYINT=numpy.int8,
54  BLOB=object,
55  CHAR=object,
56  BOOL=bool,
57 )
58 
59 
60 @dataclass
61 class ColumnDef:
62  """Column representation in schema.
63  """
64  name: str
65  """column name"""
66  type: str
67  """name of cat type (INT, FLOAT, etc.)"""
68  nullable: bool
69  """True for nullable columns"""
70  default: Any
71  """default value for column, can be None"""
72  description: Optional[str]
73  """documentation, can be None or empty"""
74  unit: Optional[str]
75  """string with unit name, can be None"""
76  ucd: Optional[str]
77  """string with ucd, can be None"""
78 
79  @property
80  def dtype(self) -> Union[Type, str]:
81  """Pandas dtype for this column"""
82  return _dtype_map.get(self.type, object)
83 
84 
85 @enum.unique
86 class IndexType(enum.Enum):
87  """Types of indices.
88  """
89  PRIMARY = "PRIMARY"
90  UNIQUE = "UNIQUE"
91  INDEX = "INDEX"
92  PARTITION = "PARTITION"
93 
94 
95 @dataclass
96 class IndexDef:
97  """Index description.
98  """
99  name: str
100  """index name, can be empty"""
101  type: IndexType
102  """Type of the index"""
103  columns: List[str]
104  """list of column names in index"""
105 
106 
107 @dataclass
108 class TableDef:
109  """Table description
110  """
111  name: str
112  """table name"""
113  description: Optional[str]
114  """documentation, can be None or empty"""
115  columns: List[ColumnDef]
116  """list of ColumnDef instances"""
117  indices: List[IndexDef]
118  """list of IndexDef instances, can be empty"""
119 
120  @property
121  def primary_key(self) -> IndexDef:
122  """Primary key index"""
123  for index in self.indices:
124  if index.type is IndexType.PRIMARY:
125  return index
126  raise ValueError(f"Table {self.name} has no primary key.")
127 
128 
129 @enum.unique
130 class ApdbTables(enum.Enum):
131  """Names of the tables in APDB schema.
132  """
133 
134  DiaObject = "DiaObject"
135  """Name of the table for DIAObject records."""
136 
137  DiaSource = "DiaSource"
138  """Name of the table for DIASource records."""
139 
140  DiaForcedSource = "DiaForcedSource"
141  """Name of the table for DIAForcedSource records."""
142 
143  DiaObjectLast = "DiaObjectLast"
144  """Name of the table for the last version of DIAObject records.
145 
146  This table may be optional for some implementations.
147  """
148 
149  SSObject = "SSObject"
150  """Name of the table for SSObject records."""
151 
152  DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
153  """Name of the table for DiaObject_To_Object_Match records."""
154 
155  def table_name(self, prefix: str = "") -> str:
156  """Return full table name.
157  """
158  return prefix + self.value
159 
160 
162  """Class for management of APDB schema.
163 
164  Attributes
165  ----------
166  tableSchemas : `dict`
167  Maps table name to `TableDef` instance.
168 
169  Parameters
170  ----------
171  schema_file : `str`
172  Name of the YAML schema file.
173  extra_schema_file : `str`, optional
174  Name of the YAML schema file with extra column definitions.
175  """
176  def __init__(self, schema_file: str, extra_schema_file: Optional[str] = None):
177  # build complete table schema
178  self.tableSchemastableSchemas = self._buildSchemas_buildSchemas(schema_file, extra_schema_file)
179 
180  def _buildSchemas(self, schema_file: str, extra_schema_file: Optional[str] = None,
181  ) -> Mapping[ApdbTables, TableDef]:
182  """Create schema definitions for all tables.
183 
184  Reads YAML schemas and builds dictionary containing `TableDef`
185  instances for each table.
186 
187  Parameters
188  ----------
189  schema_file : `str`
190  Name of YAML file with standard cat schema.
191  extra_schema_file : `str`, optional
192  Name of YAML file with extra table information or `None`.
193 
194  Returns
195  -------
196  schemas : `dict`
197  Mapping of table names to `TableDef` instances.
198  """
199 
200  schema_file = os.path.expandvars(schema_file)
201  _LOG.debug("Reading schema file %s", schema_file)
202  with open(schema_file) as yaml_stream:
203  tables = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
204  # index it by table name
205  _LOG.debug("Read %d tables from schema", len(tables))
206 
207  if extra_schema_file:
208  extra_schema_file = os.path.expandvars(extra_schema_file)
209  _LOG.debug("Reading extra schema file %s", extra_schema_file)
210  with open(extra_schema_file) as yaml_stream:
211  extras = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
212  # index it by table name
213  schemas_extra = {table['table']: table for table in extras}
214  else:
215  schemas_extra = {}
216 
217  # merge extra schema into a regular schema, for now only columns are merged
218  for table in tables:
219  table_name = table['table']
220  if table_name in schemas_extra:
221  columns = table['columns']
222  extra_columns = schemas_extra[table_name].get('columns', [])
223  extra_columns = {col['name']: col for col in extra_columns}
224  _LOG.debug("Extra columns for table %s: %s", table_name, extra_columns.keys())
225  columns = []
226  for col in table['columns']:
227  if col['name'] in extra_columns:
228  columns.append(extra_columns.pop(col['name']))
229  else:
230  columns.append(col)
231  # add all remaining extra columns
232  table['columns'] = columns + list(extra_columns.values())
233 
234  if 'indices' in schemas_extra[table_name]:
235  raise RuntimeError("Extra table definition contains indices, "
236  "merging is not implemented")
237 
238  del schemas_extra[table_name]
239 
240  # Pure "extra" table definitions may contain indices
241  tables += schemas_extra.values()
242 
243  # convert all dicts into named tuples
244  schemas = {}
245  for table in tables:
246 
247  columns = table.get('columns', [])
248 
249  try:
250  table_enum = ApdbTables(table['table'])
251  except ValueError as exc:
252  raise ValueError(f"{table['table']} is not a valid APDB table name") from exc
253 
254  table_columns = []
255  for col in columns:
256  # For prototype set default to 0 even if columns don't specify it
257  if "default" not in col:
258  default = None
259  if col['type'] not in ("BLOB", "DATETIME"):
260  default = 0
261  else:
262  default = col["default"]
263 
264  column = ColumnDef(name=col['name'],
265  type=col['type'],
266  nullable=col.get("nullable"),
267  default=default,
268  description=col.get("description"),
269  unit=col.get("unit"),
270  ucd=col.get("ucd"))
271  table_columns.append(column)
272 
273  table_indices = []
274  for idx in table.get('indices', []):
275  try:
276  index_type = IndexType(idx.get('type'))
277  except ValueError as exc:
278  raise ValueError(f"{idx.get('type')} is not a valid index type") from exc
279  index = IndexDef(name=idx.get('name'),
280  type=index_type,
281  columns=idx.get('columns'))
282  table_indices.append(index)
283 
284  schemas[table_enum] = TableDef(name=table_enum.value,
285  description=table.get('description'),
286  columns=table_columns,
287  indices=table_indices)
288 
289  return schemas
def __init__(self, str schema_file, Optional[str] extra_schema_file=None)
Definition: apdbSchema.py:176
Mapping[ApdbTables, TableDef] _buildSchemas(self, str schema_file, Optional[str] extra_schema_file=None)
Definition: apdbSchema.py:181
str table_name(self, str prefix="")
Definition: apdbSchema.py:155
Union[Type, str] dtype(self)
Definition: apdbSchema.py:80
daf::base::PropertyList * list
Definition: fits.cc:913