LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbSchema.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22"""Module containing methods and classes for generic APDB schema operations.
23
24The code in this module is independent of the specific technology used to
25implement APDB.
26"""
27
28from __future__ import annotations
29
30__all__ = ["ApdbSchema", "ApdbTables"]
31
32import enum
33import logging
34import os
35from collections.abc import Mapping, MutableMapping
36from functools import cached_property
37
38import felis.datamodel
39import numpy
40import yaml
41from felis.datamodel import Schema as FelisSchema
42
43from .schema_model import ExtraDataTypes, Schema, Table
44from .versionTuple import VersionTuple
45
46_LOG = logging.getLogger(__name__)
47
48# In most cases column types are determined by Cassandra driver, but in some
49# cases we need to create Pandas Dataframe ourselves and we use this map to
50# infer types of columns from their YAML schema. Note that Cassandra saves
51# timestamps with millisecond precision, but pandas maps datetime type to
52# "datetime64[ns]".
53_dtype_map: Mapping[felis.datamodel.DataType | ExtraDataTypes, type | str] = {
54 felis.datamodel.DataType.double: numpy.float64,
55 felis.datamodel.DataType.float: numpy.float32,
56 felis.datamodel.DataType.timestamp: "datetime64[ns]",
57 felis.datamodel.DataType.long: numpy.int64,
58 felis.datamodel.DataType.int: numpy.int32,
59 felis.datamodel.DataType.short: numpy.int16,
60 felis.datamodel.DataType.byte: numpy.int8,
61 felis.datamodel.DataType.binary: object,
62 felis.datamodel.DataType.char: object,
63 felis.datamodel.DataType.text: object,
64 felis.datamodel.DataType.string: object,
65 felis.datamodel.DataType.unicode: object,
66 felis.datamodel.DataType.boolean: bool,
67}
68
69
70@enum.unique
71class ApdbTables(enum.Enum):
72 """Names of the tables in APDB schema."""
73
74 DiaObject = "DiaObject"
75 """Name of the table for DIAObject records."""
76
77 DiaSource = "DiaSource"
78 """Name of the table for DIASource records."""
79
80 DiaForcedSource = "DiaForcedSource"
81 """Name of the table for DIAForcedSource records."""
82
83 DiaObjectLast = "DiaObjectLast"
84 """Name of the table for the last version of DIAObject records.
85
86 This table may be optional for some implementations.
87 """
88
89 SSObject = "SSObject"
90 """Name of the table for SSObject records."""
91
92 SSSource = "SSSource"
93 """Name of the table for SSSource records."""
94
95 DiaObject_To_Object_Match = "DiaObject_To_Object_Match"
96 """Name of the table for DiaObject_To_Object_Match records."""
97
98 metadata = "metadata"
99 """Name of the metadata table, this table may not always exist."""
100
101 def table_name(self, prefix: str = "", time_partition: int | None = None) -> str:
102 """Return full table name.
103
104 Parameters
105 ----------
106 prefix : `str`, optional
107 Optional prefix for table name.
108 time_partition : `int`, optional
109 Optional time partition, should only be used for tables that
110 support time partitioning.
111 """
112 name = f"{prefix}{self.value}"
113 if time_partition is not None:
114 name = f"{name}_{time_partition}"
115 return name
116
117
119 """Class for management of APDB schema.
120
121 Attributes
122 ----------
123 tableSchemas : `dict`
124 Maps table name to `TableDef` instance.
125
126 Parameters
127 ----------
128 schema_file : `str`
129 Name of the YAML schema file.
130 schema_name : `str`, optional
131 Name of the schema in YAML files.
132 """
133
135 self,
136 schema_file: str,
137 schema_name: str = "ApdbSchema",
138 ):
139 # build complete table schema
140 self.tableSchemas, self._schemaVersion = self._buildSchemas(schema_file, schema_name)
141
142 def column_dtype(self, felis_type: felis.datamodel.DataType | ExtraDataTypes) -> type | str:
143 """Return Pandas data type for a given Felis column type.
144
145 Parameters
146 ----------
147 felis_type : `felis.datamodel.DataType`
148 Felis type, on of the enums defined in `felis.datamodel` module.
149
150 Returns
151 -------
152 column_dtype : `type` or `str`
153 Type that can be used for columns in Pandas.
154
155 Raises
156 ------
157 TypeError
158 Raised if type is cannot be handled.
159 """
160 try:
161 return _dtype_map[felis_type]
162 except KeyError:
163 raise TypeError(f"Unexpected Felis type: {felis_type}")
164
165 def schemaVersion(self) -> VersionTuple:
166 """Return schema version as defined in YAML schema file.
167
168 Returns
169 -------
170 version : `VersionTuple`
171 Version number read from YAML file, if YAML file does not define
172 schema version then "0.1.0" is returned.
173 """
174 if self._schemaVersion is None:
175 return VersionTuple(0, 1, 0)
176 else:
177 return self._schemaVersion
178
179 @classmethod
181 cls, schema_file: str, schema_name: str = "ApdbSchema"
182 ) -> tuple[Mapping[ApdbTables, Table], VersionTuple | None]:
183 """Create schema definitions for all tables.
184
185 Reads YAML schema and builds a dictionary containing
186 `.schema_model.Table` instances for each table.
187
188 Parameters
189 ----------
190 schema_file : `str`
191 Name of YAML file with ``felis`` schema.
192 schema_name : `str`, optional
193 Name of the schema in YAML files.
194
195 Returns
196 -------
197 tables : `dict`
198 Mapping of table names to `.schema_model.Table` instances.
199 version : `VersionTuple` or `None`
200 Schema version defined in schema file, `None` if version is not
201 defined.
202 """
203 schema_file = os.path.expandvars(schema_file)
204 with open(schema_file) as yaml_stream:
205 schemas_list = list(yaml.load_all(yaml_stream, Loader=yaml.SafeLoader))
206 schemas_list = [schema for schema in schemas_list if schema.get("name") == schema_name]
207 if not schemas_list:
208 raise ValueError(f"Schema file {schema_file!r} does not define schema {schema_name!r}")
209 elif len(schemas_list) > 1:
210 raise ValueError(f"Schema file {schema_file!r} defines multiple schemas {schema_name!r}")
211 felis_schema: FelisSchema = felis.datamodel.Schema.model_validate(schemas_list[0])
212 schema = Schema.from_felis(felis_schema)
213
214 # convert all dicts into classes
215 tables: MutableMapping[ApdbTables, Table] = {}
216 for table in schema.tables:
217 try:
218 table_enum = ApdbTables(table.name)
219 except ValueError:
220 # There may be other tables in the schema that do not belong
221 # to APDB.
222 continue
223 else:
224 tables[table_enum] = table
225
226 version: VersionTuple | None = None
227 if schema.version is not None:
228 version = VersionTuple.fromString(schema.version.current)
229
230 return tables, version
231
232 @cached_property
233 def has_mjd_timestamps(self) -> bool:
234 """True if timestamps columns are in MJD (`bool`)."""
235 table = self.tableSchemas[ApdbTables.DiaObject]
236 # Look for validityStartMjdTai or validityStart
237 for column in table.columns:
238 if column.name == "validityStartMjdTai":
239 return True
240 elif column.name == "validityStart":
241 return False
242 raise LookupError(
243 "Could not find validityStart or validityStartMjdTai column in DiaObject table schema."
244 )
type|str column_dtype(self, felis.datamodel.DataType|ExtraDataTypes felis_type)
__init__(self, str schema_file, str schema_name="ApdbSchema")
tuple[Mapping[ApdbTables, Table], VersionTuple|None] _buildSchemas(cls, str schema_file, str schema_name="ApdbSchema")
tuple[Mapping[ApdbTables, Table], VersionTuple|None] _schemaVersion
str table_name(self, str prefix="", int|None time_partition=None)