LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
apdbSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""This module contains methods and classes for generic APDB schema operations.
23
24The code in this module is independent of the specific technology used to
25implement APDB.
26"""
27
28from __future__ import annotations
29
30__all__ = ["ColumnDef", "IndexType", "IndexDef", "TableDef", "ApdbTables", "ApdbSchema"]
31
32import enum
33from dataclasses import dataclass
34import logging
35import numpy
36import os
37from typing import Any, Dict, List, Mapping, Optional, Type, Union
38import yaml
39
40
41_LOG = logging.getLogger(__name__)
42
43# In most cases column types are determined by Cassandra driver, but in some
44# cases we need to create Pandas Dataframe ourselves and we use this map to
45# infer types of columns from their YAML schema.
46_dtype_map: Mapping[str, Union[Type, str]] = dict(
47 double=numpy.float64,
48 float=numpy.float32,
49 timestamp="datetime64[ms]",
50 long=numpy.int64,
51 int=numpy.int32,
52 short=numpy.int16,
53 byte=numpy.int8,
54 binary=object,
55 char=object,
56 text=object,
57 string=object,
58 unicode=object,
59 boolean=bool,
60)
61
62
63@dataclass
65 """Column representation in schema."""
66
67 name: str
68 """column name"""
69 type: str
70 """name of cat type (INT, FLOAT, etc.)"""
71 nullable: bool
72 """True for nullable columns"""
73 length: Optional[int] = None
74 """Optiona length for string/binary columns"""
75 default: Any = None
76 """default value for column, can be None"""
77 description: Optional[str] = None
78 """documentation, can be None or empty"""
79 unit: Optional[str] = None
80 """string with unit name, can be None"""
81 ucd: Optional[str] = None
82 """string with ucd, can be None"""
83
84 @property
85 def dtype(self) -> Union[Type, str]:
86 """Pandas dtype for this column"""
87 return _dtype_map.get(self.type, object)
88
89
90@enum.unique
91class IndexType(enum.Enum):
92 """Types of indices."""
93
94 PRIMARY = "PRIMARY"
95 UNIQUE = "UNIQUE"
96 INDEX = "INDEX"
97 PARTITION = "PARTITION"
98
99
100@dataclass
102 """Index description."""
103
104 name: str
105 """index name, can be empty"""
106 type: IndexType
107 """Type of the index"""
108 columns: List[str]
109 """list of column names in index"""
110
111
112@dataclass
114 """Table description"""
115
116 name: str
117 """table name"""
118 columns: List[ColumnDef]
119 """list of ColumnDef instances"""
120 indices: List[IndexDef]
121 """list of IndexDef instances, can be empty"""
122 description: Optional[str] = None
123 """documentation, can be None or empty"""
124
125 @property
126 def primary_key(self) -> IndexDef:
127 """Primary key index"""
128 for index in self.indices:
129 if index.type is IndexType.PRIMARY:
130 return index
131 raise ValueError(f"Table {self.name} has no primary key.")
132
133
134@enum.unique
135class ApdbTables(enum.Enum):
136 """Names of the tables in APDB schema."""
137
138 DiaObject = "DiaObject"
139 """Name of the table for DIAObject records."""
140
141 DiaSource = "DiaSource"
142 """Name of the table for DIASource records."""
143
144 DiaForcedSource = "DiaForcedSource"
145 """Name of the table for DIAForcedSource records."""
146
147 DiaObjectLast = "DiaObjectLast"
148 """Name of the table for the last version of DIAObject records.
149
150 This table may be optional for some implementations.
151 """
152
153 SSObject = "SSObject"
154 """Name of the table for SSObject records."""
155
156 DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
157 """Name of the table for DiaObject_To_Object_Match records."""
158
159 def table_name(self, prefix: str = "") -> str:
160 """Return full table name."""
161 return prefix + self.value
162
163
165 """Class for management of APDB schema.
166
167 Attributes
168 ----------
169 tableSchemas : `dict`
170 Maps table name to `TableDef` instance.
171
172 Parameters
173 ----------
174 schema_file : `str`
175 Name of the YAML schema file.
176 schema_name : `str`, optional
177 Name of the schema in YAML files.
178 """
179
181 self,
182 schema_file: str,
183 schema_name: str = "ApdbSchema",
184 ):
185 # build complete table schema
186 self.tableSchemastableSchemas = self._buildSchemas_buildSchemas(schema_file, schema_name)
187
188 def _readTables(self, schema_file: str, schema_name: str) -> List[Dict[str, Any]]:
189 """Read table schema from YAML file.
190
191 Parameters
192 ----------
193 schema_file : `str`
194 Name of YAML file with ``felis`` schema.
195 schema_name : `str`, optional
196 Name of the schema in YAML files.
197
198 Returns
199 -------
200 tables : `list`
201 List of table definition objects.
202 """
203 schema_file = os.path.expandvars(schema_file)
204 _LOG.debug("Reading schema file %s", schema_file)
205 with open(schema_file) as yaml_stream:
206 schemas = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
207 schemas = [schema for schema in schemas if schema.get("name") == schema_name]
208 if not schemas:
209 raise ValueError(f"Schema file {schema_file!r} does not define schema {schema_name!r}")
210 elif len(schemas) > 1:
211 raise ValueError(f"Schema file {schema_file!r} defines multiple schemas {schema_name!r}")
212 schema = schemas[0]
213 try:
214 tables = schema["tables"]
215 except KeyError:
216 raise ValueError(f"Schema definition file {schema_file!r} defines no tables")
217 _LOG.debug("Read %d tables from schema", len(tables))
218 return tables
219
220 def _buildSchemas(
221 self,
222 schema_file: str,
223 schema_name: str = "ApdbSchema",
224 ) -> Mapping[ApdbTables, TableDef]:
225 """Create schema definitions for all tables.
226
227 Reads YAML schemas and builds dictionary containing `TableDef`
228 instances for each table.
229
230 Parameters
231 ----------
232 schema_file : `str`
233 Name of YAML file with ``felis`` schema.
234 schema_name : `str`, optional
235 Name of the schema in YAML files.
236
237 Returns
238 -------
239 schemas : `dict`
240 Mapping of table names to `TableDef` instances.
241 """
242
243 schema_file = os.path.expandvars(schema_file)
244 tables = self._readTables_readTables(schema_file, schema_name)
245
246 # convert all dicts into classes
247 schemas = {}
248 for table in tables:
249 try:
250 table_enum = ApdbTables(table["name"])
251 except ValueError:
252 # There may be other tables in the schema that do not belong
253 # to APDB.
254 continue
255
256 columns = table.get("columns", [])
257
258 table_columns = []
259 column_map = {}
260 for col in columns:
261 column = ColumnDef(
262 name=col["name"],
263 type=col["datatype"],
264 nullable=col.get("nullable", True),
265 length=col.get("length"),
266 default=col.get("value"),
267 description=col.get("description"),
268 unit=col.get("fits:tunit"),
269 ucd=col.get("ivoa:ucd"),
270 )
271 table_columns.append(column)
272 column_map[col["@id"]] = column
273
274 table_indices = []
275
276 # PK
277 if (idx := table.get("primaryKey")) is not None:
278 if isinstance(idx, list):
279 columns = [column_map[col_id].name for col_id in idx]
280 else:
281 columns = [column_map[idx].name]
282 index = IndexDef(name="", type=IndexType.PRIMARY, columns=columns)
283 table_indices.append(index)
284
285 # usual indices
286 for idx in table.get("indexes", []):
287 columns = [column_map[col_id].name for col_id in idx.get("columns")]
288 index = IndexDef(name=idx.get("name"), type=IndexType.INDEX, columns=columns)
289 table_indices.append(index)
290
291 # Other constraints, for now only Unique is going to work, foreign
292 # keys support may be added later.
293 for idx in table.get("constraints", []):
294 try:
295 contraint_type = idx.get["@type"]
296 index_type = IndexType(contraint_type.upper())
297 except ValueError:
298 raise ValueError(f"{contraint_type} is not a valid index type") from None
299 index = IndexDef(name=idx.get("name"), type=index_type, columns=idx.get("columns"))
300 table_indices.append(index)
301
302 schemas[table_enum] = TableDef(
303 name=table_enum.value,
304 description=table.get("description"),
305 columns=table_columns,
306 indices=table_indices,
307 )
308
309 return schemas
def __init__(self, str schema_file, str schema_name="ApdbSchema")
Definition: apdbSchema.py:184
List[Dict[str, Any]] _readTables(self, str schema_file, str schema_name)
Definition: apdbSchema.py:188
Mapping[ApdbTables, TableDef] _buildSchemas(self, str schema_file, str schema_name="ApdbSchema")
Definition: apdbSchema.py:224
str table_name(self, str prefix="")
Definition: apdbSchema.py:159
Union[Type, str] dtype(self)
Definition: apdbSchema.py:85
daf::base::PropertyList * list
Definition: fits.cc:913