22from __future__
import annotations
24__all__ = [
"ApdbCassandra"]
30from collections
import defaultdict
32from typing
import TYPE_CHECKING, Any, cast
41 import cassandra.query
42 from cassandra.query
import UNSET_VALUE
44 CASSANDRA_IMPORTED =
True
46 CASSANDRA_IMPORTED =
False
51from lsst
import sphgeom
52from lsst.utils.iteration
import chunk_iterable
54from ..apdb
import Apdb, ApdbConfig
55from ..apdbConfigFreezer
import ApdbConfigFreezer
56from ..apdbReplica
import ApdbTableData, ReplicaChunk
57from ..apdbSchema
import ApdbSchema, ApdbTables
58from ..monitor
import MonAgent
59from ..schema_model
import Table
60from ..timer
import Timer
61from ..versionTuple
import IncompatibleVersionError, VersionTuple
62from .apdbCassandraAdmin
import ApdbCassandraAdmin
63from .apdbCassandraReplica
import ApdbCassandraReplica
64from .apdbCassandraSchema
import ApdbCassandraSchema, CreateTableOptions, ExtraTables
65from .apdbMetadataCassandra
import ApdbMetadataCassandra
66from .cassandra_utils
import (
72from .config
import ApdbCassandraConfig, ApdbCassandraConnectionConfig, ApdbCassandraTimePartitionRange
73from .connectionContext
import ConnectionContext, DbVersions
74from .exceptions
import CassandraMissingError
75from .partitioner
import Partitioner
76from .sessionFactory
import SessionContext, SessionFactory
79 from ..apdbMetadata
import ApdbMetadata
81_LOG = logging.getLogger(__name__)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
93 """Implementation of APDB database with Apache Cassandra backend.
97 config : `ApdbCassandraConfig`
102 if not CASSANDRA_IMPORTED:
114 """Establish connection if not established and return context."""
121 schema_version=self.
_schema.schemaVersion(),
124 ApdbCassandraReplica.apdbReplicaImplementationVersion()
129 _LOG.debug(
"Current versions: %s", current_versions)
132 if _LOG.isEnabledFor(logging.DEBUG):
133 _LOG.debug(
"ApdbCassandra Configuration: %s", self.
_connection_context.config.model_dump())
137 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
138 """Create `Timer` instance given its name."""
139 return Timer(name, _MON, tags=tags)
141 def _versionCheck(self, current_versions: DbVersions, db_versions: DbVersions) ->
None:
142 """Check schema version compatibility."""
143 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
145 f
"Configured schema version {current_versions.schema_version} "
146 f
"is not compatible with database version {db_versions.schema_version}"
148 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
150 f
"Current code version {current_versions.code_version} "
151 f
"is not compatible with database version {db_versions.code_version}"
155 match current_versions.replica_version, db_versions.replica_version:
159 if not current.checkCompatibility(stored):
161 f
"Current replication code version {current} "
162 f
"is not compatible with database version {stored}"
166 f
"Current replication code version {current_versions.replica_version} "
167 f
"is not compatible with database version {db_versions.replica_version}"
172 """Return version number for current APDB implementation.
176 version : `VersionTuple`
177 Version of the code defined in implementation class.
185 def tableDef(self, table: ApdbTables) -> Table |
None:
187 return self.
_schema.tableSchemas.get(table)
192 hosts: tuple[str, ...],
195 schema_file: str |
None =
None,
196 schema_name: str |
None =
None,
197 read_sources_months: int |
None =
None,
198 read_forced_sources_months: int |
None =
None,
199 enable_replica: bool =
False,
200 replica_skips_diaobjects: bool =
False,
201 port: int |
None =
None,
202 username: str |
None =
None,
203 prefix: str |
None =
None,
204 part_pixelization: str |
None =
None,
205 part_pix_level: int |
None =
None,
206 time_partition_tables: bool =
True,
207 time_partition_start: str |
None =
None,
208 time_partition_end: str |
None =
None,
209 read_consistency: str |
None =
None,
210 write_consistency: str |
None =
None,
211 read_timeout: int |
None =
None,
212 write_timeout: int |
None =
None,
213 ra_dec_columns: tuple[str, str] |
None =
None,
214 replication_factor: int |
None =
None,
216 table_options: CreateTableOptions |
None =
None,
217 ) -> ApdbCassandraConfig:
218 """Initialize new APDB instance and make configuration object for it.
222 hosts : `tuple` [`str`, ...]
223 List of host names or IP addresses for Cassandra cluster.
225 Name of the keyspace for APDB tables.
226 schema_file : `str`, optional
227 Location of (YAML) configuration file with APDB schema. If not
228 specified then default location will be used.
229 schema_name : `str`, optional
230 Name of the schema in YAML configuration file. If not specified
231 then default name will be used.
232 read_sources_months : `int`, optional
233 Number of months of history to read from DiaSource.
234 read_forced_sources_months : `int`, optional
235 Number of months of history to read from DiaForcedSource.
236 enable_replica : `bool`, optional
237 If True, make additional tables used for replication to PPDB.
238 replica_skips_diaobjects : `bool`, optional
239 If `True` then do not fill regular ``DiaObject`` table when
240 ``enable_replica`` is `True`.
241 port : `int`, optional
242 Port number to use for Cassandra connections.
243 username : `str`, optional
244 User name for Cassandra connections.
245 prefix : `str`, optional
246 Optional prefix for all table names.
247 part_pixelization : `str`, optional
248 Name of the MOC pixelization used for partitioning.
249 part_pix_level : `int`, optional
251 time_partition_tables : `bool`, optional
252 Create per-partition tables.
253 time_partition_start : `str`, optional
254 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
256 time_partition_end : `str`, optional
257 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
259 read_consistency : `str`, optional
260 Name of the consistency level for read operations.
261 write_consistency : `str`, optional
262 Name of the consistency level for write operations.
263 read_timeout : `int`, optional
264 Read timeout in seconds.
265 write_timeout : `int`, optional
266 Write timeout in seconds.
267 ra_dec_columns : `tuple` [`str`, `str`], optional
268 Names of ra/dec columns in DiaObject table.
269 replication_factor : `int`, optional
270 Replication factor used when creating new keyspace, if keyspace
271 already exists its replication factor is not changed.
272 drop : `bool`, optional
273 If `True` then drop existing tables before re-creating the schema.
274 table_options : `CreateTableOptions`, optional
275 Options used when creating Cassandra tables.
279 config : `ApdbCassandraConfig`
280 Resulting configuration object for a created APDB instance.
289 "idle_heartbeat_interval": 0,
290 "idle_heartbeat_timeout": 30,
291 "control_connection_timeout": 100,
295 contact_points=hosts,
297 enable_replica=enable_replica,
298 replica_skips_diaobjects=replica_skips_diaobjects,
299 connection_config=connection_config,
301 config.partitioning.time_partition_tables = time_partition_tables
302 if schema_file
is not None:
303 config.schema_file = schema_file
304 if schema_name
is not None:
305 config.schema_name = schema_name
306 if read_sources_months
is not None:
307 config.read_sources_months = read_sources_months
308 if read_forced_sources_months
is not None:
309 config.read_forced_sources_months = read_forced_sources_months
311 config.connection_config.port = port
312 if username
is not None:
313 config.connection_config.username = username
314 if prefix
is not None:
315 config.prefix = prefix
316 if part_pixelization
is not None:
317 config.partitioning.part_pixelization = part_pixelization
318 if part_pix_level
is not None:
319 config.partitioning.part_pix_level = part_pix_level
320 if time_partition_start
is not None:
321 config.partitioning.time_partition_start = time_partition_start
322 if time_partition_end
is not None:
323 config.partitioning.time_partition_end = time_partition_end
324 if read_consistency
is not None:
325 config.connection_config.read_consistency = read_consistency
326 if write_consistency
is not None:
327 config.connection_config.write_consistency = write_consistency
328 if read_timeout
is not None:
329 config.connection_config.read_timeout = read_timeout
330 if write_timeout
is not None:
331 config.connection_config.write_timeout = write_timeout
332 if ra_dec_columns
is not None:
333 config.ra_dec_columns = ra_dec_columns
335 cls.
_makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
340 """Return `ApdbReplica` instance for this database."""
351 replication_factor: int |
None =
None,
352 table_options: CreateTableOptions |
None =
None,
356 if not isinstance(config, ApdbCassandraConfig):
357 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
359 simple_schema =
ApdbSchema(config.schema_file, config.schema_name)
364 keyspace=config.keyspace,
365 table_schemas=simple_schema.tableSchemas,
366 prefix=config.prefix,
367 time_partition_tables=config.partitioning.time_partition_tables,
368 enable_replica=config.enable_replica,
369 replica_skips_diaobjects=config.replica_skips_diaobjects,
373 part_range_config: ApdbCassandraTimePartitionRange |
None =
None
374 if config.partitioning.time_partition_tables:
376 time_partition_start = astropy.time.Time(
377 config.partitioning.time_partition_start, format=
"isot", scale=
"tai"
379 time_partition_end = astropy.time.Time(
380 config.partitioning.time_partition_end, format=
"isot", scale=
"tai"
383 start=partitioner.time_partition(time_partition_start),
384 end=partitioner.time_partition(time_partition_end),
388 part_range=part_range_config,
389 replication_factor=replication_factor,
390 table_options=table_options,
394 drop=drop, replication_factor=replication_factor, table_options=table_options
397 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
399 session, meta_table_name, config.keyspace,
"read_tuples",
"write"
404 ConnectionContext.metadataSchemaVersionKey, str(simple_schema.schemaVersion()), force=
True
410 if config.enable_replica:
413 ConnectionContext.metadataReplicaVersionKey,
414 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
419 freezer = ApdbConfigFreezer[ApdbCassandraConfig](ConnectionContext.frozen_parameters)
420 metadata.set(ConnectionContext.metadataConfigKey, freezer.to_json(config), force=
True)
423 if part_range_config:
424 part_range_config.save_to_meta(metadata)
429 config = context.config
431 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=
True)
432 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
435 column_names = context.schema.apdbColumnNames(ApdbTables.DiaObjectLast)
436 what =
",".join(quote_id(column)
for column
in column_names)
438 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
439 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
440 statements: list[tuple] = []
441 for where, params
in sp_where:
442 full_query = f
"{query} WHERE {where}"
444 statement = context.preparer.prepare(full_query)
449 statement = cassandra.query.SimpleStatement(full_query)
450 statements.append((statement, params))
451 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
453 with _MON.context_tags({
"table":
"DiaObject"}):
455 "select_query_stats", values={
"num_sp_part": num_sp_part,
"num_queries": len(statements)}
457 with self.
_timer(
"select_time")
as timer:
464 config.connection_config.read_concurrency,
467 timer.add_values(row_count=len(objects))
469 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
473 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
474 ) -> pandas.DataFrame |
None:
477 config = context.config
479 months = config.read_sources_months
482 mjd_end = float(visit_time.mjd)
483 mjd_start = mjd_end - months * 30
485 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
488 self, region: sphgeom.Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
489 ) -> pandas.DataFrame |
None:
492 config = context.config
494 months = config.read_forced_sources_months
497 mjd_end = float(visit_time.mjd)
498 mjd_start = mjd_end - months * 30
500 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
507 visit_time: astropy.time.Time,
511 config = context.config
514 if context.has_visit_detector_table:
515 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
517 f
'SELECT count(*) FROM "{self._keyspace}"."{table_name}" WHERE visit = %s AND detector = %s'
519 with self.
_timer(
"contains_visit_detector_time"):
520 result = context.session.execute(query, (visit, detector))
521 return bool(result.one()[0])
528 sp_where, _ = context.partitioner.spatial_where(region, use_ranges=
True, for_prepare=
True)
529 visit_detector_where = (
"visit = ? AND detector = ?", (visit, detector))
533 mjd_start = float(visit_time.mjd) - 1.0 / 24
534 mjd_end = float(visit_time.mjd) + 1.0 / 24
536 statements: list[tuple] = []
537 for table_type
in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
538 tables, temporal_where = context.partitioner.temporal_where(
539 table_type, mjd_start, mjd_end, query_per_time_part=
True, for_prepare=
True
542 prefix = f
'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
544 suffix =
"PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
546 self.
_combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
549 with self.
_timer(
"contains_visit_detector_time"):
551 list[tuple[int] |
None],
556 config.connection_config.read_concurrency,
565 tableName = context.schema.tableName(ApdbTables.SSObject)
566 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
569 with self.
_timer(
"select_time", tags={
"table":
"SSObject"})
as timer:
570 result = context.session.execute(query, execution_profile=
"read_pandas")
571 objects = result._current_rows
572 timer.add_values(row_count=len(objects))
574 _LOG.debug(
"found %s SSObjects", objects.shape[0])
579 visit_time: astropy.time.Time,
580 objects: pandas.DataFrame,
581 sources: pandas.DataFrame |
None =
None,
582 forced_sources: pandas.DataFrame |
None =
None,
586 config = context.config
588 if context.has_visit_detector_table:
592 visit_detector: set[tuple[int, int]] = set()
593 for df
in sources, forced_sources:
594 if df
is not None and not df.empty:
595 df = df[[
"visit",
"detector"]]
596 for visit, detector
in df.itertuples(index=
False):
597 visit_detector.add((visit, detector))
602 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
603 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" (visit, detector) VALUES (%s, %s)'
604 for item
in visit_detector:
605 context.session.execute(query, item, execution_profile=
"write")
608 if sources
is not None:
610 if forced_sources
is not None:
613 replica_chunk: ReplicaChunk |
None =
None
614 if context.schema.replication_enabled:
615 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, config.replica_chunk_seconds)
622 if sources
is not None and len(sources) > 0:
625 subchunk = self.
_storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
628 if forced_sources
is not None and len(forced_sources) > 0:
630 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
640 config = context.config
642 if self.
_schema.has_mjd_timestamps:
643 reassign_time_column =
"ssObjectReassocTimeMjdTai"
644 reassignTime = astropy.time.Time.now().tai.mjd
646 reassign_time_column =
"ssObjectReassocTime"
648 reassignTime = int(datetime.datetime.now(tz=datetime.UTC).timestamp() * 1000)
654 table_name = context.schema.tableName(ExtraTables.DiaSourceToPartition)
656 selects: list[tuple] = []
657 for ids
in chunk_iterable(idMap.keys(), 1_000):
658 ids_str =
",".join(str(item)
for item
in ids)
662 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
663 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
671 list[tuple[int, int, int, int |
None]],
673 context.session, selects,
"read_tuples", config.connection_config.read_concurrency
678 id2partitions: dict[int, tuple[int, int]] = {}
679 id2chunk_id: dict[int, int] = {}
681 id2partitions[row[0]] = row[1:3]
682 if row[3]
is not None:
683 id2chunk_id[row[0]] = row[3]
686 if set(id2partitions) != set(idMap):
687 missing =
",".join(str(item)
for item
in set(idMap) - set(id2partitions))
688 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
691 queries: list[tuple[cassandra.query.PreparedStatement, tuple]] = []
692 for diaSourceId, ssObjectId
in idMap.items():
693 apdb_part, apdb_time_part = id2partitions[diaSourceId]
695 if config.partitioning.time_partition_tables:
696 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part)
698 f
'UPDATE "{self._keyspace}"."{table_name}"'
699 f
' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
700 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
702 values = (ssObjectId, reassignTime, apdb_part, diaSourceId)
704 table_name = context.schema.tableName(ApdbTables.DiaSource)
706 f
'UPDATE "{self._keyspace}"."{table_name}"'
707 f
' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
708 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
710 values = (ssObjectId, reassignTime, apdb_part, apdb_time_part, diaSourceId)
711 queries.append((context.preparer.prepare(query), values))
715 warnings.warn(
"Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
717 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
718 with self.
_timer(
"source_reassign_time")
as timer:
719 execute_concurrent(context.session, queries, execution_profile=
"write")
720 timer.add_values(source_count=len(idMap))
730 raise NotImplementedError()
736 return context.metadata
739 def admin(self) -> ApdbCassandraAdmin:
745 region: sphgeom.Region,
746 object_ids: Iterable[int] |
None,
749 table_name: ApdbTables,
750 ) -> pandas.DataFrame:
751 """Return catalog of DiaSource instances given set of DiaObject IDs.
755 region : `lsst.sphgeom.Region`
758 Collection of DiaObject IDs
760 Lower bound of time interval.
762 Upper bound of time interval.
763 table_name : `ApdbTables`
768 catalog : `pandas.DataFrame`, or `None`
769 Catalog containing DiaSource records. Empty catalog is returned if
770 ``object_ids`` is empty.
773 config = context.config
775 object_id_set: Set[int] = set()
776 if object_ids
is not None:
777 object_id_set = set(object_ids)
778 if len(object_id_set) == 0:
781 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=
True)
782 tables, temporal_where = context.partitioner.temporal_where(
783 table_name, mjd_start, mjd_end, for_prepare=
True, partitons_range=context.time_partitions_range
786 start = astropy.time.Time(mjd_start, format=
"mjd", scale=
"tai")
787 end = astropy.time.Time(mjd_end, format=
"mjd", scale=
"tai")
789 f
"Query time range ({start.isot} - {end.isot}) does not overlap database time partitions."
793 column_names = context.schema.apdbColumnNames(table_name)
794 what =
",".join(quote_id(column)
for column
in column_names)
797 statements: list[tuple] = []
799 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
800 statements += list(self.
_combine_where(prefix, sp_where, temporal_where))
801 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
803 with _MON.context_tags({
"table": table_name.name}):
805 "select_query_stats", values={
"num_sp_part": num_sp_part,
"num_queries": len(statements)}
807 with self.
_timer(
"select_time")
as timer:
814 config.connection_config.read_concurrency,
817 timer.add_values(row_count_from_db=len(catalog))
820 if len(object_id_set) > 0:
821 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
824 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
826 timer.add_values(row_count=len(catalog))
828 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
833 config = context.config
836 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
841 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
843 columns = [
"partition",
"apdb_replica_chunk",
"last_update_time",
"unique_id"]
844 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
845 if context.has_chunk_sub_partitions:
846 columns.append(
"has_subchunks")
849 column_list =
", ".join(columns)
850 placeholders =
",".join([
"%s"] * len(columns))
851 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
853 context.session.execute(
856 timeout=config.connection_config.write_timeout,
857 execution_profile=
"write",
861 """Return existing mapping of diaObjectId to its last partition."""
863 config = context.config
865 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
868 for id_chunk
in chunk_iterable(ids, 10_000):
869 id_chunk_list = list(id_chunk)
871 f
'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
872 f
'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
874 queries.append((query, ()))
875 object_count += len(id_chunk_list)
877 with self.
_timer(
"query_object_last_partitions")
as timer:
884 config.connection_config.read_concurrency,
887 timer.add_values(object_count=object_count, row_count=len(data.rows()))
889 if data.column_names() != [
"diaObjectId",
"apdb_part"]:
890 raise RuntimeError(f
"Unexpected column names in query result: {data.column_names()}")
892 return {row[0]: row[1]
for row
in data.rows()}
895 """Objects in DiaObjectsLast can move from one spatial partition to
896 another. For those objects inserting new version does not replace old
897 one, so we need to explicitly remove old versions before inserting new
903 new_partitions = dict(zip(objs[
"diaObjectId"], objs[
"apdb_part"]))
906 moved_oids: dict[int, tuple[int, int]] = {}
907 for oid, old_part
in old_partitions.items():
908 new_part = new_partitions.get(oid, old_part)
909 if new_part != old_part:
910 moved_oids[oid] = (old_part, new_part)
911 _LOG.debug(
"DiaObject IDs that moved to new partition: %s", moved_oids)
915 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
916 query = f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
917 statement = context.preparer.prepare(query)
919 for oid, (old_part, _)
in moved_oids.items():
920 queries.append((statement, (old_part, oid)))
921 with self.
_timer(
"delete_object_last")
as timer:
922 execute_concurrent(context.session, queries, execution_profile=
"write")
923 timer.add_values(row_count=len(moved_oids))
926 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
927 query = f
'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
928 statement = context.preparer.prepare(query)
931 for oid, new_part
in new_partitions.items():
932 queries.append((statement, (oid, new_part)))
934 with self.
_timer(
"update_object_last_partition")
as timer:
935 execute_concurrent(context.session, queries, execution_profile=
"write")
936 timer.add_values(row_count=len(queries))
939 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk |
None
941 """Store catalog of DiaObjects from current visit.
945 objs : `pandas.DataFrame`
946 Catalog with DiaObject records
947 visit_time : `astropy.time.Time`
948 Time of the current visit.
949 replica_chunk : `ReplicaChunk` or `None`
950 Replica chunk identifier if replication is configured.
953 _LOG.debug(
"No objects to write to database.")
957 config = context.config
959 if context.has_dia_object_last_to_partition:
962 if self.
_schema.has_mjd_timestamps:
963 validity_start_column =
"validityStartMjdTai"
964 timestamp = visit_time.tai.mjd
966 validity_start_column =
"validityStart"
967 timestamp = visit_time.datetime
971 extra_columns = {validity_start_column: timestamp}
972 visit_time_part = context.partitioner.time_partition(visit_time)
973 time_part: int |
None = visit_time_part
974 if (time_partitions_range := context.time_partitions_range)
is not None:
976 if not config.partitioning.time_partition_tables:
977 extra_columns[
"apdb_time_part"] = time_part
982 if replica_chunk
is None or not config.replica_skips_diaobjects:
984 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
987 if replica_chunk
is not None:
988 extra_columns = {
"apdb_replica_chunk": replica_chunk.id, validity_start_column: timestamp}
989 table = ExtraTables.DiaObjectChunks
990 if context.has_chunk_sub_partitions:
991 table = ExtraTables.DiaObjectChunks2
995 extra_columns[
"apdb_replica_subchunk"] = random.randrange(config.replica_sub_chunk_count)
1000 table_name: ApdbTables,
1001 sources: pandas.DataFrame,
1002 replica_chunk: ReplicaChunk |
None,
1004 """Store catalog of DIASources or DIAForcedSources from current visit.
1008 table_name : `ApdbTables`
1009 Table where to store the data.
1010 sources : `pandas.DataFrame`
1011 Catalog containing DiaSource records
1012 visit_time : `astropy.time.Time`
1013 Time of the current visit.
1014 replica_chunk : `ReplicaChunk` or `None`
1015 Replica chunk identifier if replication is configured.
1019 subchunk : `int` or `None`
1020 Subchunk number for resulting replica data, `None` if relication is
1021 not enabled ot subchunking is not enabled.
1024 config = context.config
1028 tp_sources = sources.copy(deep=
False)
1029 tp_sources[
"apdb_time_part"] = tp_sources[
"midpointMjdTai"].apply(context.partitioner.time_partition)
1030 if (time_partitions_range := context.time_partitions_range)
is not None:
1032 extra_columns: dict[str, Any] = {}
1033 if not config.partitioning.time_partition_tables:
1037 partitions = set(tp_sources[
"apdb_time_part"])
1038 if len(partitions) == 1:
1040 time_part = partitions.pop()
1044 for time_part, sub_frame
in tp_sources.groupby(by=
"apdb_time_part"):
1045 sub_frame.drop(columns=
"apdb_time_part", inplace=
True)
1048 subchunk: int |
None =
None
1049 if replica_chunk
is not None:
1050 extra_columns = {
"apdb_replica_chunk": replica_chunk.id}
1051 if context.has_chunk_sub_partitions:
1052 subchunk = random.randrange(config.replica_sub_chunk_count)
1053 extra_columns[
"apdb_replica_subchunk"] = subchunk
1054 if table_name
is ApdbTables.DiaSource:
1055 extra_table = ExtraTables.DiaSourceChunks2
1057 extra_table = ExtraTables.DiaForcedSourceChunks2
1059 if table_name
is ApdbTables.DiaSource:
1060 extra_table = ExtraTables.DiaSourceChunks
1062 extra_table = ExtraTables.DiaForcedSourceChunks
1068 self, partitions: Iterable[int], time_partitions_range: ApdbCassandraTimePartitionRange
1070 """Check that time partitons for new data actually exist.
1074 partitions : `~collections.abc.Iterable` [`int`]
1075 Time partitions for new data.
1076 time_partitions_range : `ApdbCassandraTimePartitionRange`
1077 Currrent time partition range.
1079 partitions = set(partitions)
1080 min_part = min(partitions)
1081 max_part = max(partitions)
1082 if min_part < time_partitions_range.start
or max_part > time_partitions_range.end:
1084 "Attempt to store data for time partitions that do not yet exist. "
1085 f
"Partitons for new records: {min_part}-{max_part}. "
1086 f
"Database partitons: {time_partitions_range.start}-{time_partitions_range.end}."
1089 if max_part == time_partitions_range.end:
1091 "Writing into the last temporal partition. Partition range needs to be extended soon.",
1097 sources: pandas.DataFrame,
1098 visit_time: astropy.time.Time,
1099 replica_chunk: ReplicaChunk |
None,
1100 subchunk: int |
None,
1102 """Store mapping of diaSourceId to its partitioning values.
1106 sources : `pandas.DataFrame`
1107 Catalog containing DiaSource records
1108 visit_time : `astropy.time.Time`
1109 Time of the current visit.
1110 replica_chunk : `ReplicaChunk` or `None`
1111 Replication chunk, or `None` when replication is disabled.
1112 subchunk : `int` or `None`
1113 Replication sub-chunk, or `None` when replication is disabled or
1114 sub-chunking is not used.
1118 id_map = cast(pandas.DataFrame, sources[[
"diaSourceId",
"apdb_part"]])
1120 "apdb_time_part": context.partitioner.time_partition(visit_time),
1121 "apdb_replica_chunk": replica_chunk.id
if replica_chunk
is not None else None,
1123 if context.has_chunk_sub_partitions:
1124 extra_columns[
"apdb_replica_subchunk"] = subchunk
1127 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
1132 records: pandas.DataFrame,
1133 table_name: ApdbTables | ExtraTables,
1134 extra_columns: Mapping |
None =
None,
1135 time_part: int |
None =
None,
1137 """Store generic objects.
1139 Takes Pandas catalog and stores a bunch of records in a table.
1143 records : `pandas.DataFrame`
1144 Catalog containing object records
1145 table_name : `ApdbTables`
1146 Name of the table as defined in APDB schema.
1147 extra_columns : `dict`, optional
1148 Mapping (column_name, column_value) which gives fixed values for
1149 columns in each row, overrides values in ``records`` if matching
1150 columns exist there.
1151 time_part : `int`, optional
1152 If not `None` then insert into a per-partition table.
1156 If Pandas catalog contains additional columns not defined in table
1157 schema they are ignored. Catalog does not have to contain all columns
1158 defined in a table, but partition and clustering keys must be present
1159 in a catalog or ``extra_columns``.
1164 if extra_columns
is None:
1166 extra_fields = list(extra_columns.keys())
1169 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
1171 column_map = context.schema.getColumnMap(table_name)
1173 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
1174 fields += extra_fields
1177 partition_columns = context.schema.partitionColumns(table_name)
1178 required_columns = partition_columns + context.schema.clusteringColumns(table_name)
1179 missing_columns = [column
for column
in required_columns
if column
not in fields]
1181 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
1183 qfields = [quote_id(field)
for field
in fields]
1184 qfields_str =
",".join(qfields)
1188 with self.
_timer(
"insert_build_time", tags={
"table": table_name.name}):
1191 values_by_key: dict[tuple, list[list]] = defaultdict(list)
1192 for rec
in records.itertuples(index=
False):
1194 partitioning_values: dict[str, Any] = {}
1195 for field
in df_fields:
1196 if field
not in column_map:
1198 value = getattr(rec, field)
1199 if column_map[field].datatype
is felis.datamodel.DataType.timestamp:
1200 if isinstance(value, pandas.Timestamp):
1201 value = value.to_pydatetime()
1202 elif value
is pandas.NaT:
1207 value = int(value * 1000)
1208 value = literal(value)
1209 values.append(UNSET_VALUE
if value
is None else value)
1210 if field
in partition_columns:
1211 partitioning_values[field] = value
1212 for field
in extra_fields:
1213 value = literal(extra_columns[field])
1214 values.append(UNSET_VALUE
if value
is None else value)
1215 if field
in partition_columns:
1216 partitioning_values[field] = value
1218 key = tuple(partitioning_values[field]
for field
in partition_columns)
1219 values_by_key[key].append(values)
1221 table = context.schema.tableName(table_name, time_part)
1223 holders =
",".join([
"?"] * len(qfields))
1224 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1225 statement = context.preparer.prepare(query)
1229 for key_values
in values_by_key.values():
1230 for values_chunk
in chunk_iterable(key_values, batch_size):
1231 batch = cassandra.query.BatchStatement()
1232 for row_values
in values_chunk:
1233 batch.add(statement, row_values)
1234 queries.append((batch,
None))
1235 assert batch.routing_key
is not None and batch.keyspace
is not None
1237 _LOG.debug(
"%s: will store %d records", context.schema.tableName(table_name), records.shape[0])
1238 with self.
_timer(
"insert_time", tags={
"table": table_name.name})
as timer:
1239 execute_concurrent(context.session, queries, execution_profile=
"write")
1240 timer.add_values(row_count=len(records), num_batches=len(queries))
1243 """Calculate spatial partition for each record and add it to a
1248 df : `pandas.DataFrame`
1249 DataFrame which has to contain ra/dec columns, names of these
1250 columns are defined by configuration ``ra_dec_columns`` field.
1254 df : `pandas.DataFrame`
1255 DataFrame with ``apdb_part`` column which contains pixel index
1256 for ra/dec coordinates.
1260 This overrides any existing column in a DataFrame with the same name
1261 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1265 config = context.config
1268 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1269 ra_col, dec_col = config.ra_dec_columns
1270 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1272 idx = context.partitioner.pixel(uv3d)
1275 df[
"apdb_part"] = apdb_part
1279 """Make an empty catalog for a table with a given name.
1283 table_name : `ApdbTables`
1288 catalog : `pandas.DataFrame`
1291 table = self.
_schema.tableSchemas[table_name]
1294 columnDef.name: pandas.Series(dtype=self.
_schema.column_dtype(columnDef.datatype))
1295 for columnDef
in table.columns
1297 return pandas.DataFrame(data)
1302 where1: list[tuple[str, tuple]],
1303 where2: list[tuple[str, tuple]],
1304 where3: tuple[str, tuple] |
None =
None,
1305 suffix: str |
None =
None,
1306 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1307 """Make cartesian product of two parts of WHERE clause into a series
1308 of statements to execute.
1313 Initial statement prefix that comes before WHERE clause, e.g.
1314 "SELECT * from Table"
1324 for expr1, params1
in where1:
1325 for expr2, params2
in where2:
1328 params = params1 + params2
1330 wheres.append(expr1)
1332 wheres.append(expr2)
1334 wheres.append(where3[0])
1337 full_query +=
" WHERE " +
" AND ".join(wheres)
1339 full_query +=
" " + suffix
1341 statement = context.preparer.prepare(full_query)
1346 statement = cassandra.query.SimpleStatement(full_query)
1347 yield (statement, params)
1350 """Update timestamp columns in input DataFrame to be naive datetime
1353 Clients may or may not generate aware timestamps, code in this class
1354 assumes that timestamps are naive, so we convert them to UTC and
1358 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1359 for column
in columns:
1361 df[column] = df[column].dt.tz_convert(
None)
1365 """Calculate batch size based on config parameters."""
1367 config = context.config
1371 if 0 < config.batch_statement_limit < batch_size:
1372 batch_size = config.batch_statement_limit
1373 if config.batch_size_limit > 0:
1381 row_size = context.schema.table_row_size(table)
1382 row_size += 4 * len(context.schema.getColumnMap(table))
1383 batch_size = min(batch_size, (config.batch_size_limit // row_size) + 1)
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
None _versionCheck(self, DbVersions current_versions, DbVersions db_versions)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
None _check_time_partitions(self, Iterable[int] partitions, ApdbCassandraTimePartitionRange time_partitions_range)
__init__(self, ApdbCassandraConfig config)
ApdbMetadata metadata(self)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
VersionTuple apdbImplementationVersion(cls)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
ApdbCassandraReplica get_replica(self)
pandas.DataFrame getSSObjects(self)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
None _deleteMovingObjects(self, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
int countUnassociatedObjects(self)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
ConnectionContext|None _connection_context
ConnectionContext _context(self)
ApdbCassandraAdmin admin(self)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
ApdbCassandraConfig getConfig(self)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.