22from __future__
import annotations
24__all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
30from typing
import TYPE_CHECKING, Any, cast
39 import cassandra.query
40 from cassandra.auth
import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
44 CASSANDRA_IMPORTED =
True
46 CASSANDRA_IMPORTED =
False
50from lsst
import sphgeom
52from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
53from lsst.utils.iteration
import chunk_iterable
55from ..apdb
import Apdb, ApdbConfig
56from ..apdbConfigFreezer
import ApdbConfigFreezer
57from ..apdbReplica
import ReplicaChunk
58from ..apdbSchema
import ApdbTables
59from ..monitor
import MonAgent
60from ..pixelization
import Pixelization
61from ..schema_model
import Table
62from ..timer
import Timer
63from ..versionTuple
import IncompatibleVersionError, VersionTuple
64from .apdbCassandraReplica
import ApdbCassandraReplica
65from .apdbCassandraSchema
import ApdbCassandraSchema, ExtraTables
66from .apdbMetadataCassandra
import ApdbMetadataCassandra
67from .cassandra_utils
import (
68 PreparedStatementCache,
70 pandas_dataframe_factory,
77 from ..apdbMetadata
import ApdbMetadata
79_LOG = logging.getLogger(__name__)
84"""Version for the code controlling non-replication tables. This needs to be
85updated following compatibility rules when schema produced by this code
90DB_AUTH_ENVVAR =
"LSST_DB_AUTH"
91"""Default name of the environmental variable that will be used to locate DB
92credentials configuration file. """
94DB_AUTH_PATH =
"~/.lsst/db-auth.yaml"
95"""Default path at which it is expected that DB credentials are found."""
100 super().
__init__(
"cassandra-driver module cannot be imported")
104 """Configuration class for Cassandra-based APDB implementation."""
106 contact_points = ListField[str](
107 doc=
"The list of contact points to try connecting for cluster discovery.", default=[
"127.0.0.1"]
109 private_ips = ListField[str](doc=
"List of internal IP addresses for contact_points.", default=[])
110 port = Field[int](doc=
"Port number to connect to.", default=9042)
111 keyspace = Field[str](doc=
"Default keyspace for operations.", default=
"apdb")
112 username = Field[str](
113 doc=f
"Cassandra user name, if empty then {DB_AUTH_PATH} has to provide it with password.",
116 read_consistency = Field[str](
117 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.", default=
"QUORUM"
119 write_consistency = Field[str](
120 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.", default=
"QUORUM"
122 read_timeout = Field[float](doc=
"Timeout in seconds for read operations.", default=120.0)
123 write_timeout = Field[float](doc=
"Timeout in seconds for write operations.", default=60.0)
124 remove_timeout = Field[float](doc=
"Timeout in seconds for remove operations.", default=600.0)
125 read_concurrency = Field[int](doc=
"Concurrency level for read operations.", default=500)
126 protocol_version = Field[int](
127 doc=
"Cassandra protocol version to use, default is V4",
128 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0,
130 dia_object_columns = ListField[str](
131 doc=
"List of columns to read from DiaObject[Last], by default read all columns", default=[]
133 prefix = Field[str](doc=
"Prefix to add to table names", default=
"")
134 part_pixelization = ChoiceField[str](
135 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
136 doc=
"Pixelization used for partitioning index.",
139 part_pix_level = Field[int](doc=
"Pixelization level used for partitioning index.", default=10)
140 part_pix_max_ranges = Field[int](doc=
"Max number of ranges in pixelization envelope", default=64)
141 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
142 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
143 time_partition_tables = Field[bool](
144 doc=
"Use per-partition tables for sources instead of partitioning by time", default=
False
146 time_partition_days = Field[int](
148 "Time partitioning granularity in days, this value must not be changed after database is "
153 time_partition_start = Field[str](
155 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
156 "This is used only when time_partition_tables is True."
158 default=
"2018-12-01T00:00:00",
160 time_partition_end = Field[str](
162 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
163 "This is used only when time_partition_tables is True."
165 default=
"2030-01-01T00:00:00",
167 query_per_time_part = Field[bool](
170 "If True then build separate query for each time partition, otherwise build one single query. "
171 "This is only used when time_partition_tables is False in schema config."
174 query_per_spatial_part = Field[bool](
176 doc=
"If True then build one query per spatial partition, otherwise build single query.",
178 use_insert_id_skips_diaobjects = Field[bool](
181 "If True then do not store DiaObjects when use_insert_id is True "
182 "(DiaObjectsChunks has the same data)."
187@dataclasses.dataclass
189 """Part of the configuration that is saved in metadata table and read back.
191 The attributes are a subset of attributes in `ApdbCassandraConfig` class.
195 config : `ApdbSqlConfig`
196 Configuration used to copy initial values of attributes.
200 part_pixelization: str
202 ra_dec_columns: list[str]
203 time_partition_tables: bool
204 time_partition_days: int
205 use_insert_id_skips_diaobjects: bool
217 """Convert this instance to JSON representation."""
218 return json.dumps(dataclasses.asdict(self))
221 """Update attribute values from a JSON string.
226 String containing JSON representation of configuration.
228 data = json.loads(json_str)
229 if not isinstance(data, dict):
230 raise TypeError(f
"JSON string must be convertible to object: {json_str!r}")
231 allowed_names = {field.name
for field
in dataclasses.fields(self)}
232 for key, value
in data.items():
233 if key
not in allowed_names:
234 raise ValueError(f
"JSON object contains unknown key: {key}")
235 setattr(self, key, value)
238if CASSANDRA_IMPORTED:
241 """Translate internal IP address to external.
243 Only used for docker-based setup, not viable long-term solution.
246 def __init__(self, public_ips: list[str], private_ips: list[str]):
247 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
250 return self.
_map.get(private_ip, private_ip)
254 """Implementation of APDB database on to of Apache Cassandra.
256 The implementation is configured via standard ``pex_config`` mechanism
257 using `ApdbCassandraConfig` configuration class. For an example of
258 different configurations check config/ folder.
262 config : `ApdbCassandraConfig`
263 Configuration object.
266 metadataSchemaVersionKey =
"version:schema"
267 """Name of the metadata key to store schema version number."""
269 metadataCodeVersionKey =
"version:ApdbCassandra"
270 """Name of the metadata key to store code version number."""
272 metadataReplicaVersionKey =
"version:ApdbCassandraReplica"
273 """Name of the metadata key to store replica code version number."""
275 metadataConfigKey =
"config:apdb-cassandra.json"
276 """Name of the metadata key to store code version number."""
278 _frozen_parameters = (
283 "time_partition_tables",
284 "time_partition_days",
285 "use_insert_id_skips_diaobjects",
287 """Names of the config parameters to be frozen in metadata table."""
289 partition_zero_epoch = astropy.time.Time(0, format=
"unix_tai")
290 """Start time for partition 0, this should never be changed."""
293 if not CASSANDRA_IMPORTED:
300 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
302 self.
_session, meta_table_name, config.keyspace,
"read_tuples",
"write"
307 if config_json
is not None:
310 self.
config = freezer.update(config, config_json)
316 self.
config.part_pixelization,
317 self.
config.part_pix_level,
318 config.part_pix_max_ranges,
324 schema_file=self.
config.schema_file,
325 schema_name=self.
config.schema_name,
326 prefix=self.
config.prefix,
327 time_partition_tables=self.
config.time_partition_tables,
328 enable_replica=self.
config.use_insert_id,
338 _LOG.debug(
"ApdbCassandra Configuration:")
340 _LOG.debug(
" %s: %s", key, value)
342 self.
_timer_args: list[MonAgent | logging.Logger] = [_MON]
347 if hasattr(self,
"_cluster"):
350 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
351 """Create `Timer` instance given its name."""
355 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
356 """Make Cassandra session."""
357 addressTranslator: AddressTranslator |
None =
None
358 if config.private_ips:
359 addressTranslator =
_AddressTranslator(list(config.contact_points), list(config.private_ips))
363 contact_points=config.contact_points,
365 address_translator=addressTranslator,
366 protocol_version=config.protocol_version,
369 session = cluster.connect()
371 session.default_fetch_size =
None
373 return cluster, session
377 """Make Cassandra authentication provider instance."""
379 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
380 except DbAuthNotFoundError:
384 empty_username =
True
386 for hostname
in config.contact_points:
388 username, password = dbauth.getAuth(
389 "cassandra", config.username, hostname, config.port, config.keyspace
394 empty_username =
True
396 return PlainTextAuthProvider(username=username, password=password)
397 except DbAuthNotFoundError:
402 f
"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
403 f
"user name, anonymous Cassandra logon will be attempted."
409 """Check schema version compatibility."""
411 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
412 """Retrieve version number from given metadata key."""
413 if metadata.table_exists():
414 version_str = metadata.get(key)
415 if version_str
is None:
417 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
418 return VersionTuple.fromString(version_str)
429 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
431 f
"Configured schema version {self._schema.schemaVersion()} "
432 f
"is not compatible with database version {db_schema_version}"
436 f
"Current code version {self.apdbImplementationVersion()} "
437 f
"is not compatible with database version {db_code_version}"
441 if self.
_schema.has_replica_chunks:
443 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
444 if not code_replica_version.checkCompatibility(db_replica_version):
446 f
"Current replication code version {code_replica_version} "
447 f
"is not compatible with database version {db_replica_version}"
452 """Return version number for current APDB implementation.
456 version : `VersionTuple`
457 Version of the code defined in implementation class.
461 def tableDef(self, table: ApdbTables) -> Table |
None:
463 return self.
_schema.tableSchemas.get(table)
471 schema_file: str |
None =
None,
472 schema_name: str |
None =
None,
473 read_sources_months: int |
None =
None,
474 read_forced_sources_months: int |
None =
None,
475 use_insert_id: bool =
False,
476 use_insert_id_skips_diaobjects: bool =
False,
477 port: int |
None =
None,
478 username: str |
None =
None,
479 prefix: str |
None =
None,
480 part_pixelization: str |
None =
None,
481 part_pix_level: int |
None =
None,
482 time_partition_tables: bool =
True,
483 time_partition_start: str |
None =
None,
484 time_partition_end: str |
None =
None,
485 read_consistency: str |
None =
None,
486 write_consistency: str |
None =
None,
487 read_timeout: int |
None =
None,
488 write_timeout: int |
None =
None,
489 ra_dec_columns: list[str] |
None =
None,
490 replication_factor: int |
None =
None,
492 ) -> ApdbCassandraConfig:
493 """Initialize new APDB instance and make configuration object for it.
497 hosts : `list` [`str`]
498 List of host names or IP addresses for Cassandra cluster.
500 Name of the keyspace for APDB tables.
501 schema_file : `str`, optional
502 Location of (YAML) configuration file with APDB schema. If not
503 specified then default location will be used.
504 schema_name : `str`, optional
505 Name of the schema in YAML configuration file. If not specified
506 then default name will be used.
507 read_sources_months : `int`, optional
508 Number of months of history to read from DiaSource.
509 read_forced_sources_months : `int`, optional
510 Number of months of history to read from DiaForcedSource.
511 use_insert_id : `bool`, optional
512 If True, make additional tables used for replication to PPDB.
513 use_insert_id_skips_diaobjects : `bool`, optional
514 If `True` then do not fill regular ``DiaObject`` table when
515 ``use_insert_id`` is `True`.
516 port : `int`, optional
517 Port number to use for Cassandra connections.
518 username : `str`, optional
519 User name for Cassandra connections.
520 prefix : `str`, optional
521 Optional prefix for all table names.
522 part_pixelization : `str`, optional
523 Name of the MOC pixelization used for partitioning.
524 part_pix_level : `int`, optional
526 time_partition_tables : `bool`, optional
527 Create per-partition tables.
528 time_partition_start : `str`, optional
529 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
531 time_partition_end : `str`, optional
532 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
534 read_consistency : `str`, optional
535 Name of the consistency level for read operations.
536 write_consistency : `str`, optional
537 Name of the consistency level for write operations.
538 read_timeout : `int`, optional
539 Read timeout in seconds.
540 write_timeout : `int`, optional
541 Write timeout in seconds.
542 ra_dec_columns : `list` [`str`], optional
543 Names of ra/dec columns in DiaObject table.
544 replication_factor : `int`, optional
545 Replication factor used when creating new keyspace, if keyspace
546 already exists its replication factor is not changed.
547 drop : `bool`, optional
548 If `True` then drop existing tables before re-creating the schema.
552 config : `ApdbCassandraConfig`
553 Resulting configuration object for a created APDB instance.
556 contact_points=hosts,
558 use_insert_id=use_insert_id,
559 use_insert_id_skips_diaobjects=use_insert_id_skips_diaobjects,
560 time_partition_tables=time_partition_tables,
562 if schema_file
is not None:
563 config.schema_file = schema_file
564 if schema_name
is not None:
565 config.schema_name = schema_name
566 if read_sources_months
is not None:
567 config.read_sources_months = read_sources_months
568 if read_forced_sources_months
is not None:
569 config.read_forced_sources_months = read_forced_sources_months
572 if username
is not None:
573 config.username = username
574 if prefix
is not None:
575 config.prefix = prefix
576 if part_pixelization
is not None:
577 config.part_pixelization = part_pixelization
578 if part_pix_level
is not None:
579 config.part_pix_level = part_pix_level
580 if time_partition_start
is not None:
581 config.time_partition_start = time_partition_start
582 if time_partition_end
is not None:
583 config.time_partition_end = time_partition_end
584 if read_consistency
is not None:
585 config.read_consistency = read_consistency
586 if write_consistency
is not None:
587 config.write_consistency = write_consistency
588 if read_timeout
is not None:
589 config.read_timeout = read_timeout
590 if write_timeout
is not None:
591 config.write_timeout = write_timeout
592 if ra_dec_columns
is not None:
593 config.ra_dec_columns = ra_dec_columns
595 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor)
600 """Return `ApdbReplica` instance for this database."""
607 cls, config: ApdbConfig, *, drop: bool =
False, replication_factor: int |
None =
None
611 if not isinstance(config, ApdbCassandraConfig):
612 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
618 keyspace=config.keyspace,
619 schema_file=config.schema_file,
620 schema_name=config.schema_name,
621 prefix=config.prefix,
622 time_partition_tables=config.time_partition_tables,
623 enable_replica=config.use_insert_id,
627 if config.time_partition_tables:
628 time_partition_start = astropy.time.Time(config.time_partition_start, format=
"isot", scale=
"tai")
629 time_partition_end = astropy.time.Time(config.time_partition_end, format=
"isot", scale=
"tai")
631 part_days = config.time_partition_days
636 schema.makeSchema(drop=drop, part_range=part_range, replication_factor=replication_factor)
638 schema.makeSchema(drop=drop, replication_factor=replication_factor)
640 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
644 if metadata.table_exists():
648 if config.use_insert_id:
652 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
666 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
669 column_names = self.
_schema.apdbColumnNames(ApdbTables.DiaObjectLast)
670 what =
",".join(quote_id(column)
for column
in column_names)
672 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
673 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
674 statements: list[tuple] = []
675 for where, params
in sp_where:
676 full_query = f
"{query} WHERE {where}"
678 statement = self.
_preparer.prepare(full_query)
683 statement = cassandra.query.SimpleStatement(full_query)
684 statements.append((statement, params))
685 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
687 with _MON.context_tags({
"table":
"DiaObject"}):
689 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
691 with self.
_timer(
"select_time"):
695 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
699 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
703 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
704 ) -> pandas.DataFrame |
None:
706 months = self.
config.read_sources_months
709 mjd_end = visit_time.mjd
710 mjd_start = mjd_end - months * 30
712 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
715 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
716 ) -> pandas.DataFrame |
None:
718 months = self.
config.read_forced_sources_months
721 mjd_end = visit_time.mjd
722 mjd_start = mjd_end - months * 30
724 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
733 existing_tables = self.
_schema.existing_tables(ApdbTables.DiaSource, ApdbTables.DiaForcedSource)
734 tables_to_check = existing_tables[ApdbTables.DiaSource][:]
735 if self.
config.use_insert_id:
736 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaSourceChunks))
737 tables_to_check.extend(existing_tables[ApdbTables.DiaForcedSource])
738 if self.
config.use_insert_id:
739 tables_to_check.append(self.
_schema.tableName(ExtraTables.DiaForcedSourceChunks))
743 for table_name
in tables_to_check:
748 f
'SELECT * from "{self._keyspace}"."{table_name}" '
749 "WHERE visit = ? AND detector = ? "
750 "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
752 with self.
_timer(
"contains_visit_detector_time", tags={
"table": table_name}):
754 if result.one()
is not None:
761 tableName = self.
_schema.tableName(ApdbTables.SSObject)
762 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
765 with self.
_timer(
"select_time", tags={
"table":
"SSObject"}):
766 result = self.
_session.execute(query, execution_profile=
"read_pandas")
767 objects = result._current_rows
769 _LOG.debug(
"found %s SSObjects", objects.shape[0])
774 visit_time: astropy.time.Time,
775 objects: pandas.DataFrame,
776 sources: pandas.DataFrame |
None =
None,
777 forced_sources: pandas.DataFrame |
None =
None,
781 replica_chunk: ReplicaChunk |
None =
None
782 if self.
_schema.has_replica_chunks:
783 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
790 if sources
is not None:
793 self.
_storeDiaSources(ApdbTables.DiaSource, sources, visit_time, replica_chunk)
796 if forced_sources
is not None:
798 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time, replica_chunk)
811 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
813 selects: list[tuple] = []
814 for ids
in chunk_iterable(idMap.keys(), 1_000):
815 ids_str =
",".join(str(item)
for item
in ids)
819 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
820 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
828 list[tuple[int, int, int, int |
None]],
829 select_concurrent(self.
_session, selects,
"read_tuples", self.
config.read_concurrency),
833 id2partitions: dict[int, tuple[int, int]] = {}
834 id2chunk_id: dict[int, int] = {}
836 id2partitions[row[0]] = row[1:3]
837 if row[3]
is not None:
838 id2chunk_id[row[0]] = row[3]
841 if set(id2partitions) !=
set(idMap):
842 missing =
",".join(str(item)
for item
in set(idMap) -
set(id2partitions))
843 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
846 queries = cassandra.query.BatchStatement()
847 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
848 for diaSourceId, ssObjectId
in idMap.items():
849 apdb_part, apdb_time_part = id2partitions[diaSourceId]
851 if self.
config.time_partition_tables:
853 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
854 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
855 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
857 values = (ssObjectId, apdb_part, diaSourceId)
860 f
'UPDATE "{self._keyspace}"."{table_name}"'
861 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
862 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
864 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
865 queries.add(self.
_preparer.prepare(query), values)
873 if replica_chunks := self.
get_replica().getReplicaChunks():
874 known_ids =
set(replica_chunk.id
for replica_chunk
in replica_chunks)
875 id2chunk_id = {key: value
for key, value
in id2chunk_id.items()
if value
in known_ids}
877 table_name = self.
_schema.tableName(ExtraTables.DiaSourceChunks)
878 for diaSourceId, ssObjectId
in idMap.items():
879 if replica_chunk := id2chunk_id.get(diaSourceId):
881 f
'UPDATE "{self._keyspace}"."{table_name}" '
882 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
883 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
885 values = (ssObjectId, replica_chunk, diaSourceId)
886 queries.add(self.
_preparer.prepare(query), values)
888 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
889 with self.
_timer(
"update_time", tags={
"table": table_name}):
890 self.
_session.execute(queries, execution_profile=
"write")
900 raise NotImplementedError()
906 raise RuntimeError(
"Database schema was not initialized.")
910 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
911 """Make all execution profiles used in the code."""
912 if config.private_ips:
913 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
915 loadBalancePolicy = RoundRobinPolicy()
917 read_tuples_profile = ExecutionProfile(
918 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
919 request_timeout=config.read_timeout,
920 row_factory=cassandra.query.tuple_factory,
921 load_balancing_policy=loadBalancePolicy,
923 read_pandas_profile = ExecutionProfile(
924 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
925 request_timeout=config.read_timeout,
926 row_factory=pandas_dataframe_factory,
927 load_balancing_policy=loadBalancePolicy,
929 read_raw_profile = ExecutionProfile(
930 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
931 request_timeout=config.read_timeout,
932 row_factory=raw_data_factory,
933 load_balancing_policy=loadBalancePolicy,
936 read_pandas_multi_profile = ExecutionProfile(
937 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
938 request_timeout=config.read_timeout,
939 row_factory=pandas_dataframe_factory,
940 load_balancing_policy=loadBalancePolicy,
944 read_raw_multi_profile = ExecutionProfile(
945 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
946 request_timeout=config.read_timeout,
947 row_factory=raw_data_factory,
948 load_balancing_policy=loadBalancePolicy,
950 write_profile = ExecutionProfile(
951 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
952 request_timeout=config.write_timeout,
953 load_balancing_policy=loadBalancePolicy,
956 default_profile = ExecutionProfile(
957 load_balancing_policy=loadBalancePolicy,
960 "read_tuples": read_tuples_profile,
961 "read_pandas": read_pandas_profile,
962 "read_raw": read_raw_profile,
963 "read_pandas_multi": read_pandas_multi_profile,
964 "read_raw_multi": read_raw_multi_profile,
965 "write": write_profile,
966 EXEC_PROFILE_DEFAULT: default_profile,
971 region: sphgeom.Region,
972 object_ids: Iterable[int] |
None,
975 table_name: ApdbTables,
976 ) -> pandas.DataFrame:
977 """Return catalog of DiaSource instances given set of DiaObject IDs.
981 region : `lsst.sphgeom.Region`
984 Collection of DiaObject IDs
986 Lower bound of time interval.
988 Upper bound of time interval.
989 table_name : `ApdbTables`
994 catalog : `pandas.DataFrame`, or `None`
995 Catalog containing DiaSource records. Empty catalog is returned if
996 ``object_ids`` is empty.
998 object_id_set: Set[int] =
set()
999 if object_ids
is not None:
1000 object_id_set =
set(object_ids)
1001 if len(object_id_set) == 0:
1005 tables, temporal_where = self.
_temporal_where(table_name, mjd_start, mjd_end)
1008 column_names = self.
_schema.apdbColumnNames(table_name)
1009 what =
",".join(quote_id(column)
for column
in column_names)
1012 statements: list[tuple] = []
1013 for table
in tables:
1014 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
1015 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
1016 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
1018 with _MON.context_tags({
"table": table_name.name}):
1020 "select_query_stats", values={
"num_sp_part": len(sp_where),
"num_queries": len(statements)}
1022 with self.
_timer(
"select_time"):
1026 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
1031 if len(object_id_set) > 0:
1032 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
1035 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
1037 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
1042 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1047 table_name = self.
_schema.tableName(ExtraTables.ApdbReplicaChunks)
1049 f
'INSERT INTO "{self._keyspace}"."{table_name}" '
1050 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1051 "VALUES (?, ?, ?, ?)"
1056 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1057 timeout=self.
config.write_timeout,
1058 execution_profile=
"write",
1062 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1064 """Store catalog of DiaObjects from current visit.
1068 objs : `pandas.DataFrame`
1069 Catalog with DiaObject records
1070 visit_time : `astropy.time.Time`
1071 Time of the current visit.
1072 replica_chunk : `ReplicaChunk` or `None`
1073 Replica chunk identifier if replication is configured.
1076 _LOG.debug(
"No objects to write to database.")
1079 visit_time_dt = visit_time.datetime
1080 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1083 extra_columns[
"validityStart"] = visit_time_dt
1085 if not self.
config.time_partition_tables:
1086 extra_columns[
"apdb_time_part"] = time_part
1091 if replica_chunk
is None or not self.
config.use_insert_id_skips_diaobjects:
1093 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1096 if replica_chunk
is not None:
1097 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1102 table_name: ApdbTables,
1103 sources: pandas.DataFrame,
1104 visit_time: astropy.time.Time,
1105 replica_chunk: ReplicaChunk |
None,
1107 """Store catalog of DIASources or DIAForcedSources from current visit.
1111 table_name : `ApdbTables`
1112 Table where to store the data.
1113 sources : `pandas.DataFrame`
1114 Catalog containing DiaSource records
1115 visit_time : `astropy.time.Time`
1116 Time of the current visit.
1117 replica_chunk : `ReplicaChunk` or `None`
1118 Replica chunk identifier if replication is configured.
1121 extra_columns: dict[str, Any] = {}
1122 if not self.
config.time_partition_tables:
1123 extra_columns[
"apdb_time_part"] = time_part
1126 self.
_storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
1128 if replica_chunk
is not None:
1129 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1130 if table_name
is ApdbTables.DiaSource:
1131 extra_table = ExtraTables.DiaSourceChunks
1133 extra_table = ExtraTables.DiaForcedSourceChunks
1137 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
1139 """Store mapping of diaSourceId to its partitioning values.
1143 sources : `pandas.DataFrame`
1144 Catalog containing DiaSource records
1145 visit_time : `astropy.time.Time`
1146 Time of the current visit.
1148 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1151 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1155 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1160 records: pandas.DataFrame,
1161 table_name: ApdbTables | ExtraTables,
1162 extra_columns: Mapping |
None =
None,
1163 time_part: int |
None =
None,
1165 """Store generic objects.
1167 Takes Pandas catalog and stores a bunch of records in a table.
1171 records : `pandas.DataFrame`
1172 Catalog containing object records
1173 table_name : `ApdbTables`
1174 Name of the table as defined in APDB schema.
1175 extra_columns : `dict`, optional
1176 Mapping (column_name, column_value) which gives fixed values for
1177 columns in each row, overrides values in ``records`` if matching
1178 columns exist there.
1179 time_part : `int`, optional
1180 If not `None` then insert into a per-partition table.
1184 If Pandas catalog contains additional columns not defined in table
1185 schema they are ignored. Catalog does not have to contain all columns
1186 defined in a table, but partition and clustering keys must be present
1187 in a catalog or ``extra_columns``.
1190 if extra_columns
is None:
1192 extra_fields = list(extra_columns.keys())
1195 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1197 column_map = self.
_schema.getColumnMap(table_name)
1199 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1200 fields += extra_fields
1203 required_columns = self.
_schema.partitionColumns(table_name) + self.
_schema.clusteringColumns(
1206 missing_columns = [column
for column
in required_columns
if column
not in fields]
1208 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1210 qfields = [quote_id(field)
for field
in fields]
1211 qfields_str =
",".join(qfields)
1213 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1214 table = self.
_schema.tableName(table_name)
1215 if time_part
is not None:
1216 table = f
"{table}_{time_part}"
1218 holders =
",".join([
"?"] * len(qfields))
1219 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1220 statement = self.
_preparer.prepare(query)
1221 queries = cassandra.query.BatchStatement()
1222 for rec
in records.itertuples(index=
False):
1224 for field
in df_fields:
1225 if field
not in column_map:
1227 value = getattr(rec, field)
1228 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1229 if isinstance(value, pandas.Timestamp):
1230 value = literal(value.to_pydatetime())
1234 value = int(value * 1000)
1235 values.append(literal(value))
1236 for field
in extra_fields:
1237 value = extra_columns[field]
1238 values.append(literal(value))
1239 queries.add(statement, values)
1241 _LOG.debug(
"%s: will store %d records", self.
_schema.tableName(table_name), records.shape[0])
1242 with self.
_timer(
"insert_time", tags={
"table": table_name.name}):
1243 self.
_session.execute(queries, timeout=self.
config.write_timeout, execution_profile=
"write")
1246 """Calculate spatial partition for each record and add it to a
1251 This overrides any existing column in a DataFrame with the same name
1252 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1256 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1257 ra_col, dec_col = self.
config.ra_dec_columns
1258 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1263 df[
"apdb_part"] = apdb_part
1266 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1267 """Add apdb_part column to DiaSource catalog.
1271 This method copies apdb_part value from a matching DiaObject record.
1272 DiaObject catalog needs to have a apdb_part column filled by
1273 ``_add_obj_part`` method and DiaSource records need to be
1274 associated to DiaObjects via ``diaObjectId`` column.
1276 This overrides any existing column in a DataFrame with the same name
1277 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1280 pixel_id_map: dict[int, int] = {
1281 diaObjectId: apdb_part
for diaObjectId, apdb_part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
1283 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1284 ra_col, dec_col = self.
config.ra_dec_columns
1285 for i, (diaObjId, ra, dec)
in enumerate(
1286 zip(sources[
"diaObjectId"], sources[ra_col], sources[dec_col])
1296 apdb_part[i] = pixel_id_map[diaObjId]
1297 sources = sources.copy()
1298 sources[
"apdb_part"] = apdb_part
1301 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1302 """Add apdb_part column to DiaForcedSource catalog.
1306 This method copies apdb_part value from a matching DiaObject record.
1307 DiaObject catalog needs to have a apdb_part column filled by
1308 ``_add_obj_part`` method and DiaSource records need to be
1309 associated to DiaObjects via ``diaObjectId`` column.
1311 This overrides any existing column in a DataFrame with the same name
1312 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1315 pixel_id_map: dict[int, int] = {
1316 diaObjectId: apdb_part
for diaObjectId, apdb_part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
1318 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1319 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
1320 apdb_part[i] = pixel_id_map[diaObjId]
1321 sources = sources.copy()
1322 sources[
"apdb_part"] = apdb_part
1327 """Calculate time partition number for a given time.
1331 time : `float` or `astropy.time.Time`
1332 Time for which to calculate partition number. Can be float to mean
1333 MJD or `astropy.time.Time`
1335 Epoch time for partition 0.
1337 Number of days per partition.
1342 Partition number for a given time.
1344 if isinstance(time, astropy.time.Time):
1345 mjd = float(time.mjd)
1348 days_since_epoch = mjd - epoch_mjd
1349 partition = int(days_since_epoch) // part_days
1353 """Calculate time partition number for a given time.
1357 time : `float` or `astropy.time.Time`
1358 Time for which to calculate partition number. Can be float to mean
1359 MJD or `astropy.time.Time`
1364 Partition number for a given time.
1366 if isinstance(time, astropy.time.Time):
1367 mjd = float(time.mjd)
1371 partition = int(days_since_epoch) // self.
config.time_partition_days
1375 """Make an empty catalog for a table with a given name.
1379 table_name : `ApdbTables`
1384 catalog : `pandas.DataFrame`
1387 table = self.
_schema.tableSchemas[table_name]
1390 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1391 for columnDef
in table.columns
1393 return pandas.DataFrame(data)
1398 where1: list[tuple[str, tuple]],
1399 where2: list[tuple[str, tuple]],
1400 suffix: str |
None =
None,
1401 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1402 """Make cartesian product of two parts of WHERE clause into a series
1403 of statements to execute.
1408 Initial statement prefix that comes before WHERE clause, e.g.
1409 "SELECT * from Table"
1417 for expr1, params1
in where1:
1418 for expr2, params2
in where2:
1422 wheres.append(expr1)
1424 wheres.append(expr2)
1426 full_query +=
" WHERE " +
" AND ".join(wheres)
1428 full_query +=
" " + suffix
1429 params = params1 + params2
1431 statement = self.
_preparer.prepare(full_query)
1436 statement = cassandra.query.SimpleStatement(full_query)
1437 yield (statement, params)
1440 self, region: sphgeom.Region |
None, use_ranges: bool =
False
1441 ) -> list[tuple[str, tuple]]:
1442 """Generate expressions for spatial part of WHERE clause.
1446 region : `sphgeom.Region`
1447 Spatial region for query results.
1449 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1450 p2") instead of exact list of pixels. Should be set to True for
1451 large regions covering very many pixels.
1455 expressions : `list` [ `tuple` ]
1456 Empty list is returned if ``region`` is `None`, otherwise a list
1457 of one or more (expression, parameters) tuples
1463 expressions: list[tuple[str, tuple]] = []
1464 for lower, upper
in pixel_ranges:
1467 expressions.append((
'"apdb_part" = ?', (lower,)))
1469 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1473 if self.
config.query_per_spatial_part:
1474 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1476 pixels_str =
",".join([str(pix)
for pix
in pixels])
1477 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1482 start_time: float | astropy.time.Time,
1483 end_time: float | astropy.time.Time,
1484 query_per_time_part: bool |
None =
None,
1485 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1486 """Generate table names and expressions for temporal part of WHERE
1491 table : `ApdbTables`
1492 Table to select from.
1493 start_time : `astropy.time.Time` or `float`
1494 Starting Datetime of MJD value of the time range.
1495 end_time : `astropy.time.Time` or `float`
1496 Starting Datetime of MJD value of the time range.
1497 query_per_time_part : `bool`, optional
1498 If None then use ``query_per_time_part`` from configuration.
1502 tables : `list` [ `str` ]
1503 List of the table names to query.
1504 expressions : `list` [ `tuple` ]
1505 A list of zero or more (expression, parameters) tuples.
1508 temporal_where: list[tuple[str, tuple]] = []
1509 table_name = self.
_schema.tableName(table)
1512 time_parts = list(range(time_part_start, time_part_end + 1))
1513 if self.
config.time_partition_tables:
1514 tables = [f
"{table_name}_{part}" for part
in time_parts]
1516 tables = [table_name]
1517 if query_per_time_part
is None:
1518 query_per_time_part = self.
config.query_per_time_part
1519 if query_per_time_part:
1520 temporal_where = [(
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts]
1522 time_part_list =
",".join([str(part)
for part
in time_parts])
1523 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1525 return tables, temporal_where
std::vector< SchemaItem< Flag > > * items
str translate(self, str private_ip)
__init__(self, list[str] public_ips, list[str] private_ips)
bool use_insert_id_skips_diaobjects
use_insert_id_skips_diaobjects
None update(self, str json_str)
bool time_partition_tables
__init__(self, ApdbCassandraConfig config)
ApdbCassandraConfig init_database(cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool use_insert_id_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
str metadataReplicaVersionKey
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
_partition_zero_epoch_mjd
bool containsVisitDetector(self, int visit, int detector)
str metadataSchemaVersionKey
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _versionCheck(self, ApdbMetadataCassandra metadata)
__init__(self, ApdbCassandraConfig config)
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
str metadataCodeVersionKey
metadataReplicaVersionKey
None _makeSchema(cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None)
ApdbCassandraReplica get_replica(self)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
pandas.DataFrame getSSObjects(self)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
int countUnassociatedObjects(self)
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertySet * set