22from __future__
import annotations
24__all__ = [
"ApdbCassandra"]
31from collections
import defaultdict
33from typing
import TYPE_CHECKING, Any, cast
42 import cassandra.query
43 from cassandra.query
import UNSET_VALUE
45 CASSANDRA_IMPORTED =
True
47 CASSANDRA_IMPORTED =
False
52from lsst
import sphgeom
53from lsst.utils.iteration
import chunk_iterable
55from ..apdb
import Apdb, ApdbConfig
56from ..apdbConfigFreezer
import ApdbConfigFreezer
57from ..apdbReplica
import ApdbTableData, ReplicaChunk
58from ..apdbSchema
import ApdbSchema, ApdbTables
59from ..monitor
import MonAgent
60from ..schema_model
import Table
61from ..timer
import Timer
62from ..versionTuple
import IncompatibleVersionError, VersionTuple
63from .apdbCassandraAdmin
import ApdbCassandraAdmin
64from .apdbCassandraReplica
import ApdbCassandraReplica
65from .apdbCassandraSchema
import ApdbCassandraSchema, CreateTableOptions, ExtraTables
66from .apdbMetadataCassandra
import ApdbMetadataCassandra
67from .cassandra_utils
import (
73from .config
import ApdbCassandraConfig, ApdbCassandraConnectionConfig, ApdbCassandraTimePartitionRange
74from .connectionContext
import ConnectionContext, DbVersions
75from .exceptions
import CassandraMissingError
76from .partitioner
import Partitioner
77from .sessionFactory
import SessionContext, SessionFactory
80 from ..apdbMetadata
import ApdbMetadata
81 from ..apdbUpdateRecord
import ApdbUpdateRecord
83_LOG = logging.getLogger(__name__)
88"""Version for the code controlling non-replication tables. This needs to be
89updated following compatibility rules when schema produced by this code
95 """Implementation of APDB database with Apache Cassandra backend.
99 config : `ApdbCassandraConfig`
100 Configuration object.
104 if not CASSANDRA_IMPORTED:
116 """Establish connection if not established and return context."""
123 schema_version=self.
_schema.schemaVersion(),
126 ApdbCassandraReplica.apdbReplicaImplementationVersion()
131 _LOG.debug(
"Current versions: %s", current_versions)
134 if _LOG.isEnabledFor(logging.DEBUG):
135 _LOG.debug(
"ApdbCassandra Configuration: %s", self.
_connection_context.config.model_dump())
139 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
140 """Create `Timer` instance given its name."""
141 return Timer(name, _MON, tags=tags)
143 def _versionCheck(self, current_versions: DbVersions, db_versions: DbVersions) ->
None:
144 """Check schema version compatibility."""
145 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
147 f
"Configured schema version {current_versions.schema_version} "
148 f
"is not compatible with database version {db_versions.schema_version}"
150 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
152 f
"Current code version {current_versions.code_version} "
153 f
"is not compatible with database version {db_versions.code_version}"
157 match current_versions.replica_version, db_versions.replica_version:
161 if not current.checkCompatibility(stored):
163 f
"Current replication code version {current} "
164 f
"is not compatible with database version {stored}"
168 f
"Current replication code version {current_versions.replica_version} "
169 f
"is not compatible with database version {db_versions.replica_version}"
174 """Return version number for current APDB implementation.
178 version : `VersionTuple`
179 Version of the code defined in implementation class.
187 def tableDef(self, table: ApdbTables) -> Table |
None:
189 return self.
_schema.tableSchemas.get(table)
194 hosts: tuple[str, ...],
197 schema_file: str |
None =
None,
198 schema_name: str |
None =
None,
199 read_sources_months: int |
None =
None,
200 read_forced_sources_months: int |
None =
None,
201 enable_replica: bool =
False,
202 replica_skips_diaobjects: bool =
False,
203 port: int |
None =
None,
204 username: str |
None =
None,
205 prefix: str |
None =
None,
206 part_pixelization: str |
None =
None,
207 part_pix_level: int |
None =
None,
208 time_partition_tables: bool =
True,
209 time_partition_start: str |
None =
None,
210 time_partition_end: str |
None =
None,
211 read_consistency: str |
None =
None,
212 write_consistency: str |
None =
None,
213 read_timeout: int |
None =
None,
214 write_timeout: int |
None =
None,
215 ra_dec_columns: tuple[str, str] |
None =
None,
216 replication_factor: int |
None =
None,
218 table_options: CreateTableOptions |
None =
None,
219 ) -> ApdbCassandraConfig:
220 """Initialize new APDB instance and make configuration object for it.
224 hosts : `tuple` [`str`, ...]
225 List of host names or IP addresses for Cassandra cluster.
227 Name of the keyspace for APDB tables.
228 schema_file : `str`, optional
229 Location of (YAML) configuration file with APDB schema. If not
230 specified then default location will be used.
231 schema_name : `str`, optional
232 Name of the schema in YAML configuration file. If not specified
233 then default name will be used.
234 read_sources_months : `int`, optional
235 Number of months of history to read from DiaSource.
236 read_forced_sources_months : `int`, optional
237 Number of months of history to read from DiaForcedSource.
238 enable_replica : `bool`, optional
239 If True, make additional tables used for replication to PPDB.
240 replica_skips_diaobjects : `bool`, optional
241 If `True` then do not fill regular ``DiaObject`` table when
242 ``enable_replica`` is `True`.
243 port : `int`, optional
244 Port number to use for Cassandra connections.
245 username : `str`, optional
246 User name for Cassandra connections.
247 prefix : `str`, optional
248 Optional prefix for all table names.
249 part_pixelization : `str`, optional
250 Name of the MOC pixelization used for partitioning.
251 part_pix_level : `int`, optional
253 time_partition_tables : `bool`, optional
254 Create per-partition tables.
255 time_partition_start : `str`, optional
256 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
258 time_partition_end : `str`, optional
259 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
261 read_consistency : `str`, optional
262 Name of the consistency level for read operations.
263 write_consistency : `str`, optional
264 Name of the consistency level for write operations.
265 read_timeout : `int`, optional
266 Read timeout in seconds.
267 write_timeout : `int`, optional
268 Write timeout in seconds.
269 ra_dec_columns : `tuple` [`str`, `str`], optional
270 Names of ra/dec columns in DiaObject table.
271 replication_factor : `int`, optional
272 Replication factor used when creating new keyspace, if keyspace
273 already exists its replication factor is not changed.
274 drop : `bool`, optional
275 If `True` then drop existing tables before re-creating the schema.
276 table_options : `CreateTableOptions`, optional
277 Options used when creating Cassandra tables.
281 config : `ApdbCassandraConfig`
282 Resulting configuration object for a created APDB instance.
291 "idle_heartbeat_interval": 0,
292 "idle_heartbeat_timeout": 30,
293 "control_connection_timeout": 100,
297 contact_points=hosts,
299 enable_replica=enable_replica,
300 replica_skips_diaobjects=replica_skips_diaobjects,
301 connection_config=connection_config,
303 config.partitioning.time_partition_tables = time_partition_tables
304 if schema_file
is not None:
305 config.schema_file = schema_file
306 if schema_name
is not None:
307 config.schema_name = schema_name
308 if read_sources_months
is not None:
309 config.read_sources_months = read_sources_months
310 if read_forced_sources_months
is not None:
311 config.read_forced_sources_months = read_forced_sources_months
313 config.connection_config.port = port
314 if username
is not None:
315 config.connection_config.username = username
316 if prefix
is not None:
317 config.prefix = prefix
318 if part_pixelization
is not None:
319 config.partitioning.part_pixelization = part_pixelization
320 if part_pix_level
is not None:
321 config.partitioning.part_pix_level = part_pix_level
322 if time_partition_start
is not None:
323 config.partitioning.time_partition_start = time_partition_start
324 if time_partition_end
is not None:
325 config.partitioning.time_partition_end = time_partition_end
326 if read_consistency
is not None:
327 config.connection_config.read_consistency = read_consistency
328 if write_consistency
is not None:
329 config.connection_config.write_consistency = write_consistency
330 if read_timeout
is not None:
331 config.connection_config.read_timeout = read_timeout
332 if write_timeout
is not None:
333 config.connection_config.write_timeout = write_timeout
334 if ra_dec_columns
is not None:
335 config.ra_dec_columns = ra_dec_columns
337 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
342 """Return `ApdbReplica` instance for this database."""
353 replication_factor: int |
None =
None,
354 table_options: CreateTableOptions |
None =
None,
358 if not isinstance(config, ApdbCassandraConfig):
359 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
361 simple_schema =
ApdbSchema(config.schema_file, config.schema_name)
366 keyspace=config.keyspace,
367 table_schemas=simple_schema.tableSchemas,
368 prefix=config.prefix,
369 time_partition_tables=config.partitioning.time_partition_tables,
370 enable_replica=config.enable_replica,
371 replica_skips_diaobjects=config.replica_skips_diaobjects,
375 part_range_config: ApdbCassandraTimePartitionRange |
None =
None
376 if config.partitioning.time_partition_tables:
378 time_partition_start = astropy.time.Time(
379 config.partitioning.time_partition_start, format=
"isot", scale=
"tai"
381 time_partition_end = astropy.time.Time(
382 config.partitioning.time_partition_end, format=
"isot", scale=
"tai"
385 start=partitioner.time_partition(time_partition_start),
386 end=partitioner.time_partition(time_partition_end),
390 part_range=part_range_config,
391 replication_factor=replication_factor,
392 table_options=table_options,
396 drop=drop, replication_factor=replication_factor, table_options=table_options
399 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
401 session, meta_table_name, config.keyspace,
"read_tuples",
"write"
406 ConnectionContext.metadataSchemaVersionKey, str(simple_schema.schemaVersion()), force=
True
412 if config.enable_replica:
415 ConnectionContext.metadataReplicaVersionKey,
416 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
421 freezer = ApdbConfigFreezer[ApdbCassandraConfig](ConnectionContext.frozen_parameters)
422 metadata.set(ConnectionContext.metadataConfigKey, freezer.to_json(config), force=
True)
425 if part_range_config:
426 part_range_config.save_to_meta(metadata)
431 config = context.config
433 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=
True)
434 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
437 column_names = context.schema.apdbColumnNames(ApdbTables.DiaObjectLast)
438 what =
",".join(quote_id(column)
for column
in column_names)
440 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
441 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
442 statements: list[tuple] = []
443 for where, params
in sp_where:
444 full_query = f
"{query} WHERE {where}"
446 statement = context.preparer.prepare(full_query)
451 statement = cassandra.query.SimpleStatement(full_query)
452 statements.append((statement, params))
453 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
455 with _MON.context_tags({
"table":
"DiaObject"}):
457 "select_query_stats", values={
"num_sp_part": num_sp_part,
"num_queries": len(statements)}
459 with self.
_timer(
"select_time")
as timer:
466 config.connection_config.read_concurrency,
469 timer.add_values(row_count=len(objects))
471 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
476 region: sphgeom.Region,
477 object_ids: Iterable[int] |
None,
478 visit_time: astropy.time.Time,
479 start_time: astropy.time.Time |
None =
None,
480 ) -> pandas.DataFrame |
None:
483 config = context.config
485 months = config.read_sources_months
486 if start_time
is None and months == 0:
489 mjd_end = float(visit_time.tai.mjd)
490 if start_time
is None:
491 mjd_start = mjd_end - months * 30
493 mjd_start = float(start_time.tai.mjd)
495 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
499 region: sphgeom.Region,
500 object_ids: Iterable[int] |
None,
501 visit_time: astropy.time.Time,
502 start_time: astropy.time.Time |
None =
None,
503 ) -> pandas.DataFrame |
None:
506 config = context.config
508 months = config.read_forced_sources_months
509 if start_time
is None and months == 0:
512 mjd_end = float(visit_time.tai.mjd)
513 if start_time
is None:
514 mjd_start = mjd_end - months * 30
516 mjd_start = float(start_time.tai.mjd)
518 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
525 visit_time: astropy.time.Time,
529 config = context.config
532 if context.has_visit_detector_table:
533 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
535 f
'SELECT count(*) FROM "{self._keyspace}"."{table_name}" WHERE visit = %s AND detector = %s'
537 with self.
_timer(
"contains_visit_detector_time"):
538 result = context.session.execute(query, (visit, detector))
539 return bool(result.one()[0])
546 sp_where, _ = context.partitioner.spatial_where(region, use_ranges=
True, for_prepare=
True)
547 visit_detector_where = (
"visit = ? AND detector = ?", (visit, detector))
551 mjd_start = float(visit_time.tai.mjd) - 1.0 / 24
552 mjd_end = float(visit_time.tai.mjd) + 1.0 / 24
554 statements: list[tuple] = []
555 for table_type
in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
556 tables, temporal_where = context.partitioner.temporal_where(
557 table_type, mjd_start, mjd_end, query_per_time_part=
True, for_prepare=
True
560 prefix = f
'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
562 suffix =
"PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
564 self.
_combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
567 with self.
_timer(
"contains_visit_detector_time"):
569 list[tuple[int] |
None],
574 config.connection_config.read_concurrency,
583 tableName = context.schema.tableName(ApdbTables.SSObject)
584 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
587 with self.
_timer(
"select_time", tags={
"table":
"SSObject"})
as timer:
588 result = context.session.execute(query, execution_profile=
"read_pandas")
589 objects = result._current_rows
590 timer.add_values(row_count=len(objects))
592 _LOG.debug(
"found %s SSObjects", objects.shape[0])
597 visit_time: astropy.time.Time,
598 objects: pandas.DataFrame,
599 sources: pandas.DataFrame |
None =
None,
600 forced_sources: pandas.DataFrame |
None =
None,
604 config = context.config
606 if context.has_visit_detector_table:
610 visit_detector: set[tuple[int, int]] = set()
611 for df
in sources, forced_sources:
612 if df
is not None and not df.empty:
613 df = df[[
"visit",
"detector"]]
614 for visit, detector
in df.itertuples(index=
False):
615 visit_detector.add((visit, detector))
620 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
621 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" (visit, detector) VALUES (%s, %s)'
622 for item
in visit_detector:
623 context.session.execute(query, item, execution_profile=
"write")
626 if sources
is not None:
628 if forced_sources
is not None:
631 replica_chunk: ReplicaChunk |
None =
None
632 if context.schema.replication_enabled:
633 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, config.replica_chunk_seconds)
640 if sources
is not None and len(sources) > 0:
643 subchunk = self.
_storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
646 if forced_sources
is not None and len(forced_sources) > 0:
648 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
658 config = context.config
660 if self.
_schema.has_mjd_timestamps:
661 reassign_time_column =
"ssObjectReassocTimeMjdTai"
662 reassignTime = float(astropy.time.Time.now().tai.mjd)
664 reassign_time_column =
"ssObjectReassocTime"
666 reassignTime = int(datetime.datetime.now(tz=datetime.UTC).timestamp() * 1000)
672 table_name = context.schema.tableName(ExtraTables.DiaSourceToPartition)
674 selects: list[tuple] = []
675 for ids
in chunk_iterable(idMap.keys(), 1_000):
676 ids_str =
",".join(str(item)
for item
in ids)
680 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
681 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
689 list[tuple[int, int, int, int |
None]],
691 context.session, selects,
"read_tuples", config.connection_config.read_concurrency
696 id2partitions: dict[int, tuple[int, int]] = {}
697 id2chunk_id: dict[int, int] = {}
699 id2partitions[row[0]] = row[1:3]
700 if row[3]
is not None:
701 id2chunk_id[row[0]] = row[3]
704 if set(id2partitions) != set(idMap):
705 missing =
",".join(str(item)
for item
in set(idMap) - set(id2partitions))
706 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
709 queries: list[tuple[cassandra.query.PreparedStatement, tuple]] = []
710 for diaSourceId, ssObjectId
in idMap.items():
711 apdb_part, apdb_time_part = id2partitions[diaSourceId]
713 if config.partitioning.time_partition_tables:
714 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part)
716 f
'UPDATE "{self._keyspace}"."{table_name}"'
717 f
' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
718 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
720 values = (ssObjectId, reassignTime, apdb_part, diaSourceId)
722 table_name = context.schema.tableName(ApdbTables.DiaSource)
724 f
'UPDATE "{self._keyspace}"."{table_name}"'
725 f
' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
726 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
728 values = (ssObjectId, reassignTime, apdb_part, apdb_time_part, diaSourceId)
729 queries.append((context.preparer.prepare(query), values))
733 warnings.warn(
"Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
735 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
736 with self.
_timer(
"source_reassign_time")
as timer:
737 execute_concurrent(context.session, queries, execution_profile=
"write")
738 timer.add_values(source_count=len(idMap))
748 raise NotImplementedError()
754 return context.metadata
757 def admin(self) -> ApdbCassandraAdmin:
763 region: sphgeom.Region,
764 object_ids: Iterable[int] |
None,
767 table_name: ApdbTables,
768 ) -> pandas.DataFrame:
769 """Return catalog of DiaSource instances given set of DiaObject IDs.
773 region : `lsst.sphgeom.Region`
776 Collection of DiaObject IDs
778 Lower bound of time interval.
780 Upper bound of time interval.
781 table_name : `ApdbTables`
786 catalog : `pandas.DataFrame`, or `None`
787 Catalog containing DiaSource records. Empty catalog is returned if
788 ``object_ids`` is empty.
791 config = context.config
793 object_id_set: Set[int] = set()
794 if object_ids
is not None:
795 object_id_set = set(object_ids)
796 if len(object_id_set) == 0:
799 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=
True)
800 tables, temporal_where = context.partitioner.temporal_where(
801 table_name, mjd_start, mjd_end, for_prepare=
True, partitons_range=context.time_partitions_range
804 start = astropy.time.Time(mjd_start, format=
"mjd", scale=
"tai")
805 end = astropy.time.Time(mjd_end, format=
"mjd", scale=
"tai")
807 f
"Query time range ({start.isot} - {end.isot}) does not overlap database time partitions."
811 column_names = context.schema.apdbColumnNames(table_name)
812 what =
",".join(quote_id(column)
for column
in column_names)
815 statements: list[tuple] = []
817 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
818 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
819 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
821 with _MON.context_tags({
"table": table_name.name}):
823 "select_query_stats", values={
"num_sp_part": num_sp_part,
"num_queries": len(statements)}
825 with self.
_timer(
"select_time")
as timer:
832 config.connection_config.read_concurrency,
835 timer.add_values(row_count_from_db=len(catalog))
838 if len(object_id_set) > 0:
839 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
842 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
844 timer.add_values(row_count=len(catalog))
846 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
851 config = context.config
854 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
859 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
861 columns = [
"partition",
"apdb_replica_chunk",
"last_update_time",
"unique_id"]
862 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
863 if context.has_chunk_sub_partitions:
864 columns.append(
"has_subchunks")
867 column_list =
", ".join(columns)
868 placeholders =
",".join([
"%s"] * len(columns))
869 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
871 context.session.execute(
874 timeout=config.connection_config.write_timeout,
875 execution_profile=
"write",
879 """Return existing mapping of diaObjectId to its last partition."""
881 config = context.config
883 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
886 for id_chunk
in chunk_iterable(ids, 10_000):
887 id_chunk_list = list(id_chunk)
889 f
'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
890 f
'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
892 queries.append((query, ()))
893 object_count += len(id_chunk_list)
895 with self.
_timer(
"query_object_last_partitions")
as timer:
902 config.connection_config.read_concurrency,
905 timer.add_values(object_count=object_count, row_count=len(data.rows()))
907 if data.column_names() != [
"diaObjectId",
"apdb_part"]:
908 raise RuntimeError(f
"Unexpected column names in query result: {data.column_names()}")
910 return {row[0]: row[1]
for row
in data.rows()}
913 """Objects in DiaObjectsLast can move from one spatial partition to
914 another. For those objects inserting new version does not replace old
915 one, so we need to explicitly remove old versions before inserting new
921 new_partitions = dict(zip(objs[
"diaObjectId"], objs[
"apdb_part"]))
924 moved_oids: dict[int, tuple[int, int]] = {}
925 for oid, old_part
in old_partitions.items():
926 new_part = new_partitions.get(oid, old_part)
927 if new_part != old_part:
928 moved_oids[oid] = (old_part, new_part)
929 _LOG.debug(
"DiaObject IDs that moved to new partition: %s", moved_oids)
933 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
934 query = f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
935 statement = context.preparer.prepare(query)
937 for oid, (old_part, _)
in moved_oids.items():
938 queries.append((statement, (old_part, oid)))
939 with self.
_timer(
"delete_object_last")
as timer:
940 execute_concurrent(context.session, queries, execution_profile=
"write")
941 timer.add_values(row_count=len(moved_oids))
944 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
945 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
946 statement = context.preparer.prepare(query)
949 for oid, new_part
in new_partitions.items():
950 queries.append((statement, (oid, new_part)))
952 with self.
_timer(
"update_object_last_partition")
as timer:
953 execute_concurrent(context.session, queries, execution_profile=
"write")
954 timer.add_values(row_count=len(queries))
957 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
959 """Store catalog of DiaObjects from current visit.
963 objs : `pandas.DataFrame`
964 Catalog with DiaObject records
965 visit_time : `astropy.time.Time`
966 Time of the current visit.
967 replica_chunk : `ReplicaChunk` or `None`
968 Replica chunk identifier if replication is configured.
971 _LOG.debug(
"No objects to write to database.")
975 config = context.config
977 if context.has_dia_object_last_to_partition:
980 timestamp: float | datetime.datetime
981 if self.
_schema.has_mjd_timestamps:
982 validity_start_column =
"validityStartMjdTai"
983 timestamp = float(visit_time.tai.mjd)
985 validity_start_column =
"validityStart"
986 timestamp = visit_time.datetime
989 extra_columns: dict[str, Any] = {}
990 if context.schema.check_column(ApdbTables.DiaObjectLast, validity_start_column):
991 extra_columns[validity_start_column] = timestamp
995 extra_columns[validity_start_column] = timestamp
996 visit_time_part = context.partitioner.time_partition(visit_time)
997 time_part: int |
None = visit_time_part
998 if (time_partitions_range := context.time_partitions_range)
is not None:
1000 if not config.partitioning.time_partition_tables:
1001 extra_columns[
"apdb_time_part"] = time_part
1006 if replica_chunk
is None or not config.replica_skips_diaobjects:
1008 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1011 if replica_chunk
is not None:
1012 extra_columns = {
"apdb_replica_chunk": replica_chunk.id, validity_start_column: timestamp}
1013 table = ExtraTables.DiaObjectChunks
1014 if context.has_chunk_sub_partitions:
1015 table = ExtraTables.DiaObjectChunks2
1019 extra_columns[
"apdb_replica_subchunk"] = random.randrange(config.replica_sub_chunk_count)
1024 table_name: ApdbTables,
1025 sources: pandas.DataFrame,
1026 replica_chunk: ReplicaChunk |
None,
1028 """Store catalog of DIASources or DIAForcedSources from current visit.
1032 table_name : `ApdbTables`
1033 Table where to store the data.
1034 sources : `pandas.DataFrame`
1035 Catalog containing DiaSource records
1036 visit_time : `astropy.time.Time`
1037 Time of the current visit.
1038 replica_chunk : `ReplicaChunk` or `None`
1039 Replica chunk identifier if replication is configured.
1043 subchunk : `int` or `None`
1044 Subchunk number for resulting replica data, `None` if relication is
1045 not enabled ot subchunking is not enabled.
1048 config = context.config
1052 tp_sources = sources.copy(deep=
False)
1053 tp_sources[
"apdb_time_part"] = tp_sources[
"midpointMjdTai"].apply(context.partitioner.time_partition)
1054 if (time_partitions_range := context.time_partitions_range)
is not None:
1056 extra_columns: dict[str, Any] = {}
1057 if not config.partitioning.time_partition_tables:
1061 partitions = set(tp_sources[
"apdb_time_part"])
1062 if len(partitions) == 1:
1064 time_part = partitions.pop()
1068 for time_part, sub_frame
in tp_sources.groupby(by=
"apdb_time_part"):
1069 sub_frame.drop(columns=
"apdb_time_part", inplace=
True)
1072 subchunk: int |
None =
None
1073 if replica_chunk
is not None:
1074 extra_columns = {
"apdb_replica_chunk": replica_chunk.id}
1075 if context.has_chunk_sub_partitions:
1076 subchunk = random.randrange(config.replica_sub_chunk_count)
1077 extra_columns[
"apdb_replica_subchunk"] = subchunk
1078 if table_name
is ApdbTables.DiaSource:
1079 extra_table = ExtraTables.DiaSourceChunks2
1081 extra_table = ExtraTables.DiaForcedSourceChunks2
1083 if table_name
is ApdbTables.DiaSource:
1084 extra_table = ExtraTables.DiaSourceChunks
1086 extra_table = ExtraTables.DiaForcedSourceChunks
1092 self, partitions: Iterable[int], time_partitions_range: ApdbCassandraTimePartitionRange
1094 """Check that time partitons for new data actually exist.
1098 partitions : `~collections.abc.Iterable` [`int`]
1099 Time partitions for new data.
1100 time_partitions_range : `ApdbCassandraTimePartitionRange`
1101 Currrent time partition range.
1103 partitions = set(partitions)
1104 min_part = min(partitions)
1105 max_part = max(partitions)
1106 if min_part < time_partitions_range.start
or max_part > time_partitions_range.end:
1108 "Attempt to store data for time partitions that do not yet exist. "
1109 f
"Partitons for new records: {min_part}-{max_part}. "
1110 f
"Database partitons: {time_partitions_range.start}-{time_partitions_range.end}."
1113 if max_part == time_partitions_range.end:
1115 "Writing into the last temporal partition. Partition range needs to be extended soon.",
1121 sources: pandas.DataFrame,
1122 visit_time: astropy.time.Time,
1123 replica_chunk: ReplicaChunk |
None,
1124 subchunk: int |
None,
1126 """Store mapping of diaSourceId to its partitioning values.
1130 sources : `pandas.DataFrame`
1131 Catalog containing DiaSource records
1132 visit_time : `astropy.time.Time`
1133 Time of the current visit.
1134 replica_chunk : `ReplicaChunk` or `None`
1135 Replication chunk, or `None` when replication is disabled.
1136 subchunk : `int` or `None`
1137 Replication sub-chunk, or `None` when replication is disabled or
1138 sub-chunking is not used.
1142 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1144 "apdb_time_part": context.partitioner.time_partition(visit_time),
1145 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1147 if context.has_chunk_sub_partitions:
1148 extra_columns[
"apdb_replica_subchunk"] = subchunk
1151 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1156 records: pandas.DataFrame,
1157 table_name: ApdbTables | ExtraTables,
1158 extra_columns: Mapping |
None =
None,
1159 time_part: int |
None =
None,
1161 """Store generic objects.
1163 Takes Pandas catalog and stores a bunch of records in a table.
1167 records : `pandas.DataFrame`
1168 Catalog containing object records
1169 table_name : `ApdbTables`
1170 Name of the table as defined in APDB schema.
1171 extra_columns : `dict`, optional
1172 Mapping (column_name, column_value) which gives fixed values for
1173 columns in each row, overrides values in ``records`` if matching
1174 columns exist there.
1175 time_part : `int`, optional
1176 If not `None` then insert into a per-partition table.
1180 If Pandas catalog contains additional columns not defined in table
1181 schema they are ignored. Catalog does not have to contain all columns
1182 defined in a table, but partition and clustering keys must be present
1183 in a catalog or ``extra_columns``.
1188 if extra_columns
is None:
1190 extra_fields = list(extra_columns.keys())
1193 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1195 column_map = context.schema.getColumnMap(table_name)
1197 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1198 fields += extra_fields
1201 partition_columns = context.schema.partitionColumns(table_name)
1202 required_columns = partition_columns + context.schema.clusteringColumns(table_name)
1203 missing_columns = [column
for column
in required_columns
if column
not in fields]
1205 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1207 qfields = [quote_id(field)
for field
in fields]
1208 qfields_str =
",".join(qfields)
1212 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1215 values_by_key: dict[tuple, list[list]] = defaultdict(list)
1216 for rec
in records.itertuples(index=
False):
1218 partitioning_values: dict[str, Any] = {}
1219 for field
in df_fields:
1220 if field
not in column_map:
1222 value = getattr(rec, field)
1223 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1224 if isinstance(value, pandas.Timestamp):
1225 value = value.to_pydatetime()
1226 elif value
is pandas.NaT:
1231 value = int(value * 1000)
1232 value = literal(value)
1233 values.append(UNSET_VALUE
if value
is None else value)
1234 if field
in partition_columns:
1235 partitioning_values[field] = value
1236 for field
in extra_fields:
1237 value = literal(extra_columns[field])
1238 values.append(UNSET_VALUE
if value
is None else value)
1239 if field
in partition_columns:
1240 partitioning_values[field] = value
1242 key = tuple(partitioning_values[field]
for field
in partition_columns)
1243 values_by_key[key].append(values)
1245 table = context.schema.tableName(table_name, time_part)
1247 holders =
",".join([
"?"] * len(qfields))
1248 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1249 statement = context.preparer.prepare(query)
1253 for key_values
in values_by_key.values():
1254 for values_chunk
in chunk_iterable(key_values, batch_size):
1255 batch = cassandra.query.BatchStatement()
1256 for row_values
in values_chunk:
1257 batch.add(statement, row_values)
1258 queries.append((batch,
None))
1259 assert batch.routing_key
is not None and batch.keyspace
is not None
1261 _LOG.debug(
"%s: will store %d records", context.schema.tableName(table_name), records.shape[0])
1262 with self.
_timer(
"insert_time", tags={
"table": table_name.name})
as timer:
1263 execute_concurrent(context.session, queries, execution_profile=
"write")
1264 timer.add_values(row_count=len(records), num_batches=len(queries))
1267 self, records: Iterable[ApdbUpdateRecord], chunk: ReplicaChunk, *, store_chunk: bool =
False
1269 """Store ApdbUpdateRecords in the replica table for those records.
1273 records : `list` [`ApdbUpdateRecord`]
1275 chunk : `ReplicaChunk`
1276 Replica chunk for these records.
1277 store_chunk : `bool`
1278 If True then also store replica chunk.
1283 Raised if replication is not enabled for this instance.
1286 config = context.config
1288 if not context.schema.replication_enabled:
1289 raise TypeError(
"Replication is not enabled for this APDB instance.")
1294 apdb_replica_chunk = chunk.id
1297 update_unique_id = uuid.uuid4()
1300 for record
in records:
1304 record.update_time_ns,
1305 record.update_order,
1311 "apdb_replica_chunk",
1317 if context.has_chunk_sub_partitions:
1318 subchunk = random.randrange(config.replica_sub_chunk_count)
1320 row.append(subchunk)
1321 columns.append(
"apdb_replica_subchunk")
1323 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
1324 placeholders =
", ".join([
"%s"] * len(columns))
1325 columns_str =
", ".join(columns)
1326 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ({columns_str}) VALUES ({placeholders})'
1327 queries = [(query, row)
for row
in rows]
1329 with self.
_timer(
"store_update_record")
as timer:
1330 execute_concurrent(context.session, queries, execution_profile=
"write")
1331 timer.add_values(row_count=len(queries))
1334 """Calculate spatial partition for each record and add it to a
1339 df : `pandas.DataFrame`
1340 DataFrame which has to contain ra/dec columns, names of these
1341 columns are defined by configuration ``ra_dec_columns`` field.
1345 df : `pandas.DataFrame`
1346 DataFrame with ``apdb_part`` column which contains pixel index
1347 for ra/dec coordinates.
1351 This overrides any existing column in a DataFrame with the same name
1352 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1356 config = context.config
1359 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1360 ra_col, dec_col = config.ra_dec_columns
1361 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1363 idx = context.partitioner.pixel(uv3d)
1366 df[
"apdb_part"] = apdb_part
1370 """Make an empty catalog for a table with a given name.
1374 table_name : `ApdbTables`
1379 catalog : `pandas.DataFrame`
1382 table = self.
_schema.tableSchemas[table_name]
1385 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1386 for columnDef
in table.columns
1388 return pandas.DataFrame(data)
1393 where1: list[tuple[str, tuple]],
1394 where2: list[tuple[str, tuple]],
1395 where3: tuple[str, tuple] |
None =
None,
1396 suffix: str |
None =
None,
1397 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1398 """Make cartesian product of two parts of WHERE clause into a series
1399 of statements to execute.
1404 Initial statement prefix that comes before WHERE clause, e.g.
1405 "SELECT * from Table"
1415 for expr1, params1
in where1:
1416 for expr2, params2
in where2:
1419 params = params1 + params2
1421 wheres.append(expr1)
1423 wheres.append(expr2)
1425 wheres.append(where3[0])
1428 full_query +=
" WHERE " +
" AND ".join(wheres)
1430 full_query +=
" " + suffix
1432 statement = context.preparer.prepare(full_query)
1437 statement = cassandra.query.SimpleStatement(full_query)
1438 yield (statement, params)
1441 """Update timestamp columns in input DataFrame to be naive datetime
1444 Clients may or may not generate aware timestamps, code in this class
1445 assumes that timestamps are naive, so we convert them to UTC and
1449 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1450 for column
in columns:
1452 df[column] = df[column].dt.tz_convert(
None)
1456 """Calculate batch size based on config parameters."""
1458 config = context.config
1462 if 0 < config.batch_statement_limit < batch_size:
1463 batch_size = config.batch_statement_limit
1464 if config.batch_size_limit > 0:
1472 row_size = context.schema.table_row_size(table)
1473 row_size += 4 * len(context.schema.getColumnMap(table))
1474 batch_size = min(batch_size, (config.batch_size_limit // row_size) + 1)
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _versionCheck(self, DbVersions current_versions, DbVersions db_versions)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
None _check_time_partitions(self, Iterable[int] partitions, ApdbCassandraTimePartitionRange time_partitions_range)
__init__(self, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
ApdbCassandraReplica get_replica(self)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
pandas.DataFrame getSSObjects(self)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
None _deleteMovingObjects(self, pandas.DataFrame objs)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk)
None _storeUpdateRecords(self, Iterable[ApdbUpdateRecord] records, ReplicaChunk chunk, *, bool store_chunk=False)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
int countUnassociatedObjects(self)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
ConnectionContext|None _connection_context
ConnectionContext _context(self)
ApdbCassandraAdmin admin(self)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
ApdbCassandraConfig getConfig(self)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.