22from __future__
import annotations
24__all__ = [
"ApdbCassandra"]
30from typing
import TYPE_CHECKING, Any, cast
39 import cassandra.query
40 from cassandra.auth
import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
43 from cassandra.query
import UNSET_VALUE
45 CASSANDRA_IMPORTED =
True
47 CASSANDRA_IMPORTED =
False
51from lsst
import sphgeom
52from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
53from lsst.utils.iteration
import chunk_iterable
55from .._auth
import DB_AUTH_ENVVAR, DB_AUTH_PATH
56from ..apdb
import Apdb, ApdbConfig
57from ..apdbConfigFreezer
import ApdbConfigFreezer
58from ..apdbReplica
import ApdbTableData, ReplicaChunk
59from ..apdbSchema
import ApdbTables
60from ..monitor
import MonAgent
61from ..pixelization
import Pixelization
62from ..schema_model
import Table
63from ..timer
import Timer
64from ..versionTuple
import IncompatibleVersionError, VersionTuple
65from .apdbCassandraReplica
import ApdbCassandraReplica
66from .apdbCassandraSchema
import ApdbCassandraSchema, CreateTableOptions, ExtraTables
67from .apdbMetadataCassandra
import ApdbMetadataCassandra
68from .cassandra_utils
import (
69 PreparedStatementCache,
71 pandas_dataframe_factory,
76from .config
import ApdbCassandraConfig, ApdbCassandraConnectionConfig
79 from ..apdbMetadata
import ApdbMetadata
81_LOG = logging.getLogger(__name__)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
93 """Dump cassandra query to debug log."""
94 _LOG.debug(
"Cassandra query: %s", rf.query)
99 super().
__init__(
"cassandra-driver module cannot be imported")
102@dataclasses.dataclass
104 """Collection of information about a specific database."""
109 permissions: dict[str, set[str]] |
None =
None
110 """Roles that can access the database and their permissions.
112 `None` means that authentication information is not accessible due to
113 system table permissions. If anonymous access is enabled then dictionary
114 will be empty but not `None`.
118@dataclasses.dataclass
120 """Versions defined in APDB metadata table."""
122 schema_version: VersionTuple
123 """Version of the schema from which database was created."""
125 code_version: VersionTuple
126 """Version of ApdbCassandra with which database was created."""
128 replica_version: VersionTuple |
None
129 """Version of ApdbCassandraReplica with which database was created, None
130 if replication was not configured.
134if CASSANDRA_IMPORTED:
137 """Translate internal IP address to external.
139 Only used for docker-based setup, not viable long-term solution.
142 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
143 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
146 return self.
_map.get(private_ip, private_ip)
150 """Implementation of APDB database on to of Apache Cassandra.
152 The implementation is configured via standard ``pex_config`` mechanism
153 using `ApdbCassandraConfig` configuration class. For an example of
154 different configurations check config/ folder.
158 config : `ApdbCassandraConfig`
159 Configuration object.
162 metadataSchemaVersionKey =
"version:schema"
163 """Name of the metadata key to store schema version number."""
165 metadataCodeVersionKey =
"version:ApdbCassandra"
166 """Name of the metadata key to store code version number."""
168 metadataReplicaVersionKey =
"version:ApdbCassandraReplica"
169 """Name of the metadata key to store replica code version number."""
171 metadataConfigKey =
"config:apdb-cassandra.json"
172 """Name of the metadata key to store code version number."""
174 _frozen_parameters = (
177 "replica_skips_diaobjects",
178 "partitioning.part_pixelization",
179 "partitioning.part_pix_level",
180 "partitioning.time_partition_tables",
181 "partitioning.time_partition_days",
183 """Names of the config parameters to be frozen in metadata table."""
185 partition_zero_epoch = astropy.time.Time(0, format=
"unix_tai")
186 """Start time for partition 0, this should never be changed."""
189 if not CASSANDRA_IMPORTED:
196 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
198 self.
_session, meta_table_name, config.keyspace,
"read_tuples",
"write"
202 with self.
_timer(
"read_metadata_config"):
204 if config_json
is not None:
207 self.
config = freezer.update(config, config_json)
212 self.
config.partitioning.part_pixelization,
213 self.
config.partitioning.part_pix_level,
214 config.partitioning.part_pix_max_ranges,
220 schema_file=self.
config.schema_file,
221 schema_name=self.
config.schema_name,
222 prefix=self.
config.prefix,
223 time_partition_tables=self.
config.partitioning.time_partition_tables,
224 enable_replica=self.
config.enable_replica,
230 with self.
_timer(
"version_check"):
243 if _LOG.isEnabledFor(logging.DEBUG):
244 _LOG.debug(
"ApdbCassandra Configuration: %s", self.
config.model_dump())
247 if hasattr(self,
"_cluster"):
250 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
251 """Create `Timer` instance given its name."""
252 return Timer(name, _MON, tags=tags)
255 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
256 """Make Cassandra session."""
257 addressTranslator: AddressTranslator |
None =
None
258 if config.connection_config.private_ips:
260 config.contact_points, config.connection_config.private_ips
263 with Timer(
"cluster_connect", _MON):
266 contact_points=config.contact_points,
267 port=config.connection_config.port,
268 address_translator=addressTranslator,
269 protocol_version=config.connection_config.protocol_version,
271 **config.connection_config.extra_parameters,
273 session = cluster.connect()
276 if _LOG.isEnabledFor(logging.DEBUG):
277 session.add_request_init_listener(_dump_query)
280 session.default_fetch_size =
None
282 return cluster, session
286 """Make Cassandra authentication provider instance."""
288 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
289 except DbAuthNotFoundError:
293 empty_username =
True
295 for hostname
in config.contact_points:
297 username, password = dbauth.getAuth(
299 config.connection_config.username,
301 config.connection_config.port,
307 empty_username =
True
309 return PlainTextAuthProvider(username=username, password=password)
310 except DbAuthNotFoundError:
315 f
"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
316 f
"user name, anonymous Cassandra logon will be attempted."
322 """Check schema version compatibility."""
324 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
325 """Retrieve version number from given metadata key."""
326 if metadata.table_exists():
327 version_str = metadata.get(key)
328 if version_str
is None:
330 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
331 return VersionTuple.fromString(version_str)
342 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
344 f
"Configured schema version {self._schema.schemaVersion()} "
345 f
"is not compatible with database version {db_schema_version}"
349 f
"Current code version {self.apdbImplementationVersion()} "
350 f
"is not compatible with database version {db_code_version}"
354 db_replica_version: VersionTuple |
None =
None
355 if self.
_schema.has_replica_chunks:
357 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
358 if not code_replica_version.checkCompatibility(db_replica_version):
360 f
"Current replication code version {code_replica_version} "
361 f
"is not compatible with database version {db_replica_version}"
365 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
370 """Return version number for current APDB implementation.
374 version : `VersionTuple`
375 Version of the code defined in implementation class.
379 def tableDef(self, table: ApdbTables) -> Table |
None:
381 return self.
_schema.tableSchemas.get(table)
386 hosts: tuple[str, ...],
389 schema_file: str |
None =
None,
390 schema_name: str |
None =
None,
391 read_sources_months: int |
None =
None,
392 read_forced_sources_months: int |
None =
None,
393 enable_replica: bool =
False,
394 replica_skips_diaobjects: bool =
False,
395 port: int |
None =
None,
396 username: str |
None =
None,
397 prefix: str |
None =
None,
398 part_pixelization: str |
None =
None,
399 part_pix_level: int |
None =
None,
400 time_partition_tables: bool =
True,
401 time_partition_start: str |
None =
None,
402 time_partition_end: str |
None =
None,
403 read_consistency: str |
None =
None,
404 write_consistency: str |
None =
None,
405 read_timeout: int |
None =
None,
406 write_timeout: int |
None =
None,
407 ra_dec_columns: tuple[str, str] |
None =
None,
408 replication_factor: int |
None =
None,
410 table_options: CreateTableOptions |
None =
None,
411 ) -> ApdbCassandraConfig:
412 """Initialize new APDB instance and make configuration object for it.
416 hosts : `tuple` [`str`, ...]
417 List of host names or IP addresses for Cassandra cluster.
419 Name of the keyspace for APDB tables.
420 schema_file : `str`, optional
421 Location of (YAML) configuration file with APDB schema. If not
422 specified then default location will be used.
423 schema_name : `str`, optional
424 Name of the schema in YAML configuration file. If not specified
425 then default name will be used.
426 read_sources_months : `int`, optional
427 Number of months of history to read from DiaSource.
428 read_forced_sources_months : `int`, optional
429 Number of months of history to read from DiaForcedSource.
430 enable_replica : `bool`, optional
431 If True, make additional tables used for replication to PPDB.
432 replica_skips_diaobjects : `bool`, optional
433 If `True` then do not fill regular ``DiaObject`` table when
434 ``enable_replica`` is `True`.
435 port : `int`, optional
436 Port number to use for Cassandra connections.
437 username : `str`, optional
438 User name for Cassandra connections.
439 prefix : `str`, optional
440 Optional prefix for all table names.
441 part_pixelization : `str`, optional
442 Name of the MOC pixelization used for partitioning.
443 part_pix_level : `int`, optional
445 time_partition_tables : `bool`, optional
446 Create per-partition tables.
447 time_partition_start : `str`, optional
448 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
450 time_partition_end : `str`, optional
451 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
453 read_consistency : `str`, optional
454 Name of the consistency level for read operations.
455 write_consistency : `str`, optional
456 Name of the consistency level for write operations.
457 read_timeout : `int`, optional
458 Read timeout in seconds.
459 write_timeout : `int`, optional
460 Write timeout in seconds.
461 ra_dec_columns : `tuple` [`str`, `str`], optional
462 Names of ra/dec columns in DiaObject table.
463 replication_factor : `int`, optional
464 Replication factor used when creating new keyspace, if keyspace
465 already exists its replication factor is not changed.
466 drop : `bool`, optional
467 If `True` then drop existing tables before re-creating the schema.
468 table_options : `CreateTableOptions`, optional
469 Options used when creating Cassandra tables.
473 config : `ApdbCassandraConfig`
474 Resulting configuration object for a created APDB instance.
483 "idle_heartbeat_interval": 0,
484 "idle_heartbeat_timeout": 30,
485 "control_connection_timeout": 100,
489 contact_points=hosts,
491 enable_replica=enable_replica,
492 replica_skips_diaobjects=replica_skips_diaobjects,
493 connection_config=connection_config,
495 config.partitioning.time_partition_tables = time_partition_tables
496 if schema_file
is not None:
497 config.schema_file = schema_file
498 if schema_name
is not None:
499 config.schema_name = schema_name
500 if read_sources_months
is not None:
501 config.read_sources_months = read_sources_months
502 if read_forced_sources_months
is not None:
503 config.read_forced_sources_months = read_forced_sources_months
505 config.connection_config.port = port
506 if username
is not None:
507 config.connection_config.username = username
508 if prefix
is not None:
509 config.prefix = prefix
510 if part_pixelization
is not None:
511 config.partitioning.part_pixelization = part_pixelization
512 if part_pix_level
is not None:
513 config.partitioning.part_pix_level = part_pix_level
514 if time_partition_start
is not None:
515 config.partitioning.time_partition_start = time_partition_start
516 if time_partition_end
is not None:
517 config.partitioning.time_partition_end = time_partition_end
518 if read_consistency
is not None:
519 config.connection_config.read_consistency = read_consistency
520 if write_consistency
is not None:
521 config.connection_config.write_consistency = write_consistency
522 if read_timeout
is not None:
523 config.connection_config.read_timeout = read_timeout
524 if write_timeout
is not None:
525 config.connection_config.write_timeout = write_timeout
526 if ra_dec_columns
is not None:
527 config.ra_dec_columns = ra_dec_columns
529 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
535 """Return the list of keyspaces with APDB databases.
540 Name of one of the hosts in Cassandra cluster.
544 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
545 Information about databases that contain APDB instance.
552 with cluster, session:
554 table_name = ApdbTables.DiaSource.table_name()
555 query =
"select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
556 result = session.execute(query, (table_name,))
557 keyspaces = [row[0]
for row
in result.all()]
563 template =
", ".join([
"%s"] * len(keyspaces))
565 "SELECT resource, role, permissions FROM system_auth.role_permissions "
566 f
"WHERE resource IN ({template}) ALLOW FILTERING"
568 resources = [f
"data/{keyspace}" for keyspace
in keyspaces]
570 result = session.execute(query, resources)
573 infos = {keyspace:
DatabaseInfo(name=keyspace, permissions={})
for keyspace
in keyspaces}
575 _, _, keyspace = row[0].partition(
"/")
577 role_permissions: set[str] = set(row[2])
578 infos[keyspace].permissions[role] = role_permissions
579 except cassandra.Unauthorized
as exc:
583 f
"Authentication information is not accessible to current user - {exc}", stacklevel=2
585 infos = {keyspace:
DatabaseInfo(name=keyspace)
for keyspace
in keyspaces}
589 return infos.values()
593 """Delete APDB database by dropping its keyspace.
598 Name of one of the hosts in Cassandra cluster.
600 Name of keyspace to delete.
601 timeout : `int`, optional
602 Timeout for delete operation in seconds. Dropping a large keyspace
603 can be a long operation, but this default value of one hour should
604 be sufficient for most or all cases.
610 with cluster, session:
611 query = f
"DROP KEYSPACE {quote_id(keyspace)}"
612 session.execute(query, timeout=timeout)
615 """Return `ApdbReplica` instance for this database."""
626 replication_factor: int |
None =
None,
627 table_options: CreateTableOptions |
None =
None,
631 if not isinstance(config, ApdbCassandraConfig):
632 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
635 with cluster, session:
638 keyspace=config.keyspace,
639 schema_file=config.schema_file,
640 schema_name=config.schema_name,
641 prefix=config.prefix,
642 time_partition_tables=config.partitioning.time_partition_tables,
643 enable_replica=config.enable_replica,
647 if config.partitioning.time_partition_tables:
648 time_partition_start = astropy.time.Time(
649 config.partitioning.time_partition_start, format=
"isot", scale=
"tai"
651 time_partition_end = astropy.time.Time(
652 config.partitioning.time_partition_end, format=
"isot", scale=
"tai"
655 part_days = config.partitioning.time_partition_days
662 part_range=part_range,
663 replication_factor=replication_factor,
664 table_options=table_options,
668 drop=drop, replication_factor=replication_factor, table_options=table_options
671 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
673 session, meta_table_name, config.keyspace,
"read_tuples",
"write"
677 if metadata.table_exists():
681 if config.enable_replica:
685 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
697 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
700 column_names = self.
_schema.apdbColumnNames(ApdbTables.DiaObjectLast)
701 what =
",".join(quote_id(column)
for column
in column_names)
703 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
704 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
705 statements: list[tuple] = []
706 for where, params
in sp_where:
707 full_query = f
"{query} WHERE {where}"
709 statement = self.
_preparer.prepare(full_query)
714 statement = cassandra.query.SimpleStatement(full_query)
715 statements.append((statement, params))
716 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
718 with _MON.context_tags({
"table":
"DiaObject"}):
720 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
722 with self.
_timer(
"select_time")
as timer:
729 self.
config.connection_config.read_concurrency,
732 timer.add_values(row_count=len(objects))
734 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
738 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
739 ) -> pandas.DataFrame |
None:
741 months = self.
config.read_sources_months
744 mjd_end = float(visit_time.mjd)
745 mjd_start = mjd_end - months * 30
747 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
750 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
751 ) -> pandas.DataFrame |
None:
753 months = self.
config.read_forced_sources_months
756 mjd_end = float(visit_time.mjd)
757 mjd_start = mjd_end - months * 30
759 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
768 existing_tables = self.
_schema.existing_tables(ApdbTables.DiaSource, ApdbTables.DiaForcedSource)
769 tables_to_check = existing_tables[ApdbTables.DiaSource][:]
770 if self.
config.enable_replica:
771 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaSourceChunks))
772 tables_to_check.extend(existing_tables[ApdbTables.DiaForcedSource])
773 if self.
config.enable_replica:
774 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaForcedSourceChunks))
778 for table_name
in tables_to_check:
783 f
'SELECT * from "{self._keyspace}"."{table_name}" '
784 "WHERE visit = ? AND detector = ? "
785 "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
787 with self.
_timer(
"contains_visit_detector_time", tags={
"table": table_name})
as timer:
789 found = result.one()
is not None
790 timer.add_values(found=int(found))
798 tableName = self.
_schema.tableName(ApdbTables.SSObject)
799 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
802 with self.
_timer(
"select_time", tags={
"table":
"SSObject"})
as timer:
803 result = self.
_session.execute(query, execution_profile=
"read_pandas")
804 objects = result._current_rows
805 timer.add_values(row_count=len(objects))
807 _LOG.debug(
"found %s SSObjects", objects.shape[0])
812 visit_time: astropy.time.Time,
813 objects: pandas.DataFrame,
814 sources: pandas.DataFrame |
None =
None,
815 forced_sources: pandas.DataFrame |
None =
None,
819 if sources
is not None:
821 if forced_sources
is not None:
824 replica_chunk: ReplicaChunk |
None =
None
825 if self.
_schema.has_replica_chunks:
826 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
833 if sources
is not None:
839 if forced_sources
is not None:
841 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
855 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
857 selects: list[tuple] = []
858 for ids
in chunk_iterable(idMap.keys(), 1_000):
859 ids_str =
",".join(str(item)
for item
in ids)
863 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
864 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
872 list[tuple[int, int, int, int |
None]],
874 self.
_session, selects,
"read_tuples", self.
config.connection_config.read_concurrency
879 id2partitions: dict[int, tuple[int, int]] = {}
880 id2chunk_id: dict[int, int] = {}
882 id2partitions[row[0]] = row[1:3]
883 if row[3]
is not None:
884 id2chunk_id[row[0]] = row[3]
887 if set(id2partitions) != set(idMap):
888 missing =
",".join(str(item)
for item
in set(idMap) - set(id2partitions))
889 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
892 queries = cassandra.query.BatchStatement()
893 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
894 for diaSourceId, ssObjectId
in idMap.items():
895 apdb_part, apdb_time_part = id2partitions[diaSourceId]
897 if self.
config.partitioning.time_partition_tables:
899 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
900 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
901 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
903 values = (ssObjectId, apdb_part, diaSourceId)
906 f
'UPDATE "{self._keyspace}"."{table_name}"'
907 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
908 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
910 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
911 queries.add(self.
_preparer.prepare(query), values)
919 if replica_chunks := self.
get_replica().getReplicaChunks():
920 known_ids = set(replica_chunk.id
for replica_chunk
in replica_chunks)
921 id2chunk_id = {key: value
for key, value
in id2chunk_id.items()
if value
in known_ids}
923 table_name = self.
_schema.tableName(ExtraTables.DiaSourceChunks)
924 for diaSourceId, ssObjectId
in idMap.items():
925 if replica_chunk := id2chunk_id.get(diaSourceId):
927 f
'UPDATE "{self._keyspace}"."{table_name}" '
928 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
929 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
931 values = (ssObjectId, replica_chunk, diaSourceId)
932 queries.add(self.
_preparer.prepare(query), values)
934 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
935 with self.
_timer(
"source_reassign_time")
as timer:
936 self.
_session.execute(queries, execution_profile=
"write")
937 timer.add_values(source_count=len(idMap))
947 raise NotImplementedError()
953 raise RuntimeError(
"Database schema was not initialized.")
957 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
958 """Make all execution profiles used in the code."""
959 if config.connection_config.private_ips:
960 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
962 loadBalancePolicy = RoundRobinPolicy()
964 read_tuples_profile = ExecutionProfile(
965 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
966 request_timeout=config.connection_config.read_timeout,
967 row_factory=cassandra.query.tuple_factory,
968 load_balancing_policy=loadBalancePolicy,
970 read_pandas_profile = ExecutionProfile(
971 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
972 request_timeout=config.connection_config.read_timeout,
973 row_factory=pandas_dataframe_factory,
974 load_balancing_policy=loadBalancePolicy,
976 read_raw_profile = ExecutionProfile(
977 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
978 request_timeout=config.connection_config.read_timeout,
979 row_factory=raw_data_factory,
980 load_balancing_policy=loadBalancePolicy,
983 read_pandas_multi_profile = ExecutionProfile(
984 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
985 request_timeout=config.connection_config.read_timeout,
986 row_factory=pandas_dataframe_factory,
987 load_balancing_policy=loadBalancePolicy,
991 read_raw_multi_profile = ExecutionProfile(
992 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
993 request_timeout=config.connection_config.read_timeout,
994 row_factory=raw_data_factory,
995 load_balancing_policy=loadBalancePolicy,
997 write_profile = ExecutionProfile(
998 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
999 request_timeout=config.connection_config.write_timeout,
1000 load_balancing_policy=loadBalancePolicy,
1003 default_profile = ExecutionProfile(
1004 load_balancing_policy=loadBalancePolicy,
1007 "read_tuples": read_tuples_profile,
1008 "read_pandas": read_pandas_profile,
1009 "read_raw": read_raw_profile,
1010 "read_pandas_multi": read_pandas_multi_profile,
1011 "read_raw_multi": read_raw_multi_profile,
1012 "write": write_profile,
1013 EXEC_PROFILE_DEFAULT: default_profile,
1018 region: sphgeom.Region,
1019 object_ids: Iterable[int] |
None,
1022 table_name: ApdbTables,
1023 ) -> pandas.DataFrame:
1024 """Return catalog of DiaSource instances given set of DiaObject IDs.
1028 region : `lsst.sphgeom.Region`
1031 Collection of DiaObject IDs
1033 Lower bound of time interval.
1035 Upper bound of time interval.
1036 table_name : `ApdbTables`
1041 catalog : `pandas.DataFrame`, or `None`
1042 Catalog containing DiaSource records. Empty catalog is returned if
1043 ``object_ids`` is empty.
1045 object_id_set: Set[int] = set()
1046 if object_ids
is not None:
1047 object_id_set = set(object_ids)
1048 if len(object_id_set) == 0:
1052 tables, temporal_where = self.
_temporal_where(table_name, mjd_start, mjd_end)
1055 column_names = self.
_schema.apdbColumnNames(table_name)
1056 what =
",".join(quote_id(column)
for column
in column_names)
1059 statements: list[tuple] = []
1060 for table
in tables:
1061 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
1062 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
1063 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
1065 with _MON.context_tags({
"table": table_name.name}):
1067 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
1069 with self.
_timer(
"select_time")
as timer:
1075 "read_pandas_multi",
1076 self.
config.connection_config.read_concurrency,
1079 timer.add_values(row_count=len(catalog))
1082 if len(object_id_set) > 0:
1083 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
1086 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
1088 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
1093 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1098 table_name = self.
_schema.tableName(ExtraTables.ApdbReplicaChunks)
1100 f
'INSERT INTO "{self._keyspace}"."{table_name}" '
1101 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1102 "VALUES (?, ?, ?, ?)"
1107 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1108 timeout=self.
config.connection_config.write_timeout,
1109 execution_profile=
"write",
1113 """Return existing mapping of diaObjectId to its last partition."""
1114 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1117 for id_chunk
in chunk_iterable(ids, 10_000):
1118 id_chunk_list = list(id_chunk)
1120 f
'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1121 f
'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1123 queries.append((query, ()))
1124 object_count += len(id_chunk_list)
1126 with self.
_timer(
"query_object_last_partitions")
as timer:
1130 self.
_session, queries,
"read_raw_multi", self.
config.connection_config.read_concurrency
1133 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1135 if data.column_names() != [
"diaObjectId",
"apdb_part"]:
1136 raise RuntimeError(f
"Unexpected column names in query result: {data.column_names()}")
1138 return {row[0]: row[1]
for row
in data.rows()}
1141 """Objects in DiaObjectsLast can move from one spatial partition to
1142 another. For those objects inserting new version does not replace old
1143 one, so we need to explicitly remove old versions before inserting new
1147 new_partitions = {oid: part
for oid, part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])}
1150 moved_oids: dict[int, tuple[int, int]] = {}
1151 for oid, old_part
in old_partitions.items():
1152 new_part = new_partitions.get(oid, old_part)
1153 if new_part != old_part:
1154 moved_oids[oid] = (old_part, new_part)
1155 _LOG.debug(
"DiaObject IDs that moved to new partition: %s", moved_oids)
1159 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
1160 query = f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1161 statement = self.
_preparer.prepare(query)
1162 batch = cassandra.query.BatchStatement()
1163 for oid, (old_part, _)
in moved_oids.items():
1164 batch.add(statement, (old_part, oid))
1165 with self.
_timer(
"delete_object_last")
as timer:
1167 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1169 timer.add_values(row_count=len(moved_oids))
1172 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1173 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1174 statement = self.
_preparer.prepare(query)
1176 batch_size = self.
_batch_size(ExtraTables.DiaObjectLastToPartition)
1178 for chunk
in chunk_iterable(new_partitions.items(), batch_size):
1179 batch = cassandra.query.BatchStatement()
1180 for oid, new_part
in chunk:
1181 batch.add(statement, (oid, new_part))
1182 batches.append(batch)
1184 with self.
_timer(
"update_object_last_partition")
as timer:
1185 for batch
in batches:
1187 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1189 timer.add_values(row_count=len(batch))
1192 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1194 """Store catalog of DiaObjects from current visit.
1198 objs : `pandas.DataFrame`
1199 Catalog with DiaObject records
1200 visit_time : `astropy.time.Time`
1201 Time of the current visit.
1202 replica_chunk : `ReplicaChunk` or `None`
1203 Replica chunk identifier if replication is configured.
1206 _LOG.debug(
"No objects to write to database.")
1212 visit_time_dt = visit_time.datetime
1213 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1216 extra_columns[
"validityStart"] = visit_time_dt
1218 if not self.
config.partitioning.time_partition_tables:
1219 extra_columns[
"apdb_time_part"] = time_part
1224 if replica_chunk
is None or not self.
config.replica_skips_diaobjects:
1226 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1229 if replica_chunk
is not None:
1230 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1235 table_name: ApdbTables,
1236 sources: pandas.DataFrame,
1237 replica_chunk: ReplicaChunk |
None,
1239 """Store catalog of DIASources or DIAForcedSources from current visit.
1243 table_name : `ApdbTables`
1244 Table where to store the data.
1245 sources : `pandas.DataFrame`
1246 Catalog containing DiaSource records
1247 visit_time : `astropy.time.Time`
1248 Time of the current visit.
1249 replica_chunk : `ReplicaChunk` or `None`
1250 Replica chunk identifier if replication is configured.
1254 tp_sources = sources.copy(deep=
False)
1255 tp_sources[
"apdb_time_part"] = tp_sources[
"midpointMjdTai"].apply(self.
_time_partition)
1256 extra_columns: dict[str, Any] = {}
1257 if not self.
config.partitioning.time_partition_tables:
1261 partitions = set(tp_sources[
"apdb_time_part"])
1262 if len(partitions) == 1:
1264 time_part = partitions.pop()
1268 for time_part, sub_frame
in tp_sources.groupby(by=
"apdb_time_part"):
1269 sub_frame.drop(columns=
"apdb_time_part", inplace=
True)
1272 if replica_chunk
is not None:
1273 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1274 if table_name
is ApdbTables.DiaSource:
1275 extra_table = ExtraTables.DiaSourceChunks
1277 extra_table = ExtraTables.DiaForcedSourceChunks
1281 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1283 """Store mapping of diaSourceId to its partitioning values.
1287 sources : `pandas.DataFrame`
1288 Catalog containing DiaSource records
1289 visit_time : `astropy.time.Time`
1290 Time of the current visit.
1292 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1295 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1299 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1304 records: pandas.DataFrame,
1305 table_name: ApdbTables | ExtraTables,
1306 extra_columns: Mapping |
None =
None,
1307 time_part: int |
None =
None,
1309 """Store generic objects.
1311 Takes Pandas catalog and stores a bunch of records in a table.
1315 records : `pandas.DataFrame`
1316 Catalog containing object records
1317 table_name : `ApdbTables`
1318 Name of the table as defined in APDB schema.
1319 extra_columns : `dict`, optional
1320 Mapping (column_name, column_value) which gives fixed values for
1321 columns in each row, overrides values in ``records`` if matching
1322 columns exist there.
1323 time_part : `int`, optional
1324 If not `None` then insert into a per-partition table.
1328 If Pandas catalog contains additional columns not defined in table
1329 schema they are ignored. Catalog does not have to contain all columns
1330 defined in a table, but partition and clustering keys must be present
1331 in a catalog or ``extra_columns``.
1334 if extra_columns
is None:
1336 extra_fields = list(extra_columns.keys())
1339 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1341 column_map = self.
_schema.getColumnMap(table_name)
1343 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1344 fields += extra_fields
1347 required_columns = self.
_schema.partitionColumns(table_name) + self.
_schema.clusteringColumns(
1350 missing_columns = [column
for column
in required_columns
if column
not in fields]
1352 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1354 qfields = [quote_id(field)
for field
in fields]
1355 qfields_str =
",".join(qfields)
1359 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1360 table = self.
_schema.tableName(table_name)
1361 if time_part
is not None:
1362 table = f
"{table}_{time_part}"
1364 holders =
",".join([
"?"] * len(qfields))
1365 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1366 statement = self.
_preparer.prepare(query)
1370 for rec_chunk
in chunk_iterable(records.itertuples(index=
False), batch_size):
1371 batch = cassandra.query.BatchStatement()
1372 for rec
in rec_chunk:
1374 for field
in df_fields:
1375 if field
not in column_map:
1377 value = getattr(rec, field)
1378 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1379 if isinstance(value, pandas.Timestamp):
1380 value = value.to_pydatetime()
1381 elif value
is pandas.NaT:
1386 value = int(value * 1000)
1387 value = literal(value)
1388 values.append(UNSET_VALUE
if value
is None else value)
1389 for field
in extra_fields:
1390 value = literal(extra_columns[field])
1391 values.append(UNSET_VALUE
if value
is None else value)
1392 batch.add(statement, values)
1393 queries.append(batch)
1395 _LOG.debug(
"%s: will store %d records", self.
_schema.tableName(table_name), records.shape[0])
1396 with self.
_timer(
"insert_time", tags={
"table": table_name.name})
as timer:
1397 for batch
in queries:
1399 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1401 timer.add_values(row_count=len(records))
1404 """Calculate spatial partition for each record and add it to a
1409 df : `pandas.DataFrame`
1410 DataFrame which has to contain ra/dec columns, names of these
1411 columns are defined by configuration ``ra_dec_columns`` field.
1415 df : `pandas.DataFrame`
1416 DataFrame with ``apdb_part`` column which contains pixel index
1417 for ra/dec coordinates.
1421 This overrides any existing column in a DataFrame with the same name
1422 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1426 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1427 ra_col, dec_col = self.
config.ra_dec_columns
1428 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1433 df[
"apdb_part"] = apdb_part
1438 """Calculate time partition number for a given time.
1442 time : `float` or `astropy.time.Time`
1443 Time for which to calculate partition number. Can be float to mean
1444 MJD or `astropy.time.Time`
1446 Epoch time for partition 0.
1448 Number of days per partition.
1453 Partition number for a given time.
1455 if isinstance(time, astropy.time.Time):
1456 mjd = float(time.mjd)
1459 days_since_epoch = mjd - epoch_mjd
1460 partition = int(days_since_epoch) // part_days
1464 """Calculate time partition number for a given time.
1468 time : `float` or `astropy.time.Time`
1469 Time for which to calculate partition number. Can be float to mean
1470 MJD or `astropy.time.Time`
1475 Partition number for a given time.
1477 if isinstance(time, astropy.time.Time):
1478 mjd = float(time.mjd)
1482 partition = int(days_since_epoch) // self.
config.partitioning.time_partition_days
1486 """Make an empty catalog for a table with a given name.
1490 table_name : `ApdbTables`
1495 catalog : `pandas.DataFrame`
1498 table = self.
_schema.tableSchemas[table_name]
1501 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1502 for columnDef
in table.columns
1504 return pandas.DataFrame(data)
1509 where1: list[tuple[str, tuple]],
1510 where2: list[tuple[str, tuple]],
1511 suffix: str |
None =
None,
1512 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1513 """Make cartesian product of two parts of WHERE clause into a series
1514 of statements to execute.
1519 Initial statement prefix that comes before WHERE clause, e.g.
1520 "SELECT * from Table"
1528 for expr1, params1
in where1:
1529 for expr2, params2
in where2:
1533 wheres.append(expr1)
1535 wheres.append(expr2)
1537 full_query +=
" WHERE " +
" AND ".join(wheres)
1539 full_query +=
" " + suffix
1540 params = params1 + params2
1542 statement = self.
_preparer.prepare(full_query)
1547 statement = cassandra.query.SimpleStatement(full_query)
1548 yield (statement, params)
1551 self, region: sphgeom.Region |
None, use_ranges: bool =
False
1552 ) -> list[tuple[str, tuple]]:
1553 """Generate expressions for spatial part of WHERE clause.
1557 region : `sphgeom.Region`
1558 Spatial region for query results.
1560 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1561 p2") instead of exact list of pixels. Should be set to True for
1562 large regions covering very many pixels.
1566 expressions : `list` [ `tuple` ]
1567 Empty list is returned if ``region`` is `None`, otherwise a list
1568 of one or more (expression, parameters) tuples
1574 expressions: list[tuple[str, tuple]] = []
1575 for lower, upper
in pixel_ranges:
1578 expressions.append((
'"apdb_part" = ?', (lower,)))
1580 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1584 if self.
config.partitioning.query_per_spatial_part:
1585 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1587 pixels_str =
",".join([str(pix)
for pix
in pixels])
1588 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1593 start_time: float | astropy.time.Time,
1594 end_time: float | astropy.time.Time,
1595 query_per_time_part: bool |
None =
None,
1596 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1597 """Generate table names and expressions for temporal part of WHERE
1602 table : `ApdbTables`
1603 Table to select from.
1604 start_time : `astropy.time.Time` or `float`
1605 Starting Datetime of MJD value of the time range.
1606 end_time : `astropy.time.Time` or `float`
1607 Starting Datetime of MJD value of the time range.
1608 query_per_time_part : `bool`, optional
1609 If None then use ``query_per_time_part`` from configuration.
1613 tables : `list` [ `str` ]
1614 List of the table names to query.
1615 expressions : `list` [ `tuple` ]
1616 A list of zero or more (expression, parameters) tuples.
1619 temporal_where: list[tuple[str, tuple]] = []
1620 table_name = self.
_schema.tableName(table)
1623 time_parts = list(range(time_part_start, time_part_end + 1))
1624 if self.
config.partitioning.time_partition_tables:
1625 tables = [f
"{table_name}_{part}" for part
in time_parts]
1627 tables = [table_name]
1628 if query_per_time_part
is None:
1629 query_per_time_part = self.
config.partitioning.query_per_time_part
1630 if query_per_time_part:
1631 temporal_where = [(
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts]
1633 time_part_list =
",".join([str(part)
for part
in time_parts])
1634 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1636 return tables, temporal_where
1639 """Update timestamp columns in input DataFrame to be naive datetime
1642 Clients may or may not generate aware timestamps, code in this class
1643 assumes that timestamps are naive, so we convert them to UTC and
1647 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1648 for column
in columns:
1650 df[column] = df[column].dt.tz_convert(
None)
1654 """Calculate batch size based on config parameters."""
1657 if 0 < self.
config.batch_statement_limit < batch_size:
1658 batch_size = self.
config.batch_statement_limit
1659 if self.
config.batch_size_limit > 0:
1667 row_size = self.
_schema.table_row_size(table)
1668 row_size += 4 * len(self.
_schema.getColumnMap(table))
1669 batch_size = min(batch_size, (self.
config.batch_size_limit // row_size) + 1)
str translate(self, str private_ip)
__init__(self, tuple[str,...] public_ips, tuple[str,...] private_ips)
int _batch_size(self, ApdbTables|ExtraTables table)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
str metadataSchemaVersionKey
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
_partition_zero_epoch_mjd
bool containsVisitDetector(self, int visit, int detector)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
tuple[Cluster, Session] _session
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
__init__(self, ApdbCassandraConfig config)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
_DbVersions _versionCheck(self, ApdbMetadataCassandra metadata)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
str metadataReplicaVersionKey
_DbVersions|None _db_versions
ApdbCassandraReplica get_replica(self)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
pandas.DataFrame getSSObjects(self)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
None _deleteMovingObjects(self, pandas.DataFrame objs)
str metadataCodeVersionKey
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
None delete_database(cls, str host, str keyspace, *, int timeout=3600)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
int countUnassociatedObjects(self)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
_DbVersions|None _has_dia_object_last_to_partition
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.