LSST Applications g0f08755f38+c89d42e150,g1635faa6d4+b6cf076a36,g1653933729+a8ce1bb630,g1a0ca8cf93+4c08b13bf7,g28da252d5a+f33f8200ef,g29321ee8c0+0187be18b1,g2bbee38e9b+9634bc57db,g2bc492864f+9634bc57db,g2cdde0e794+c2c89b37c4,g3156d2b45e+41e33cbcdc,g347aa1857d+9634bc57db,g35bb328faa+a8ce1bb630,g3a166c0a6a+9634bc57db,g3e281a1b8c+9f2c4e2fc3,g414038480c+077ccc18e7,g41af890bb2+e740673f1a,g5fbc88fb19+17cd334064,g7642f7d749+c89d42e150,g781aacb6e4+a8ce1bb630,g80478fca09+f8b2ab54e1,g82479be7b0+e2bd23ab8b,g858d7b2824+c89d42e150,g9125e01d80+a8ce1bb630,g9726552aa6+10f999ec6a,ga5288a1d22+065360aec4,gacf8899fa4+9553554aa7,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gbd46683f8f+ac57cbb13d,gc28159a63d+9634bc57db,gcf0d15dbbd+e37acf7834,gda3e153d99+c89d42e150,gda6a2b7d83+e37acf7834,gdaeeff99f8+1711a396fd,ge2409df99d+cb1e6652d6,ge79ae78c31+9634bc57db,gf0baf85859+147a0692ba,gf3967379c6+02b11634a5,w.2024.45
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraConfig", "ApdbCassandra"]
25
26import dataclasses
27import logging
28import warnings
29from collections.abc import Iterable, Iterator, Mapping, Set
30from typing import TYPE_CHECKING, Any, cast
31
32import numpy as np
33import pandas
34
35# If cassandra-driver is not there the module can still be imported
36# but ApdbCassandra cannot be instantiated.
37try:
38 import cassandra
39 import cassandra.query
40 from cassandra.auth import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
43 from cassandra.query import UNSET_VALUE
44
45 CASSANDRA_IMPORTED = True
46except ImportError:
47 CASSANDRA_IMPORTED = False
48
49import astropy.time
50import felis.datamodel
51from lsst import sphgeom
52from lsst.pex.config import ChoiceField, Field, ListField
53from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
54from lsst.utils.iteration import chunk_iterable
55
56from .._auth import DB_AUTH_ENVVAR, DB_AUTH_PATH
57from ..apdb import Apdb, ApdbConfig
58from ..apdbConfigFreezer import ApdbConfigFreezer
59from ..apdbReplica import ApdbTableData, ReplicaChunk
60from ..apdbSchema import ApdbTables
61from ..monitor import MonAgent
62from ..pixelization import Pixelization
63from ..schema_model import Table
64from ..timer import Timer
65from ..versionTuple import IncompatibleVersionError, VersionTuple
66from .apdbCassandraReplica import ApdbCassandraReplica
67from .apdbCassandraSchema import ApdbCassandraSchema, CreateTableOptions, ExtraTables
68from .apdbMetadataCassandra import ApdbMetadataCassandra
69from .cassandra_utils import (
70 PreparedStatementCache,
71 literal,
72 pandas_dataframe_factory,
73 quote_id,
74 raw_data_factory,
75 select_concurrent,
76)
77
78if TYPE_CHECKING:
79 from ..apdbMetadata import ApdbMetadata
80
81_LOG = logging.getLogger(__name__)
82
83_MON = MonAgent(__name__)
84
85VERSION = VersionTuple(0, 1, 1)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
88changes.
89"""
90
91
92def _dump_query(rf: Any) -> None:
93 """Dump cassandra query to debug log."""
94 _LOG.debug("Cassandra query: %s", rf.query)
95
96
97class CassandraMissingError(Exception):
98 def __init__(self) -> None:
99 super().__init__("cassandra-driver module cannot be imported")
100
101
103 """Configuration class for Cassandra-based APDB implementation."""
104
105 contact_points = ListField[str](
106 doc="The list of contact points to try connecting for cluster discovery.", default=["127.0.0.1"]
107 )
108 private_ips = ListField[str](doc="List of internal IP addresses for contact_points.", default=[])
109 port = Field[int](doc="Port number to connect to.", default=9042)
110 keyspace = Field[str](doc="Default keyspace for operations.", default="apdb")
111 username = Field[str](
112 doc=f"Cassandra user name, if empty then {DB_AUTH_PATH} has to provide it with password.",
113 default="",
114 )
115 read_consistency = Field[str](
116 doc="Name for consistency level of read operations, default: QUORUM, can be ONE.", default="QUORUM"
117 )
118 write_consistency = Field[str](
119 doc="Name for consistency level of write operations, default: QUORUM, can be ONE.", default="QUORUM"
120 )
121 read_timeout = Field[float](doc="Timeout in seconds for read operations.", default=120.0)
122 write_timeout = Field[float](doc="Timeout in seconds for write operations.", default=60.0)
123 remove_timeout = Field[float](doc="Timeout in seconds for remove operations.", default=600.0)
124 read_concurrency = Field[int](doc="Concurrency level for read operations.", default=500)
125 protocol_version = Field[int](
126 doc="Cassandra protocol version to use, default is V4",
127 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 0,
128 )
129 dia_object_columns = ListField[str](
130 doc="List of columns to read from DiaObject[Last], by default read all columns", default=[]
131 )
132 prefix = Field[str](doc="Prefix to add to table names", default="")
133 part_pixelization = ChoiceField[str](
134 allowed=dict(htm="HTM pixelization", q3c="Q3C pixelization", mq3c="MQ3C pixelization"),
135 doc="Pixelization used for partitioning index.",
136 default="mq3c",
137 )
138 part_pix_level = Field[int](doc="Pixelization level used for partitioning index.", default=11)
139 part_pix_max_ranges = Field[int](doc="Max number of ranges in pixelization envelope", default=128)
140 ra_dec_columns = ListField[str](default=["ra", "dec"], doc="Names of ra/dec columns in DiaObject table")
141 timer = Field[bool](doc="If True then print/log timing information", default=False)
142 time_partition_tables = Field[bool](
143 doc="Use per-partition tables for sources instead of partitioning by time", default=False
144 )
145 time_partition_days = Field[int](
146 doc=(
147 "Time partitioning granularity in days, this value must not be changed after database is "
148 "initialized"
149 ),
150 default=30,
151 )
152 time_partition_start = Field[str](
153 doc=(
154 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
155 "This is used only when time_partition_tables is True."
156 ),
157 default="2018-12-01T00:00:00",
158 )
159 time_partition_end = Field[str](
160 doc=(
161 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
162 "This is used only when time_partition_tables is True."
163 ),
164 default="2030-01-01T00:00:00",
165 )
166 query_per_time_part = Field[bool](
167 default=False,
168 doc=(
169 "If True then build separate query for each time partition, otherwise build one single query. "
170 "This is only used when time_partition_tables is False in schema config."
171 ),
172 )
173 query_per_spatial_part = Field[bool](
174 default=False,
175 doc="If True then build one query per spatial partition, otherwise build single query.",
176 )
177 use_insert_id_skips_diaobjects = Field[bool](
178 default=False,
179 doc=(
180 "If True then do not store DiaObjects when use_insert_id is True "
181 "(DiaObjectsChunks has the same data)."
182 ),
183 )
184 idle_heartbeat_interval = Field[int](
185 doc=(
186 "Interval, in seconds, on which to heartbeat idle connections. "
187 "Zero (default) disables heartbeats."
188 ),
189 default=0,
190 )
191 idle_heartbeat_timeout = Field[int](
192 doc="Timeout, in seconds, on which the heartbeat wait for idle connection responses.",
193 default=30,
194 )
195
196
197@dataclasses.dataclass
199 """Collection of information about a specific database."""
200
201 name: str
202 """Keyspace name."""
203
204 permissions: dict[str, set[str]] | None = None
205 """Roles that can access the database and their permissions.
206
207 `None` means that authentication information is not accessible due to
208 system table permissions. If anonymous access is enabled then dictionary
209 will be empty but not `None`.
210 """
211
212
213@dataclasses.dataclass
215 """Versions defined in APDB metadata table."""
216
217 schema_version: VersionTuple
218 """Version of the schema from which database was created."""
219
220 code_version: VersionTuple
221 """Version of ApdbCassandra with which database was created."""
222
223 replica_version: VersionTuple | None
224 """Version of ApdbCassandraReplica with which database was created, None
225 if replication was not configured.
226 """
227
228
229if CASSANDRA_IMPORTED:
230
231 class _AddressTranslator(AddressTranslator):
232 """Translate internal IP address to external.
233
234 Only used for docker-based setup, not viable long-term solution.
235 """
236
237 def __init__(self, public_ips: list[str], private_ips: list[str]):
238 self._map = dict((k, v) for k, v in zip(private_ips, public_ips))
239
240 def translate(self, private_ip: str) -> str:
241 return self._map.get(private_ip, private_ip)
242
243
245 """Implementation of APDB database on to of Apache Cassandra.
246
247 The implementation is configured via standard ``pex_config`` mechanism
248 using `ApdbCassandraConfig` configuration class. For an example of
249 different configurations check config/ folder.
250
251 Parameters
252 ----------
253 config : `ApdbCassandraConfig`
254 Configuration object.
255 """
256
257 metadataSchemaVersionKey = "version:schema"
258 """Name of the metadata key to store schema version number."""
259
260 metadataCodeVersionKey = "version:ApdbCassandra"
261 """Name of the metadata key to store code version number."""
262
263 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
264 """Name of the metadata key to store replica code version number."""
265
266 metadataConfigKey = "config:apdb-cassandra.json"
267 """Name of the metadata key to store code version number."""
268
269 _frozen_parameters = (
270 "use_insert_id",
271 "part_pixelization",
272 "part_pix_level",
273 "ra_dec_columns",
274 "time_partition_tables",
275 "time_partition_days",
276 "use_insert_id_skips_diaobjects",
277 )
278 """Names of the config parameters to be frozen in metadata table."""
279
280 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
281 """Start time for partition 0, this should never be changed."""
282
283 def __init__(self, config: ApdbCassandraConfig):
284 if not CASSANDRA_IMPORTED:
286
287 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
288 if config.timer:
289 self._timer_args.append(_LOG)
290
291 self._keyspace = config.keyspace
292
293 self._cluster, self._session = self._make_session(config)
294
295 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
297 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
298 )
299
300 # Read frozen config from metadata.
301 with self._timer("read_metadata_config"):
302 config_json = self._metadata.get(self.metadataConfigKeymetadataConfigKey)
303 if config_json is not None:
304 # Update config from metadata.
305 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
306 self.config = freezer.update(config, config_json)
307 else:
308 self.config = config
309 self.config.validate()
310
312 self.config.part_pixelization,
313 self.config.part_pix_level,
314 config.part_pix_max_ranges,
315 )
316
318 session=self._session,
319 keyspace=self._keyspace,
320 schema_file=self.config.schema_file,
321 schema_name=self.config.schema_name,
322 prefix=self.config.prefix,
323 time_partition_tables=self.config.time_partition_tables,
324 enable_replica=self.config.use_insert_id,
325 )
327
328 self._db_versions: _DbVersions | None = None
329 if self._metadata.table_exists():
330 with self._timer("version_check"):
332
333 # Support for DiaObjectLastToPartition was added at code version 0.1.1
334 # in a backward-compatible way (we only use the table if it is there).
335 if self._db_versions:
337 else:
339
340 # Cache for prepared statements
342
343 _LOG.debug("ApdbCassandra Configuration:")
344 for key, value in self.config.items():
345 _LOG.debug(" %s: %s", key, value)
346
347 def __del__(self) -> None:
348 if hasattr(self, "_cluster"):
349 self._cluster.shutdown()
350
351 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
352 """Create `Timer` instance given its name."""
353 return Timer(name, *self._timer_args, tags=tags)
354
355 @classmethod
356 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
357 """Make Cassandra session."""
358 addressTranslator: AddressTranslator | None = None
359 if config.private_ips:
360 addressTranslator = _AddressTranslator(list(config.contact_points), list(config.private_ips))
361
362 timer_args: list[MonAgent | logging.Logger] = [_MON]
363 if config.timer:
364 timer_args.append(_LOG)
365 with Timer("cluster_connect", *timer_args):
366 # TODO: control_connection_timeout is hardcoded for now as I do not
367 # want to extend config class with too many options. I'll move it
368 # to config when we switch from pex_config to other format.
369 cluster = Cluster(
370 execution_profiles=cls._makeProfiles(config),
371 contact_points=config.contact_points,
372 port=config.port,
373 address_translator=addressTranslator,
374 protocol_version=config.protocol_version,
375 auth_provider=cls._make_auth_provider(config),
376 control_connection_timeout=100,
377 idle_heartbeat_interval=config.idle_heartbeat_interval,
378 idle_heartbeat_timeout=config.idle_heartbeat_timeout,
379 )
380 session = cluster.connect()
381
382 # Dump queries if debug level is enabled.
383 if _LOG.isEnabledFor(logging.DEBUG):
384 session.add_request_init_listener(_dump_query)
385
386 # Disable result paging
387 session.default_fetch_size = None
388
389 return cluster, session
390
391 @classmethod
392 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
393 """Make Cassandra authentication provider instance."""
394 try:
395 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
396 except DbAuthNotFoundError:
397 # Credentials file doesn't exist, use anonymous login.
398 return None
399
400 empty_username = True
401 # Try every contact point in turn.
402 for hostname in config.contact_points:
403 try:
404 username, password = dbauth.getAuth(
405 "cassandra", config.username, hostname, config.port, config.keyspace
406 )
407 if not username:
408 # Password without user name, try next hostname, but give
409 # warning later if no better match is found.
410 empty_username = True
411 else:
412 return PlainTextAuthProvider(username=username, password=password)
413 except DbAuthNotFoundError:
414 pass
415
416 if empty_username:
417 _LOG.warning(
418 f"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
419 f"user name, anonymous Cassandra logon will be attempted."
420 )
421
422 return None
423
424 def _versionCheck(self, metadata: ApdbMetadataCassandra) -> _DbVersions:
425 """Check schema version compatibility."""
426
427 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
428 """Retrieve version number from given metadata key."""
429 if metadata.table_exists():
430 version_str = metadata.get(key)
431 if version_str is None:
432 # Should not happen with existing metadata table.
433 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
434 return VersionTuple.fromString(version_str)
435 return default
436
437 # For old databases where metadata table does not exist we assume that
438 # version of both code and schema is 0.1.0.
439 initial_version = VersionTuple(0, 1, 0)
440 db_schema_version = _get_version(self.metadataSchemaVersionKeymetadataSchemaVersionKey, initial_version)
441 db_code_version = _get_version(self.metadataCodeVersionKeymetadataCodeVersionKey, initial_version)
442
443 # For now there is no way to make read-only APDB instances, assume that
444 # any access can do updates.
445 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
447 f"Configured schema version {self._schema.schemaVersion()} "
448 f"is not compatible with database version {db_schema_version}"
449 )
450 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
452 f"Current code version {self.apdbImplementationVersion()} "
453 f"is not compatible with database version {db_code_version}"
454 )
455
456 # Check replica code version only if replica is enabled.
457 db_replica_version: VersionTuple | None = None
458 if self._schema.has_replica_chunks:
459 db_replica_version = _get_version(self.metadataReplicaVersionKeymetadataReplicaVersionKey, initial_version)
460 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
461 if not code_replica_version.checkCompatibility(db_replica_version):
463 f"Current replication code version {code_replica_version} "
464 f"is not compatible with database version {db_replica_version}"
465 )
466
467 return _DbVersions(
468 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
469 )
470
471 @classmethod
472 def apdbImplementationVersion(cls) -> VersionTuple:
473 """Return version number for current APDB implementation.
474
475 Returns
476 -------
477 version : `VersionTuple`
478 Version of the code defined in implementation class.
479 """
480 return VERSION
481
482 def tableDef(self, table: ApdbTables) -> Table | None:
483 # docstring is inherited from a base class
484 return self._schema.tableSchemas.get(table)
485
486 @classmethod
488 cls,
489 hosts: list[str],
490 keyspace: str,
491 *,
492 schema_file: str | None = None,
493 schema_name: str | None = None,
494 read_sources_months: int | None = None,
495 read_forced_sources_months: int | None = None,
496 use_insert_id: bool = False,
497 use_insert_id_skips_diaobjects: bool = False,
498 port: int | None = None,
499 username: str | None = None,
500 prefix: str | None = None,
501 part_pixelization: str | None = None,
502 part_pix_level: int | None = None,
503 time_partition_tables: bool = True,
504 time_partition_start: str | None = None,
505 time_partition_end: str | None = None,
506 read_consistency: str | None = None,
507 write_consistency: str | None = None,
508 read_timeout: int | None = None,
509 write_timeout: int | None = None,
510 ra_dec_columns: list[str] | None = None,
511 replication_factor: int | None = None,
512 drop: bool = False,
513 table_options: CreateTableOptions | None = None,
514 ) -> ApdbCassandraConfig:
515 """Initialize new APDB instance and make configuration object for it.
516
517 Parameters
518 ----------
519 hosts : `list` [`str`]
520 List of host names or IP addresses for Cassandra cluster.
521 keyspace : `str`
522 Name of the keyspace for APDB tables.
523 schema_file : `str`, optional
524 Location of (YAML) configuration file with APDB schema. If not
525 specified then default location will be used.
526 schema_name : `str`, optional
527 Name of the schema in YAML configuration file. If not specified
528 then default name will be used.
529 read_sources_months : `int`, optional
530 Number of months of history to read from DiaSource.
531 read_forced_sources_months : `int`, optional
532 Number of months of history to read from DiaForcedSource.
533 use_insert_id : `bool`, optional
534 If True, make additional tables used for replication to PPDB.
535 use_insert_id_skips_diaobjects : `bool`, optional
536 If `True` then do not fill regular ``DiaObject`` table when
537 ``use_insert_id`` is `True`.
538 port : `int`, optional
539 Port number to use for Cassandra connections.
540 username : `str`, optional
541 User name for Cassandra connections.
542 prefix : `str`, optional
543 Optional prefix for all table names.
544 part_pixelization : `str`, optional
545 Name of the MOC pixelization used for partitioning.
546 part_pix_level : `int`, optional
547 Pixelization level.
548 time_partition_tables : `bool`, optional
549 Create per-partition tables.
550 time_partition_start : `str`, optional
551 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
552 format, in TAI.
553 time_partition_end : `str`, optional
554 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
555 format, in TAI.
556 read_consistency : `str`, optional
557 Name of the consistency level for read operations.
558 write_consistency : `str`, optional
559 Name of the consistency level for write operations.
560 read_timeout : `int`, optional
561 Read timeout in seconds.
562 write_timeout : `int`, optional
563 Write timeout in seconds.
564 ra_dec_columns : `list` [`str`], optional
565 Names of ra/dec columns in DiaObject table.
566 replication_factor : `int`, optional
567 Replication factor used when creating new keyspace, if keyspace
568 already exists its replication factor is not changed.
569 drop : `bool`, optional
570 If `True` then drop existing tables before re-creating the schema.
571 table_options : `CreateTableOptions`, optional
572 Options used when creating Cassandra tables.
573
574 Returns
575 -------
576 config : `ApdbCassandraConfig`
577 Resulting configuration object for a created APDB instance.
578 """
579 config = ApdbCassandraConfig(
580 contact_points=hosts,
581 keyspace=keyspace,
582 use_insert_id=use_insert_id,
583 use_insert_id_skips_diaobjects=use_insert_id_skips_diaobjects,
584 time_partition_tables=time_partition_tables,
585 )
586 if schema_file is not None:
587 config.schema_file = schema_file
588 if schema_name is not None:
589 config.schema_name = schema_name
590 if read_sources_months is not None:
591 config.read_sources_months = read_sources_months
592 if read_forced_sources_months is not None:
593 config.read_forced_sources_months = read_forced_sources_months
594 if port is not None:
595 config.port = port
596 if username is not None:
597 config.username = username
598 if prefix is not None:
599 config.prefix = prefix
600 if part_pixelization is not None:
601 config.part_pixelization = part_pixelization
602 if part_pix_level is not None:
603 config.part_pix_level = part_pix_level
604 if time_partition_start is not None:
605 config.time_partition_start = time_partition_start
606 if time_partition_end is not None:
607 config.time_partition_end = time_partition_end
608 if read_consistency is not None:
609 config.read_consistency = read_consistency
610 if write_consistency is not None:
611 config.write_consistency = write_consistency
612 if read_timeout is not None:
613 config.read_timeout = read_timeout
614 if write_timeout is not None:
615 config.write_timeout = write_timeout
616 if ra_dec_columns is not None:
617 config.ra_dec_columns = ra_dec_columns
618
619 cls._makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
620
621 return config
622
623 @classmethod
624 def list_databases(cls, host: str) -> Iterable[DatabaseInfo]:
625 """Return the list of keyspaces with APDB databases.
626
627 Parameters
628 ----------
629 host : `str`
630 Name of one of the hosts in Cassandra cluster.
631
632 Returns
633 -------
634 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
635 Information about databases that contain APDB instance.
636 """
637 # For DbAuth we need to use database name "*" to try to match any
638 # database.
639 config = ApdbCassandraConfig(contact_points=[host], keyspace="*")
640 cluster, session = cls._make_session(config)
641
642 with cluster, session:
643 # Get names of all keyspaces containing DiaSource table
644 table_name = ApdbTables.DiaSource.table_name()
645 query = "select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
646 result = session.execute(query, (table_name,))
647 keyspaces = [row[0] for row in result.all()]
648
649 if not keyspaces:
650 return []
651
652 # Retrieve roles for each keyspace.
653 template = ", ".join(["%s"] * len(keyspaces))
654 query = (
655 "SELECT resource, role, permissions FROM system_auth.role_permissions "
656 f"WHERE resource IN ({template}) ALLOW FILTERING"
657 )
658 resources = [f"data/{keyspace}" for keyspace in keyspaces]
659 try:
660 result = session.execute(query, resources)
661 # If anonymous access is enabled then result will be empty,
662 # set infos to have empty permissions dict in that case.
663 infos = {keyspace: DatabaseInfo(name=keyspace, permissions={}) for keyspace in keyspaces}
664 for row in result:
665 _, _, keyspace = row[0].partition("/")
666 role: str = row[1]
667 role_permissions: set[str] = set(row[2])
668 infos[keyspace].permissions[role] = role_permissions # type: ignore[index]
669 except cassandra.Unauthorized as exc:
670 # Likely that access to role_permissions is not granted for
671 # current user.
672 warnings.warn(
673 f"Authentication information is not accessible to current user - {exc}", stacklevel=2
674 )
675 infos = {keyspace: DatabaseInfo(name=keyspace) for keyspace in keyspaces}
676
677 # Would be nice to get size estimate, but this is not available
678 # via CQL queries.
679 return infos.values()
680
681 @classmethod
682 def delete_database(cls, host: str, keyspace: str, *, timeout: int = 3600) -> None:
683 """Delete APDB database by dropping its keyspace.
684
685 Parameters
686 ----------
687 host : `str`
688 Name of one of the hosts in Cassandra cluster.
689 keyspace : `str`
690 Name of keyspace to delete.
691 timeout : `int`, optional
692 Timeout for delete operation in seconds. Dropping a large keyspace
693 can be a long operation, but this default value of one hour should
694 be sufficient for most or all cases.
695 """
696 # For DbAuth we need to use database name "*" to try to match any
697 # database.
698 config = ApdbCassandraConfig(contact_points=[host], keyspace="*")
699 cluster, session = cls._make_session(config)
700 with cluster, session:
701 query = f"DROP KEYSPACE {quote_id(keyspace)}"
702 session.execute(query, timeout=timeout)
703
704 def get_replica(self) -> ApdbCassandraReplica:
705 """Return `ApdbReplica` instance for this database."""
706 # Note that this instance has to stay alive while replica exists, so
707 # we pass reference to self.
708 return ApdbCassandraReplica(self, self._schema, self._session)
709
710 @classmethod
712 cls,
713 config: ApdbConfig,
714 *,
715 drop: bool = False,
716 replication_factor: int | None = None,
717 table_options: CreateTableOptions | None = None,
718 ) -> None:
719 # docstring is inherited from a base class
720
721 if not isinstance(config, ApdbCassandraConfig):
722 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
723
724 cluster, session = cls._make_session(config)
725 with cluster, session:
726 schema = ApdbCassandraSchema(
727 session=session,
728 keyspace=config.keyspace,
729 schema_file=config.schema_file,
730 schema_name=config.schema_name,
731 prefix=config.prefix,
732 time_partition_tables=config.time_partition_tables,
733 enable_replica=config.use_insert_id,
734 )
735
736 # Ask schema to create all tables.
737 if config.time_partition_tables:
738 time_partition_start = astropy.time.Time(
739 config.time_partition_start, format="isot", scale="tai"
740 )
741 time_partition_end = astropy.time.Time(config.time_partition_end, format="isot", scale="tai")
742 part_epoch = float(cls.partition_zero_epoch.mjd)
743 part_days = config.time_partition_days
744 part_range = (
745 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
746 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
747 )
748 schema.makeSchema(
749 drop=drop,
750 part_range=part_range,
751 replication_factor=replication_factor,
752 table_options=table_options,
753 )
754 else:
755 schema.makeSchema(
756 drop=drop, replication_factor=replication_factor, table_options=table_options
757 )
758
759 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
760 metadata = ApdbMetadataCassandra(
761 session, meta_table_name, config.keyspace, "read_tuples", "write"
762 )
763
764 # Fill version numbers, overrides if they existed before.
765 if metadata.table_exists():
766 metadata.set(cls.metadataSchemaVersionKeymetadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
768
769 if config.use_insert_id:
770 # Only store replica code version if replica is enabled.
771 metadata.set(
773 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
774 force=True,
775 )
776
777 # Store frozen part of a configuration in metadata.
778 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
779 metadata.set(cls.metadataConfigKeymetadataConfigKey, freezer.to_json(config), force=True)
780
781 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
782 # docstring is inherited from a base class
783
784 sp_where = self._spatial_where(region)
785 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
786
787 # We need to exclude extra partitioning columns from result.
788 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
789 what = ",".join(quote_id(column) for column in column_names)
790
791 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
792 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
793 statements: list[tuple] = []
794 for where, params in sp_where:
795 full_query = f"{query} WHERE {where}"
796 if params:
797 statement = self._preparer.prepare(full_query)
798 else:
799 # If there are no params then it is likely that query has a
800 # bunch of literals rendered already, no point trying to
801 # prepare it because it's not reusable.
802 statement = cassandra.query.SimpleStatement(full_query)
803 statements.append((statement, params))
804 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
805
806 with _MON.context_tags({"table": "DiaObject"}):
807 _MON.add_record(
808 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
809 )
810 with self._timer("select_time") as timer:
811 objects = cast(
812 pandas.DataFrame,
813 select_concurrent(
814 self._session, statements, "read_pandas_multi", self.config.read_concurrency
815 ),
816 )
817 timer.add_values(row_count=len(objects))
818
819 _LOG.debug("found %s DiaObjects", objects.shape[0])
820 return objects
821
823 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
824 ) -> pandas.DataFrame | None:
825 # docstring is inherited from a base class
826 months = self.config.read_sources_months
827 if months == 0:
828 return None
829 mjd_end = visit_time.mjd
830 mjd_start = mjd_end - months * 30
831
832 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
833
835 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
836 ) -> pandas.DataFrame | None:
837 # docstring is inherited from a base class
838 months = self.config.read_forced_sources_months
839 if months == 0:
840 return None
841 mjd_end = visit_time.mjd
842 mjd_start = mjd_end - months * 30
843
844 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
845
846 def containsVisitDetector(self, visit: int, detector: int) -> bool:
847 # docstring is inherited from a base class
848 # The order of checks corresponds to order in store(), on potential
849 # store failure earlier tables have higher probability containing
850 # stored records. With per-partition tables there will be many tables
851 # in the list, but it is unlikely that we'll use that setup in
852 # production.
853 existing_tables = self._schema.existing_tables(ApdbTables.DiaSource, ApdbTables.DiaForcedSource)
854 tables_to_check = existing_tables[ApdbTables.DiaSource][:]
855 if self.config.use_insert_id:
856 tables_to_check.append(self._schema.tableName(ExtraTables.DiaSourceChunks))
857 tables_to_check.extend(existing_tables[ApdbTables.DiaForcedSource])
858 if self.config.use_insert_id:
859 tables_to_check.append(self._schema.tableName(ExtraTables.DiaForcedSourceChunks))
860
861 # I do not want to run concurrent queries as they are all full-scan
862 # queries, so we do one by one.
863 for table_name in tables_to_check:
864 # Try to find a single record with given visit/detector. This is a
865 # full scan query so ALLOW FILTERING is needed. It will probably
866 # guess PER PARTITION LIMIT itself, but let's help it.
867 query = (
868 f'SELECT * from "{self._keyspace}"."{table_name}" '
869 "WHERE visit = ? AND detector = ? "
870 "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
871 )
872 with self._timer("contains_visit_detector_time", tags={"table": table_name}) as timer:
873 result = self._session.execute(self._preparer.prepare(query), (visit, detector))
874 found = result.one() is not None
875 timer.add_values(found=int(found))
876 if found:
877 # There is a result.
878 return True
879 return False
880
881 def getSSObjects(self) -> pandas.DataFrame:
882 # docstring is inherited from a base class
883 tableName = self._schema.tableName(ApdbTables.SSObject)
884 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
885
886 objects = None
887 with self._timer("select_time", tags={"table": "SSObject"}) as timer:
888 result = self._session.execute(query, execution_profile="read_pandas")
889 objects = result._current_rows
890 timer.add_values(row_count=len(objects))
891
892 _LOG.debug("found %s SSObjects", objects.shape[0])
893 return objects
894
895 def store(
896 self,
897 visit_time: astropy.time.Time,
898 objects: pandas.DataFrame,
899 sources: pandas.DataFrame | None = None,
900 forced_sources: pandas.DataFrame | None = None,
901 ) -> None:
902 # docstring is inherited from a base class
903 objects = self._fix_input_timestamps(objects)
904 if sources is not None:
905 sources = self._fix_input_timestamps(sources)
906 if forced_sources is not None:
907 forced_sources = self._fix_input_timestamps(forced_sources)
908
909 replica_chunk: ReplicaChunk | None = None
910 if self._schema.has_replica_chunks:
911 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
912 self._storeReplicaChunk(replica_chunk, visit_time)
913
914 # fill region partition column for DiaObjects
915 objects = self._add_apdb_part(objects)
916 self._storeDiaObjects(objects, visit_time, replica_chunk)
917
918 if sources is not None:
919 # copy apdb_part column from DiaObjects to DiaSources
920 sources = self._add_apdb_part(sources)
921 self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
922 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
923
924 if forced_sources is not None:
925 forced_sources = self._add_apdb_part(forced_sources)
926 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
927
928 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
929 # docstring is inherited from a base class
930 objects = self._fix_input_timestamps(objects)
931 self._storeObjectsPandas(objects, ApdbTables.SSObject)
932
933 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
934 # docstring is inherited from a base class
935
936 # To update a record we need to know its exact primary key (including
937 # partition key) so we start by querying for diaSourceId to find the
938 # primary keys.
939
940 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
941 # split it into 1k IDs per query
942 selects: list[tuple] = []
943 for ids in chunk_iterable(idMap.keys(), 1_000):
944 ids_str = ",".join(str(item) for item in ids)
945 selects.append(
946 (
947 (
948 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
949 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
950 ),
951 {},
952 )
953 )
954
955 # No need for DataFrame here, read data as tuples.
956 result = cast(
957 list[tuple[int, int, int, int | None]],
958 select_concurrent(self._session, selects, "read_tuples", self.config.read_concurrency),
959 )
960
961 # Make mapping from source ID to its partition.
962 id2partitions: dict[int, tuple[int, int]] = {}
963 id2chunk_id: dict[int, int] = {}
964 for row in result:
965 id2partitions[row[0]] = row[1:3]
966 if row[3] is not None:
967 id2chunk_id[row[0]] = row[3]
968
969 # make sure we know partitions for each ID
970 if set(id2partitions) != set(idMap):
971 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
972 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
973
974 # Reassign in standard tables
975 queries = cassandra.query.BatchStatement()
976 table_name = self._schema.tableName(ApdbTables.DiaSource)
977 for diaSourceId, ssObjectId in idMap.items():
978 apdb_part, apdb_time_part = id2partitions[diaSourceId]
979 values: tuple
980 if self.config.time_partition_tables:
981 query = (
982 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
983 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
984 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
985 )
986 values = (ssObjectId, apdb_part, diaSourceId)
987 else:
988 query = (
989 f'UPDATE "{self._keyspace}"."{table_name}"'
990 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
991 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
992 )
993 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
994 queries.add(self._preparer.prepare(query), values)
995
996 # Reassign in replica tables, only if replication is enabled
997 if id2chunk_id:
998 # Filter out chunks that have been deleted already. There is a
999 # potential race with concurrent removal of chunks, but it
1000 # should be handled by WHERE in UPDATE.
1001 known_ids = set()
1002 if replica_chunks := self.get_replica().getReplicaChunks():
1003 known_ids = set(replica_chunk.id for replica_chunk in replica_chunks)
1004 id2chunk_id = {key: value for key, value in id2chunk_id.items() if value in known_ids}
1005 if id2chunk_id:
1006 table_name = self._schema.tableName(ExtraTables.DiaSourceChunks)
1007 for diaSourceId, ssObjectId in idMap.items():
1008 if replica_chunk := id2chunk_id.get(diaSourceId):
1009 query = (
1010 f'UPDATE "{self._keyspace}"."{table_name}" '
1011 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
1012 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
1013 )
1014 values = (ssObjectId, replica_chunk, diaSourceId)
1015 queries.add(self._preparer.prepare(query), values)
1016
1017 _LOG.debug("%s: will update %d records", table_name, len(idMap))
1018 with self._timer("source_reassign_time") as timer:
1019 self._session.execute(queries, execution_profile="write")
1020 timer.add_values(source_count=len(idMap))
1021
1022 def dailyJob(self) -> None:
1023 # docstring is inherited from a base class
1024 pass
1025
1027 # docstring is inherited from a base class
1028
1029 # It's too inefficient to implement it for Cassandra in current schema.
1030 raise NotImplementedError()
1031
1032 @property
1033 def metadata(self) -> ApdbMetadata:
1034 # docstring is inherited from a base class
1035 if self._metadata is None:
1036 raise RuntimeError("Database schema was not initialized.")
1037 return self._metadata
1038
1039 @classmethod
1040 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
1041 """Make all execution profiles used in the code."""
1042 if config.private_ips:
1043 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
1044 else:
1045 loadBalancePolicy = RoundRobinPolicy()
1046
1047 read_tuples_profile = ExecutionProfile(
1048 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1049 request_timeout=config.read_timeout,
1050 row_factory=cassandra.query.tuple_factory,
1051 load_balancing_policy=loadBalancePolicy,
1052 )
1053 read_pandas_profile = ExecutionProfile(
1054 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1055 request_timeout=config.read_timeout,
1056 row_factory=pandas_dataframe_factory,
1057 load_balancing_policy=loadBalancePolicy,
1058 )
1059 read_raw_profile = ExecutionProfile(
1060 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1061 request_timeout=config.read_timeout,
1062 row_factory=raw_data_factory,
1063 load_balancing_policy=loadBalancePolicy,
1064 )
1065 # Profile to use with select_concurrent to return pandas data frame
1066 read_pandas_multi_profile = ExecutionProfile(
1067 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1068 request_timeout=config.read_timeout,
1069 row_factory=pandas_dataframe_factory,
1070 load_balancing_policy=loadBalancePolicy,
1071 )
1072 # Profile to use with select_concurrent to return raw data (columns and
1073 # rows)
1074 read_raw_multi_profile = ExecutionProfile(
1075 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
1076 request_timeout=config.read_timeout,
1077 row_factory=raw_data_factory,
1078 load_balancing_policy=loadBalancePolicy,
1079 )
1080 write_profile = ExecutionProfile(
1081 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
1082 request_timeout=config.write_timeout,
1083 load_balancing_policy=loadBalancePolicy,
1084 )
1085 # To replace default DCAwareRoundRobinPolicy
1086 default_profile = ExecutionProfile(
1087 load_balancing_policy=loadBalancePolicy,
1088 )
1089 return {
1090 "read_tuples": read_tuples_profile,
1091 "read_pandas": read_pandas_profile,
1092 "read_raw": read_raw_profile,
1093 "read_pandas_multi": read_pandas_multi_profile,
1094 "read_raw_multi": read_raw_multi_profile,
1095 "write": write_profile,
1096 EXEC_PROFILE_DEFAULT: default_profile,
1097 }
1098
1100 self,
1101 region: sphgeom.Region,
1102 object_ids: Iterable[int] | None,
1103 mjd_start: float,
1104 mjd_end: float,
1105 table_name: ApdbTables,
1106 ) -> pandas.DataFrame:
1107 """Return catalog of DiaSource instances given set of DiaObject IDs.
1108
1109 Parameters
1110 ----------
1111 region : `lsst.sphgeom.Region`
1112 Spherical region.
1113 object_ids :
1114 Collection of DiaObject IDs
1115 mjd_start : `float`
1116 Lower bound of time interval.
1117 mjd_end : `float`
1118 Upper bound of time interval.
1119 table_name : `ApdbTables`
1120 Name of the table.
1121
1122 Returns
1123 -------
1124 catalog : `pandas.DataFrame`, or `None`
1125 Catalog containing DiaSource records. Empty catalog is returned if
1126 ``object_ids`` is empty.
1127 """
1128 object_id_set: Set[int] = set()
1129 if object_ids is not None:
1130 object_id_set = set(object_ids)
1131 if len(object_id_set) == 0:
1132 return self._make_empty_catalog(table_name)
1133
1134 sp_where = self._spatial_where(region)
1135 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
1136
1137 # We need to exclude extra partitioning columns from result.
1138 column_names = self._schema.apdbColumnNames(table_name)
1139 what = ",".join(quote_id(column) for column in column_names)
1140
1141 # Build all queries
1142 statements: list[tuple] = []
1143 for table in tables:
1144 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
1145 statements += list(self._combine_where(prefix, sp_where, temporal_where))
1146 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
1147
1148 with _MON.context_tags({"table": table_name.name}):
1149 _MON.add_record(
1150 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
1151 )
1152 with self._timer("select_time") as timer:
1153 catalog = cast(
1154 pandas.DataFrame,
1155 select_concurrent(
1156 self._session, statements, "read_pandas_multi", self.config.read_concurrency
1157 ),
1158 )
1159 timer.add_values(row_count=len(catalog))
1160
1161 # filter by given object IDs
1162 if len(object_id_set) > 0:
1163 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
1164
1165 # precise filtering on midpointMjdTai
1166 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
1167
1168 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
1169 return catalog
1170
1171 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
1172 # Cassandra timestamp uses milliseconds since epoch
1173 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1174
1175 # everything goes into a single partition
1176 partition = 0
1177
1178 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
1179 query = (
1180 f'INSERT INTO "{self._keyspace}"."{table_name}" '
1181 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1182 "VALUES (?, ?, ?, ?)"
1183 )
1184
1185 self._session.execute(
1186 self._preparer.prepare(query),
1187 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1188 timeout=self.config.write_timeout,
1189 execution_profile="write",
1190 )
1191
1192 def _queryDiaObjectLastPartitions(self, ids: Iterable[int]) -> Mapping[int, int]:
1193 """Return existing mapping of diaObjectId to its last partition."""
1194 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1195 queries = []
1196 object_count = 0
1197 for id_chunk in chunk_iterable(ids, 10_000):
1198 id_chunk_list = list(id_chunk)
1199 query = (
1200 f'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1201 f'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1202 )
1203 queries.append((query, ()))
1204 object_count += len(id_chunk_list)
1205
1206 with self._timer("query_object_last_partitions") as timer:
1207 data = cast(
1208 ApdbTableData,
1209 select_concurrent(self._session, queries, "read_raw_multi", self.config.read_concurrency),
1210 )
1211 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1212
1213 if data.column_names() != ["diaObjectId", "apdb_part"]:
1214 raise RuntimeError(f"Unexpected column names in query result: {data.column_names()}")
1215
1216 return {row[0]: row[1] for row in data.rows()}
1217
1218 def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
1219 """Objects in DiaObjectsLast can move from one spatial partition to
1220 another. For those objects inserting new version does not replace old
1221 one, so we need to explicitly remove old versions before inserting new
1222 ones.
1223 """
1224 # Extract all object IDs.
1225 new_partitions = {oid: part for oid, part in zip(objs["diaObjectId"], objs["apdb_part"])}
1226 old_partitions = self._queryDiaObjectLastPartitions(objs["diaObjectId"])
1227
1228 moved_oids: dict[int, tuple[int, int]] = {}
1229 for oid, old_part in old_partitions.items():
1230 new_part = new_partitions.get(oid, old_part)
1231 if new_part != old_part:
1232 moved_oids[oid] = (old_part, new_part)
1233 _LOG.debug("DiaObject IDs that moved to new partition: %s", moved_oids)
1234
1235 if moved_oids:
1236 # Delete old records from DiaObjectLast.
1237 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
1238 query = f'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1239 statement = self._preparer.prepare(query)
1240 batch = cassandra.query.BatchStatement()
1241 for oid, (old_part, _) in moved_oids.items():
1242 batch.add(statement, (old_part, oid))
1243 with self._timer("delete_object_last") as timer:
1244 self._session.execute(batch, timeout=self.config.write_timeout, execution_profile="write")
1245 timer.add_values(row_count=len(moved_oids))
1246
1247 # Add all new records to the map.
1248 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1249 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1250 statement = self._preparer.prepare(query)
1251 batch = cassandra.query.BatchStatement()
1252 for oid, new_part in new_partitions.items():
1253 batch.add(statement, (oid, new_part))
1254
1255 with self._timer("update_object_last_partition") as timer:
1256 self._session.execute(batch, timeout=self.config.write_timeout, execution_profile="write")
1257 timer.add_values(row_count=len(batch))
1258
1260 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1261 ) -> None:
1262 """Store catalog of DiaObjects from current visit.
1263
1264 Parameters
1265 ----------
1266 objs : `pandas.DataFrame`
1267 Catalog with DiaObject records
1268 visit_time : `astropy.time.Time`
1269 Time of the current visit.
1270 replica_chunk : `ReplicaChunk` or `None`
1271 Replica chunk identifier if replication is configured.
1272 """
1273 if len(objs) == 0:
1274 _LOG.debug("No objects to write to database.")
1275 return
1276
1278 self._deleteMovingObjects(objs)
1279
1280 visit_time_dt = visit_time.datetime
1281 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1282 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1283
1284 extra_columns["validityStart"] = visit_time_dt
1285 time_part: int | None = self._time_partition(visit_time)
1286 if not self.config.time_partition_tables:
1287 extra_columns["apdb_time_part"] = time_part
1288 time_part = None
1289
1290 # Only store DiaObects if not doing replication or explicitly
1291 # configured to always store them.
1292 if replica_chunk is None or not self.config.use_insert_id_skips_diaobjects:
1294 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1295 )
1296
1297 if replica_chunk is not None:
1298 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1299 self._storeObjectsPandas(objs, ExtraTables.DiaObjectChunks, extra_columns=extra_columns)
1300
1302 self,
1303 table_name: ApdbTables,
1304 sources: pandas.DataFrame,
1305 replica_chunk: ReplicaChunk | None,
1306 ) -> None:
1307 """Store catalog of DIASources or DIAForcedSources from current visit.
1308
1309 Parameters
1310 ----------
1311 table_name : `ApdbTables`
1312 Table where to store the data.
1313 sources : `pandas.DataFrame`
1314 Catalog containing DiaSource records
1315 visit_time : `astropy.time.Time`
1316 Time of the current visit.
1317 replica_chunk : `ReplicaChunk` or `None`
1318 Replica chunk identifier if replication is configured.
1319 """
1320 # Time partitioning has to be based on midpointMjdTai, not visit_time
1321 # as visit_time is not really a visit time.
1322 tp_sources = sources.copy(deep=False)
1323 tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(self._time_partition)
1324 extra_columns: dict[str, Any] = {}
1325 if not self.config.time_partition_tables:
1326 self._storeObjectsPandas(tp_sources, table_name)
1327 else:
1328 # Group by time partition
1329 partitions = set(tp_sources["apdb_time_part"])
1330 if len(partitions) == 1:
1331 # Single partition - just save the whole thing.
1332 time_part = partitions.pop()
1333 self._storeObjectsPandas(sources, table_name, time_part=time_part)
1334 else:
1335 # group by time partition.
1336 for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
1337 sub_frame.drop(columns="apdb_time_part", inplace=True)
1338 self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1339
1340 if replica_chunk is not None:
1341 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1342 if table_name is ApdbTables.DiaSource:
1343 extra_table = ExtraTables.DiaSourceChunks
1344 else:
1345 extra_table = ExtraTables.DiaForcedSourceChunks
1346 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1347
1349 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1350 ) -> None:
1351 """Store mapping of diaSourceId to its partitioning values.
1352
1353 Parameters
1354 ----------
1355 sources : `pandas.DataFrame`
1356 Catalog containing DiaSource records
1357 visit_time : `astropy.time.Time`
1358 Time of the current visit.
1359 """
1360 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1361 extra_columns = {
1362 "apdb_time_part": self._time_partition(visit_time),
1363 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1364 }
1365
1367 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1368 )
1369
1371 self,
1372 records: pandas.DataFrame,
1373 table_name: ApdbTables | ExtraTables,
1374 extra_columns: Mapping | None = None,
1375 time_part: int | None = None,
1376 ) -> None:
1377 """Store generic objects.
1378
1379 Takes Pandas catalog and stores a bunch of records in a table.
1380
1381 Parameters
1382 ----------
1383 records : `pandas.DataFrame`
1384 Catalog containing object records
1385 table_name : `ApdbTables`
1386 Name of the table as defined in APDB schema.
1387 extra_columns : `dict`, optional
1388 Mapping (column_name, column_value) which gives fixed values for
1389 columns in each row, overrides values in ``records`` if matching
1390 columns exist there.
1391 time_part : `int`, optional
1392 If not `None` then insert into a per-partition table.
1393
1394 Notes
1395 -----
1396 If Pandas catalog contains additional columns not defined in table
1397 schema they are ignored. Catalog does not have to contain all columns
1398 defined in a table, but partition and clustering keys must be present
1399 in a catalog or ``extra_columns``.
1400 """
1401 # use extra columns if specified
1402 if extra_columns is None:
1403 extra_columns = {}
1404 extra_fields = list(extra_columns.keys())
1405
1406 # Fields that will come from dataframe.
1407 df_fields = [column for column in records.columns if column not in extra_fields]
1408
1409 column_map = self._schema.getColumnMap(table_name)
1410 # list of columns (as in felis schema)
1411 fields = [column_map[field].name for field in df_fields if field in column_map]
1412 fields += extra_fields
1413
1414 # check that all partitioning and clustering columns are defined
1415 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1416 table_name
1417 )
1418 missing_columns = [column for column in required_columns if column not in fields]
1419 if missing_columns:
1420 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1421
1422 qfields = [quote_id(field) for field in fields]
1423 qfields_str = ",".join(qfields)
1424
1425 with self._timer("insert_build_time", tags={"table": table_name.name}):
1426 table = self._schema.tableName(table_name)
1427 if time_part is not None:
1428 table = f"{table}_{time_part}"
1429
1430 holders = ",".join(["?"] * len(qfields))
1431 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1432 statement = self._preparer.prepare(query)
1433 # Cassandra has 64k limit on batch size, normally that should be
1434 # enough but some tests generate too many forced sources.
1435 queries = []
1436 for rec_chunk in chunk_iterable(records.itertuples(index=False), 50_000_000):
1437 batch = cassandra.query.BatchStatement()
1438 for rec in rec_chunk:
1439 values = []
1440 for field in df_fields:
1441 if field not in column_map:
1442 continue
1443 value = getattr(rec, field)
1444 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1445 if isinstance(value, pandas.Timestamp):
1446 value = value.to_pydatetime()
1447 elif value is pandas.NaT:
1448 value = None
1449 else:
1450 # Assume it's seconds since epoch, Cassandra
1451 # datetime is in milliseconds
1452 value = int(value * 1000)
1453 value = literal(value)
1454 values.append(UNSET_VALUE if value is None else value)
1455 for field in extra_fields:
1456 value = literal(extra_columns[field])
1457 values.append(UNSET_VALUE if value is None else value)
1458 batch.add(statement, values)
1459 queries.append(batch)
1460
1461 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1462 with self._timer("insert_time", tags={"table": table_name.name}) as timer:
1463 for batch in queries:
1464 self._session.execute(batch, timeout=self.config.write_timeout, execution_profile="write")
1465 timer.add_values(row_count=len(records))
1466
1467 def _add_apdb_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1468 """Calculate spatial partition for each record and add it to a
1469 DataFrame.
1470
1471 Parameters
1472 ----------
1473 df : `pandas.DataFrame`
1474 DataFrame which has to contain ra/dec columns, names of these
1475 columns are defined by configuration ``ra_dec_columns`` field.
1476
1477 Returns
1478 -------
1479 df : `pandas.DataFrame`
1480 DataFrame with ``apdb_part`` column which contains pixel index
1481 for ra/dec coordinates.
1482
1483 Notes
1484 -----
1485 This overrides any existing column in a DataFrame with the same name
1486 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1487 is returned.
1488 """
1489 # Calculate pixelization index for every record.
1490 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1491 ra_col, dec_col = self.config.ra_dec_columns
1492 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1493 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1494 idx = self._pixelization.pixel(uv3d)
1495 apdb_part[i] = idx
1496 df = df.copy()
1497 df["apdb_part"] = apdb_part
1498 return df
1499
1500 @classmethod
1501 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1502 """Calculate time partition number for a given time.
1503
1504 Parameters
1505 ----------
1506 time : `float` or `astropy.time.Time`
1507 Time for which to calculate partition number. Can be float to mean
1508 MJD or `astropy.time.Time`
1509 epoch_mjd : `float`
1510 Epoch time for partition 0.
1511 part_days : `int`
1512 Number of days per partition.
1513
1514 Returns
1515 -------
1516 partition : `int`
1517 Partition number for a given time.
1518 """
1519 if isinstance(time, astropy.time.Time):
1520 mjd = float(time.mjd)
1521 else:
1522 mjd = time
1523 days_since_epoch = mjd - epoch_mjd
1524 partition = int(days_since_epoch) // part_days
1525 return partition
1526
1527 def _time_partition(self, time: float | astropy.time.Time) -> int:
1528 """Calculate time partition number for a given time.
1529
1530 Parameters
1531 ----------
1532 time : `float` or `astropy.time.Time`
1533 Time for which to calculate partition number. Can be float to mean
1534 MJD or `astropy.time.Time`
1535
1536 Returns
1537 -------
1538 partition : `int`
1539 Partition number for a given time.
1540 """
1541 if isinstance(time, astropy.time.Time):
1542 mjd = float(time.mjd)
1543 else:
1544 mjd = time
1545 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1546 partition = int(days_since_epoch) // self.config.time_partition_days
1547 return partition
1548
1549 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1550 """Make an empty catalog for a table with a given name.
1551
1552 Parameters
1553 ----------
1554 table_name : `ApdbTables`
1555 Name of the table.
1556
1557 Returns
1558 -------
1559 catalog : `pandas.DataFrame`
1560 An empty catalog.
1561 """
1562 table = self._schema.tableSchemas[table_name]
1563
1564 data = {
1565 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1566 for columnDef in table.columns
1567 }
1568 return pandas.DataFrame(data)
1569
1571 self,
1572 prefix: str,
1573 where1: list[tuple[str, tuple]],
1574 where2: list[tuple[str, tuple]],
1575 suffix: str | None = None,
1576 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1577 """Make cartesian product of two parts of WHERE clause into a series
1578 of statements to execute.
1579
1580 Parameters
1581 ----------
1582 prefix : `str`
1583 Initial statement prefix that comes before WHERE clause, e.g.
1584 "SELECT * from Table"
1585 """
1586 # If lists are empty use special sentinels.
1587 if not where1:
1588 where1 = [("", ())]
1589 if not where2:
1590 where2 = [("", ())]
1591
1592 for expr1, params1 in where1:
1593 for expr2, params2 in where2:
1594 full_query = prefix
1595 wheres = []
1596 if expr1:
1597 wheres.append(expr1)
1598 if expr2:
1599 wheres.append(expr2)
1600 if wheres:
1601 full_query += " WHERE " + " AND ".join(wheres)
1602 if suffix:
1603 full_query += " " + suffix
1604 params = params1 + params2
1605 if params:
1606 statement = self._preparer.prepare(full_query)
1607 else:
1608 # If there are no params then it is likely that query
1609 # has a bunch of literals rendered already, no point
1610 # trying to prepare it.
1611 statement = cassandra.query.SimpleStatement(full_query)
1612 yield (statement, params)
1613
1615 self, region: sphgeom.Region | None, use_ranges: bool = False
1616 ) -> list[tuple[str, tuple]]:
1617 """Generate expressions for spatial part of WHERE clause.
1618
1619 Parameters
1620 ----------
1621 region : `sphgeom.Region`
1622 Spatial region for query results.
1623 use_ranges : `bool`
1624 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1625 p2") instead of exact list of pixels. Should be set to True for
1626 large regions covering very many pixels.
1627
1628 Returns
1629 -------
1630 expressions : `list` [ `tuple` ]
1631 Empty list is returned if ``region`` is `None`, otherwise a list
1632 of one or more (expression, parameters) tuples
1633 """
1634 if region is None:
1635 return []
1636 if use_ranges:
1637 pixel_ranges = self._pixelization.envelope(region)
1638 expressions: list[tuple[str, tuple]] = []
1639 for lower, upper in pixel_ranges:
1640 upper -= 1
1641 if lower == upper:
1642 expressions.append(('"apdb_part" = ?', (lower,)))
1643 else:
1644 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1645 return expressions
1646 else:
1647 pixels = self._pixelization.pixels(region)
1648 if self.config.query_per_spatial_part:
1649 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1650 else:
1651 pixels_str = ",".join([str(pix) for pix in pixels])
1652 return [(f'"apdb_part" IN ({pixels_str})', ())]
1653
1655 self,
1656 table: ApdbTables,
1657 start_time: float | astropy.time.Time,
1658 end_time: float | astropy.time.Time,
1659 query_per_time_part: bool | None = None,
1660 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1661 """Generate table names and expressions for temporal part of WHERE
1662 clauses.
1663
1664 Parameters
1665 ----------
1666 table : `ApdbTables`
1667 Table to select from.
1668 start_time : `astropy.time.Time` or `float`
1669 Starting Datetime of MJD value of the time range.
1670 end_time : `astropy.time.Time` or `float`
1671 Starting Datetime of MJD value of the time range.
1672 query_per_time_part : `bool`, optional
1673 If None then use ``query_per_time_part`` from configuration.
1674
1675 Returns
1676 -------
1677 tables : `list` [ `str` ]
1678 List of the table names to query.
1679 expressions : `list` [ `tuple` ]
1680 A list of zero or more (expression, parameters) tuples.
1681 """
1682 tables: list[str]
1683 temporal_where: list[tuple[str, tuple]] = []
1684 table_name = self._schema.tableName(table)
1685 time_part_start = self._time_partition(start_time)
1686 time_part_end = self._time_partition(end_time)
1687 time_parts = list(range(time_part_start, time_part_end + 1))
1688 if self.config.time_partition_tables:
1689 tables = [f"{table_name}_{part}" for part in time_parts]
1690 else:
1691 tables = [table_name]
1692 if query_per_time_part is None:
1693 query_per_time_part = self.config.query_per_time_part
1694 if query_per_time_part:
1695 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1696 else:
1697 time_part_list = ",".join([str(part) for part in time_parts])
1698 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1699
1700 return tables, temporal_where
1701
1702 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1703 """Update timestamp columns in input DataFrame to be naive datetime
1704 type.
1705
1706 Clients may or may not generate aware timestamps, code in this class
1707 assumes that timestamps are naive, so we convert them to UTC and
1708 drop timezone.
1709 """
1710 # Find all columns with aware timestamps.
1711 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1712 for column in columns:
1713 # tz_convert(None) will convert to UTC and drop timezone.
1714 df[column] = df[column].dt.tz_convert(None)
1715 return df
std::vector< SchemaItem< Flag > > * items
__init__(self, list[str] public_ips, list[str] private_ips)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
bool containsVisitDetector(self, int visit, int detector)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
None _makeSchema(cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
_DbVersions _versionCheck(self, ApdbMetadataCassandra metadata)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
None delete_database(cls, str host, str keyspace, *int timeout=3600)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
ApdbCassandraConfig init_database(cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool use_insert_id_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
None _deleteMovingObjects(self, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.