LSST Applications g00274db5b6+edbf708997,g00d0e8bbd7+edbf708997,g199a45376c+5137f08352,g1fd858c14a+1d4b6db739,g262e1987ae+f4d9505c4f,g29ae962dfc+7156fb1a53,g2cef7863aa+73c82f25e4,g35bb328faa+edbf708997,g3e17d7035e+5b3adc59f5,g3fd5ace14f+852fa6fbcb,g47891489e3+6dc8069a4c,g53246c7159+edbf708997,g64539dfbff+9f17e571f4,g67b6fd64d1+6dc8069a4c,g74acd417e5+ae494d68d9,g786e29fd12+af89c03590,g7ae74a0b1c+a25e60b391,g7aefaa3e3d+536efcc10a,g7cc15d900a+d121454f8d,g87389fa792+a4172ec7da,g89139ef638+6dc8069a4c,g8d7436a09f+28c28d8d6d,g8ea07a8fe4+db21c37724,g92c671f44c+9f17e571f4,g98df359435+b2e6376b13,g99af87f6a8+b0f4ad7b8d,gac66b60396+966efe6077,gb88ae4c679+7dec8f19df,gbaa8f7a6c5+38b34f4976,gbf99507273+edbf708997,gc24b5d6ed1+9f17e571f4,gca7fc764a6+6dc8069a4c,gcc769fe2a4+97d0256649,gd7ef33dd92+6dc8069a4c,gdab6d2f7ff+ae494d68d9,gdbb4c4dda9+9f17e571f4,ge410e46f29+6dc8069a4c,geaed405ab2+e194be0d2b,w.2025.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module containing methods and classes for generic APDB schema operations.
23
24The code in this module is independent of the specific technology used to
25implement APDB.
26"""
27
28from __future__ import annotations
29
30__all__ = ["ApdbSchema", "ApdbTables"]
31
32import enum
33import logging
34from collections.abc import Mapping, MutableMapping
35from functools import cached_property
36
37import felis.datamodel
38import numpy
39
40from .schema_model import ExtraDataTypes, Schema, Table
41from .versionTuple import VersionTuple
42
43_LOG = logging.getLogger(__name__)
44
45# In most cases column types are determined by Cassandra driver, but in some
46# cases we need to create Pandas Dataframe ourselves and we use this map to
47# infer types of columns from their YAML schema. Note that Cassandra saves
48# timestamps with millisecond precision, but pandas maps datetime type to
49# "datetime64[ns]".
50_dtype_map: Mapping[felis.datamodel.DataType | ExtraDataTypes, type | str] = {
51 felis.datamodel.DataType.double: numpy.float64,
52 felis.datamodel.DataType.float: numpy.float32,
53 felis.datamodel.DataType.timestamp: "datetime64[ns]",
54 felis.datamodel.DataType.long: numpy.int64,
55 felis.datamodel.DataType.int: numpy.int32,
56 felis.datamodel.DataType.short: numpy.int16,
57 felis.datamodel.DataType.byte: numpy.int8,
58 felis.datamodel.DataType.binary: object,
59 felis.datamodel.DataType.char: object,
60 felis.datamodel.DataType.text: object,
61 felis.datamodel.DataType.string: object,
62 felis.datamodel.DataType.unicode: object,
63 felis.datamodel.DataType.boolean: bool,
64}
65
66
67@enum.unique
68class ApdbTables(enum.Enum):
69 """Names of the tables in APDB schema."""
70
71 DiaObject = "DiaObject"
72 """Name of the table for DIAObject records."""
73
74 DiaSource = "DiaSource"
75 """Name of the table for DIASource records."""
76
77 DiaForcedSource = "DiaForcedSource"
78 """Name of the table for DIAForcedSource records."""
79
80 DiaObjectLast = "DiaObjectLast"
81 """Name of the table for the last version of DIAObject records.
82
83 This table may be optional for some implementations.
84 """
85
86 SSObject = "SSObject"
87 """Name of the table for SSObject records."""
88
89 SSSource = "SSSource"
90 """Name of the table for SSSource records."""
91
92 DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
93 """Name of the table for DiaObject_To_Object_Match records."""
94
95 metadata = "metadata"
96 """Name of the metadata table, this table may not always exist."""
97
98 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
99 """Return full table name.
100
101 Parameters
102 ----------
103 prefix : `str`, optional
104 Optional prefix for table name.
105 time_partition : `int`, optional
106 Optional time partition, should only be used for tables that
107 support time partitioning.
108 """
109 name = f"{prefix}{self.value}"
110 if time_partition is not None:
111 name = f"{name}_{time_partition}"
112 return name
113
114
116 """Class for management of APDB schema.
117
118 Attributes
119 ----------
120 tableSchemas : `dict`
121 Maps table name to `TableDef` instance.
122
123 Parameters
124 ----------
125 schema_file : `str`
126 Location of the YAML file with APDB schema.
127 ss_schema_file : `str`
128 Location of the YAML file with SSO schema. File will be loaded if APDB
129 schema file does not contain SSObject/SSSource tables. Can be set to
130 empty string to skip loading of SSObject/SSSource schema.
131 """
132
134 self,
135 schema_file: str,
136 ss_schema_file: str,
137 ):
138 # build complete table schema
139 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file)
140 if ss_schema_file:
141 if ApdbTables.SSObject not in self.tableSchemas or ApdbTables.SSSource not in self.tableSchemas:
142 # Read additional SSP schema.
143 ssp_tables, _ = self._buildSchemas(ss_schema_file)
144 if ApdbTables.SSObject not in ssp_tables or ApdbTables.SSSource not in ssp_tables:
145 raise LookupError(f"Cannot locate SSObject/SSSource table in {ss_schema_file}")
146 self.tableSchemas = dict(self.tableSchemas) | {
147 ApdbTables.SSObject: ssp_tables[ApdbTables.SSObject],
148 ApdbTables.SSSource: ssp_tables[ApdbTables.SSSource],
149 }
150
151 def column_dtype(self, felis_type: felis.datamodel.DataType | ExtraDataTypes) -> type | str:
152 """Return Pandas data type for a given Felis column type.
153
154 Parameters
155 ----------
156 felis_type : `felis.datamodel.DataType`
157 Felis type, on of the enums defined in `felis.datamodel` module.
158
159 Returns
160 -------
161 column_dtype : `type` or `str`
162 Type that can be used for columns in Pandas.
163
164 Raises
165 ------
166 TypeError
167 Raised if type is cannot be handled.
168 """
169 try:
170 return _dtype_map[felis_type]
171 except KeyError:
172 raise TypeError(f"Unexpected Felis type: {felis_type}")
173
174 def schemaVersion(self) -> VersionTuple:
175 """Return schema version as defined in YAML schema file.
176
177 Returns
178 -------
179 version : `VersionTuple`
180 Version number read from YAML file, if YAML file does not define
181 schema version then "0.1.0" is returned.
182 """
183 if self._schemaVersion is None:
184 return VersionTuple(0, 1, 0)
185 else:
186 return self._schemaVersion
187
188 @classmethod
189 def _buildSchemas(cls, schema_file: str) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]:
190 """Create schema definitions for tables from felis schema.
191
192 Reads YAML schema and builds a dictionary containing
193 `.schema_model.Table` instances for each APDB table appearing in that
194 schema.
195
196 Parameters
197 ----------
198 schema_file : `str`
199 Name of YAML file with ``felis`` schema.
200
201 Returns
202 -------
203 tables : `dict` [`ApdbTables`, `schema_model.Table`]
204 Mapping of table names to `.schema_model.Table` instances.
205 version : `VersionTuple` or `None`
206 Schema version defined in schema file, `None` if version is not
207 defined.
208 """
209 _LOG.debug("Loading felis schema from %s", schema_file)
210 felis_schema = felis.datamodel.Schema.from_uri(schema_file, context={"id_generation": True})
211 schema = Schema.from_felis(felis_schema)
212
213 # convert all dicts into classes
214 tables: MutableMapping[ApdbTables, Table] = {}
215 for table in schema.tables:
216 try:
217 table_enum = ApdbTables(table.name)
218 except ValueError:
219 # There may be other tables in the schema that do not belong
220 # to APDB.
221 continue
222 else:
223 tables[table_enum] = table
224
225 version: VersionTuple | None = None
226 if schema.version is not None:
227 version = VersionTuple.fromString(schema.version.current)
228
229 _LOG.debug("Loaded schema for tables %s", list(tables))
230 return tables, version
231
232 @cached_property
233 def has_mjd_timestamps(self) -> bool:
234 """True if timestamps columns are in MJD (`bool`)."""
235 table = self.tableSchemas[ApdbTables.DiaObject]
236 # Look for validityStartMjdTai or validityStart
237 for column in table.columns:
238 if column.name == "validityStartMjdTai":
239 return True
240 elif column.name == "validityStart":
241 return False
242 raise LookupError(
243 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema."
244 )
__init__(self, str schema_file, str ss_schema_file)
type|str column_dtype(self, felis.datamodel.DataType|ExtraDataTypes felis_type)
tuple[Mapping[ApdbTables, Table], VersionTuple|None] _buildSchemas(cls, str schema_file)
tuple[Mapping[ApdbTables, Table], VersionTuple|None] _schemaVersion
str table_name(self, str prefix="", int|None time_partition=None)
Definition apdbSchema.py:98