22from __future__
import annotations
24__all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
30from typing
import TYPE_CHECKING, Any, cast
39 import cassandra.query
40 from cassandra.auth
import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
43 from cassandra.query
import UNSET_VALUE
45 CASSANDRA_IMPORTED =
True
47 CASSANDRA_IMPORTED =
False
51from lsst
import sphgeom
53from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
54from lsst.utils.iteration
import chunk_iterable
56from .._auth
import DB_AUTH_ENVVAR, DB_AUTH_PATH
57from ..apdb
import Apdb, ApdbConfig
58from ..apdbConfigFreezer
import ApdbConfigFreezer
59from ..apdbReplica
import ApdbTableData, ReplicaChunk
60from ..apdbSchema
import ApdbTables
61from ..monitor
import MonAgent
62from ..pixelization
import Pixelization
63from ..schema_model
import Table
64from ..timer
import Timer
65from ..versionTuple
import IncompatibleVersionError, VersionTuple
66from .apdbCassandraReplica
import ApdbCassandraReplica
67from .apdbCassandraSchema
import ApdbCassandraSchema, CreateTableOptions, ExtraTables
68from .apdbMetadataCassandra
import ApdbMetadataCassandra
69from .cassandra_utils
import (
70 PreparedStatementCache,
72 pandas_dataframe_factory,
79 from ..apdbMetadata
import ApdbMetadata
81_LOG = logging.getLogger(__name__)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
93 """Dump cassandra query to debug log."""
94 _LOG.debug(
"Cassandra query: %s", rf.query)
99 super().
__init__(
"cassandra-driver module cannot be imported")
103 """Configuration class for Cassandra-based APDB implementation."""
105 contact_points = ListField[str](
106 doc=
"The list of contact points to try connecting for cluster discovery.", default=[
"127.0.0.1"]
108 private_ips = ListField[str](doc=
"List of internal IP addresses for contact_points.", default=[])
109 port = Field[int](doc=
"Port number to connect to.", default=9042)
110 keyspace = Field[str](doc=
"Default keyspace for operations.", default=
"apdb")
111 username = Field[str](
112 doc=f
"Cassandra user name, if empty then {DB_AUTH_PATH} has to provide it with password.",
115 read_consistency = Field[str](
116 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.", default=
"QUORUM"
118 write_consistency = Field[str](
119 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.", default=
"QUORUM"
121 read_timeout = Field[float](doc=
"Timeout in seconds for read operations.", default=120.0)
122 write_timeout = Field[float](doc=
"Timeout in seconds for write operations.", default=60.0)
123 remove_timeout = Field[float](doc=
"Timeout in seconds for remove operations.", default=600.0)
124 read_concurrency = Field[int](doc=
"Concurrency level for read operations.", default=500)
125 protocol_version = Field[int](
126 doc=
"Cassandra protocol version to use, default is V4",
127 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0,
129 dia_object_columns = ListField[str](
130 doc=
"List of columns to read from DiaObject[Last], by default read all columns", default=[]
132 prefix = Field[str](doc=
"Prefix to add to table names", default=
"")
133 part_pixelization = ChoiceField[str](
134 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
135 doc=
"Pixelization used for partitioning index.",
138 part_pix_level = Field[int](doc=
"Pixelization level used for partitioning index.", default=11)
139 part_pix_max_ranges = Field[int](doc=
"Max number of ranges in pixelization envelope", default=128)
140 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
141 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
142 time_partition_tables = Field[bool](
143 doc=
"Use per-partition tables for sources instead of partitioning by time", default=
False
145 time_partition_days = Field[int](
147 "Time partitioning granularity in days, this value must not be changed after database is "
152 time_partition_start = Field[str](
154 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
155 "This is used only when time_partition_tables is True."
157 default=
"2018-12-01T00:00:00",
159 time_partition_end = Field[str](
161 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
162 "This is used only when time_partition_tables is True."
164 default=
"2030-01-01T00:00:00",
166 query_per_time_part = Field[bool](
169 "If True then build separate query for each time partition, otherwise build one single query. "
170 "This is only used when time_partition_tables is False in schema config."
173 query_per_spatial_part = Field[bool](
175 doc=
"If True then build one query per spatial partition, otherwise build single query.",
177 use_insert_id_skips_diaobjects = Field[bool](
180 "If True then do not store DiaObjects when use_insert_id is True "
181 "(DiaObjectsChunks has the same data)."
184 idle_heartbeat_interval = Field[int](
186 "Interval, in seconds, on which to heartbeat idle connections. "
187 "Zero (default) disables heartbeats."
191 idle_heartbeat_timeout = Field[int](
192 doc=
"Timeout, in seconds, on which the heartbeat wait for idle connection responses.",
197@dataclasses.dataclass
199 """Collection of information about a specific database."""
204 permissions: dict[str, set[str]] |
None =
None
205 """Roles that can access the database and their permissions.
207 `None` means that authentication information is not accessible due to
208 system table permissions. If anonymous access is enabled then dictionary
209 will be empty but not `None`.
213@dataclasses.dataclass
215 """Versions defined in APDB metadata table."""
217 schema_version: VersionTuple
218 """Version of the schema from which database was created."""
220 code_version: VersionTuple
221 """Version of ApdbCassandra with which database was created."""
223 replica_version: VersionTuple |
None
224 """Version of ApdbCassandraReplica with which database was created, None
225 if replication was not configured.
229if CASSANDRA_IMPORTED:
232 """Translate internal IP address to external.
234 Only used for docker-based setup, not viable long-term solution.
237 def __init__(self, public_ips: list[str], private_ips: list[str]):
238 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
241 return self.
_map.get(private_ip, private_ip)
245 """Implementation of APDB database on to of Apache Cassandra.
247 The implementation is configured via standard ``pex_config`` mechanism
248 using `ApdbCassandraConfig` configuration class. For an example of
249 different configurations check config/ folder.
253 config : `ApdbCassandraConfig`
254 Configuration object.
257 metadataSchemaVersionKey =
"version:schema"
258 """Name of the metadata key to store schema version number."""
260 metadataCodeVersionKey =
"version:ApdbCassandra"
261 """Name of the metadata key to store code version number."""
263 metadataReplicaVersionKey =
"version:ApdbCassandraReplica"
264 """Name of the metadata key to store replica code version number."""
266 metadataConfigKey =
"config:apdb-cassandra.json"
267 """Name of the metadata key to store code version number."""
269 _frozen_parameters = (
274 "time_partition_tables",
275 "time_partition_days",
276 "use_insert_id_skips_diaobjects",
278 """Names of the config parameters to be frozen in metadata table."""
280 partition_zero_epoch = astropy.time.Time(0, format=
"unix_tai")
281 """Start time for partition 0, this should never be changed."""
284 if not CASSANDRA_IMPORTED:
287 self.
_timer_args: list[MonAgent | logging.Logger] = [_MON]
295 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
297 self.
_session, meta_table_name, config.keyspace,
"read_tuples",
"write"
301 with self.
_timer(
"read_metadata_config"):
303 if config_json
is not None:
306 self.
config = freezer.update(config, config_json)
312 self.
config.part_pixelization,
313 self.
config.part_pix_level,
314 config.part_pix_max_ranges,
320 schema_file=self.
config.schema_file,
321 schema_name=self.
config.schema_name,
322 prefix=self.
config.prefix,
323 time_partition_tables=self.
config.time_partition_tables,
324 enable_replica=self.
config.use_insert_id,
330 with self.
_timer(
"version_check"):
343 _LOG.debug(
"ApdbCassandra Configuration:")
345 _LOG.debug(
" %s: %s", key, value)
348 if hasattr(self,
"_cluster"):
351 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
352 """Create `Timer` instance given its name."""
356 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
357 """Make Cassandra session."""
358 addressTranslator: AddressTranslator |
None =
None
359 if config.private_ips:
360 addressTranslator =
_AddressTranslator(list(config.contact_points), list(config.private_ips))
362 timer_args: list[MonAgent | logging.Logger] = [_MON]
364 timer_args.append(_LOG)
365 with Timer(
"cluster_connect", *timer_args):
371 contact_points=config.contact_points,
373 address_translator=addressTranslator,
374 protocol_version=config.protocol_version,
376 control_connection_timeout=100,
377 idle_heartbeat_interval=config.idle_heartbeat_interval,
378 idle_heartbeat_timeout=config.idle_heartbeat_timeout,
380 session = cluster.connect()
383 if _LOG.isEnabledFor(logging.DEBUG):
384 session.add_request_init_listener(_dump_query)
387 session.default_fetch_size =
None
389 return cluster, session
393 """Make Cassandra authentication provider instance."""
395 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
396 except DbAuthNotFoundError:
400 empty_username =
True
402 for hostname
in config.contact_points:
404 username, password = dbauth.getAuth(
405 "cassandra", config.username, hostname, config.port, config.keyspace
410 empty_username =
True
412 return PlainTextAuthProvider(username=username, password=password)
413 except DbAuthNotFoundError:
418 f
"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
419 f
"user name, anonymous Cassandra logon will be attempted."
425 """Check schema version compatibility."""
427 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
428 """Retrieve version number from given metadata key."""
429 if metadata.table_exists():
430 version_str = metadata.get(key)
431 if version_str
is None:
433 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
434 return VersionTuple.fromString(version_str)
445 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
447 f
"Configured schema version {self._schema.schemaVersion()} "
448 f
"is not compatible with database version {db_schema_version}"
452 f
"Current code version {self.apdbImplementationVersion()} "
453 f
"is not compatible with database version {db_code_version}"
457 db_replica_version: VersionTuple |
None =
None
458 if self.
_schema.has_replica_chunks:
460 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
461 if not code_replica_version.checkCompatibility(db_replica_version):
463 f
"Current replication code version {code_replica_version} "
464 f
"is not compatible with database version {db_replica_version}"
468 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
473 """Return version number for current APDB implementation.
477 version : `VersionTuple`
478 Version of the code defined in implementation class.
482 def tableDef(self, table: ApdbTables) -> Table |
None:
484 return self.
_schema.tableSchemas.get(table)
492 schema_file: str |
None =
None,
493 schema_name: str |
None =
None,
494 read_sources_months: int |
None =
None,
495 read_forced_sources_months: int |
None =
None,
496 use_insert_id: bool =
False,
497 use_insert_id_skips_diaobjects: bool =
False,
498 port: int |
None =
None,
499 username: str |
None =
None,
500 prefix: str |
None =
None,
501 part_pixelization: str |
None =
None,
502 part_pix_level: int |
None =
None,
503 time_partition_tables: bool =
True,
504 time_partition_start: str |
None =
None,
505 time_partition_end: str |
None =
None,
506 read_consistency: str |
None =
None,
507 write_consistency: str |
None =
None,
508 read_timeout: int |
None =
None,
509 write_timeout: int |
None =
None,
510 ra_dec_columns: list[str] |
None =
None,
511 replication_factor: int |
None =
None,
513 table_options: CreateTableOptions |
None =
None,
514 ) -> ApdbCassandraConfig:
515 """Initialize new APDB instance and make configuration object for it.
519 hosts : `list` [`str`]
520 List of host names or IP addresses for Cassandra cluster.
522 Name of the keyspace for APDB tables.
523 schema_file : `str`, optional
524 Location of (YAML) configuration file with APDB schema. If not
525 specified then default location will be used.
526 schema_name : `str`, optional
527 Name of the schema in YAML configuration file. If not specified
528 then default name will be used.
529 read_sources_months : `int`, optional
530 Number of months of history to read from DiaSource.
531 read_forced_sources_months : `int`, optional
532 Number of months of history to read from DiaForcedSource.
533 use_insert_id : `bool`, optional
534 If True, make additional tables used for replication to PPDB.
535 use_insert_id_skips_diaobjects : `bool`, optional
536 If `True` then do not fill regular ``DiaObject`` table when
537 ``use_insert_id`` is `True`.
538 port : `int`, optional
539 Port number to use for Cassandra connections.
540 username : `str`, optional
541 User name for Cassandra connections.
542 prefix : `str`, optional
543 Optional prefix for all table names.
544 part_pixelization : `str`, optional
545 Name of the MOC pixelization used for partitioning.
546 part_pix_level : `int`, optional
548 time_partition_tables : `bool`, optional
549 Create per-partition tables.
550 time_partition_start : `str`, optional
551 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
553 time_partition_end : `str`, optional
554 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
556 read_consistency : `str`, optional
557 Name of the consistency level for read operations.
558 write_consistency : `str`, optional
559 Name of the consistency level for write operations.
560 read_timeout : `int`, optional
561 Read timeout in seconds.
562 write_timeout : `int`, optional
563 Write timeout in seconds.
564 ra_dec_columns : `list` [`str`], optional
565 Names of ra/dec columns in DiaObject table.
566 replication_factor : `int`, optional
567 Replication factor used when creating new keyspace, if keyspace
568 already exists its replication factor is not changed.
569 drop : `bool`, optional
570 If `True` then drop existing tables before re-creating the schema.
571 table_options : `CreateTableOptions`, optional
572 Options used when creating Cassandra tables.
576 config : `ApdbCassandraConfig`
577 Resulting configuration object for a created APDB instance.
580 contact_points=hosts,
582 use_insert_id=use_insert_id,
583 use_insert_id_skips_diaobjects=use_insert_id_skips_diaobjects,
584 time_partition_tables=time_partition_tables,
586 if schema_file
is not None:
587 config.schema_file = schema_file
588 if schema_name
is not None:
589 config.schema_name = schema_name
590 if read_sources_months
is not None:
591 config.read_sources_months = read_sources_months
592 if read_forced_sources_months
is not None:
593 config.read_forced_sources_months = read_forced_sources_months
596 if username
is not None:
597 config.username = username
598 if prefix
is not None:
599 config.prefix = prefix
600 if part_pixelization
is not None:
601 config.part_pixelization = part_pixelization
602 if part_pix_level
is not None:
603 config.part_pix_level = part_pix_level
604 if time_partition_start
is not None:
605 config.time_partition_start = time_partition_start
606 if time_partition_end
is not None:
607 config.time_partition_end = time_partition_end
608 if read_consistency
is not None:
609 config.read_consistency = read_consistency
610 if write_consistency
is not None:
611 config.write_consistency = write_consistency
612 if read_timeout
is not None:
613 config.read_timeout = read_timeout
614 if write_timeout
is not None:
615 config.write_timeout = write_timeout
616 if ra_dec_columns
is not None:
617 config.ra_dec_columns = ra_dec_columns
619 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
625 """Return the list of keyspaces with APDB databases.
630 Name of one of the hosts in Cassandra cluster.
634 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
635 Information about databases that contain APDB instance.
642 with cluster, session:
644 table_name = ApdbTables.DiaSource.table_name()
645 query =
"select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
646 result = session.execute(query, (table_name,))
647 keyspaces = [row[0]
for row
in result.all()]
653 template =
", ".join([
"%s"] * len(keyspaces))
655 "SELECT resource, role, permissions FROM system_auth.role_permissions "
656 f
"WHERE resource IN ({template}) ALLOW FILTERING"
658 resources = [f
"data/{keyspace}" for keyspace
in keyspaces]
660 result = session.execute(query, resources)
663 infos = {keyspace:
DatabaseInfo(name=keyspace, permissions={})
for keyspace
in keyspaces}
665 _, _, keyspace = row[0].partition(
"/")
667 role_permissions: set[str] = set(row[2])
668 infos[keyspace].permissions[role] = role_permissions
669 except cassandra.Unauthorized
as exc:
673 f
"Authentication information is not accessible to current user - {exc}", stacklevel=2
675 infos = {keyspace:
DatabaseInfo(name=keyspace)
for keyspace
in keyspaces}
679 return infos.values()
683 """Delete APDB database by dropping its keyspace.
688 Name of one of the hosts in Cassandra cluster.
690 Name of keyspace to delete.
691 timeout : `int`, optional
692 Timeout for delete operation in seconds. Dropping a large keyspace
693 can be a long operation, but this default value of one hour should
694 be sufficient for most or all cases.
700 with cluster, session:
701 query = f
"DROP KEYSPACE {quote_id(keyspace)}"
702 session.execute(query, timeout=timeout)
705 """Return `ApdbReplica` instance for this database."""
716 replication_factor: int |
None =
None,
717 table_options: CreateTableOptions |
None =
None,
721 if not isinstance(config, ApdbCassandraConfig):
722 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
725 with cluster, session:
728 keyspace=config.keyspace,
729 schema_file=config.schema_file,
730 schema_name=config.schema_name,
731 prefix=config.prefix,
732 time_partition_tables=config.time_partition_tables,
733 enable_replica=config.use_insert_id,
737 if config.time_partition_tables:
738 time_partition_start = astropy.time.Time(
739 config.time_partition_start, format=
"isot", scale=
"tai"
741 time_partition_end = astropy.time.Time(config.time_partition_end, format=
"isot", scale=
"tai")
743 part_days = config.time_partition_days
750 part_range=part_range,
751 replication_factor=replication_factor,
752 table_options=table_options,
756 drop=drop, replication_factor=replication_factor, table_options=table_options
759 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
761 session, meta_table_name, config.keyspace,
"read_tuples",
"write"
765 if metadata.table_exists():
769 if config.use_insert_id:
773 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
785 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
788 column_names = self.
_schema.apdbColumnNames(ApdbTables.DiaObjectLast)
789 what =
",".join(quote_id(column)
for column
in column_names)
791 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
792 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
793 statements: list[tuple] = []
794 for where, params
in sp_where:
795 full_query = f
"{query} WHERE {where}"
797 statement = self.
_preparer.prepare(full_query)
802 statement = cassandra.query.SimpleStatement(full_query)
803 statements.append((statement, params))
804 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
806 with _MON.context_tags({
"table":
"DiaObject"}):
808 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
810 with self.
_timer(
"select_time")
as timer:
814 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
817 timer.add_values(row_count=len(objects))
819 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
823 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
824 ) -> pandas.DataFrame |
None:
826 months = self.
config.read_sources_months
829 mjd_end = visit_time.mjd
830 mjd_start = mjd_end - months * 30
832 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
835 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
836 ) -> pandas.DataFrame |
None:
838 months = self.
config.read_forced_sources_months
841 mjd_end = visit_time.mjd
842 mjd_start = mjd_end - months * 30
844 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
853 existing_tables = self.
_schema.existing_tables(ApdbTables.DiaSource, ApdbTables.DiaForcedSource)
854 tables_to_check = existing_tables[ApdbTables.DiaSource][:]
855 if self.
config.use_insert_id:
856 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaSourceChunks))
857 tables_to_check.extend(existing_tables[ApdbTables.DiaForcedSource])
858 if self.
config.use_insert_id:
859 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaForcedSourceChunks))
863 for table_name
in tables_to_check:
868 f
'SELECT * from "{self._keyspace}"."{table_name}" '
869 "WHERE visit = ? AND detector = ? "
870 "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
872 with self.
_timer(
"contains_visit_detector_time", tags={
"table": table_name})
as timer:
874 found = result.one()
is not None
875 timer.add_values(found=int(found))
883 tableName = self.
_schema.tableName(ApdbTables.SSObject)
884 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
887 with self.
_timer(
"select_time", tags={
"table":
"SSObject"})
as timer:
888 result = self.
_session.execute(query, execution_profile=
"read_pandas")
889 objects = result._current_rows
890 timer.add_values(row_count=len(objects))
892 _LOG.debug(
"found %s SSObjects", objects.shape[0])
897 visit_time: astropy.time.Time,
898 objects: pandas.DataFrame,
899 sources: pandas.DataFrame |
None =
None,
900 forced_sources: pandas.DataFrame |
None =
None,
904 if sources
is not None:
906 if forced_sources
is not None:
909 replica_chunk: ReplicaChunk |
None =
None
910 if self.
_schema.has_replica_chunks:
911 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
918 if sources
is not None:
924 if forced_sources
is not None:
926 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
940 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
942 selects: list[tuple] = []
943 for ids
in chunk_iterable(idMap.keys(), 1_000):
944 ids_str =
",".join(str(item)
for item
in ids)
948 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
949 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
957 list[tuple[int, int, int, int |
None]],
958 select_concurrent(self.
_session, selects,
"read_tuples", self.
config.read_concurrency),
962 id2partitions: dict[int, tuple[int, int]] = {}
963 id2chunk_id: dict[int, int] = {}
965 id2partitions[row[0]] = row[1:3]
966 if row[3]
is not None:
967 id2chunk_id[row[0]] = row[3]
970 if set(id2partitions) != set(idMap):
971 missing =
",".join(str(item)
for item
in set(idMap) - set(id2partitions))
972 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
975 queries = cassandra.query.BatchStatement()
976 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
977 for diaSourceId, ssObjectId
in idMap.items():
978 apdb_part, apdb_time_part = id2partitions[diaSourceId]
980 if self.
config.time_partition_tables:
982 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
983 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
984 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
986 values = (ssObjectId, apdb_part, diaSourceId)
989 f
'UPDATE "{self._keyspace}"."{table_name}"'
990 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
991 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
993 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
994 queries.add(self.
_preparer.prepare(query), values)
1002 if replica_chunks := self.
get_replica().getReplicaChunks():
1003 known_ids = set(replica_chunk.id
for replica_chunk
in replica_chunks)
1004 id2chunk_id = {key: value
for key, value
in id2chunk_id.items()
if value
in known_ids}
1006 table_name = self.
_schema.tableName(ExtraTables.DiaSourceChunks)
1007 for diaSourceId, ssObjectId
in idMap.items():
1008 if replica_chunk := id2chunk_id.get(diaSourceId):
1010 f
'UPDATE "{self._keyspace}"."{table_name}" '
1011 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
1012 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
1014 values = (ssObjectId, replica_chunk, diaSourceId)
1015 queries.add(self.
_preparer.prepare(query), values)
1017 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
1018 with self.
_timer(
"source_reassign_time")
as timer:
1019 self.
_session.execute(queries, execution_profile=
"write")
1020 timer.add_values(source_count=len(idMap))
1030 raise NotImplementedError()
1036 raise RuntimeError(
"Database schema was not initialized.")
1040 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
1041 """Make all execution profiles used in the code."""
1042 if config.private_ips:
1043 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
1045 loadBalancePolicy = RoundRobinPolicy()
1047 read_tuples_profile = ExecutionProfile(
1048 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1049 request_timeout=config.read_timeout,
1050 row_factory=cassandra.query.tuple_factory,
1051 load_balancing_policy=loadBalancePolicy,
1053 read_pandas_profile = ExecutionProfile(
1054 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1055 request_timeout=config.read_timeout,
1056 row_factory=pandas_dataframe_factory,
1057 load_balancing_policy=loadBalancePolicy,
1059 read_raw_profile = ExecutionProfile(
1060 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1061 request_timeout=config.read_timeout,
1062 row_factory=raw_data_factory,
1063 load_balancing_policy=loadBalancePolicy,
1066 read_pandas_multi_profile = ExecutionProfile(
1067 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1068 request_timeout=config.read_timeout,
1069 row_factory=pandas_dataframe_factory,
1070 load_balancing_policy=loadBalancePolicy,
1074 read_raw_multi_profile = ExecutionProfile(
1075 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1076 request_timeout=config.read_timeout,
1077 row_factory=raw_data_factory,
1078 load_balancing_policy=loadBalancePolicy,
1080 write_profile = ExecutionProfile(
1081 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
1082 request_timeout=config.write_timeout,
1083 load_balancing_policy=loadBalancePolicy,
1086 default_profile = ExecutionProfile(
1087 load_balancing_policy=loadBalancePolicy,
1090 "read_tuples": read_tuples_profile,
1091 "read_pandas": read_pandas_profile,
1092 "read_raw": read_raw_profile,
1093 "read_pandas_multi": read_pandas_multi_profile,
1094 "read_raw_multi": read_raw_multi_profile,
1095 "write": write_profile,
1096 EXEC_PROFILE_DEFAULT: default_profile,
1101 region: sphgeom.Region,
1102 object_ids: Iterable[int] |
None,
1105 table_name: ApdbTables,
1106 ) -> pandas.DataFrame:
1107 """Return catalog of DiaSource instances given set of DiaObject IDs.
1111 region : `lsst.sphgeom.Region`
1114 Collection of DiaObject IDs
1116 Lower bound of time interval.
1118 Upper bound of time interval.
1119 table_name : `ApdbTables`
1124 catalog : `pandas.DataFrame`, or `None`
1125 Catalog containing DiaSource records. Empty catalog is returned if
1126 ``object_ids`` is empty.
1128 object_id_set: Set[int] = set()
1129 if object_ids
is not None:
1130 object_id_set = set(object_ids)
1131 if len(object_id_set) == 0:
1135 tables, temporal_where = self.
_temporal_where(table_name, mjd_start, mjd_end)
1138 column_names = self.
_schema.apdbColumnNames(table_name)
1139 what =
",".join(quote_id(column)
for column
in column_names)
1142 statements: list[tuple] = []
1143 for table
in tables:
1144 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
1145 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
1146 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
1148 with _MON.context_tags({
"table": table_name.name}):
1150 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
1152 with self.
_timer(
"select_time")
as timer:
1156 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
1159 timer.add_values(row_count=len(catalog))
1162 if len(object_id_set) > 0:
1163 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
1166 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
1168 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
1173 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1178 table_name = self.
_schema.tableName(ExtraTables.ApdbReplicaChunks)
1180 f
'INSERT INTO "{self._keyspace}"."{table_name}" '
1181 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1182 "VALUES (?, ?, ?, ?)"
1187 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1188 timeout=self.
config.write_timeout,
1189 execution_profile=
"write",
1193 """Return existing mapping of diaObjectId to its last partition."""
1194 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1197 for id_chunk
in chunk_iterable(ids, 10_000):
1198 id_chunk_list = list(id_chunk)
1200 f
'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1201 f
'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1203 queries.append((query, ()))
1204 object_count += len(id_chunk_list)
1206 with self.
_timer(
"query_object_last_partitions")
as timer:
1209 select_concurrent(self.
_session, queries,
"read_raw_multi", self.
config.read_concurrency),
1211 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1213 if data.column_names() != [
"diaObjectId",
"apdb_part"]:
1214 raise RuntimeError(f
"Unexpected column names in query result: {data.column_names()}")
1216 return {row[0]: row[1]
for row
in data.rows()}
1219 """Objects in DiaObjectsLast can move from one spatial partition to
1220 another. For those objects inserting new version does not replace old
1221 one, so we need to explicitly remove old versions before inserting new
1225 new_partitions = {oid: part
for oid, part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])}
1228 moved_oids: dict[int, tuple[int, int]] = {}
1229 for oid, old_part
in old_partitions.items():
1230 new_part = new_partitions.get(oid, old_part)
1231 if new_part != old_part:
1232 moved_oids[oid] = (old_part, new_part)
1233 _LOG.debug(
"DiaObject IDs that moved to new partition: %s", moved_oids)
1237 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
1238 query = f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1239 statement = self.
_preparer.prepare(query)
1240 batch = cassandra.query.BatchStatement()
1241 for oid, (old_part, _)
in moved_oids.items():
1242 batch.add(statement, (old_part, oid))
1243 with self.
_timer(
"delete_object_last")
as timer:
1244 self.
_session.execute(batch, timeout=self.
config.write_timeout, execution_profile=
"write")
1245 timer.add_values(row_count=len(moved_oids))
1248 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1249 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1250 statement = self.
_preparer.prepare(query)
1251 batch = cassandra.query.BatchStatement()
1252 for oid, new_part
in new_partitions.items():
1253 batch.add(statement, (oid, new_part))
1255 with self.
_timer(
"update_object_last_partition")
as timer:
1256 self.
_session.execute(batch, timeout=self.
config.write_timeout, execution_profile=
"write")
1257 timer.add_values(row_count=len(batch))
1260 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1262 """Store catalog of DiaObjects from current visit.
1266 objs : `pandas.DataFrame`
1267 Catalog with DiaObject records
1268 visit_time : `astropy.time.Time`
1269 Time of the current visit.
1270 replica_chunk : `ReplicaChunk` or `None`
1271 Replica chunk identifier if replication is configured.
1274 _LOG.debug(
"No objects to write to database.")
1280 visit_time_dt = visit_time.datetime
1281 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1284 extra_columns[
"validityStart"] = visit_time_dt
1286 if not self.
config.time_partition_tables:
1287 extra_columns[
"apdb_time_part"] = time_part
1292 if replica_chunk
is None or not self.
config.use_insert_id_skips_diaobjects:
1294 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1297 if replica_chunk
is not None:
1298 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1303 table_name: ApdbTables,
1304 sources: pandas.DataFrame,
1305 replica_chunk: ReplicaChunk |
None,
1307 """Store catalog of DIASources or DIAForcedSources from current visit.
1311 table_name : `ApdbTables`
1312 Table where to store the data.
1313 sources : `pandas.DataFrame`
1314 Catalog containing DiaSource records
1315 visit_time : `astropy.time.Time`
1316 Time of the current visit.
1317 replica_chunk : `ReplicaChunk` or `None`
1318 Replica chunk identifier if replication is configured.
1322 tp_sources = sources.copy(deep=
False)
1323 tp_sources[
"apdb_time_part"] = tp_sources[
"midpointMjdTai"].apply(self.
_time_partition)
1324 extra_columns: dict[str, Any] = {}
1325 if not self.
config.time_partition_tables:
1329 partitions = set(tp_sources[
"apdb_time_part"])
1330 if len(partitions) == 1:
1332 time_part = partitions.pop()
1336 for time_part, sub_frame
in tp_sources.groupby(by=
"apdb_time_part"):
1337 sub_frame.drop(columns=
"apdb_time_part", inplace=
True)
1340 if replica_chunk
is not None:
1341 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1342 if table_name
is ApdbTables.DiaSource:
1343 extra_table = ExtraTables.DiaSourceChunks
1345 extra_table = ExtraTables.DiaForcedSourceChunks
1349 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1351 """Store mapping of diaSourceId to its partitioning values.
1355 sources : `pandas.DataFrame`
1356 Catalog containing DiaSource records
1357 visit_time : `astropy.time.Time`
1358 Time of the current visit.
1360 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1363 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1367 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1372 records: pandas.DataFrame,
1373 table_name: ApdbTables | ExtraTables,
1374 extra_columns: Mapping |
None =
None,
1375 time_part: int |
None =
None,
1377 """Store generic objects.
1379 Takes Pandas catalog and stores a bunch of records in a table.
1383 records : `pandas.DataFrame`
1384 Catalog containing object records
1385 table_name : `ApdbTables`
1386 Name of the table as defined in APDB schema.
1387 extra_columns : `dict`, optional
1388 Mapping (column_name, column_value) which gives fixed values for
1389 columns in each row, overrides values in ``records`` if matching
1390 columns exist there.
1391 time_part : `int`, optional
1392 If not `None` then insert into a per-partition table.
1396 If Pandas catalog contains additional columns not defined in table
1397 schema they are ignored. Catalog does not have to contain all columns
1398 defined in a table, but partition and clustering keys must be present
1399 in a catalog or ``extra_columns``.
1402 if extra_columns
is None:
1404 extra_fields = list(extra_columns.keys())
1407 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1409 column_map = self.
_schema.getColumnMap(table_name)
1411 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1412 fields += extra_fields
1415 required_columns = self.
_schema.partitionColumns(table_name) + self.
_schema.clusteringColumns(
1418 missing_columns = [column
for column
in required_columns
if column
not in fields]
1420 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1422 qfields = [quote_id(field)
for field
in fields]
1423 qfields_str =
",".join(qfields)
1425 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1426 table = self.
_schema.tableName(table_name)
1427 if time_part
is not None:
1428 table = f
"{table}_{time_part}"
1430 holders =
",".join([
"?"] * len(qfields))
1431 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1432 statement = self.
_preparer.prepare(query)
1436 for rec_chunk
in chunk_iterable(records.itertuples(index=
False), 50_000_000):
1437 batch = cassandra.query.BatchStatement()
1438 for rec
in rec_chunk:
1440 for field
in df_fields:
1441 if field
not in column_map:
1443 value = getattr(rec, field)
1444 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1445 if isinstance(value, pandas.Timestamp):
1446 value = value.to_pydatetime()
1447 elif value
is pandas.NaT:
1452 value = int(value * 1000)
1453 value = literal(value)
1454 values.append(UNSET_VALUE
if value
is None else value)
1455 for field
in extra_fields:
1456 value = literal(extra_columns[field])
1457 values.append(UNSET_VALUE
if value
is None else value)
1458 batch.add(statement, values)
1459 queries.append(batch)
1461 _LOG.debug(
"%s: will store %d records", self.
_schema.tableName(table_name), records.shape[0])
1462 with self.
_timer(
"insert_time", tags={
"table": table_name.name})
as timer:
1463 for batch
in queries:
1464 self.
_session.execute(batch, timeout=self.
config.write_timeout, execution_profile=
"write")
1465 timer.add_values(row_count=len(records))
1468 """Calculate spatial partition for each record and add it to a
1473 df : `pandas.DataFrame`
1474 DataFrame which has to contain ra/dec columns, names of these
1475 columns are defined by configuration ``ra_dec_columns`` field.
1479 df : `pandas.DataFrame`
1480 DataFrame with ``apdb_part`` column which contains pixel index
1481 for ra/dec coordinates.
1485 This overrides any existing column in a DataFrame with the same name
1486 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1490 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1491 ra_col, dec_col = self.
config.ra_dec_columns
1492 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1497 df[
"apdb_part"] = apdb_part
1502 """Calculate time partition number for a given time.
1506 time : `float` or `astropy.time.Time`
1507 Time for which to calculate partition number. Can be float to mean
1508 MJD or `astropy.time.Time`
1510 Epoch time for partition 0.
1512 Number of days per partition.
1517 Partition number for a given time.
1519 if isinstance(time, astropy.time.Time):
1520 mjd = float(time.mjd)
1523 days_since_epoch = mjd - epoch_mjd
1524 partition = int(days_since_epoch) // part_days
1528 """Calculate time partition number for a given time.
1532 time : `float` or `astropy.time.Time`
1533 Time for which to calculate partition number. Can be float to mean
1534 MJD or `astropy.time.Time`
1539 Partition number for a given time.
1541 if isinstance(time, astropy.time.Time):
1542 mjd = float(time.mjd)
1546 partition = int(days_since_epoch) // self.
config.time_partition_days
1550 """Make an empty catalog for a table with a given name.
1554 table_name : `ApdbTables`
1559 catalog : `pandas.DataFrame`
1562 table = self.
_schema.tableSchemas[table_name]
1565 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1566 for columnDef
in table.columns
1568 return pandas.DataFrame(data)
1573 where1: list[tuple[str, tuple]],
1574 where2: list[tuple[str, tuple]],
1575 suffix: str |
None =
None,
1576 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1577 """Make cartesian product of two parts of WHERE clause into a series
1578 of statements to execute.
1583 Initial statement prefix that comes before WHERE clause, e.g.
1584 "SELECT * from Table"
1592 for expr1, params1
in where1:
1593 for expr2, params2
in where2:
1597 wheres.append(expr1)
1599 wheres.append(expr2)
1601 full_query +=
" WHERE " +
" AND ".join(wheres)
1603 full_query +=
" " + suffix
1604 params = params1 + params2
1606 statement = self.
_preparer.prepare(full_query)
1611 statement = cassandra.query.SimpleStatement(full_query)
1612 yield (statement, params)
1615 self, region: sphgeom.Region |
None, use_ranges: bool =
False
1616 ) -> list[tuple[str, tuple]]:
1617 """Generate expressions for spatial part of WHERE clause.
1621 region : `sphgeom.Region`
1622 Spatial region for query results.
1624 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1625 p2") instead of exact list of pixels. Should be set to True for
1626 large regions covering very many pixels.
1630 expressions : `list` [ `tuple` ]
1631 Empty list is returned if ``region`` is `None`, otherwise a list
1632 of one or more (expression, parameters) tuples
1638 expressions: list[tuple[str, tuple]] = []
1639 for lower, upper
in pixel_ranges:
1642 expressions.append((
'"apdb_part" = ?', (lower,)))
1644 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1648 if self.
config.query_per_spatial_part:
1649 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1651 pixels_str =
",".join([str(pix)
for pix
in pixels])
1652 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1657 start_time: float | astropy.time.Time,
1658 end_time: float | astropy.time.Time,
1659 query_per_time_part: bool |
None =
None,
1660 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1661 """Generate table names and expressions for temporal part of WHERE
1666 table : `ApdbTables`
1667 Table to select from.
1668 start_time : `astropy.time.Time` or `float`
1669 Starting Datetime of MJD value of the time range.
1670 end_time : `astropy.time.Time` or `float`
1671 Starting Datetime of MJD value of the time range.
1672 query_per_time_part : `bool`, optional
1673 If None then use ``query_per_time_part`` from configuration.
1677 tables : `list` [ `str` ]
1678 List of the table names to query.
1679 expressions : `list` [ `tuple` ]
1680 A list of zero or more (expression, parameters) tuples.
1683 temporal_where: list[tuple[str, tuple]] = []
1684 table_name = self.
_schema.tableName(table)
1687 time_parts = list(range(time_part_start, time_part_end + 1))
1688 if self.
config.time_partition_tables:
1689 tables = [f
"{table_name}_{part}" for part
in time_parts]
1691 tables = [table_name]
1692 if query_per_time_part
is None:
1693 query_per_time_part = self.
config.query_per_time_part
1694 if query_per_time_part:
1695 temporal_where = [(
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts]
1697 time_part_list =
",".join([str(part)
for part
in time_parts])
1698 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1700 return tables, temporal_where
1703 """Update timestamp columns in input DataFrame to be naive datetime
1706 Clients may or may not generate aware timestamps, code in this class
1707 assumes that timestamps are naive, so we convert them to UTC and
1711 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1712 for column
in columns:
1714 df[column] = df[column].dt.tz_convert(
None)
std::vector< SchemaItem< Flag > > * items
str translate(self, str private_ip)
__init__(self, list[str] public_ips, list[str] private_ips)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
str metadataReplicaVersionKey
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
_partition_zero_epoch_mjd
bool containsVisitDetector(self, int visit, int detector)
str metadataSchemaVersionKey
_has_dia_object_last_to_partition
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
__init__(self, ApdbCassandraConfig config)
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
None _makeSchema(cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
_DbVersions _versionCheck(self, ApdbMetadataCassandra metadata)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
str metadataCodeVersionKey
metadataReplicaVersionKey
ApdbCassandraReplica get_replica(self)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
pandas.DataFrame getSSObjects(self)
None delete_database(cls, str host, str keyspace, *int timeout=3600)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
ApdbCassandraConfig init_database(cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool use_insert_id_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
None _deleteMovingObjects(self, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
int countUnassociatedObjects(self)
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.