22from __future__
import annotations
24__all__ = [
"ApdbCassandra"]
31from typing
import TYPE_CHECKING, Any, cast
40 import cassandra.query
41 from cassandra.auth
import AuthProvider, PlainTextAuthProvider
42 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
43 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
44 from cassandra.query
import UNSET_VALUE
46 CASSANDRA_IMPORTED =
True
48 CASSANDRA_IMPORTED =
False
52from lsst
import sphgeom
53from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
54from lsst.utils.iteration
import chunk_iterable
56from ..apdb
import Apdb, ApdbConfig
57from ..apdbConfigFreezer
import ApdbConfigFreezer
58from ..apdbReplica
import ApdbTableData, ReplicaChunk
59from ..apdbSchema
import ApdbTables
60from ..monitor
import MonAgent
61from ..pixelization
import Pixelization
62from ..schema_model
import Table
63from ..timer
import Timer
64from ..versionTuple
import IncompatibleVersionError, VersionTuple
65from .apdbCassandraReplica
import ApdbCassandraReplica
66from .apdbCassandraSchema
import ApdbCassandraSchema, CreateTableOptions, ExtraTables
67from .apdbMetadataCassandra
import ApdbMetadataCassandra
68from .cassandra_utils
import (
69 PreparedStatementCache,
71 pandas_dataframe_factory,
76from .config
import ApdbCassandraConfig, ApdbCassandraConnectionConfig
79 from ..apdbMetadata
import ApdbMetadata
81_LOG = logging.getLogger(__name__)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
93 """Dump cassandra query to debug log."""
94 _LOG.debug(
"Cassandra query: %s", rf.query)
99 super().
__init__(
"cassandra-driver module cannot be imported")
102@dataclasses.dataclass
104 """Collection of information about a specific database."""
109 permissions: dict[str, set[str]] |
None =
None
110 """Roles that can access the database and their permissions.
112 `None` means that authentication information is not accessible due to
113 system table permissions. If anonymous access is enabled then dictionary
114 will be empty but not `None`.
118@dataclasses.dataclass
120 """Versions defined in APDB metadata table."""
122 schema_version: VersionTuple
123 """Version of the schema from which database was created."""
125 code_version: VersionTuple
126 """Version of ApdbCassandra with which database was created."""
128 replica_version: VersionTuple |
None
129 """Version of ApdbCassandraReplica with which database was created, None
130 if replication was not configured.
134if CASSANDRA_IMPORTED:
137 """Translate internal IP address to external.
139 Only used for docker-based setup, not viable long-term solution.
142 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
143 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
142 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
…
146 return self.
_map.get(private_ip, private_ip)
150 """Implementation of APDB database on to of Apache Cassandra.
152 The implementation is configured via standard ``pex_config`` mechanism
153 using `ApdbCassandraConfig` configuration class. For an example of
154 different configurations check config/ folder.
158 config : `ApdbCassandraConfig`
159 Configuration object.
162 metadataSchemaVersionKey =
"version:schema"
163 """Name of the metadata key to store schema version number."""
165 metadataCodeVersionKey =
"version:ApdbCassandra"
166 """Name of the metadata key to store code version number."""
168 metadataReplicaVersionKey =
"version:ApdbCassandraReplica"
169 """Name of the metadata key to store replica code version number."""
171 metadataConfigKey =
"config:apdb-cassandra.json"
172 """Name of the metadata key to store code version number."""
174 _frozen_parameters = (
177 "replica_skips_diaobjects",
178 "replica_sub_chunk_count",
179 "partitioning.part_pixelization",
180 "partitioning.part_pix_level",
181 "partitioning.time_partition_tables",
182 "partitioning.time_partition_days",
184 """Names of the config parameters to be frozen in metadata table."""
186 partition_zero_epoch = astropy.time.Time(0, format=
"unix_tai")
187 """Start time for partition 0, this should never be changed."""
190 if not CASSANDRA_IMPORTED:
197 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
199 self.
_session, meta_table_name, config.keyspace,
"read_tuples",
"write"
203 with self.
_timer(
"read_metadata_config"):
205 if config_json
is not None:
208 self.
config = freezer.update(config, config_json)
219 if self.
config.enable_replica:
220 assert self.
_db_versions.replica_version
is not None,
"Replica version must be defined"
226 self.
config.partitioning.part_pixelization,
227 self.
config.partitioning.part_pix_level,
228 config.partitioning.part_pix_max_ranges,
234 schema_file=self.
config.schema_file,
235 schema_name=self.
config.schema_name,
236 prefix=self.
config.prefix,
237 time_partition_tables=self.
config.partitioning.time_partition_tables,
238 enable_replica=self.
config.enable_replica,
244 schema_version=self.
_schema.schemaVersion(),
247 ApdbCassandraReplica.apdbReplicaImplementationVersion()
248 if self.
config.enable_replica
252 _LOG.debug(
"Current versions: %s", current_versions)
258 if self.
config.enable_replica:
259 assert self.
_db_versions.replica_version
is not None,
"Replica version must be defined"
271 if _LOG.isEnabledFor(logging.DEBUG):
272 _LOG.debug(
"ApdbCassandra Configuration: %s", self.
config.model_dump())
275 if hasattr(self,
"_cluster"):
278 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
279 """Create `Timer` instance given its name."""
280 return Timer(name, _MON, tags=tags)
278 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
…
283 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
284 """Make Cassandra session."""
285 addressTranslator: AddressTranslator |
None =
None
286 if config.connection_config.private_ips:
288 config.contact_points, config.connection_config.private_ips
291 with Timer(
"cluster_connect", _MON):
294 contact_points=config.contact_points,
295 port=config.connection_config.port,
296 address_translator=addressTranslator,
297 protocol_version=config.connection_config.protocol_version,
299 **config.connection_config.extra_parameters,
301 session = cluster.connect()
304 if _LOG.isEnabledFor(logging.DEBUG):
305 session.add_request_init_listener(_dump_query)
308 session.default_fetch_size =
None
310 return cluster, session
283 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
…
314 """Make Cassandra authentication provider instance."""
317 except DbAuthNotFoundError:
321 empty_username =
True
323 for hostname
in config.contact_points:
325 username, password = dbauth.getAuth(
327 config.connection_config.username,
329 config.connection_config.port,
335 empty_username =
True
337 return PlainTextAuthProvider(username=username, password=password)
338 except DbAuthNotFoundError:
343 f
"Credentials file ({dbauth.db_auth_path}) provided password but not "
344 "user name, anonymous Cassandra logon will be attempted."
350 """Check schema version compatibility."""
352 def _get_version(key: str) -> VersionTuple:
353 """Retrieve version number from given metadata key."""
354 version_str = metadata.get(key)
355 if version_str
is None:
357 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
358 return VersionTuple.fromString(version_str)
364 db_replica_version: VersionTuple |
None =
None
365 if self.
config.enable_replica:
369 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
372 def _versionCheck(self, current_versions: _DbVersions, db_versions: _DbVersions) ->
None:
373 """Check schema version compatibility."""
374 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
376 f
"Configured schema version {current_versions.schema_version} "
377 f
"is not compatible with database version {db_versions.schema_version}"
379 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
381 f
"Current code version {current_versions.code_version} "
382 f
"is not compatible with database version {db_versions.code_version}"
386 match current_versions.replica_version, db_versions.replica_version:
390 if not current.checkCompatibility(stored):
392 f
"Current replication code version {current} "
393 f
"is not compatible with database version {stored}"
397 f
"Current replication code version {current_versions.replica_version} "
398 f
"is not compatible with database version {db_versions.replica_version}"
372 def _versionCheck(self, current_versions: _DbVersions, db_versions: _DbVersions) ->
None:
…
403 """Return version number for current APDB implementation.
407 version : `VersionTuple`
408 Version of the code defined in implementation class.
412 def tableDef(self, table: ApdbTables) -> Table |
None:
414 return self.
_schema.tableSchemas.get(table)
412 def tableDef(self, table: ApdbTables) -> Table |
None:
…
419 hosts: tuple[str, ...],
422 schema_file: str |
None =
None,
423 schema_name: str |
None =
None,
424 read_sources_months: int |
None =
None,
425 read_forced_sources_months: int |
None =
None,
426 enable_replica: bool =
False,
427 replica_skips_diaobjects: bool =
False,
428 port: int |
None =
None,
429 username: str |
None =
None,
430 prefix: str |
None =
None,
431 part_pixelization: str |
None =
None,
432 part_pix_level: int |
None =
None,
433 time_partition_tables: bool =
True,
434 time_partition_start: str |
None =
None,
435 time_partition_end: str |
None =
None,
436 read_consistency: str |
None =
None,
437 write_consistency: str |
None =
None,
438 read_timeout: int |
None =
None,
439 write_timeout: int |
None =
None,
440 ra_dec_columns: tuple[str, str] |
None =
None,
441 replication_factor: int |
None =
None,
443 table_options: CreateTableOptions |
None =
None,
444 ) -> ApdbCassandraConfig:
445 """Initialize new APDB instance and make configuration object for it.
449 hosts : `tuple` [`str`, ...]
450 List of host names or IP addresses for Cassandra cluster.
452 Name of the keyspace for APDB tables.
453 schema_file : `str`, optional
454 Location of (YAML) configuration file with APDB schema. If not
455 specified then default location will be used.
456 schema_name : `str`, optional
457 Name of the schema in YAML configuration file. If not specified
458 then default name will be used.
459 read_sources_months : `int`, optional
460 Number of months of history to read from DiaSource.
461 read_forced_sources_months : `int`, optional
462 Number of months of history to read from DiaForcedSource.
463 enable_replica : `bool`, optional
464 If True, make additional tables used for replication to PPDB.
465 replica_skips_diaobjects : `bool`, optional
466 If `True` then do not fill regular ``DiaObject`` table when
467 ``enable_replica`` is `True`.
468 port : `int`, optional
469 Port number to use for Cassandra connections.
470 username : `str`, optional
471 User name for Cassandra connections.
472 prefix : `str`, optional
473 Optional prefix for all table names.
474 part_pixelization : `str`, optional
475 Name of the MOC pixelization used for partitioning.
476 part_pix_level : `int`, optional
478 time_partition_tables : `bool`, optional
479 Create per-partition tables.
480 time_partition_start : `str`, optional
481 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
483 time_partition_end : `str`, optional
484 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
486 read_consistency : `str`, optional
487 Name of the consistency level for read operations.
488 write_consistency : `str`, optional
489 Name of the consistency level for write operations.
490 read_timeout : `int`, optional
491 Read timeout in seconds.
492 write_timeout : `int`, optional
493 Write timeout in seconds.
494 ra_dec_columns : `tuple` [`str`, `str`], optional
495 Names of ra/dec columns in DiaObject table.
496 replication_factor : `int`, optional
497 Replication factor used when creating new keyspace, if keyspace
498 already exists its replication factor is not changed.
499 drop : `bool`, optional
500 If `True` then drop existing tables before re-creating the schema.
501 table_options : `CreateTableOptions`, optional
502 Options used when creating Cassandra tables.
506 config : `ApdbCassandraConfig`
507 Resulting configuration object for a created APDB instance.
516 "idle_heartbeat_interval": 0,
517 "idle_heartbeat_timeout": 30,
518 "control_connection_timeout": 100,
522 contact_points=hosts,
524 enable_replica=enable_replica,
525 replica_skips_diaobjects=replica_skips_diaobjects,
526 connection_config=connection_config,
528 config.partitioning.time_partition_tables = time_partition_tables
529 if schema_file
is not None:
530 config.schema_file = schema_file
531 if schema_name
is not None:
532 config.schema_name = schema_name
533 if read_sources_months
is not None:
534 config.read_sources_months = read_sources_months
535 if read_forced_sources_months
is not None:
536 config.read_forced_sources_months = read_forced_sources_months
538 config.connection_config.port = port
539 if username
is not None:
540 config.connection_config.username = username
541 if prefix
is not None:
542 config.prefix = prefix
543 if part_pixelization
is not None:
544 config.partitioning.part_pixelization = part_pixelization
545 if part_pix_level
is not None:
546 config.partitioning.part_pix_level = part_pix_level
547 if time_partition_start
is not None:
548 config.partitioning.time_partition_start = time_partition_start
549 if time_partition_end
is not None:
550 config.partitioning.time_partition_end = time_partition_end
551 if read_consistency
is not None:
552 config.connection_config.read_consistency = read_consistency
553 if write_consistency
is not None:
554 config.connection_config.write_consistency = write_consistency
555 if read_timeout
is not None:
556 config.connection_config.read_timeout = read_timeout
557 if write_timeout
is not None:
558 config.connection_config.write_timeout = write_timeout
559 if ra_dec_columns
is not None:
560 config.ra_dec_columns = ra_dec_columns
562 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
568 """Return the list of keyspaces with APDB databases.
573 Name of one of the hosts in Cassandra cluster.
577 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
578 Information about databases that contain APDB instance.
585 with cluster, session:
587 table_name = ApdbTables.DiaSource.table_name()
588 query =
"select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
589 result = session.execute(query, (table_name,))
590 keyspaces = [row[0]
for row
in result.all()]
596 template =
", ".join([
"%s"] * len(keyspaces))
598 "SELECT resource, role, permissions FROM system_auth.role_permissions "
599 f
"WHERE resource IN ({template}) ALLOW FILTERING"
601 resources = [f
"data/{keyspace}" for keyspace
in keyspaces]
603 result = session.execute(query, resources)
606 infos = {keyspace:
DatabaseInfo(name=keyspace, permissions={})
for keyspace
in keyspaces}
608 _, _, keyspace = row[0].partition(
"/")
610 role_permissions: set[str] = set(row[2])
611 infos[keyspace].permissions[role] = role_permissions
612 except cassandra.Unauthorized
as exc:
616 f
"Authentication information is not accessible to current user - {exc}", stacklevel=2
618 infos = {keyspace:
DatabaseInfo(name=keyspace)
for keyspace
in keyspaces}
622 return infos.values()
626 """Delete APDB database by dropping its keyspace.
631 Name of one of the hosts in Cassandra cluster.
633 Name of keyspace to delete.
634 timeout : `int`, optional
635 Timeout for delete operation in seconds. Dropping a large keyspace
636 can be a long operation, but this default value of one hour should
637 be sufficient for most or all cases.
643 with cluster, session:
644 query = f
"DROP KEYSPACE {quote_id(keyspace)}"
645 session.execute(query, timeout=timeout)
648 """Return `ApdbReplica` instance for this database."""
659 replication_factor: int |
None =
None,
660 table_options: CreateTableOptions |
None =
None,
664 if not isinstance(config, ApdbCassandraConfig):
665 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
668 with cluster, session:
671 keyspace=config.keyspace,
672 schema_file=config.schema_file,
673 schema_name=config.schema_name,
674 prefix=config.prefix,
675 time_partition_tables=config.partitioning.time_partition_tables,
676 enable_replica=config.enable_replica,
680 if config.partitioning.time_partition_tables:
681 time_partition_start = astropy.time.Time(
682 config.partitioning.time_partition_start, format=
"isot", scale=
"tai"
684 time_partition_end = astropy.time.Time(
685 config.partitioning.time_partition_end, format=
"isot", scale=
"tai"
688 part_days = config.partitioning.time_partition_days
695 part_range=part_range,
696 replication_factor=replication_factor,
697 table_options=table_options,
701 drop=drop, replication_factor=replication_factor, table_options=table_options
704 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
706 session, meta_table_name, config.keyspace,
"read_tuples",
"write"
713 if config.enable_replica:
717 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
729 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
732 column_names = self.
_schema.apdbColumnNames(ApdbTables.DiaObjectLast)
733 what =
",".join(quote_id(column)
for column
in column_names)
735 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
736 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
737 statements: list[tuple] = []
738 for where, params
in sp_where:
739 full_query = f
"{query} WHERE {where}"
741 statement = self.
_preparer.prepare(full_query)
746 statement = cassandra.query.SimpleStatement(full_query)
747 statements.append((statement, params))
748 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
750 with _MON.context_tags({
"table":
"DiaObject"}):
752 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
754 with self.
_timer(
"select_time")
as timer:
761 self.
config.connection_config.read_concurrency,
764 timer.add_values(row_count=len(objects))
766 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
770 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
771 ) -> pandas.DataFrame |
None:
773 months = self.
config.read_sources_months
776 mjd_end = float(visit_time.mjd)
777 mjd_start = mjd_end - months * 30
779 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
782 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
783 ) -> pandas.DataFrame |
None:
785 months = self.
config.read_forced_sources_months
788 mjd_end = float(visit_time.mjd)
789 mjd_start = mjd_end - months * 30
791 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
798 visit_time: astropy.time.Time,
807 visit_detector_where = (
"visit = ? AND detector = ?", (visit, detector))
811 mjd_start = float(visit_time.mjd) - 1.0 / 24
812 mjd_end = float(visit_time.mjd) + 1.0 / 24
814 statements: list[tuple] = []
815 for table_type
in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
817 table_type, mjd_start, mjd_end, query_per_time_part=
True
820 prefix = f
'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
822 suffix =
"PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
824 self.
_combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
827 with self.
_timer(
"contains_visit_detector_time"):
829 list[tuple[int] |
None],
834 self.
config.connection_config.read_concurrency,
841 tableName = self.
_schema.tableName(ApdbTables.SSObject)
842 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
845 with self.
_timer(
"select_time", tags={
"table":
"SSObject"})
as timer:
846 result = self.
_session.execute(query, execution_profile=
"read_pandas")
847 objects = result._current_rows
848 timer.add_values(row_count=len(objects))
850 _LOG.debug(
"found %s SSObjects", objects.shape[0])
855 visit_time: astropy.time.Time,
856 objects: pandas.DataFrame,
857 sources: pandas.DataFrame |
None =
None,
858 forced_sources: pandas.DataFrame |
None =
None,
862 if sources
is not None:
864 if forced_sources
is not None:
867 replica_chunk: ReplicaChunk |
None =
None
868 if self.
_schema.replication_enabled:
869 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
876 if sources
is not None and len(sources) > 0:
879 subchunk = self.
_storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
882 if forced_sources
is not None and len(forced_sources) > 0:
884 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
898 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
900 selects: list[tuple] = []
901 for ids
in chunk_iterable(idMap.keys(), 1_000):
902 ids_str =
",".join(str(item)
for item
in ids)
906 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
907 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
915 list[tuple[int, int, int, int |
None]],
917 self.
_session, selects,
"read_tuples", self.
config.connection_config.read_concurrency
922 id2partitions: dict[int, tuple[int, int]] = {}
923 id2chunk_id: dict[int, int] = {}
925 id2partitions[row[0]] = row[1:3]
926 if row[3]
is not None:
927 id2chunk_id[row[0]] = row[3]
930 if set(id2partitions) != set(idMap):
931 missing =
",".join(str(item)
for item
in set(idMap) - set(id2partitions))
932 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
935 queries = cassandra.query.BatchStatement()
936 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
937 for diaSourceId, ssObjectId
in idMap.items():
938 apdb_part, apdb_time_part = id2partitions[diaSourceId]
940 if self.
config.partitioning.time_partition_tables:
942 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
943 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
944 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
946 values = (ssObjectId, apdb_part, diaSourceId)
949 f
'UPDATE "{self._keyspace}"."{table_name}"'
950 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
951 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
953 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
954 queries.add(self.
_preparer.prepare(query), values)
958 warnings.warn(
"Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
960 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
961 with self.
_timer(
"source_reassign_time")
as timer:
962 self.
_session.execute(queries, execution_profile=
"write")
963 timer.add_values(source_count=len(idMap))
973 raise NotImplementedError()
981 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
982 """Make all execution profiles used in the code."""
983 if config.connection_config.private_ips:
984 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
986 loadBalancePolicy = RoundRobinPolicy()
988 read_tuples_profile = ExecutionProfile(
989 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
990 request_timeout=config.connection_config.read_timeout,
991 row_factory=cassandra.query.tuple_factory,
992 load_balancing_policy=loadBalancePolicy,
994 read_pandas_profile = ExecutionProfile(
995 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
996 request_timeout=config.connection_config.read_timeout,
997 row_factory=pandas_dataframe_factory,
998 load_balancing_policy=loadBalancePolicy,
1000 read_raw_profile = ExecutionProfile(
1001 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1002 request_timeout=config.connection_config.read_timeout,
1003 row_factory=raw_data_factory,
1004 load_balancing_policy=loadBalancePolicy,
1007 read_pandas_multi_profile = ExecutionProfile(
1008 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1009 request_timeout=config.connection_config.read_timeout,
1010 row_factory=pandas_dataframe_factory,
1011 load_balancing_policy=loadBalancePolicy,
1015 read_raw_multi_profile = ExecutionProfile(
1016 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1017 request_timeout=config.connection_config.read_timeout,
1018 row_factory=raw_data_factory,
1019 load_balancing_policy=loadBalancePolicy,
1021 write_profile = ExecutionProfile(
1022 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
1023 request_timeout=config.connection_config.write_timeout,
1024 load_balancing_policy=loadBalancePolicy,
1027 default_profile = ExecutionProfile(
1028 load_balancing_policy=loadBalancePolicy,
1031 "read_tuples": read_tuples_profile,
1032 "read_pandas": read_pandas_profile,
1033 "read_raw": read_raw_profile,
1034 "read_pandas_multi": read_pandas_multi_profile,
1035 "read_raw_multi": read_raw_multi_profile,
1036 "write": write_profile,
1037 EXEC_PROFILE_DEFAULT: default_profile,
981 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
…
1042 region: sphgeom.Region,
1043 object_ids: Iterable[int] |
None,
1046 table_name: ApdbTables,
1047 ) -> pandas.DataFrame:
1048 """Return catalog of DiaSource instances given set of DiaObject IDs.
1052 region : `lsst.sphgeom.Region`
1055 Collection of DiaObject IDs
1057 Lower bound of time interval.
1059 Upper bound of time interval.
1060 table_name : `ApdbTables`
1065 catalog : `pandas.DataFrame`, or `None`
1066 Catalog containing DiaSource records. Empty catalog is returned if
1067 ``object_ids`` is empty.
1069 object_id_set: Set[int] = set()
1070 if object_ids
is not None:
1071 object_id_set = set(object_ids)
1072 if len(object_id_set) == 0:
1076 tables, temporal_where = self.
_temporal_where(table_name, mjd_start, mjd_end)
1079 column_names = self.
_schema.apdbColumnNames(table_name)
1080 what =
",".join(quote_id(column)
for column
in column_names)
1083 statements: list[tuple] = []
1084 for table
in tables:
1085 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
1086 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
1087 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
1089 with _MON.context_tags({
"table": table_name.name}):
1091 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
1093 with self.
_timer(
"select_time")
as timer:
1099 "read_pandas_multi",
1100 self.
config.connection_config.read_concurrency,
1103 timer.add_values(row_count=len(catalog))
1106 if len(object_id_set) > 0:
1107 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
1110 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
1112 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
1117 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1122 table_name = self.
_schema.tableName(ExtraTables.ApdbReplicaChunks)
1124 columns = [
"partition",
"apdb_replica_chunk",
"last_update_time",
"unique_id"]
1125 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
1127 columns.append(
"has_subchunks")
1130 column_list =
", ".join(columns)
1131 placeholders =
",".join([
"%s"] * len(columns))
1132 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
1137 timeout=self.
config.connection_config.write_timeout,
1138 execution_profile=
"write",
1142 """Return existing mapping of diaObjectId to its last partition."""
1143 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1146 for id_chunk
in chunk_iterable(ids, 10_000):
1147 id_chunk_list = list(id_chunk)
1149 f
'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1150 f
'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1152 queries.append((query, ()))
1153 object_count += len(id_chunk_list)
1155 with self.
_timer(
"query_object_last_partitions")
as timer:
1159 self.
_session, queries,
"read_raw_multi", self.
config.connection_config.read_concurrency
1162 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1164 if data.column_names() != [
"diaObjectId",
"apdb_part"]:
1165 raise RuntimeError(f
"Unexpected column names in query result: {data.column_names()}")
1167 return {row[0]: row[1]
for row
in data.rows()}
1170 """Objects in DiaObjectsLast can move from one spatial partition to
1171 another. For those objects inserting new version does not replace old
1172 one, so we need to explicitly remove old versions before inserting new
1176 new_partitions = {oid: part
for oid, part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])}
1179 moved_oids: dict[int, tuple[int, int]] = {}
1180 for oid, old_part
in old_partitions.items():
1181 new_part = new_partitions.get(oid, old_part)
1182 if new_part != old_part:
1183 moved_oids[oid] = (old_part, new_part)
1184 _LOG.debug(
"DiaObject IDs that moved to new partition: %s", moved_oids)
1188 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
1189 query = f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1190 statement = self.
_preparer.prepare(query)
1191 batch = cassandra.query.BatchStatement()
1192 for oid, (old_part, _)
in moved_oids.items():
1193 batch.add(statement, (old_part, oid))
1194 with self.
_timer(
"delete_object_last")
as timer:
1196 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1198 timer.add_values(row_count=len(moved_oids))
1201 table_name = self.
_schema.tableName(ExtraTables.DiaObjectLastToPartition)
1202 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1203 statement = self.
_preparer.prepare(query)
1205 batch_size = self.
_batch_size(ExtraTables.DiaObjectLastToPartition)
1207 for chunk
in chunk_iterable(new_partitions.items(), batch_size):
1208 batch = cassandra.query.BatchStatement()
1209 for oid, new_part
in chunk:
1210 batch.add(statement, (oid, new_part))
1211 batches.append(batch)
1213 with self.
_timer(
"update_object_last_partition")
as timer:
1214 for batch
in batches:
1216 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1218 timer.add_values(row_count=len(batch))
1221 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1223 """Store catalog of DiaObjects from current visit.
1227 objs : `pandas.DataFrame`
1228 Catalog with DiaObject records
1229 visit_time : `astropy.time.Time`
1230 Time of the current visit.
1231 replica_chunk : `ReplicaChunk` or `None`
1232 Replica chunk identifier if replication is configured.
1235 _LOG.debug(
"No objects to write to database.")
1241 visit_time_dt = visit_time.datetime
1242 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1245 extra_columns[
"validityStart"] = visit_time_dt
1247 if not self.
config.partitioning.time_partition_tables:
1248 extra_columns[
"apdb_time_part"] = time_part
1253 if replica_chunk
is None or not self.
config.replica_skips_diaobjects:
1255 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1258 if replica_chunk
is not None:
1259 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1260 table = ExtraTables.DiaObjectChunks
1262 table = ExtraTables.DiaObjectChunks2
1266 extra_columns[
"apdb_replica_subchunk"] = random.randrange(self.
config.replica_sub_chunk_count)
1271 table_name: ApdbTables,
1272 sources: pandas.DataFrame,
1273 replica_chunk: ReplicaChunk |
None,
1275 """Store catalog of DIASources or DIAForcedSources from current visit.
1279 table_name : `ApdbTables`
1280 Table where to store the data.
1281 sources : `pandas.DataFrame`
1282 Catalog containing DiaSource records
1283 visit_time : `astropy.time.Time`
1284 Time of the current visit.
1285 replica_chunk : `ReplicaChunk` or `None`
1286 Replica chunk identifier if replication is configured.
1290 subchunk : `int` or `None`
1291 Subchunk number for resulting replica data, `None` if relication is
1292 not enabled ot subchunking is not enabled.
1296 tp_sources = sources.copy(deep=
False)
1297 tp_sources[
"apdb_time_part"] = tp_sources[
"midpointMjdTai"].apply(self.
_time_partition)
1298 extra_columns: dict[str, Any] = {}
1299 if not self.
config.partitioning.time_partition_tables:
1303 partitions = set(tp_sources[
"apdb_time_part"])
1304 if len(partitions) == 1:
1306 time_part = partitions.pop()
1310 for time_part, sub_frame
in tp_sources.groupby(by=
"apdb_time_part"):
1311 sub_frame.drop(columns=
"apdb_time_part", inplace=
True)
1314 subchunk: int |
None =
None
1315 if replica_chunk
is not None:
1316 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1318 subchunk = random.randrange(self.
config.replica_sub_chunk_count)
1319 extra_columns[
"apdb_replica_subchunk"] = subchunk
1320 if table_name
is ApdbTables.DiaSource:
1321 extra_table = ExtraTables.DiaSourceChunks2
1323 extra_table = ExtraTables.DiaForcedSourceChunks2
1325 if table_name
is ApdbTables.DiaSource:
1326 extra_table = ExtraTables.DiaSourceChunks
1328 extra_table = ExtraTables.DiaForcedSourceChunks
1335 sources: pandas.DataFrame,
1336 visit_time: astropy.time.Time,
1337 replica_chunk: ReplicaChunk |
None,
1338 subchunk: int |
None,
1340 """Store mapping of diaSourceId to its partitioning values.
1344 sources : `pandas.DataFrame`
1345 Catalog containing DiaSource records
1346 visit_time : `astropy.time.Time`
1347 Time of the current visit.
1348 replica_chunk : `ReplicaChunk` or `None`
1349 Replication chunk, or `None` when replication is disabled.
1350 subchunk : `int` or `None`
1351 Replication sub-chunk, or `None` when replication is disabled or
1352 sub-chunking is not used.
1354 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1357 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1360 extra_columns[
"apdb_replica_subchunk"] = subchunk
1363 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1368 records: pandas.DataFrame,
1369 table_name: ApdbTables | ExtraTables,
1370 extra_columns: Mapping |
None =
None,
1371 time_part: int |
None =
None,
1373 """Store generic objects.
1375 Takes Pandas catalog and stores a bunch of records in a table.
1379 records : `pandas.DataFrame`
1380 Catalog containing object records
1381 table_name : `ApdbTables`
1382 Name of the table as defined in APDB schema.
1383 extra_columns : `dict`, optional
1384 Mapping (column_name, column_value) which gives fixed values for
1385 columns in each row, overrides values in ``records`` if matching
1386 columns exist there.
1387 time_part : `int`, optional
1388 If not `None` then insert into a per-partition table.
1392 If Pandas catalog contains additional columns not defined in table
1393 schema they are ignored. Catalog does not have to contain all columns
1394 defined in a table, but partition and clustering keys must be present
1395 in a catalog or ``extra_columns``.
1398 if extra_columns
is None:
1400 extra_fields = list(extra_columns.keys())
1403 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1405 column_map = self.
_schema.getColumnMap(table_name)
1407 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1408 fields += extra_fields
1411 required_columns = self.
_schema.partitionColumns(table_name) + self.
_schema.clusteringColumns(
1414 missing_columns = [column
for column
in required_columns
if column
not in fields]
1416 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1418 qfields = [quote_id(field)
for field
in fields]
1419 qfields_str =
",".join(qfields)
1423 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1424 table = self.
_schema.tableName(table_name)
1425 if time_part
is not None:
1426 table = f
"{table}_{time_part}"
1428 holders =
",".join([
"?"] * len(qfields))
1429 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1430 statement = self.
_preparer.prepare(query)
1434 for rec_chunk
in chunk_iterable(records.itertuples(index=
False), batch_size):
1435 batch = cassandra.query.BatchStatement()
1436 for rec
in rec_chunk:
1438 for field
in df_fields:
1439 if field
not in column_map:
1441 value = getattr(rec, field)
1442 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1443 if isinstance(value, pandas.Timestamp):
1444 value = value.to_pydatetime()
1445 elif value
is pandas.NaT:
1450 value = int(value * 1000)
1451 value = literal(value)
1452 values.append(UNSET_VALUE
if value
is None else value)
1453 for field
in extra_fields:
1454 value = literal(extra_columns[field])
1455 values.append(UNSET_VALUE
if value
is None else value)
1456 batch.add(statement, values)
1457 queries.append(batch)
1459 _LOG.debug(
"%s: will store %d records", self.
_schema.tableName(table_name), records.shape[0])
1460 with self.
_timer(
"insert_time", tags={
"table": table_name.name})
as timer:
1461 for batch
in queries:
1463 batch, timeout=self.
config.connection_config.write_timeout, execution_profile=
"write"
1465 timer.add_values(row_count=len(records))
1468 """Calculate spatial partition for each record and add it to a
1473 df : `pandas.DataFrame`
1474 DataFrame which has to contain ra/dec columns, names of these
1475 columns are defined by configuration ``ra_dec_columns`` field.
1479 df : `pandas.DataFrame`
1480 DataFrame with ``apdb_part`` column which contains pixel index
1481 for ra/dec coordinates.
1485 This overrides any existing column in a DataFrame with the same name
1486 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1490 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1491 ra_col, dec_col = self.
config.ra_dec_columns
1492 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1497 df[
"apdb_part"] = apdb_part
1502 """Calculate time partition number for a given time.
1506 time : `float` or `astropy.time.Time`
1507 Time for which to calculate partition number. Can be float to mean
1508 MJD or `astropy.time.Time`
1510 Epoch time for partition 0.
1512 Number of days per partition.
1517 Partition number for a given time.
1519 if isinstance(time, astropy.time.Time):
1520 mjd = float(time.mjd)
1523 days_since_epoch = mjd - epoch_mjd
1524 partition = int(days_since_epoch) // part_days
1528 """Calculate time partition number for a given time.
1532 time : `float` or `astropy.time.Time`
1533 Time for which to calculate partition number. Can be float to mean
1534 MJD or `astropy.time.Time`
1539 Partition number for a given time.
1541 if isinstance(time, astropy.time.Time):
1542 mjd = float(time.mjd)
1546 partition = int(days_since_epoch) // self.
config.partitioning.time_partition_days
1550 """Make an empty catalog for a table with a given name.
1554 table_name : `ApdbTables`
1559 catalog : `pandas.DataFrame`
1562 table = self.
_schema.tableSchemas[table_name]
1565 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1566 for columnDef
in table.columns
1568 return pandas.DataFrame(data)
1573 where1: list[tuple[str, tuple]],
1574 where2: list[tuple[str, tuple]],
1575 where3: tuple[str, tuple] |
None =
None,
1576 suffix: str |
None =
None,
1577 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1578 """Make cartesian product of two parts of WHERE clause into a series
1579 of statements to execute.
1584 Initial statement prefix that comes before WHERE clause, e.g.
1585 "SELECT * from Table"
1593 for expr1, params1
in where1:
1594 for expr2, params2
in where2:
1597 params = params1 + params2
1599 wheres.append(expr1)
1601 wheres.append(expr2)
1603 wheres.append(where3[0])
1606 full_query +=
" WHERE " +
" AND ".join(wheres)
1608 full_query +=
" " + suffix
1610 statement = self.
_preparer.prepare(full_query)
1615 statement = cassandra.query.SimpleStatement(full_query)
1616 yield (statement, params)
1619 self, region: sphgeom.Region |
None, use_ranges: bool =
False
1620 ) -> list[tuple[str, tuple]]:
1621 """Generate expressions for spatial part of WHERE clause.
1625 region : `sphgeom.Region`
1626 Spatial region for query results.
1628 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1629 p2") instead of exact list of pixels. Should be set to True for
1630 large regions covering very many pixels.
1634 expressions : `list` [ `tuple` ]
1635 Empty list is returned if ``region`` is `None`, otherwise a list
1636 of one or more (expression, parameters) tuples
1642 expressions: list[tuple[str, tuple]] = []
1643 for lower, upper
in pixel_ranges:
1646 expressions.append((
'"apdb_part" = ?', (lower,)))
1648 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1652 if self.
config.partitioning.query_per_spatial_part:
1653 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1655 pixels_str =
",".join([str(pix)
for pix
in pixels])
1656 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1661 start_time: float | astropy.time.Time,
1662 end_time: float | astropy.time.Time,
1663 query_per_time_part: bool |
None =
None,
1664 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1665 """Generate table names and expressions for temporal part of WHERE
1670 table : `ApdbTables`
1671 Table to select from.
1672 start_time : `astropy.time.Time` or `float`
1673 Starting Datetime of MJD value of the time range.
1674 end_time : `astropy.time.Time` or `float`
1675 Starting Datetime of MJD value of the time range.
1676 query_per_time_part : `bool`, optional
1677 If None then use ``query_per_time_part`` from configuration.
1681 tables : `list` [ `str` ]
1682 List of the table names to query.
1683 expressions : `list` [ `tuple` ]
1684 A list of zero or more (expression, parameters) tuples.
1687 temporal_where: list[tuple[str, tuple]] = []
1688 table_name = self.
_schema.tableName(table)
1691 time_parts = list(range(time_part_start, time_part_end + 1))
1692 if self.
config.partitioning.time_partition_tables:
1693 tables = [f
"{table_name}_{part}" for part
in time_parts]
1695 tables = [table_name]
1696 if query_per_time_part
is None:
1697 query_per_time_part = self.
config.partitioning.query_per_time_part
1698 if query_per_time_part:
1699 temporal_where = [(
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts]
1701 time_part_list =
",".join([str(part)
for part
in time_parts])
1702 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1704 return tables, temporal_where
1707 """Update timestamp columns in input DataFrame to be naive datetime
1710 Clients may or may not generate aware timestamps, code in this class
1711 assumes that timestamps are naive, so we convert them to UTC and
1715 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1716 for column
in columns:
1718 df[column] = df[column].dt.tz_convert(
None)
1722 """Calculate batch size based on config parameters."""
1725 if 0 < self.
config.batch_statement_limit < batch_size:
1726 batch_size = self.
config.batch_statement_limit
1727 if self.
config.batch_size_limit > 0:
1735 row_size = self.
_schema.table_row_size(table)
1736 row_size += 4 * len(self.
_schema.getColumnMap(table))
1737 batch_size = min(batch_size, (self.
config.batch_size_limit // row_size) + 1)
str translate(self, str private_ip)
__init__(self, tuple[str,...] public_ips, tuple[str,...] private_ips)
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
str metadataSchemaVersionKey
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
_partition_zero_epoch_mjd
None _versionCheck(self, _DbVersions current_versions, _DbVersions db_versions)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
tuple[Cluster, Session] _session
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
_DbVersions _has_dia_object_last_to_partition
__init__(self, ApdbCassandraConfig config)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
str metadataReplicaVersionKey
ApdbCassandraReplica get_replica(self)
bool _has_chunk_sub_partitions
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
pandas.DataFrame getSSObjects(self)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
None _deleteMovingObjects(self, pandas.DataFrame objs)
_DbVersions _readVersions(self, ApdbMetadataCassandra metadata)
str metadataCodeVersionKey
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
None delete_database(cls, str host, str keyspace, *, int timeout=3600)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
int countUnassociatedObjects(self)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.