Loading [MathJax]/extensions/tex2jax.js
LSST Applications g0fba68d861+83433b07ee,g16d25e1f1b+23bc9e47ac,g1ec0fe41b4+3ea9d11450,g1fd858c14a+9be2b0f3b9,g2440f9efcc+8c5ae1fdc5,g35bb328faa+8c5ae1fdc5,g4a4af6cd76+d25431c27e,g4d2262a081+c74e83464e,g53246c7159+8c5ae1fdc5,g55585698de+1e04e59700,g56a49b3a55+92a7603e7a,g60b5630c4e+1e04e59700,g67b6fd64d1+3fc8cb0b9e,g78460c75b0+7e33a9eb6d,g786e29fd12+668abc6043,g8352419a5c+8c5ae1fdc5,g8852436030+60e38ee5ff,g89139ef638+3fc8cb0b9e,g94187f82dc+1e04e59700,g989de1cb63+3fc8cb0b9e,g9d31334357+1e04e59700,g9f33ca652e+0a83e03614,gabe3b4be73+8856018cbb,gabf8522325+977d9fabaf,gb1101e3267+8b4b9c8ed7,gb89ab40317+3fc8cb0b9e,gc0af124501+57ccba3ad1,gcf25f946ba+60e38ee5ff,gd6cbbdb0b4+1cc2750d2e,gd794735e4e+7be992507c,gdb1c4ca869+be65c9c1d7,gde0f65d7ad+c7f52e58fe,ge278dab8ac+6b863515ed,ge410e46f29+3fc8cb0b9e,gf35d7ec915+97dd712d81,gf5e32f922b+8c5ae1fdc5,gf618743f1b+747388abfa,gf67bdafdda+3fc8cb0b9e,w.2025.18
LSST Data Management Base Package
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandra"]
25
26import dataclasses
27import logging
28import random
29import warnings
30from collections.abc import Iterable, Iterator, Mapping, Set
31from typing import TYPE_CHECKING, Any, cast
32
33import numpy as np
34import pandas
35
36# If cassandra-driver is not there the module can still be imported
37# but ApdbCassandra cannot be instantiated.
38try:
39 import cassandra
40 import cassandra.query
41 from cassandra.auth import AuthProvider, PlainTextAuthProvider
42 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
43 from cassandra.policies import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
44 from cassandra.query import UNSET_VALUE
45
46 CASSANDRA_IMPORTED = True
47except ImportError:
48 CASSANDRA_IMPORTED = False
49
50import astropy.time
51import felis.datamodel
52from lsst import sphgeom
53from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
54from lsst.utils.iteration import chunk_iterable
55
56from ..apdb import Apdb, ApdbConfig
57from ..apdbConfigFreezer import ApdbConfigFreezer
58from ..apdbReplica import ApdbTableData, ReplicaChunk
59from ..apdbSchema import ApdbTables
60from ..monitor import MonAgent
61from ..pixelization import Pixelization
62from ..schema_model import Table
63from ..timer import Timer
64from ..versionTuple import IncompatibleVersionError, VersionTuple
65from .apdbCassandraReplica import ApdbCassandraReplica
66from .apdbCassandraSchema import ApdbCassandraSchema, CreateTableOptions, ExtraTables
67from .apdbMetadataCassandra import ApdbMetadataCassandra
68from .cassandra_utils import (
69 PreparedStatementCache,
70 literal,
71 pandas_dataframe_factory,
72 quote_id,
73 raw_data_factory,
74 select_concurrent,
75)
76from .config import ApdbCassandraConfig, ApdbCassandraConnectionConfig
77
78if TYPE_CHECKING:
79 from ..apdbMetadata import ApdbMetadata
80
81_LOG = logging.getLogger(__name__)
82
83_MON = MonAgent(__name__)
84
85VERSION = VersionTuple(0, 1, 1)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
88changes.
89"""
90
91
92def _dump_query(rf: Any) -> None:
93 """Dump cassandra query to debug log."""
94 _LOG.debug("Cassandra query: %s", rf.query)
95
96
97class CassandraMissingError(Exception):
98 def __init__(self) -> None:
99 super().__init__("cassandra-driver module cannot be imported")
100
101
102@dataclasses.dataclass
104 """Collection of information about a specific database."""
105
106 name: str
107 """Keyspace name."""
108
109 permissions: dict[str, set[str]] | None = None
110 """Roles that can access the database and their permissions.
111
112 `None` means that authentication information is not accessible due to
113 system table permissions. If anonymous access is enabled then dictionary
114 will be empty but not `None`.
115 """
116
117
118@dataclasses.dataclass
120 """Versions defined in APDB metadata table."""
121
122 schema_version: VersionTuple
123 """Version of the schema from which database was created."""
124
125 code_version: VersionTuple
126 """Version of ApdbCassandra with which database was created."""
127
128 replica_version: VersionTuple | None
129 """Version of ApdbCassandraReplica with which database was created, None
130 if replication was not configured.
131 """
132
133
134if CASSANDRA_IMPORTED:
135
136 class _AddressTranslator(AddressTranslator):
137 """Translate internal IP address to external.
138
139 Only used for docker-based setup, not viable long-term solution.
140 """
141
142 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
143 self._map = dict((k, v) for k, v in zip(private_ips, public_ips))
144
145 def translate(self, private_ip: str) -> str:
146 return self._map.get(private_ip, private_ip)
147
148
150 """Implementation of APDB database on to of Apache Cassandra.
151
152 The implementation is configured via standard ``pex_config`` mechanism
153 using `ApdbCassandraConfig` configuration class. For an example of
154 different configurations check config/ folder.
155
156 Parameters
157 ----------
158 config : `ApdbCassandraConfig`
159 Configuration object.
160 """
161
162 metadataSchemaVersionKey = "version:schema"
163 """Name of the metadata key to store schema version number."""
164
165 metadataCodeVersionKey = "version:ApdbCassandra"
166 """Name of the metadata key to store code version number."""
167
168 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
169 """Name of the metadata key to store replica code version number."""
170
171 metadataConfigKey = "config:apdb-cassandra.json"
172 """Name of the metadata key to store code version number."""
173
174 _frozen_parameters = (
175 "enable_replica",
176 "ra_dec_columns",
177 "replica_skips_diaobjects",
178 "replica_sub_chunk_count",
179 "partitioning.part_pixelization",
180 "partitioning.part_pix_level",
181 "partitioning.time_partition_tables",
182 "partitioning.time_partition_days",
183 )
184 """Names of the config parameters to be frozen in metadata table."""
185
186 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
187 """Start time for partition 0, this should never be changed."""
188
189 def __init__(self, config: ApdbCassandraConfig):
190 if not CASSANDRA_IMPORTED:
192
193 self._keyspace = config.keyspace
194
195 self._cluster, self._session = self._make_session(config)
196
197 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
199 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
200 )
201
202 # Read frozen config from metadata.
203 with self._timer("read_metadata_config"):
204 config_json = self._metadata.get(self.metadataConfigKey)
205 if config_json is not None:
206 # Update config from metadata.
207 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
208 self.config = freezer.update(config, config_json)
209 else:
210 self.config = config
211
212 # Read versions stored in database.
214 _LOG.debug("Database versions: %s", self._db_versions)
215
216 # Since replica version 1.1.0 we use finer partitioning for replica
217 # chunk tables.
219 if self.config.enable_replica:
220 assert self._db_versions.replica_version is not None, "Replica version must be defined"
221 self._has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
222 self._db_versions.replica_version
223 )
224
226 self.config.partitioning.part_pixelization,
227 self.config.partitioning.part_pix_level,
228 config.partitioning.part_pix_max_ranges,
229 )
230
232 session=self._session,
233 keyspace=self._keyspace,
234 schema_file=self.config.schema_file,
235 schema_name=self.config.schema_name,
236 prefix=self.config.prefix,
237 time_partition_tables=self.config.partitioning.time_partition_tables,
238 enable_replica=self.config.enable_replica,
239 has_chunk_sub_partitions=self._has_chunk_sub_partitions,
240 )
242
243 current_versions = _DbVersions(
244 schema_version=self._schema.schemaVersion(),
245 code_version=self.apdbImplementationVersion(),
246 replica_version=(
247 ApdbCassandraReplica.apdbReplicaImplementationVersion()
248 if self.config.enable_replica
249 else None
250 ),
251 )
252 _LOG.debug("Current versions: %s", current_versions)
253 self._versionCheck(current_versions, self._db_versions)
254
255 # Since replica version 1.1.0 we use finer partitioning for replica
256 # chunk tables.
257 self._has_chunk_sub_partitions = False
258 if self.config.enable_replica:
259 assert self._db_versions.replica_version is not None, "Replica version must be defined"
260 self._has_chunk_sub_partitions = ApdbCassandraReplica.hasChunkSubPartitions(
261 self._db_versions.replica_version
262 )
263
264 # Support for DiaObjectLastToPartition was added at code version 0.1.1
265 # in a backward-compatible way (we only use the table if it is there).
267
268 # Cache for prepared statements
270
271 if _LOG.isEnabledFor(logging.DEBUG):
272 _LOG.debug("ApdbCassandra Configuration: %s", self.config.model_dump())
273
274 def __del__(self) -> None:
275 if hasattr(self, "_cluster"):
276 self._cluster.shutdown()
277
278 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
279 """Create `Timer` instance given its name."""
280 return Timer(name, _MON, tags=tags)
281
282 @classmethod
283 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
284 """Make Cassandra session."""
285 addressTranslator: AddressTranslator | None = None
286 if config.connection_config.private_ips:
287 addressTranslator = _AddressTranslator(
288 config.contact_points, config.connection_config.private_ips
289 )
290
291 with Timer("cluster_connect", _MON):
292 cluster = Cluster(
293 execution_profiles=cls._makeProfiles(config),
294 contact_points=config.contact_points,
295 port=config.connection_config.port,
296 address_translator=addressTranslator,
297 protocol_version=config.connection_config.protocol_version,
298 auth_provider=cls._make_auth_provider(config),
299 **config.connection_config.extra_parameters,
300 )
301 session = cluster.connect()
302
303 # Dump queries if debug level is enabled.
304 if _LOG.isEnabledFor(logging.DEBUG):
305 session.add_request_init_listener(_dump_query)
306
307 # Disable result paging
308 session.default_fetch_size = None
309
310 return cluster, session
311
312 @classmethod
313 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
314 """Make Cassandra authentication provider instance."""
315 try:
316 dbauth = DbAuth()
317 except DbAuthNotFoundError:
318 # Credentials file doesn't exist, use anonymous login.
319 return None
320
321 empty_username = True
322 # Try every contact point in turn.
323 for hostname in config.contact_points:
324 try:
325 username, password = dbauth.getAuth(
326 "cassandra",
327 config.connection_config.username,
328 hostname,
329 config.connection_config.port,
330 config.keyspace,
331 )
332 if not username:
333 # Password without user name, try next hostname, but give
334 # warning later if no better match is found.
335 empty_username = True
336 else:
337 return PlainTextAuthProvider(username=username, password=password)
338 except DbAuthNotFoundError:
339 pass
340
341 if empty_username:
342 _LOG.warning(
343 f"Credentials file ({dbauth.db_auth_path}) provided password but not "
344 "user name, anonymous Cassandra logon will be attempted."
345 )
346
347 return None
348
349 def _readVersions(self, metadata: ApdbMetadataCassandra) -> _DbVersions:
350 """Check schema version compatibility."""
351
352 def _get_version(key: str) -> VersionTuple:
353 """Retrieve version number from given metadata key."""
354 version_str = metadata.get(key)
355 if version_str is None:
356 # Should not happen with existing metadata table.
357 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
358 return VersionTuple.fromString(version_str)
359
360 db_schema_version = _get_version(self.metadataSchemaVersionKey)
361 db_code_version = _get_version(self.metadataCodeVersionKey)
362
363 # Check replica code version only if replica is enabled.
364 db_replica_version: VersionTuple | None = None
365 if self.config.enable_replica:
366 db_replica_version = _get_version(self.metadataReplicaVersionKey)
367
368 return _DbVersions(
369 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
370 )
371
372 def _versionCheck(self, current_versions: _DbVersions, db_versions: _DbVersions) -> None:
373 """Check schema version compatibility."""
374 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
376 f"Configured schema version {current_versions.schema_version} "
377 f"is not compatible with database version {db_versions.schema_version}"
378 )
379 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
381 f"Current code version {current_versions.code_version} "
382 f"is not compatible with database version {db_versions.code_version}"
383 )
384
385 # Check replica code version only if replica is enabled.
386 match current_versions.replica_version, db_versions.replica_version:
387 case None, None:
388 pass
389 case VersionTuple() as current, VersionTuple() as stored:
390 if not current.checkCompatibility(stored):
392 f"Current replication code version {current} "
393 f"is not compatible with database version {stored}"
394 )
395 case _:
397 f"Current replication code version {current_versions.replica_version} "
398 f"is not compatible with database version {db_versions.replica_version}"
399 )
400
401 @classmethod
402 def apdbImplementationVersion(cls) -> VersionTuple:
403 """Return version number for current APDB implementation.
404
405 Returns
406 -------
407 version : `VersionTuple`
408 Version of the code defined in implementation class.
409 """
410 return VERSION
411
412 def tableDef(self, table: ApdbTables) -> Table | None:
413 # docstring is inherited from a base class
414 return self._schema.tableSchemas.get(table)
415
416 @classmethod
418 cls,
419 hosts: tuple[str, ...],
420 keyspace: str,
421 *,
422 schema_file: str | None = None,
423 schema_name: str | None = None,
424 read_sources_months: int | None = None,
425 read_forced_sources_months: int | None = None,
426 enable_replica: bool = False,
427 replica_skips_diaobjects: bool = False,
428 port: int | None = None,
429 username: str | None = None,
430 prefix: str | None = None,
431 part_pixelization: str | None = None,
432 part_pix_level: int | None = None,
433 time_partition_tables: bool = True,
434 time_partition_start: str | None = None,
435 time_partition_end: str | None = None,
436 read_consistency: str | None = None,
437 write_consistency: str | None = None,
438 read_timeout: int | None = None,
439 write_timeout: int | None = None,
440 ra_dec_columns: tuple[str, str] | None = None,
441 replication_factor: int | None = None,
442 drop: bool = False,
443 table_options: CreateTableOptions | None = None,
444 ) -> ApdbCassandraConfig:
445 """Initialize new APDB instance and make configuration object for it.
446
447 Parameters
448 ----------
449 hosts : `tuple` [`str`, ...]
450 List of host names or IP addresses for Cassandra cluster.
451 keyspace : `str`
452 Name of the keyspace for APDB tables.
453 schema_file : `str`, optional
454 Location of (YAML) configuration file with APDB schema. If not
455 specified then default location will be used.
456 schema_name : `str`, optional
457 Name of the schema in YAML configuration file. If not specified
458 then default name will be used.
459 read_sources_months : `int`, optional
460 Number of months of history to read from DiaSource.
461 read_forced_sources_months : `int`, optional
462 Number of months of history to read from DiaForcedSource.
463 enable_replica : `bool`, optional
464 If True, make additional tables used for replication to PPDB.
465 replica_skips_diaobjects : `bool`, optional
466 If `True` then do not fill regular ``DiaObject`` table when
467 ``enable_replica`` is `True`.
468 port : `int`, optional
469 Port number to use for Cassandra connections.
470 username : `str`, optional
471 User name for Cassandra connections.
472 prefix : `str`, optional
473 Optional prefix for all table names.
474 part_pixelization : `str`, optional
475 Name of the MOC pixelization used for partitioning.
476 part_pix_level : `int`, optional
477 Pixelization level.
478 time_partition_tables : `bool`, optional
479 Create per-partition tables.
480 time_partition_start : `str`, optional
481 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
482 format, in TAI.
483 time_partition_end : `str`, optional
484 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
485 format, in TAI.
486 read_consistency : `str`, optional
487 Name of the consistency level for read operations.
488 write_consistency : `str`, optional
489 Name of the consistency level for write operations.
490 read_timeout : `int`, optional
491 Read timeout in seconds.
492 write_timeout : `int`, optional
493 Write timeout in seconds.
494 ra_dec_columns : `tuple` [`str`, `str`], optional
495 Names of ra/dec columns in DiaObject table.
496 replication_factor : `int`, optional
497 Replication factor used when creating new keyspace, if keyspace
498 already exists its replication factor is not changed.
499 drop : `bool`, optional
500 If `True` then drop existing tables before re-creating the schema.
501 table_options : `CreateTableOptions`, optional
502 Options used when creating Cassandra tables.
503
504 Returns
505 -------
506 config : `ApdbCassandraConfig`
507 Resulting configuration object for a created APDB instance.
508 """
509 # Some non-standard defaults for connection parameters, these can be
510 # changed later in generated config. Check Cassandra driver
511 # documentation for what these parameters do. These parameters are not
512 # used during database initialization, but they will be saved with
513 # generated config.
514 connection_config = ApdbCassandraConnectionConfig(
515 extra_parameters={
516 "idle_heartbeat_interval": 0,
517 "idle_heartbeat_timeout": 30,
518 "control_connection_timeout": 100,
519 },
520 )
521 config = ApdbCassandraConfig(
522 contact_points=hosts,
523 keyspace=keyspace,
524 enable_replica=enable_replica,
525 replica_skips_diaobjects=replica_skips_diaobjects,
526 connection_config=connection_config,
527 )
528 config.partitioning.time_partition_tables = time_partition_tables
529 if schema_file is not None:
530 config.schema_file = schema_file
531 if schema_name is not None:
532 config.schema_name = schema_name
533 if read_sources_months is not None:
534 config.read_sources_months = read_sources_months
535 if read_forced_sources_months is not None:
536 config.read_forced_sources_months = read_forced_sources_months
537 if port is not None:
538 config.connection_config.port = port
539 if username is not None:
540 config.connection_config.username = username
541 if prefix is not None:
542 config.prefix = prefix
543 if part_pixelization is not None:
544 config.partitioning.part_pixelization = part_pixelization
545 if part_pix_level is not None:
546 config.partitioning.part_pix_level = part_pix_level
547 if time_partition_start is not None:
548 config.partitioning.time_partition_start = time_partition_start
549 if time_partition_end is not None:
550 config.partitioning.time_partition_end = time_partition_end
551 if read_consistency is not None:
552 config.connection_config.read_consistency = read_consistency
553 if write_consistency is not None:
554 config.connection_config.write_consistency = write_consistency
555 if read_timeout is not None:
556 config.connection_config.read_timeout = read_timeout
557 if write_timeout is not None:
558 config.connection_config.write_timeout = write_timeout
559 if ra_dec_columns is not None:
560 config.ra_dec_columns = ra_dec_columns
561
562 cls._makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
563
564 return config
565
566 @classmethod
567 def list_databases(cls, host: str) -> Iterable[DatabaseInfo]:
568 """Return the list of keyspaces with APDB databases.
569
570 Parameters
571 ----------
572 host : `str`
573 Name of one of the hosts in Cassandra cluster.
574
575 Returns
576 -------
577 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
578 Information about databases that contain APDB instance.
579 """
580 # For DbAuth we need to use database name "*" to try to match any
581 # database.
582 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*")
583 cluster, session = cls._make_session(config)
584
585 with cluster, session:
586 # Get names of all keyspaces containing DiaSource table
587 table_name = ApdbTables.DiaSource.table_name()
588 query = "select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
589 result = session.execute(query, (table_name,))
590 keyspaces = [row[0] for row in result.all()]
591
592 if not keyspaces:
593 return []
594
595 # Retrieve roles for each keyspace.
596 template = ", ".join(["%s"] * len(keyspaces))
597 query = (
598 "SELECT resource, role, permissions FROM system_auth.role_permissions "
599 f"WHERE resource IN ({template}) ALLOW FILTERING"
600 )
601 resources = [f"data/{keyspace}" for keyspace in keyspaces]
602 try:
603 result = session.execute(query, resources)
604 # If anonymous access is enabled then result will be empty,
605 # set infos to have empty permissions dict in that case.
606 infos = {keyspace: DatabaseInfo(name=keyspace, permissions={}) for keyspace in keyspaces}
607 for row in result:
608 _, _, keyspace = row[0].partition("/")
609 role: str = row[1]
610 role_permissions: set[str] = set(row[2])
611 infos[keyspace].permissions[role] = role_permissions # type: ignore[index]
612 except cassandra.Unauthorized as exc:
613 # Likely that access to role_permissions is not granted for
614 # current user.
615 warnings.warn(
616 f"Authentication information is not accessible to current user - {exc}", stacklevel=2
617 )
618 infos = {keyspace: DatabaseInfo(name=keyspace) for keyspace in keyspaces}
619
620 # Would be nice to get size estimate, but this is not available
621 # via CQL queries.
622 return infos.values()
623
624 @classmethod
625 def delete_database(cls, host: str, keyspace: str, *, timeout: int = 3600) -> None:
626 """Delete APDB database by dropping its keyspace.
627
628 Parameters
629 ----------
630 host : `str`
631 Name of one of the hosts in Cassandra cluster.
632 keyspace : `str`
633 Name of keyspace to delete.
634 timeout : `int`, optional
635 Timeout for delete operation in seconds. Dropping a large keyspace
636 can be a long operation, but this default value of one hour should
637 be sufficient for most or all cases.
638 """
639 # For DbAuth we need to use database name "*" to try to match any
640 # database.
641 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*")
642 cluster, session = cls._make_session(config)
643 with cluster, session:
644 query = f"DROP KEYSPACE {quote_id(keyspace)}"
645 session.execute(query, timeout=timeout)
646
647 def get_replica(self) -> ApdbCassandraReplica:
648 """Return `ApdbReplica` instance for this database."""
649 # Note that this instance has to stay alive while replica exists, so
650 # we pass reference to self.
651 return ApdbCassandraReplica(self, self._schema, self._session)
652
653 @classmethod
655 cls,
656 config: ApdbConfig,
657 *,
658 drop: bool = False,
659 replication_factor: int | None = None,
660 table_options: CreateTableOptions | None = None,
661 ) -> None:
662 # docstring is inherited from a base class
663
664 if not isinstance(config, ApdbCassandraConfig):
665 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
666
667 cluster, session = cls._make_session(config)
668 with cluster, session:
669 schema = ApdbCassandraSchema(
670 session=session,
671 keyspace=config.keyspace,
672 schema_file=config.schema_file,
673 schema_name=config.schema_name,
674 prefix=config.prefix,
675 time_partition_tables=config.partitioning.time_partition_tables,
676 enable_replica=config.enable_replica,
677 )
678
679 # Ask schema to create all tables.
680 if config.partitioning.time_partition_tables:
681 time_partition_start = astropy.time.Time(
682 config.partitioning.time_partition_start, format="isot", scale="tai"
683 )
684 time_partition_end = astropy.time.Time(
685 config.partitioning.time_partition_end, format="isot", scale="tai"
686 )
687 part_epoch = float(cls.partition_zero_epoch.mjd)
688 part_days = config.partitioning.time_partition_days
689 part_range = (
690 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
691 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
692 )
693 schema.makeSchema(
694 drop=drop,
695 part_range=part_range,
696 replication_factor=replication_factor,
697 table_options=table_options,
698 )
699 else:
700 schema.makeSchema(
701 drop=drop, replication_factor=replication_factor, table_options=table_options
702 )
703
704 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
705 metadata = ApdbMetadataCassandra(
706 session, meta_table_name, config.keyspace, "read_tuples", "write"
707 )
708
709 # Fill version numbers, overrides if they existed before.
710 metadata.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
711 metadata.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
712
713 if config.enable_replica:
714 # Only store replica code version if replica is enabled.
715 metadata.set(
717 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
718 force=True,
719 )
720
721 # Store frozen part of a configuration in metadata.
722 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
723 metadata.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
724
725 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
726 # docstring is inherited from a base class
727
728 sp_where = self._spatial_where(region)
729 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
730
731 # We need to exclude extra partitioning columns from result.
732 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
733 what = ",".join(quote_id(column) for column in column_names)
734
735 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
736 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
737 statements: list[tuple] = []
738 for where, params in sp_where:
739 full_query = f"{query} WHERE {where}"
740 if params:
741 statement = self._preparer.prepare(full_query)
742 else:
743 # If there are no params then it is likely that query has a
744 # bunch of literals rendered already, no point trying to
745 # prepare it because it's not reusable.
746 statement = cassandra.query.SimpleStatement(full_query)
747 statements.append((statement, params))
748 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
749
750 with _MON.context_tags({"table": "DiaObject"}):
751 _MON.add_record(
752 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
753 )
754 with self._timer("select_time") as timer:
755 objects = cast(
756 pandas.DataFrame,
757 select_concurrent(
758 self._session,
759 statements,
760 "read_pandas_multi",
761 self.config.connection_config.read_concurrency,
762 ),
763 )
764 timer.add_values(row_count=len(objects))
765
766 _LOG.debug("found %s DiaObjects", objects.shape[0])
767 return objects
768
770 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
771 ) -> pandas.DataFrame | None:
772 # docstring is inherited from a base class
773 months = self.config.read_sources_months
774 if months == 0:
775 return None
776 mjd_end = float(visit_time.mjd)
777 mjd_start = mjd_end - months * 30
778
779 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
780
782 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
783 ) -> pandas.DataFrame | None:
784 # docstring is inherited from a base class
785 months = self.config.read_forced_sources_months
786 if months == 0:
787 return None
788 mjd_end = float(visit_time.mjd)
789 mjd_start = mjd_end - months * 30
790
791 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
792
794 self,
795 visit: int,
796 detector: int,
797 region: sphgeom.Region,
798 visit_time: astropy.time.Time,
799 ) -> bool:
800 # docstring is inherited from a base class
801 # The order of checks corresponds to order in store(), on potential
802 # store failure earlier tables have higher probability containing
803 # stored records. With per-partition tables there will be many tables
804 # in the list, but it is unlikely that we'll use that setup in
805 # production.
806 sp_where = self._spatial_where(region, use_ranges=True)
807 visit_detector_where = ("visit = ? AND detector = ?", (visit, detector))
808
809 # Sources are partitioned on their midPointMjdTai. To avoid precision
810 # issues add some fuzzines to visit time.
811 mjd_start = float(visit_time.mjd) - 1.0 / 24
812 mjd_end = float(visit_time.mjd) + 1.0 / 24
813
814 statements: list[tuple] = []
815 for table_type in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
816 tables, temporal_where = self._temporal_where(
817 table_type, mjd_start, mjd_end, query_per_time_part=True
818 )
819 for table in tables:
820 prefix = f'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
821 # Needs ALLOW FILTERING as there is no PK constraint.
822 suffix = "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
823 statements += list(
824 self._combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
825 )
826
827 with self._timer("contains_visit_detector_time"):
828 result = cast(
829 list[tuple[int] | None],
830 select_concurrent(
831 self._session,
832 statements,
833 "read_tuples",
834 self.config.connection_config.read_concurrency,
835 ),
836 )
837 return bool(result)
838
839 def getSSObjects(self) -> pandas.DataFrame:
840 # docstring is inherited from a base class
841 tableName = self._schema.tableName(ApdbTables.SSObject)
842 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
843
844 objects = None
845 with self._timer("select_time", tags={"table": "SSObject"}) as timer:
846 result = self._session.execute(query, execution_profile="read_pandas")
847 objects = result._current_rows
848 timer.add_values(row_count=len(objects))
849
850 _LOG.debug("found %s SSObjects", objects.shape[0])
851 return objects
852
853 def store(
854 self,
855 visit_time: astropy.time.Time,
856 objects: pandas.DataFrame,
857 sources: pandas.DataFrame | None = None,
858 forced_sources: pandas.DataFrame | None = None,
859 ) -> None:
860 # docstring is inherited from a base class
861 objects = self._fix_input_timestamps(objects)
862 if sources is not None:
863 sources = self._fix_input_timestamps(sources)
864 if forced_sources is not None:
865 forced_sources = self._fix_input_timestamps(forced_sources)
866
867 replica_chunk: ReplicaChunk | None = None
868 if self._schema.replication_enabled:
869 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
870 self._storeReplicaChunk(replica_chunk, visit_time)
871
872 # fill region partition column for DiaObjects
873 objects = self._add_apdb_part(objects)
874 self._storeDiaObjects(objects, visit_time, replica_chunk)
875
876 if sources is not None and len(sources) > 0:
877 # copy apdb_part column from DiaObjects to DiaSources
878 sources = self._add_apdb_part(sources)
879 subchunk = self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
880 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk, subchunk)
881
882 if forced_sources is not None and len(forced_sources) > 0:
883 forced_sources = self._add_apdb_part(forced_sources)
884 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
885
886 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
887 # docstring is inherited from a base class
888 objects = self._fix_input_timestamps(objects)
889 self._storeObjectsPandas(objects, ApdbTables.SSObject)
890
891 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
892 # docstring is inherited from a base class
893
894 # To update a record we need to know its exact primary key (including
895 # partition key) so we start by querying for diaSourceId to find the
896 # primary keys.
897
898 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
899 # split it into 1k IDs per query
900 selects: list[tuple] = []
901 for ids in chunk_iterable(idMap.keys(), 1_000):
902 ids_str = ",".join(str(item) for item in ids)
903 selects.append(
904 (
905 (
906 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
907 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
908 ),
909 {},
910 )
911 )
912
913 # No need for DataFrame here, read data as tuples.
914 result = cast(
915 list[tuple[int, int, int, int | None]],
916 select_concurrent(
917 self._session, selects, "read_tuples", self.config.connection_config.read_concurrency
918 ),
919 )
920
921 # Make mapping from source ID to its partition.
922 id2partitions: dict[int, tuple[int, int]] = {}
923 id2chunk_id: dict[int, int] = {}
924 for row in result:
925 id2partitions[row[0]] = row[1:3]
926 if row[3] is not None:
927 id2chunk_id[row[0]] = row[3]
928
929 # make sure we know partitions for each ID
930 if set(id2partitions) != set(idMap):
931 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
932 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
933
934 # Reassign in standard tables
935 queries = cassandra.query.BatchStatement()
936 table_name = self._schema.tableName(ApdbTables.DiaSource)
937 for diaSourceId, ssObjectId in idMap.items():
938 apdb_part, apdb_time_part = id2partitions[diaSourceId]
939 values: tuple
940 if self.config.partitioning.time_partition_tables:
941 query = (
942 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
943 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
944 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
945 )
946 values = (ssObjectId, apdb_part, diaSourceId)
947 else:
948 query = (
949 f'UPDATE "{self._keyspace}"."{table_name}"'
950 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
951 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
952 )
953 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
954 queries.add(self._preparer.prepare(query), values)
955
956 # TODO: (DM-50190) Replication for updated records is not implemented.
957 if id2chunk_id:
958 warnings.warn("Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
959
960 _LOG.debug("%s: will update %d records", table_name, len(idMap))
961 with self._timer("source_reassign_time") as timer:
962 self._session.execute(queries, execution_profile="write")
963 timer.add_values(source_count=len(idMap))
964
965 def dailyJob(self) -> None:
966 # docstring is inherited from a base class
967 pass
968
969 def countUnassociatedObjects(self) -> int:
970 # docstring is inherited from a base class
971
972 # It's too inefficient to implement it for Cassandra in current schema.
973 raise NotImplementedError()
974
975 @property
976 def metadata(self) -> ApdbMetadata:
977 # docstring is inherited from a base class
978 return self._metadata
979
980 @classmethod
981 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
982 """Make all execution profiles used in the code."""
983 if config.connection_config.private_ips:
984 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
985 else:
986 loadBalancePolicy = RoundRobinPolicy()
987
988 read_tuples_profile = ExecutionProfile(
989 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
990 request_timeout=config.connection_config.read_timeout,
991 row_factory=cassandra.query.tuple_factory,
992 load_balancing_policy=loadBalancePolicy,
993 )
994 read_pandas_profile = ExecutionProfile(
995 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
996 request_timeout=config.connection_config.read_timeout,
997 row_factory=pandas_dataframe_factory,
998 load_balancing_policy=loadBalancePolicy,
999 )
1000 read_raw_profile = ExecutionProfile(
1001 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1002 request_timeout=config.connection_config.read_timeout,
1003 row_factory=raw_data_factory,
1004 load_balancing_policy=loadBalancePolicy,
1005 )
1006 # Profile to use with select_concurrent to return pandas data frame
1007 read_pandas_multi_profile = ExecutionProfile(
1008 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1009 request_timeout=config.connection_config.read_timeout,
1010 row_factory=pandas_dataframe_factory,
1011 load_balancing_policy=loadBalancePolicy,
1012 )
1013 # Profile to use with select_concurrent to return raw data (columns and
1014 # rows)
1015 read_raw_multi_profile = ExecutionProfile(
1016 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
1017 request_timeout=config.connection_config.read_timeout,
1018 row_factory=raw_data_factory,
1019 load_balancing_policy=loadBalancePolicy,
1020 )
1021 write_profile = ExecutionProfile(
1022 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
1023 request_timeout=config.connection_config.write_timeout,
1024 load_balancing_policy=loadBalancePolicy,
1025 )
1026 # To replace default DCAwareRoundRobinPolicy
1027 default_profile = ExecutionProfile(
1028 load_balancing_policy=loadBalancePolicy,
1029 )
1030 return {
1031 "read_tuples": read_tuples_profile,
1032 "read_pandas": read_pandas_profile,
1033 "read_raw": read_raw_profile,
1034 "read_pandas_multi": read_pandas_multi_profile,
1035 "read_raw_multi": read_raw_multi_profile,
1036 "write": write_profile,
1037 EXEC_PROFILE_DEFAULT: default_profile,
1038 }
1039
1041 self,
1042 region: sphgeom.Region,
1043 object_ids: Iterable[int] | None,
1044 mjd_start: float,
1045 mjd_end: float,
1046 table_name: ApdbTables,
1047 ) -> pandas.DataFrame:
1048 """Return catalog of DiaSource instances given set of DiaObject IDs.
1049
1050 Parameters
1051 ----------
1052 region : `lsst.sphgeom.Region`
1053 Spherical region.
1054 object_ids :
1055 Collection of DiaObject IDs
1056 mjd_start : `float`
1057 Lower bound of time interval.
1058 mjd_end : `float`
1059 Upper bound of time interval.
1060 table_name : `ApdbTables`
1061 Name of the table.
1062
1063 Returns
1064 -------
1065 catalog : `pandas.DataFrame`, or `None`
1066 Catalog containing DiaSource records. Empty catalog is returned if
1067 ``object_ids`` is empty.
1068 """
1069 object_id_set: Set[int] = set()
1070 if object_ids is not None:
1071 object_id_set = set(object_ids)
1072 if len(object_id_set) == 0:
1073 return self._make_empty_catalog(table_name)
1074
1075 sp_where = self._spatial_where(region)
1076 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
1077
1078 # We need to exclude extra partitioning columns from result.
1079 column_names = self._schema.apdbColumnNames(table_name)
1080 what = ",".join(quote_id(column) for column in column_names)
1081
1082 # Build all queries
1083 statements: list[tuple] = []
1084 for table in tables:
1085 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
1086 statements += list(self._combine_where(prefix, sp_where, temporal_where))
1087 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
1088
1089 with _MON.context_tags({"table": table_name.name}):
1090 _MON.add_record(
1091 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
1092 )
1093 with self._timer("select_time") as timer:
1094 catalog = cast(
1095 pandas.DataFrame,
1096 select_concurrent(
1097 self._session,
1098 statements,
1099 "read_pandas_multi",
1100 self.config.connection_config.read_concurrency,
1101 ),
1102 )
1103 timer.add_values(row_count=len(catalog))
1104
1105 # filter by given object IDs
1106 if len(object_id_set) > 0:
1107 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
1108
1109 # precise filtering on midpointMjdTai
1110 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
1111
1112 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
1113 return catalog
1114
1115 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
1116 # Cassandra timestamp uses milliseconds since epoch
1117 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1118
1119 # everything goes into a single partition
1120 partition = 0
1121
1122 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
1123
1124 columns = ["partition", "apdb_replica_chunk", "last_update_time", "unique_id"]
1125 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
1127 columns.append("has_subchunks")
1128 values.append(True)
1129
1130 column_list = ", ".join(columns)
1131 placeholders = ",".join(["%s"] * len(columns))
1132 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
1133
1134 self._session.execute(
1135 query,
1136 values,
1137 timeout=self.config.connection_config.write_timeout,
1138 execution_profile="write",
1139 )
1140
1141 def _queryDiaObjectLastPartitions(self, ids: Iterable[int]) -> Mapping[int, int]:
1142 """Return existing mapping of diaObjectId to its last partition."""
1143 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1144 queries = []
1145 object_count = 0
1146 for id_chunk in chunk_iterable(ids, 10_000):
1147 id_chunk_list = list(id_chunk)
1148 query = (
1149 f'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1150 f'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1151 )
1152 queries.append((query, ()))
1153 object_count += len(id_chunk_list)
1154
1155 with self._timer("query_object_last_partitions") as timer:
1156 data = cast(
1157 ApdbTableData,
1158 select_concurrent(
1159 self._session, queries, "read_raw_multi", self.config.connection_config.read_concurrency
1160 ),
1161 )
1162 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1163
1164 if data.column_names() != ["diaObjectId", "apdb_part"]:
1165 raise RuntimeError(f"Unexpected column names in query result: {data.column_names()}")
1166
1167 return {row[0]: row[1] for row in data.rows()}
1168
1169 def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
1170 """Objects in DiaObjectsLast can move from one spatial partition to
1171 another. For those objects inserting new version does not replace old
1172 one, so we need to explicitly remove old versions before inserting new
1173 ones.
1174 """
1175 # Extract all object IDs.
1176 new_partitions = {oid: part for oid, part in zip(objs["diaObjectId"], objs["apdb_part"])}
1177 old_partitions = self._queryDiaObjectLastPartitions(objs["diaObjectId"])
1178
1179 moved_oids: dict[int, tuple[int, int]] = {}
1180 for oid, old_part in old_partitions.items():
1181 new_part = new_partitions.get(oid, old_part)
1182 if new_part != old_part:
1183 moved_oids[oid] = (old_part, new_part)
1184 _LOG.debug("DiaObject IDs that moved to new partition: %s", moved_oids)
1185
1186 if moved_oids:
1187 # Delete old records from DiaObjectLast.
1188 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
1189 query = f'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1190 statement = self._preparer.prepare(query)
1191 batch = cassandra.query.BatchStatement()
1192 for oid, (old_part, _) in moved_oids.items():
1193 batch.add(statement, (old_part, oid))
1194 with self._timer("delete_object_last") as timer:
1195 self._session.execute(
1196 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1197 )
1198 timer.add_values(row_count=len(moved_oids))
1199
1200 # Add all new records to the map.
1201 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1202 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1203 statement = self._preparer.prepare(query)
1204
1205 batch_size = self._batch_size(ExtraTables.DiaObjectLastToPartition)
1206 batches = []
1207 for chunk in chunk_iterable(new_partitions.items(), batch_size):
1208 batch = cassandra.query.BatchStatement()
1209 for oid, new_part in chunk:
1210 batch.add(statement, (oid, new_part))
1211 batches.append(batch)
1212
1213 with self._timer("update_object_last_partition") as timer:
1214 for batch in batches:
1215 self._session.execute(
1216 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1217 )
1218 timer.add_values(row_count=len(batch))
1219
1221 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1222 ) -> None:
1223 """Store catalog of DiaObjects from current visit.
1224
1225 Parameters
1226 ----------
1227 objs : `pandas.DataFrame`
1228 Catalog with DiaObject records
1229 visit_time : `astropy.time.Time`
1230 Time of the current visit.
1231 replica_chunk : `ReplicaChunk` or `None`
1232 Replica chunk identifier if replication is configured.
1233 """
1234 if len(objs) == 0:
1235 _LOG.debug("No objects to write to database.")
1236 return
1237
1239 self._deleteMovingObjects(objs)
1240
1241 visit_time_dt = visit_time.datetime
1242 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1243 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1244
1245 extra_columns["validityStart"] = visit_time_dt
1246 time_part: int | None = self._time_partition(visit_time)
1247 if not self.config.partitioning.time_partition_tables:
1248 extra_columns["apdb_time_part"] = time_part
1249 time_part = None
1250
1251 # Only store DiaObects if not doing replication or explicitly
1252 # configured to always store them.
1253 if replica_chunk is None or not self.config.replica_skips_diaobjects:
1255 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1256 )
1257
1258 if replica_chunk is not None:
1259 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1260 table = ExtraTables.DiaObjectChunks
1262 table = ExtraTables.DiaObjectChunks2
1263 # Use a random number for a second part of partitioning key so
1264 # that different clients could wrtite to different partitions.
1265 # This makes it not exactly reproducible.
1266 extra_columns["apdb_replica_subchunk"] = random.randrange(self.config.replica_sub_chunk_count)
1267 self._storeObjectsPandas(objs, table, extra_columns=extra_columns)
1268
1270 self,
1271 table_name: ApdbTables,
1272 sources: pandas.DataFrame,
1273 replica_chunk: ReplicaChunk | None,
1274 ) -> int | None:
1275 """Store catalog of DIASources or DIAForcedSources from current visit.
1276
1277 Parameters
1278 ----------
1279 table_name : `ApdbTables`
1280 Table where to store the data.
1281 sources : `pandas.DataFrame`
1282 Catalog containing DiaSource records
1283 visit_time : `astropy.time.Time`
1284 Time of the current visit.
1285 replica_chunk : `ReplicaChunk` or `None`
1286 Replica chunk identifier if replication is configured.
1287
1288 Returns
1289 -------
1290 subchunk : `int` or `None`
1291 Subchunk number for resulting replica data, `None` if relication is
1292 not enabled ot subchunking is not enabled.
1293 """
1294 # Time partitioning has to be based on midpointMjdTai, not visit_time
1295 # as visit_time is not really a visit time.
1296 tp_sources = sources.copy(deep=False)
1297 tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(self._time_partition)
1298 extra_columns: dict[str, Any] = {}
1299 if not self.config.partitioning.time_partition_tables:
1300 self._storeObjectsPandas(tp_sources, table_name)
1301 else:
1302 # Group by time partition
1303 partitions = set(tp_sources["apdb_time_part"])
1304 if len(partitions) == 1:
1305 # Single partition - just save the whole thing.
1306 time_part = partitions.pop()
1307 self._storeObjectsPandas(sources, table_name, time_part=time_part)
1308 else:
1309 # group by time partition.
1310 for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
1311 sub_frame.drop(columns="apdb_time_part", inplace=True)
1312 self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1313
1314 subchunk: int | None = None
1315 if replica_chunk is not None:
1316 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1318 subchunk = random.randrange(self.config.replica_sub_chunk_count)
1319 extra_columns["apdb_replica_subchunk"] = subchunk
1320 if table_name is ApdbTables.DiaSource:
1321 extra_table = ExtraTables.DiaSourceChunks2
1322 else:
1323 extra_table = ExtraTables.DiaForcedSourceChunks2
1324 else:
1325 if table_name is ApdbTables.DiaSource:
1326 extra_table = ExtraTables.DiaSourceChunks
1327 else:
1328 extra_table = ExtraTables.DiaForcedSourceChunks
1329 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1330
1331 return subchunk
1332
1334 self,
1335 sources: pandas.DataFrame,
1336 visit_time: astropy.time.Time,
1337 replica_chunk: ReplicaChunk | None,
1338 subchunk: int | None,
1339 ) -> None:
1340 """Store mapping of diaSourceId to its partitioning values.
1341
1342 Parameters
1343 ----------
1344 sources : `pandas.DataFrame`
1345 Catalog containing DiaSource records
1346 visit_time : `astropy.time.Time`
1347 Time of the current visit.
1348 replica_chunk : `ReplicaChunk` or `None`
1349 Replication chunk, or `None` when replication is disabled.
1350 subchunk : `int` or `None`
1351 Replication sub-chunk, or `None` when replication is disabled or
1352 sub-chunking is not used.
1353 """
1354 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1355 extra_columns = {
1356 "apdb_time_part": self._time_partition(visit_time),
1357 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1358 }
1360 extra_columns["apdb_replica_subchunk"] = subchunk
1361
1363 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1364 )
1365
1367 self,
1368 records: pandas.DataFrame,
1369 table_name: ApdbTables | ExtraTables,
1370 extra_columns: Mapping | None = None,
1371 time_part: int | None = None,
1372 ) -> None:
1373 """Store generic objects.
1374
1375 Takes Pandas catalog and stores a bunch of records in a table.
1376
1377 Parameters
1378 ----------
1379 records : `pandas.DataFrame`
1380 Catalog containing object records
1381 table_name : `ApdbTables`
1382 Name of the table as defined in APDB schema.
1383 extra_columns : `dict`, optional
1384 Mapping (column_name, column_value) which gives fixed values for
1385 columns in each row, overrides values in ``records`` if matching
1386 columns exist there.
1387 time_part : `int`, optional
1388 If not `None` then insert into a per-partition table.
1389
1390 Notes
1391 -----
1392 If Pandas catalog contains additional columns not defined in table
1393 schema they are ignored. Catalog does not have to contain all columns
1394 defined in a table, but partition and clustering keys must be present
1395 in a catalog or ``extra_columns``.
1396 """
1397 # use extra columns if specified
1398 if extra_columns is None:
1399 extra_columns = {}
1400 extra_fields = list(extra_columns.keys())
1401
1402 # Fields that will come from dataframe.
1403 df_fields = [column for column in records.columns if column not in extra_fields]
1404
1405 column_map = self._schema.getColumnMap(table_name)
1406 # list of columns (as in felis schema)
1407 fields = [column_map[field].name for field in df_fields if field in column_map]
1408 fields += extra_fields
1409
1410 # check that all partitioning and clustering columns are defined
1411 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1412 table_name
1413 )
1414 missing_columns = [column for column in required_columns if column not in fields]
1415 if missing_columns:
1416 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1417
1418 qfields = [quote_id(field) for field in fields]
1419 qfields_str = ",".join(qfields)
1420
1421 batch_size = self._batch_size(table_name)
1422
1423 with self._timer("insert_build_time", tags={"table": table_name.name}):
1424 table = self._schema.tableName(table_name)
1425 if time_part is not None:
1426 table = f"{table}_{time_part}"
1427
1428 holders = ",".join(["?"] * len(qfields))
1429 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1430 statement = self._preparer.prepare(query)
1431 # Cassandra has 64k limit on batch size, normally that should be
1432 # enough but some tests generate too many forced sources.
1433 queries = []
1434 for rec_chunk in chunk_iterable(records.itertuples(index=False), batch_size):
1435 batch = cassandra.query.BatchStatement()
1436 for rec in rec_chunk:
1437 values = []
1438 for field in df_fields:
1439 if field not in column_map:
1440 continue
1441 value = getattr(rec, field)
1442 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1443 if isinstance(value, pandas.Timestamp):
1444 value = value.to_pydatetime()
1445 elif value is pandas.NaT:
1446 value = None
1447 else:
1448 # Assume it's seconds since epoch, Cassandra
1449 # datetime is in milliseconds
1450 value = int(value * 1000)
1451 value = literal(value)
1452 values.append(UNSET_VALUE if value is None else value)
1453 for field in extra_fields:
1454 value = literal(extra_columns[field])
1455 values.append(UNSET_VALUE if value is None else value)
1456 batch.add(statement, values)
1457 queries.append(batch)
1458
1459 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1460 with self._timer("insert_time", tags={"table": table_name.name}) as timer:
1461 for batch in queries:
1462 self._session.execute(
1463 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1464 )
1465 timer.add_values(row_count=len(records))
1466
1467 def _add_apdb_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1468 """Calculate spatial partition for each record and add it to a
1469 DataFrame.
1470
1471 Parameters
1472 ----------
1473 df : `pandas.DataFrame`
1474 DataFrame which has to contain ra/dec columns, names of these
1475 columns are defined by configuration ``ra_dec_columns`` field.
1476
1477 Returns
1478 -------
1479 df : `pandas.DataFrame`
1480 DataFrame with ``apdb_part`` column which contains pixel index
1481 for ra/dec coordinates.
1482
1483 Notes
1484 -----
1485 This overrides any existing column in a DataFrame with the same name
1486 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1487 is returned.
1488 """
1489 # Calculate pixelization index for every record.
1490 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1491 ra_col, dec_col = self.config.ra_dec_columns
1492 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1493 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1494 idx = self._pixelization.pixel(uv3d)
1495 apdb_part[i] = idx
1496 df = df.copy()
1497 df["apdb_part"] = apdb_part
1498 return df
1499
1500 @classmethod
1501 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1502 """Calculate time partition number for a given time.
1503
1504 Parameters
1505 ----------
1506 time : `float` or `astropy.time.Time`
1507 Time for which to calculate partition number. Can be float to mean
1508 MJD or `astropy.time.Time`
1509 epoch_mjd : `float`
1510 Epoch time for partition 0.
1511 part_days : `int`
1512 Number of days per partition.
1513
1514 Returns
1515 -------
1516 partition : `int`
1517 Partition number for a given time.
1518 """
1519 if isinstance(time, astropy.time.Time):
1520 mjd = float(time.mjd)
1521 else:
1522 mjd = time
1523 days_since_epoch = mjd - epoch_mjd
1524 partition = int(days_since_epoch) // part_days
1525 return partition
1526
1527 def _time_partition(self, time: float | astropy.time.Time) -> int:
1528 """Calculate time partition number for a given time.
1529
1530 Parameters
1531 ----------
1532 time : `float` or `astropy.time.Time`
1533 Time for which to calculate partition number. Can be float to mean
1534 MJD or `astropy.time.Time`
1535
1536 Returns
1537 -------
1538 partition : `int`
1539 Partition number for a given time.
1540 """
1541 if isinstance(time, astropy.time.Time):
1542 mjd = float(time.mjd)
1543 else:
1544 mjd = time
1545 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1546 partition = int(days_since_epoch) // self.config.partitioning.time_partition_days
1547 return partition
1548
1549 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1550 """Make an empty catalog for a table with a given name.
1551
1552 Parameters
1553 ----------
1554 table_name : `ApdbTables`
1555 Name of the table.
1556
1557 Returns
1558 -------
1559 catalog : `pandas.DataFrame`
1560 An empty catalog.
1561 """
1562 table = self._schema.tableSchemas[table_name]
1563
1564 data = {
1565 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1566 for columnDef in table.columns
1567 }
1568 return pandas.DataFrame(data)
1569
1571 self,
1572 prefix: str,
1573 where1: list[tuple[str, tuple]],
1574 where2: list[tuple[str, tuple]],
1575 where3: tuple[str, tuple] | None = None,
1576 suffix: str | None = None,
1577 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1578 """Make cartesian product of two parts of WHERE clause into a series
1579 of statements to execute.
1580
1581 Parameters
1582 ----------
1583 prefix : `str`
1584 Initial statement prefix that comes before WHERE clause, e.g.
1585 "SELECT * from Table"
1586 """
1587 # If lists are empty use special sentinels.
1588 if not where1:
1589 where1 = [("", ())]
1590 if not where2:
1591 where2 = [("", ())]
1592
1593 for expr1, params1 in where1:
1594 for expr2, params2 in where2:
1595 full_query = prefix
1596 wheres = []
1597 params = params1 + params2
1598 if expr1:
1599 wheres.append(expr1)
1600 if expr2:
1601 wheres.append(expr2)
1602 if where3:
1603 wheres.append(where3[0])
1604 params += where3[1]
1605 if wheres:
1606 full_query += " WHERE " + " AND ".join(wheres)
1607 if suffix:
1608 full_query += " " + suffix
1609 if params:
1610 statement = self._preparer.prepare(full_query)
1611 else:
1612 # If there are no params then it is likely that query
1613 # has a bunch of literals rendered already, no point
1614 # trying to prepare it.
1615 statement = cassandra.query.SimpleStatement(full_query)
1616 yield (statement, params)
1617
1619 self, region: sphgeom.Region | None, use_ranges: bool = False
1620 ) -> list[tuple[str, tuple]]:
1621 """Generate expressions for spatial part of WHERE clause.
1622
1623 Parameters
1624 ----------
1625 region : `sphgeom.Region`
1626 Spatial region for query results.
1627 use_ranges : `bool`
1628 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1629 p2") instead of exact list of pixels. Should be set to True for
1630 large regions covering very many pixels.
1631
1632 Returns
1633 -------
1634 expressions : `list` [ `tuple` ]
1635 Empty list is returned if ``region`` is `None`, otherwise a list
1636 of one or more (expression, parameters) tuples
1637 """
1638 if region is None:
1639 return []
1640 if use_ranges:
1641 pixel_ranges = self._pixelization.envelope(region)
1642 expressions: list[tuple[str, tuple]] = []
1643 for lower, upper in pixel_ranges:
1644 upper -= 1
1645 if lower == upper:
1646 expressions.append(('"apdb_part" = ?', (lower,)))
1647 else:
1648 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1649 return expressions
1650 else:
1651 pixels = self._pixelization.pixels(region)
1652 if self.config.partitioning.query_per_spatial_part:
1653 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1654 else:
1655 pixels_str = ",".join([str(pix) for pix in pixels])
1656 return [(f'"apdb_part" IN ({pixels_str})', ())]
1657
1659 self,
1660 table: ApdbTables,
1661 start_time: float | astropy.time.Time,
1662 end_time: float | astropy.time.Time,
1663 query_per_time_part: bool | None = None,
1664 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1665 """Generate table names and expressions for temporal part of WHERE
1666 clauses.
1667
1668 Parameters
1669 ----------
1670 table : `ApdbTables`
1671 Table to select from.
1672 start_time : `astropy.time.Time` or `float`
1673 Starting Datetime of MJD value of the time range.
1674 end_time : `astropy.time.Time` or `float`
1675 Starting Datetime of MJD value of the time range.
1676 query_per_time_part : `bool`, optional
1677 If None then use ``query_per_time_part`` from configuration.
1678
1679 Returns
1680 -------
1681 tables : `list` [ `str` ]
1682 List of the table names to query.
1683 expressions : `list` [ `tuple` ]
1684 A list of zero or more (expression, parameters) tuples.
1685 """
1686 tables: list[str]
1687 temporal_where: list[tuple[str, tuple]] = []
1688 table_name = self._schema.tableName(table)
1689 time_part_start = self._time_partition(start_time)
1690 time_part_end = self._time_partition(end_time)
1691 time_parts = list(range(time_part_start, time_part_end + 1))
1692 if self.config.partitioning.time_partition_tables:
1693 tables = [f"{table_name}_{part}" for part in time_parts]
1694 else:
1695 tables = [table_name]
1696 if query_per_time_part is None:
1697 query_per_time_part = self.config.partitioning.query_per_time_part
1698 if query_per_time_part:
1699 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1700 else:
1701 time_part_list = ",".join([str(part) for part in time_parts])
1702 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1703
1704 return tables, temporal_where
1705
1706 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1707 """Update timestamp columns in input DataFrame to be naive datetime
1708 type.
1709
1710 Clients may or may not generate aware timestamps, code in this class
1711 assumes that timestamps are naive, so we convert them to UTC and
1712 drop timezone.
1713 """
1714 # Find all columns with aware timestamps.
1715 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1716 for column in columns:
1717 # tz_convert(None) will convert to UTC and drop timezone.
1718 df[column] = df[column].dt.tz_convert(None)
1719 return df
1720
1721 def _batch_size(self, table: ApdbTables | ExtraTables) -> int:
1722 """Calculate batch size based on config parameters."""
1723 # Cassandra limit on number of statements in a batch is 64k.
1724 batch_size = 65_535
1725 if 0 < self.config.batch_statement_limit < batch_size:
1726 batch_size = self.config.batch_statement_limit
1727 if self.config.batch_size_limit > 0:
1728 # The purpose of this limit is to try not to exceed batch size
1729 # threshold which is set on server side. Cassandra wire protocol
1730 # for prepared queries (and batches) only sends column values with
1731 # with an additional 4 bytes per value specifying size. Value is
1732 # not included for NULL or NOT_SET values, but the size is always
1733 # there. There is additional small per-query overhead, which we
1734 # ignore.
1735 row_size = self._schema.table_row_size(table)
1736 row_size += 4 * len(self._schema.getColumnMap(table))
1737 batch_size = min(batch_size, (self.config.batch_size_limit // row_size) + 1)
1738 return batch_size
__init__(self, tuple[str,...] public_ips, tuple[str,...] private_ips)
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
None _versionCheck(self, _DbVersions current_versions, _DbVersions db_versions)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
None _deleteMovingObjects(self, pandas.DataFrame objs)
_DbVersions _readVersions(self, ApdbMetadataCassandra metadata)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
None delete_database(cls, str host, str keyspace, *, int timeout=3600)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
Definition Region.h:89
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.