LSST Applications g0fba68d861+5616995c1c,g1ebb85f214+2420ccdea7,g1fd858c14a+44c57a1f81,g21d47ad084+8e51fce9ac,g262e1987ae+1a7d68eb3b,g2cef7863aa+3bd8df3d95,g35bb328faa+fcb1d3bbc8,g36ff55ed5b+2420ccdea7,g47891489e3+5c6313fe9a,g53246c7159+fcb1d3bbc8,g646c943bdb+dbb9921566,g67b6fd64d1+5c6313fe9a,g6bd32b75b5+2420ccdea7,g74acd417e5+37fc0c974d,g786e29fd12+cf7ec2a62a,g86c591e316+6e13bcb9e9,g87389fa792+1e0a283bba,g89139ef638+5c6313fe9a,g90f42f885a+fce05a46d3,g9125e01d80+fcb1d3bbc8,g93e38de9ac+5345a64125,g95a1e89356+47d08a1cc6,g97be763408+bba861c665,ga9e4eb89a6+85210110a1,gb0b61e0e8e+1f27f70249,gb58c049af0+f03b321e39,gb89ab40317+5c6313fe9a,gc4e39d7843+4e09c98c3d,gd16ba4ae74+5402bcf54a,gd8ff7fe66e+2420ccdea7,gd9a9a58781+fcb1d3bbc8,gdab6d2f7ff+37fc0c974d,gde280f09ee+604b327636,ge278dab8ac+50e2446c94,ge410e46f29+5c6313fe9a,gef3c2e6661+6b480e0fb7,gf67bdafdda+5c6313fe9a,gffca2db377+fcb1d3bbc8,v29.2.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandra"]
25
26import dataclasses
27import logging
28import warnings
29from collections.abc import Iterable, Iterator, Mapping, Set
30from typing import TYPE_CHECKING, Any, cast
31
32import numpy as np
33import pandas
34
35# If cassandra-driver is not there the module can still be imported
36# but ApdbCassandra cannot be instantiated.
37try:
38 import cassandra
39 import cassandra.query
40 from cassandra.auth import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
43 from cassandra.query import UNSET_VALUE
44
45 CASSANDRA_IMPORTED = True
46except ImportError:
47 CASSANDRA_IMPORTED = False
48
49import astropy.time
50import felis.datamodel
51from lsst import sphgeom
52from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
53from lsst.utils.iteration import chunk_iterable
54
55from .._auth import DB_AUTH_ENVVAR, DB_AUTH_PATH
56from ..apdb import Apdb, ApdbConfig
57from ..apdbConfigFreezer import ApdbConfigFreezer
58from ..apdbReplica import ApdbTableData, ReplicaChunk
59from ..apdbSchema import ApdbTables
60from ..monitor import MonAgent
61from ..pixelization import Pixelization
62from ..schema_model import Table
63from ..timer import Timer
64from ..versionTuple import IncompatibleVersionError, VersionTuple
65from .apdbCassandraReplica import ApdbCassandraReplica
66from .apdbCassandraSchema import ApdbCassandraSchema, CreateTableOptions, ExtraTables
67from .apdbMetadataCassandra import ApdbMetadataCassandra
68from .cassandra_utils import (
69 PreparedStatementCache,
70 literal,
71 pandas_dataframe_factory,
72 quote_id,
73 raw_data_factory,
74 select_concurrent,
75)
76from .config import ApdbCassandraConfig, ApdbCassandraConnectionConfig
77
78if TYPE_CHECKING:
79 from ..apdbMetadata import ApdbMetadata
80
81_LOG = logging.getLogger(__name__)
82
83_MON = MonAgent(__name__)
84
85VERSION = VersionTuple(0, 1, 1)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
88changes.
89"""
90
91
92def _dump_query(rf: Any) -> None:
93 """Dump cassandra query to debug log."""
94 _LOG.debug("Cassandra query: %s", rf.query)
95
96
97class CassandraMissingError(Exception):
98 def __init__(self) -> None:
99 super().__init__("cassandra-driver module cannot be imported")
100
101
102@dataclasses.dataclass
104 """Collection of information about a specific database."""
105
106 name: str
107 """Keyspace name."""
108
109 permissions: dict[str, set[str]] | None = None
110 """Roles that can access the database and their permissions.
111
112 `None` means that authentication information is not accessible due to
113 system table permissions. If anonymous access is enabled then dictionary
114 will be empty but not `None`.
115 """
116
117
118@dataclasses.dataclass
120 """Versions defined in APDB metadata table."""
121
122 schema_version: VersionTuple
123 """Version of the schema from which database was created."""
124
125 code_version: VersionTuple
126 """Version of ApdbCassandra with which database was created."""
127
128 replica_version: VersionTuple | None
129 """Version of ApdbCassandraReplica with which database was created, None
130 if replication was not configured.
131 """
132
133
134if CASSANDRA_IMPORTED:
135
136 class _AddressTranslator(AddressTranslator):
137 """Translate internal IP address to external.
138
139 Only used for docker-based setup, not viable long-term solution.
140 """
141
142 def __init__(self, public_ips: tuple[str, ...], private_ips: tuple[str, ...]):
143 self._map = dict((k, v) for k, v in zip(private_ips, public_ips))
144
145 def translate(self, private_ip: str) -> str:
146 return self._map.get(private_ip, private_ip)
147
148
150 """Implementation of APDB database on to of Apache Cassandra.
151
152 The implementation is configured via standard ``pex_config`` mechanism
153 using `ApdbCassandraConfig` configuration class. For an example of
154 different configurations check config/ folder.
155
156 Parameters
157 ----------
158 config : `ApdbCassandraConfig`
159 Configuration object.
160 """
161
162 metadataSchemaVersionKey = "version:schema"
163 """Name of the metadata key to store schema version number."""
164
165 metadataCodeVersionKey = "version:ApdbCassandra"
166 """Name of the metadata key to store code version number."""
167
168 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
169 """Name of the metadata key to store replica code version number."""
170
171 metadataConfigKey = "config:apdb-cassandra.json"
172 """Name of the metadata key to store code version number."""
173
174 _frozen_parameters = (
175 "enable_replica",
176 "ra_dec_columns",
177 "replica_skips_diaobjects",
178 "partitioning.part_pixelization",
179 "partitioning.part_pix_level",
180 "partitioning.time_partition_tables",
181 "partitioning.time_partition_days",
182 )
183 """Names of the config parameters to be frozen in metadata table."""
184
185 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
186 """Start time for partition 0, this should never be changed."""
187
188 def __init__(self, config: ApdbCassandraConfig):
189 if not CASSANDRA_IMPORTED:
191
192 self._keyspace = config.keyspace
193
194 self._cluster, self._session = self._make_session(config)
195
196 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
198 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
199 )
200
201 # Read frozen config from metadata.
202 with self._timer("read_metadata_config"):
203 config_json = self._metadata.get(self.metadataConfigKey)
204 if config_json is not None:
205 # Update config from metadata.
206 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
207 self.config = freezer.update(config, config_json)
208 else:
209 self.config = config
210
212 self.config.partitioning.part_pixelization,
213 self.config.partitioning.part_pix_level,
214 config.partitioning.part_pix_max_ranges,
215 )
216
218 session=self._session,
219 keyspace=self._keyspace,
220 schema_file=self.config.schema_file,
221 schema_name=self.config.schema_name,
222 prefix=self.config.prefix,
223 time_partition_tables=self.config.partitioning.time_partition_tables,
224 enable_replica=self.config.enable_replica,
225 )
227
228 self._db_versions: _DbVersions | None = None
229 if self._metadata.table_exists():
230 with self._timer("version_check"):
231 self._db_versions = self._versionCheck(self._metadata)
232
233 # Support for DiaObjectLastToPartition was added at code version 0.1.1
234 # in a backward-compatible way (we only use the table if it is there).
235 if self._db_versions:
237 else:
239
240 # Cache for prepared statements
242
243 if _LOG.isEnabledFor(logging.DEBUG):
244 _LOG.debug("ApdbCassandra Configuration: %s", self.config.model_dump())
245
246 def __del__(self) -> None:
247 if hasattr(self, "_cluster"):
248 self._cluster.shutdown()
249
250 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
251 """Create `Timer` instance given its name."""
252 return Timer(name, _MON, tags=tags)
253
254 @classmethod
255 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
256 """Make Cassandra session."""
257 addressTranslator: AddressTranslator | None = None
258 if config.connection_config.private_ips:
259 addressTranslator = _AddressTranslator(
260 config.contact_points, config.connection_config.private_ips
261 )
262
263 with Timer("cluster_connect", _MON):
264 cluster = Cluster(
265 execution_profiles=cls._makeProfiles(config),
266 contact_points=config.contact_points,
267 port=config.connection_config.port,
268 address_translator=addressTranslator,
269 protocol_version=config.connection_config.protocol_version,
270 auth_provider=cls._make_auth_provider(config),
271 **config.connection_config.extra_parameters,
272 )
273 session = cluster.connect()
274
275 # Dump queries if debug level is enabled.
276 if _LOG.isEnabledFor(logging.DEBUG):
277 session.add_request_init_listener(_dump_query)
278
279 # Disable result paging
280 session.default_fetch_size = None
281
282 return cluster, session
283
284 @classmethod
285 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
286 """Make Cassandra authentication provider instance."""
287 try:
288 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
289 except DbAuthNotFoundError:
290 # Credentials file doesn't exist, use anonymous login.
291 return None
292
293 empty_username = True
294 # Try every contact point in turn.
295 for hostname in config.contact_points:
296 try:
297 username, password = dbauth.getAuth(
298 "cassandra",
299 config.connection_config.username,
300 hostname,
301 config.connection_config.port,
302 config.keyspace,
303 )
304 if not username:
305 # Password without user name, try next hostname, but give
306 # warning later if no better match is found.
307 empty_username = True
308 else:
309 return PlainTextAuthProvider(username=username, password=password)
310 except DbAuthNotFoundError:
311 pass
312
313 if empty_username:
314 _LOG.warning(
315 f"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
316 f"user name, anonymous Cassandra logon will be attempted."
317 )
318
319 return None
320
321 def _versionCheck(self, metadata: ApdbMetadataCassandra) -> _DbVersions:
322 """Check schema version compatibility."""
323
324 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
325 """Retrieve version number from given metadata key."""
326 if metadata.table_exists():
327 version_str = metadata.get(key)
328 if version_str is None:
329 # Should not happen with existing metadata table.
330 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
331 return VersionTuple.fromString(version_str)
332 return default
333
334 # For old databases where metadata table does not exist we assume that
335 # version of both code and schema is 0.1.0.
336 initial_version = VersionTuple(0, 1, 0)
337 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
338 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
339
340 # For now there is no way to make read-only APDB instances, assume that
341 # any access can do updates.
342 if not self._schema.schemaVersion().checkCompatibility(db_schema_version):
344 f"Configured schema version {self._schema.schemaVersion()} "
345 f"is not compatible with database version {db_schema_version}"
346 )
347 if not self.apdbImplementationVersion().checkCompatibility(db_code_version):
349 f"Current code version {self.apdbImplementationVersion()} "
350 f"is not compatible with database version {db_code_version}"
351 )
352
353 # Check replica code version only if replica is enabled.
354 db_replica_version: VersionTuple | None = None
355 if self._schema.has_replica_chunks:
356 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
357 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
358 if not code_replica_version.checkCompatibility(db_replica_version):
360 f"Current replication code version {code_replica_version} "
361 f"is not compatible with database version {db_replica_version}"
362 )
363
364 return _DbVersions(
365 schema_version=db_schema_version, code_version=db_code_version, replica_version=db_replica_version
366 )
367
368 @classmethod
369 def apdbImplementationVersion(cls) -> VersionTuple:
370 """Return version number for current APDB implementation.
371
372 Returns
373 -------
374 version : `VersionTuple`
375 Version of the code defined in implementation class.
376 """
377 return VERSION
378
379 def tableDef(self, table: ApdbTables) -> Table | None:
380 # docstring is inherited from a base class
381 return self._schema.tableSchemas.get(table)
382
383 @classmethod
385 cls,
386 hosts: tuple[str, ...],
387 keyspace: str,
388 *,
389 schema_file: str | None = None,
390 schema_name: str | None = None,
391 read_sources_months: int | None = None,
392 read_forced_sources_months: int | None = None,
393 enable_replica: bool = False,
394 replica_skips_diaobjects: bool = False,
395 port: int | None = None,
396 username: str | None = None,
397 prefix: str | None = None,
398 part_pixelization: str | None = None,
399 part_pix_level: int | None = None,
400 time_partition_tables: bool = True,
401 time_partition_start: str | None = None,
402 time_partition_end: str | None = None,
403 read_consistency: str | None = None,
404 write_consistency: str | None = None,
405 read_timeout: int | None = None,
406 write_timeout: int | None = None,
407 ra_dec_columns: tuple[str, str] | None = None,
408 replication_factor: int | None = None,
409 drop: bool = False,
410 table_options: CreateTableOptions | None = None,
411 ) -> ApdbCassandraConfig:
412 """Initialize new APDB instance and make configuration object for it.
413
414 Parameters
415 ----------
416 hosts : `tuple` [`str`, ...]
417 List of host names or IP addresses for Cassandra cluster.
418 keyspace : `str`
419 Name of the keyspace for APDB tables.
420 schema_file : `str`, optional
421 Location of (YAML) configuration file with APDB schema. If not
422 specified then default location will be used.
423 schema_name : `str`, optional
424 Name of the schema in YAML configuration file. If not specified
425 then default name will be used.
426 read_sources_months : `int`, optional
427 Number of months of history to read from DiaSource.
428 read_forced_sources_months : `int`, optional
429 Number of months of history to read from DiaForcedSource.
430 enable_replica : `bool`, optional
431 If True, make additional tables used for replication to PPDB.
432 replica_skips_diaobjects : `bool`, optional
433 If `True` then do not fill regular ``DiaObject`` table when
434 ``enable_replica`` is `True`.
435 port : `int`, optional
436 Port number to use for Cassandra connections.
437 username : `str`, optional
438 User name for Cassandra connections.
439 prefix : `str`, optional
440 Optional prefix for all table names.
441 part_pixelization : `str`, optional
442 Name of the MOC pixelization used for partitioning.
443 part_pix_level : `int`, optional
444 Pixelization level.
445 time_partition_tables : `bool`, optional
446 Create per-partition tables.
447 time_partition_start : `str`, optional
448 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
449 format, in TAI.
450 time_partition_end : `str`, optional
451 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
452 format, in TAI.
453 read_consistency : `str`, optional
454 Name of the consistency level for read operations.
455 write_consistency : `str`, optional
456 Name of the consistency level for write operations.
457 read_timeout : `int`, optional
458 Read timeout in seconds.
459 write_timeout : `int`, optional
460 Write timeout in seconds.
461 ra_dec_columns : `tuple` [`str`, `str`], optional
462 Names of ra/dec columns in DiaObject table.
463 replication_factor : `int`, optional
464 Replication factor used when creating new keyspace, if keyspace
465 already exists its replication factor is not changed.
466 drop : `bool`, optional
467 If `True` then drop existing tables before re-creating the schema.
468 table_options : `CreateTableOptions`, optional
469 Options used when creating Cassandra tables.
470
471 Returns
472 -------
473 config : `ApdbCassandraConfig`
474 Resulting configuration object for a created APDB instance.
475 """
476 # Some non-standard defaults for connection parameters, these can be
477 # changed later in generated config. Check Cassandra driver
478 # documentation for what these parameters do. These parameters are not
479 # used during database initialization, but they will be saved with
480 # generated config.
481 connection_config = ApdbCassandraConnectionConfig(
482 extra_parameters={
483 "idle_heartbeat_interval": 0,
484 "idle_heartbeat_timeout": 30,
485 "control_connection_timeout": 100,
486 },
487 )
488 config = ApdbCassandraConfig(
489 contact_points=hosts,
490 keyspace=keyspace,
491 enable_replica=enable_replica,
492 replica_skips_diaobjects=replica_skips_diaobjects,
493 connection_config=connection_config,
494 )
495 config.partitioning.time_partition_tables = time_partition_tables
496 if schema_file is not None:
497 config.schema_file = schema_file
498 if schema_name is not None:
499 config.schema_name = schema_name
500 if read_sources_months is not None:
501 config.read_sources_months = read_sources_months
502 if read_forced_sources_months is not None:
503 config.read_forced_sources_months = read_forced_sources_months
504 if port is not None:
505 config.connection_config.port = port
506 if username is not None:
507 config.connection_config.username = username
508 if prefix is not None:
509 config.prefix = prefix
510 if part_pixelization is not None:
511 config.partitioning.part_pixelization = part_pixelization
512 if part_pix_level is not None:
513 config.partitioning.part_pix_level = part_pix_level
514 if time_partition_start is not None:
515 config.partitioning.time_partition_start = time_partition_start
516 if time_partition_end is not None:
517 config.partitioning.time_partition_end = time_partition_end
518 if read_consistency is not None:
519 config.connection_config.read_consistency = read_consistency
520 if write_consistency is not None:
521 config.connection_config.write_consistency = write_consistency
522 if read_timeout is not None:
523 config.connection_config.read_timeout = read_timeout
524 if write_timeout is not None:
525 config.connection_config.write_timeout = write_timeout
526 if ra_dec_columns is not None:
527 config.ra_dec_columns = ra_dec_columns
528
529 cls._makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
530
531 return config
532
533 @classmethod
534 def list_databases(cls, host: str) -> Iterable[DatabaseInfo]:
535 """Return the list of keyspaces with APDB databases.
536
537 Parameters
538 ----------
539 host : `str`
540 Name of one of the hosts in Cassandra cluster.
541
542 Returns
543 -------
544 databases : `~collections.abc.Iterable` [`DatabaseInfo`]
545 Information about databases that contain APDB instance.
546 """
547 # For DbAuth we need to use database name "*" to try to match any
548 # database.
549 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*")
550 cluster, session = cls._make_session(config)
551
552 with cluster, session:
553 # Get names of all keyspaces containing DiaSource table
554 table_name = ApdbTables.DiaSource.table_name()
555 query = "select keyspace_name from system_schema.tables where table_name = %s ALLOW FILTERING"
556 result = session.execute(query, (table_name,))
557 keyspaces = [row[0] for row in result.all()]
558
559 if not keyspaces:
560 return []
561
562 # Retrieve roles for each keyspace.
563 template = ", ".join(["%s"] * len(keyspaces))
564 query = (
565 "SELECT resource, role, permissions FROM system_auth.role_permissions "
566 f"WHERE resource IN ({template}) ALLOW FILTERING"
567 )
568 resources = [f"data/{keyspace}" for keyspace in keyspaces]
569 try:
570 result = session.execute(query, resources)
571 # If anonymous access is enabled then result will be empty,
572 # set infos to have empty permissions dict in that case.
573 infos = {keyspace: DatabaseInfo(name=keyspace, permissions={}) for keyspace in keyspaces}
574 for row in result:
575 _, _, keyspace = row[0].partition("/")
576 role: str = row[1]
577 role_permissions: set[str] = set(row[2])
578 infos[keyspace].permissions[role] = role_permissions # type: ignore[index]
579 except cassandra.Unauthorized as exc:
580 # Likely that access to role_permissions is not granted for
581 # current user.
582 warnings.warn(
583 f"Authentication information is not accessible to current user - {exc}", stacklevel=2
584 )
585 infos = {keyspace: DatabaseInfo(name=keyspace) for keyspace in keyspaces}
586
587 # Would be nice to get size estimate, but this is not available
588 # via CQL queries.
589 return infos.values()
590
591 @classmethod
592 def delete_database(cls, host: str, keyspace: str, *, timeout: int = 3600) -> None:
593 """Delete APDB database by dropping its keyspace.
594
595 Parameters
596 ----------
597 host : `str`
598 Name of one of the hosts in Cassandra cluster.
599 keyspace : `str`
600 Name of keyspace to delete.
601 timeout : `int`, optional
602 Timeout for delete operation in seconds. Dropping a large keyspace
603 can be a long operation, but this default value of one hour should
604 be sufficient for most or all cases.
605 """
606 # For DbAuth we need to use database name "*" to try to match any
607 # database.
608 config = ApdbCassandraConfig(contact_points=(host,), keyspace="*")
609 cluster, session = cls._make_session(config)
610 with cluster, session:
611 query = f"DROP KEYSPACE {quote_id(keyspace)}"
612 session.execute(query, timeout=timeout)
613
614 def get_replica(self) -> ApdbCassandraReplica:
615 """Return `ApdbReplica` instance for this database."""
616 # Note that this instance has to stay alive while replica exists, so
617 # we pass reference to self.
618 return ApdbCassandraReplica(self, self._schema, self._session)
619
620 @classmethod
622 cls,
623 config: ApdbConfig,
624 *,
625 drop: bool = False,
626 replication_factor: int | None = None,
627 table_options: CreateTableOptions | None = None,
628 ) -> None:
629 # docstring is inherited from a base class
630
631 if not isinstance(config, ApdbCassandraConfig):
632 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
633
634 cluster, session = cls._make_session(config)
635 with cluster, session:
636 schema = ApdbCassandraSchema(
637 session=session,
638 keyspace=config.keyspace,
639 schema_file=config.schema_file,
640 schema_name=config.schema_name,
641 prefix=config.prefix,
642 time_partition_tables=config.partitioning.time_partition_tables,
643 enable_replica=config.enable_replica,
644 )
645
646 # Ask schema to create all tables.
647 if config.partitioning.time_partition_tables:
648 time_partition_start = astropy.time.Time(
649 config.partitioning.time_partition_start, format="isot", scale="tai"
650 )
651 time_partition_end = astropy.time.Time(
652 config.partitioning.time_partition_end, format="isot", scale="tai"
653 )
654 part_epoch = float(cls.partition_zero_epoch.mjd)
655 part_days = config.partitioning.time_partition_days
656 part_range = (
657 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
658 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
659 )
660 schema.makeSchema(
661 drop=drop,
662 part_range=part_range,
663 replication_factor=replication_factor,
664 table_options=table_options,
665 )
666 else:
667 schema.makeSchema(
668 drop=drop, replication_factor=replication_factor, table_options=table_options
669 )
670
671 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
672 metadata = ApdbMetadataCassandra(
673 session, meta_table_name, config.keyspace, "read_tuples", "write"
674 )
675
676 # Fill version numbers, overrides if they existed before.
677 if metadata.table_exists():
678 metadata.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
679 metadata.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
680
681 if config.enable_replica:
682 # Only store replica code version if replica is enabled.
683 metadata.set(
685 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
686 force=True,
687 )
688
689 # Store frozen part of a configuration in metadata.
690 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
691 metadata.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
692
693 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
694 # docstring is inherited from a base class
695
696 sp_where = self._spatial_where(region)
697 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
698
699 # We need to exclude extra partitioning columns from result.
700 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
701 what = ",".join(quote_id(column) for column in column_names)
702
703 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
704 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
705 statements: list[tuple] = []
706 for where, params in sp_where:
707 full_query = f"{query} WHERE {where}"
708 if params:
709 statement = self._preparer.prepare(full_query)
710 else:
711 # If there are no params then it is likely that query has a
712 # bunch of literals rendered already, no point trying to
713 # prepare it because it's not reusable.
714 statement = cassandra.query.SimpleStatement(full_query)
715 statements.append((statement, params))
716 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
717
718 with _MON.context_tags({"table": "DiaObject"}):
719 _MON.add_record(
720 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
721 )
722 with self._timer("select_time") as timer:
723 objects = cast(
724 pandas.DataFrame,
725 select_concurrent(
726 self._session,
727 statements,
728 "read_pandas_multi",
729 self.config.connection_config.read_concurrency,
730 ),
731 )
732 timer.add_values(row_count=len(objects))
733
734 _LOG.debug("found %s DiaObjects", objects.shape[0])
735 return objects
736
738 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
739 ) -> pandas.DataFrame | None:
740 # docstring is inherited from a base class
741 months = self.config.read_sources_months
742 if months == 0:
743 return None
744 mjd_end = float(visit_time.mjd)
745 mjd_start = mjd_end - months * 30
746
747 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
748
750 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
751 ) -> pandas.DataFrame | None:
752 # docstring is inherited from a base class
753 months = self.config.read_forced_sources_months
754 if months == 0:
755 return None
756 mjd_end = float(visit_time.mjd)
757 mjd_start = mjd_end - months * 30
758
759 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
760
761 def containsVisitDetector(self, visit: int, detector: int) -> bool:
762 # docstring is inherited from a base class
763 # The order of checks corresponds to order in store(), on potential
764 # store failure earlier tables have higher probability containing
765 # stored records. With per-partition tables there will be many tables
766 # in the list, but it is unlikely that we'll use that setup in
767 # production.
768 existing_tables = self._schema.existing_tables(ApdbTables.DiaSource, ApdbTables.DiaForcedSource)
769 tables_to_check = existing_tables[ApdbTables.DiaSource][:]
770 if self.config.enable_replica:
771 tables_to_check.append(self._schema.tableName(ExtraTables.DiaSourceChunks))
772 tables_to_check.extend(existing_tables[ApdbTables.DiaForcedSource])
773 if self.config.enable_replica:
774 tables_to_check.append(self._schema.tableName(ExtraTables.DiaForcedSourceChunks))
775
776 # I do not want to run concurrent queries as they are all full-scan
777 # queries, so we do one by one.
778 for table_name in tables_to_check:
779 # Try to find a single record with given visit/detector. This is a
780 # full scan query so ALLOW FILTERING is needed. It will probably
781 # guess PER PARTITION LIMIT itself, but let's help it.
782 query = (
783 f'SELECT * from "{self._keyspace}"."{table_name}" '
784 "WHERE visit = ? AND detector = ? "
785 "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
786 )
787 with self._timer("contains_visit_detector_time", tags={"table": table_name}) as timer:
788 result = self._session.execute(self._preparer.prepare(query), (visit, detector))
789 found = result.one() is not None
790 timer.add_values(found=int(found))
791 if found:
792 # There is a result.
793 return True
794 return False
795
796 def getSSObjects(self) -> pandas.DataFrame:
797 # docstring is inherited from a base class
798 tableName = self._schema.tableName(ApdbTables.SSObject)
799 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
800
801 objects = None
802 with self._timer("select_time", tags={"table": "SSObject"}) as timer:
803 result = self._session.execute(query, execution_profile="read_pandas")
804 objects = result._current_rows
805 timer.add_values(row_count=len(objects))
806
807 _LOG.debug("found %s SSObjects", objects.shape[0])
808 return objects
809
810 def store(
811 self,
812 visit_time: astropy.time.Time,
813 objects: pandas.DataFrame,
814 sources: pandas.DataFrame | None = None,
815 forced_sources: pandas.DataFrame | None = None,
816 ) -> None:
817 # docstring is inherited from a base class
818 objects = self._fix_input_timestamps(objects)
819 if sources is not None:
820 sources = self._fix_input_timestamps(sources)
821 if forced_sources is not None:
822 forced_sources = self._fix_input_timestamps(forced_sources)
823
824 replica_chunk: ReplicaChunk | None = None
825 if self._schema.has_replica_chunks:
826 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
827 self._storeReplicaChunk(replica_chunk, visit_time)
828
829 # fill region partition column for DiaObjects
830 objects = self._add_apdb_part(objects)
831 self._storeDiaObjects(objects, visit_time, replica_chunk)
832
833 if sources is not None:
834 # copy apdb_part column from DiaObjects to DiaSources
835 sources = self._add_apdb_part(sources)
836 self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
837 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
838
839 if forced_sources is not None:
840 forced_sources = self._add_apdb_part(forced_sources)
841 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
842
843 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
844 # docstring is inherited from a base class
845 objects = self._fix_input_timestamps(objects)
846 self._storeObjectsPandas(objects, ApdbTables.SSObject)
847
848 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
849 # docstring is inherited from a base class
850
851 # To update a record we need to know its exact primary key (including
852 # partition key) so we start by querying for diaSourceId to find the
853 # primary keys.
854
855 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
856 # split it into 1k IDs per query
857 selects: list[tuple] = []
858 for ids in chunk_iterable(idMap.keys(), 1_000):
859 ids_str = ",".join(str(item) for item in ids)
860 selects.append(
861 (
862 (
863 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
864 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
865 ),
866 {},
867 )
868 )
869
870 # No need for DataFrame here, read data as tuples.
871 result = cast(
872 list[tuple[int, int, int, int | None]],
873 select_concurrent(
874 self._session, selects, "read_tuples", self.config.connection_config.read_concurrency
875 ),
876 )
877
878 # Make mapping from source ID to its partition.
879 id2partitions: dict[int, tuple[int, int]] = {}
880 id2chunk_id: dict[int, int] = {}
881 for row in result:
882 id2partitions[row[0]] = row[1:3]
883 if row[3] is not None:
884 id2chunk_id[row[0]] = row[3]
885
886 # make sure we know partitions for each ID
887 if set(id2partitions) != set(idMap):
888 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
889 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
890
891 # Reassign in standard tables
892 queries = cassandra.query.BatchStatement()
893 table_name = self._schema.tableName(ApdbTables.DiaSource)
894 for diaSourceId, ssObjectId in idMap.items():
895 apdb_part, apdb_time_part = id2partitions[diaSourceId]
896 values: tuple
897 if self.config.partitioning.time_partition_tables:
898 query = (
899 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
900 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
901 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
902 )
903 values = (ssObjectId, apdb_part, diaSourceId)
904 else:
905 query = (
906 f'UPDATE "{self._keyspace}"."{table_name}"'
907 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
908 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
909 )
910 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
911 queries.add(self._preparer.prepare(query), values)
912
913 # Reassign in replica tables, only if replication is enabled
914 if id2chunk_id:
915 # Filter out chunks that have been deleted already. There is a
916 # potential race with concurrent removal of chunks, but it
917 # should be handled by WHERE in UPDATE.
918 known_ids = set()
919 if replica_chunks := self.get_replica().getReplicaChunks():
920 known_ids = set(replica_chunk.id for replica_chunk in replica_chunks)
921 id2chunk_id = {key: value for key, value in id2chunk_id.items() if value in known_ids}
922 if id2chunk_id:
923 table_name = self._schema.tableName(ExtraTables.DiaSourceChunks)
924 for diaSourceId, ssObjectId in idMap.items():
925 if replica_chunk := id2chunk_id.get(diaSourceId):
926 query = (
927 f'UPDATE "{self._keyspace}"."{table_name}" '
928 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
929 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
930 )
931 values = (ssObjectId, replica_chunk, diaSourceId)
932 queries.add(self._preparer.prepare(query), values)
933
934 _LOG.debug("%s: will update %d records", table_name, len(idMap))
935 with self._timer("source_reassign_time") as timer:
936 self._session.execute(queries, execution_profile="write")
937 timer.add_values(source_count=len(idMap))
938
939 def dailyJob(self) -> None:
940 # docstring is inherited from a base class
941 pass
942
943 def countUnassociatedObjects(self) -> int:
944 # docstring is inherited from a base class
945
946 # It's too inefficient to implement it for Cassandra in current schema.
947 raise NotImplementedError()
948
949 @property
950 def metadata(self) -> ApdbMetadata:
951 # docstring is inherited from a base class
952 if self._metadata is None:
953 raise RuntimeError("Database schema was not initialized.")
954 return self._metadata
955
956 @classmethod
957 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
958 """Make all execution profiles used in the code."""
959 if config.connection_config.private_ips:
960 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
961 else:
962 loadBalancePolicy = RoundRobinPolicy()
963
964 read_tuples_profile = ExecutionProfile(
965 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
966 request_timeout=config.connection_config.read_timeout,
967 row_factory=cassandra.query.tuple_factory,
968 load_balancing_policy=loadBalancePolicy,
969 )
970 read_pandas_profile = ExecutionProfile(
971 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
972 request_timeout=config.connection_config.read_timeout,
973 row_factory=pandas_dataframe_factory,
974 load_balancing_policy=loadBalancePolicy,
975 )
976 read_raw_profile = ExecutionProfile(
977 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
978 request_timeout=config.connection_config.read_timeout,
979 row_factory=raw_data_factory,
980 load_balancing_policy=loadBalancePolicy,
981 )
982 # Profile to use with select_concurrent to return pandas data frame
983 read_pandas_multi_profile = ExecutionProfile(
984 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
985 request_timeout=config.connection_config.read_timeout,
986 row_factory=pandas_dataframe_factory,
987 load_balancing_policy=loadBalancePolicy,
988 )
989 # Profile to use with select_concurrent to return raw data (columns and
990 # rows)
991 read_raw_multi_profile = ExecutionProfile(
992 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.read_consistency),
993 request_timeout=config.connection_config.read_timeout,
994 row_factory=raw_data_factory,
995 load_balancing_policy=loadBalancePolicy,
996 )
997 write_profile = ExecutionProfile(
998 consistency_level=getattr(cassandra.ConsistencyLevel, config.connection_config.write_consistency),
999 request_timeout=config.connection_config.write_timeout,
1000 load_balancing_policy=loadBalancePolicy,
1001 )
1002 # To replace default DCAwareRoundRobinPolicy
1003 default_profile = ExecutionProfile(
1004 load_balancing_policy=loadBalancePolicy,
1005 )
1006 return {
1007 "read_tuples": read_tuples_profile,
1008 "read_pandas": read_pandas_profile,
1009 "read_raw": read_raw_profile,
1010 "read_pandas_multi": read_pandas_multi_profile,
1011 "read_raw_multi": read_raw_multi_profile,
1012 "write": write_profile,
1013 EXEC_PROFILE_DEFAULT: default_profile,
1014 }
1015
1017 self,
1018 region: sphgeom.Region,
1019 object_ids: Iterable[int] | None,
1020 mjd_start: float,
1021 mjd_end: float,
1022 table_name: ApdbTables,
1023 ) -> pandas.DataFrame:
1024 """Return catalog of DiaSource instances given set of DiaObject IDs.
1025
1026 Parameters
1027 ----------
1028 region : `lsst.sphgeom.Region`
1029 Spherical region.
1030 object_ids :
1031 Collection of DiaObject IDs
1032 mjd_start : `float`
1033 Lower bound of time interval.
1034 mjd_end : `float`
1035 Upper bound of time interval.
1036 table_name : `ApdbTables`
1037 Name of the table.
1038
1039 Returns
1040 -------
1041 catalog : `pandas.DataFrame`, or `None`
1042 Catalog containing DiaSource records. Empty catalog is returned if
1043 ``object_ids`` is empty.
1044 """
1045 object_id_set: Set[int] = set()
1046 if object_ids is not None:
1047 object_id_set = set(object_ids)
1048 if len(object_id_set) == 0:
1049 return self._make_empty_catalog(table_name)
1050
1051 sp_where = self._spatial_where(region)
1052 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
1053
1054 # We need to exclude extra partitioning columns from result.
1055 column_names = self._schema.apdbColumnNames(table_name)
1056 what = ",".join(quote_id(column) for column in column_names)
1057
1058 # Build all queries
1059 statements: list[tuple] = []
1060 for table in tables:
1061 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
1062 statements += list(self._combine_where(prefix, sp_where, temporal_where))
1063 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
1064
1065 with _MON.context_tags({"table": table_name.name}):
1066 _MON.add_record(
1067 "select_query_stats", values={"num_sp_part": len(sp_where), "num_queries": len(statements)}
1068 )
1069 with self._timer("select_time") as timer:
1070 catalog = cast(
1071 pandas.DataFrame,
1072 select_concurrent(
1073 self._session,
1074 statements,
1075 "read_pandas_multi",
1076 self.config.connection_config.read_concurrency,
1077 ),
1078 )
1079 timer.add_values(row_count=len(catalog))
1080
1081 # filter by given object IDs
1082 if len(object_id_set) > 0:
1083 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
1084
1085 # precise filtering on midpointMjdTai
1086 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
1087
1088 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
1089 return catalog
1090
1091 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
1092 # Cassandra timestamp uses milliseconds since epoch
1093 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
1094
1095 # everything goes into a single partition
1096 partition = 0
1097
1098 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
1099 query = (
1100 f'INSERT INTO "{self._keyspace}"."{table_name}" '
1101 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1102 "VALUES (?, ?, ?, ?)"
1103 )
1104
1105 self._session.execute(
1106 self._preparer.prepare(query),
1107 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1108 timeout=self.config.connection_config.write_timeout,
1109 execution_profile="write",
1110 )
1111
1112 def _queryDiaObjectLastPartitions(self, ids: Iterable[int]) -> Mapping[int, int]:
1113 """Return existing mapping of diaObjectId to its last partition."""
1114 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1115 queries = []
1116 object_count = 0
1117 for id_chunk in chunk_iterable(ids, 10_000):
1118 id_chunk_list = list(id_chunk)
1119 query = (
1120 f'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
1121 f'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
1122 )
1123 queries.append((query, ()))
1124 object_count += len(id_chunk_list)
1125
1126 with self._timer("query_object_last_partitions") as timer:
1127 data = cast(
1128 ApdbTableData,
1129 select_concurrent(
1130 self._session, queries, "read_raw_multi", self.config.connection_config.read_concurrency
1131 ),
1132 )
1133 timer.add_values(object_count=object_count, row_count=len(data.rows()))
1134
1135 if data.column_names() != ["diaObjectId", "apdb_part"]:
1136 raise RuntimeError(f"Unexpected column names in query result: {data.column_names()}")
1137
1138 return {row[0]: row[1] for row in data.rows()}
1139
1140 def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
1141 """Objects in DiaObjectsLast can move from one spatial partition to
1142 another. For those objects inserting new version does not replace old
1143 one, so we need to explicitly remove old versions before inserting new
1144 ones.
1145 """
1146 # Extract all object IDs.
1147 new_partitions = {oid: part for oid, part in zip(objs["diaObjectId"], objs["apdb_part"])}
1148 old_partitions = self._queryDiaObjectLastPartitions(objs["diaObjectId"])
1149
1150 moved_oids: dict[int, tuple[int, int]] = {}
1151 for oid, old_part in old_partitions.items():
1152 new_part = new_partitions.get(oid, old_part)
1153 if new_part != old_part:
1154 moved_oids[oid] = (old_part, new_part)
1155 _LOG.debug("DiaObject IDs that moved to new partition: %s", moved_oids)
1156
1157 if moved_oids:
1158 # Delete old records from DiaObjectLast.
1159 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
1160 query = f'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
1161 statement = self._preparer.prepare(query)
1162 batch = cassandra.query.BatchStatement()
1163 for oid, (old_part, _) in moved_oids.items():
1164 batch.add(statement, (old_part, oid))
1165 with self._timer("delete_object_last") as timer:
1166 self._session.execute(
1167 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1168 )
1169 timer.add_values(row_count=len(moved_oids))
1170
1171 # Add all new records to the map.
1172 table_name = self._schema.tableName(ExtraTables.DiaObjectLastToPartition)
1173 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
1174 statement = self._preparer.prepare(query)
1175
1176 batch_size = self._batch_size(ExtraTables.DiaObjectLastToPartition)
1177 batches = []
1178 for chunk in chunk_iterable(new_partitions.items(), batch_size):
1179 batch = cassandra.query.BatchStatement()
1180 for oid, new_part in chunk:
1181 batch.add(statement, (oid, new_part))
1182 batches.append(batch)
1183
1184 with self._timer("update_object_last_partition") as timer:
1185 for batch in batches:
1186 self._session.execute(
1187 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1188 )
1189 timer.add_values(row_count=len(batch))
1190
1192 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1193 ) -> None:
1194 """Store catalog of DiaObjects from current visit.
1195
1196 Parameters
1197 ----------
1198 objs : `pandas.DataFrame`
1199 Catalog with DiaObject records
1200 visit_time : `astropy.time.Time`
1201 Time of the current visit.
1202 replica_chunk : `ReplicaChunk` or `None`
1203 Replica chunk identifier if replication is configured.
1204 """
1205 if len(objs) == 0:
1206 _LOG.debug("No objects to write to database.")
1207 return
1208
1210 self._deleteMovingObjects(objs)
1211
1212 visit_time_dt = visit_time.datetime
1213 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1214 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1215
1216 extra_columns["validityStart"] = visit_time_dt
1217 time_part: int | None = self._time_partition(visit_time)
1218 if not self.config.partitioning.time_partition_tables:
1219 extra_columns["apdb_time_part"] = time_part
1220 time_part = None
1221
1222 # Only store DiaObects if not doing replication or explicitly
1223 # configured to always store them.
1224 if replica_chunk is None or not self.config.replica_skips_diaobjects:
1226 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1227 )
1228
1229 if replica_chunk is not None:
1230 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1231 self._storeObjectsPandas(objs, ExtraTables.DiaObjectChunks, extra_columns=extra_columns)
1232
1234 self,
1235 table_name: ApdbTables,
1236 sources: pandas.DataFrame,
1237 replica_chunk: ReplicaChunk | None,
1238 ) -> None:
1239 """Store catalog of DIASources or DIAForcedSources from current visit.
1240
1241 Parameters
1242 ----------
1243 table_name : `ApdbTables`
1244 Table where to store the data.
1245 sources : `pandas.DataFrame`
1246 Catalog containing DiaSource records
1247 visit_time : `astropy.time.Time`
1248 Time of the current visit.
1249 replica_chunk : `ReplicaChunk` or `None`
1250 Replica chunk identifier if replication is configured.
1251 """
1252 # Time partitioning has to be based on midpointMjdTai, not visit_time
1253 # as visit_time is not really a visit time.
1254 tp_sources = sources.copy(deep=False)
1255 tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(self._time_partition)
1256 extra_columns: dict[str, Any] = {}
1257 if not self.config.partitioning.time_partition_tables:
1258 self._storeObjectsPandas(tp_sources, table_name)
1259 else:
1260 # Group by time partition
1261 partitions = set(tp_sources["apdb_time_part"])
1262 if len(partitions) == 1:
1263 # Single partition - just save the whole thing.
1264 time_part = partitions.pop()
1265 self._storeObjectsPandas(sources, table_name, time_part=time_part)
1266 else:
1267 # group by time partition.
1268 for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
1269 sub_frame.drop(columns="apdb_time_part", inplace=True)
1270 self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1271
1272 if replica_chunk is not None:
1273 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1274 if table_name is ApdbTables.DiaSource:
1275 extra_table = ExtraTables.DiaSourceChunks
1276 else:
1277 extra_table = ExtraTables.DiaForcedSourceChunks
1278 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1279
1281 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1282 ) -> None:
1283 """Store mapping of diaSourceId to its partitioning values.
1284
1285 Parameters
1286 ----------
1287 sources : `pandas.DataFrame`
1288 Catalog containing DiaSource records
1289 visit_time : `astropy.time.Time`
1290 Time of the current visit.
1291 """
1292 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1293 extra_columns = {
1294 "apdb_time_part": self._time_partition(visit_time),
1295 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1296 }
1297
1299 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1300 )
1301
1303 self,
1304 records: pandas.DataFrame,
1305 table_name: ApdbTables | ExtraTables,
1306 extra_columns: Mapping | None = None,
1307 time_part: int | None = None,
1308 ) -> None:
1309 """Store generic objects.
1310
1311 Takes Pandas catalog and stores a bunch of records in a table.
1312
1313 Parameters
1314 ----------
1315 records : `pandas.DataFrame`
1316 Catalog containing object records
1317 table_name : `ApdbTables`
1318 Name of the table as defined in APDB schema.
1319 extra_columns : `dict`, optional
1320 Mapping (column_name, column_value) which gives fixed values for
1321 columns in each row, overrides values in ``records`` if matching
1322 columns exist there.
1323 time_part : `int`, optional
1324 If not `None` then insert into a per-partition table.
1325
1326 Notes
1327 -----
1328 If Pandas catalog contains additional columns not defined in table
1329 schema they are ignored. Catalog does not have to contain all columns
1330 defined in a table, but partition and clustering keys must be present
1331 in a catalog or ``extra_columns``.
1332 """
1333 # use extra columns if specified
1334 if extra_columns is None:
1335 extra_columns = {}
1336 extra_fields = list(extra_columns.keys())
1337
1338 # Fields that will come from dataframe.
1339 df_fields = [column for column in records.columns if column not in extra_fields]
1340
1341 column_map = self._schema.getColumnMap(table_name)
1342 # list of columns (as in felis schema)
1343 fields = [column_map[field].name for field in df_fields if field in column_map]
1344 fields += extra_fields
1345
1346 # check that all partitioning and clustering columns are defined
1347 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1348 table_name
1349 )
1350 missing_columns = [column for column in required_columns if column not in fields]
1351 if missing_columns:
1352 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1353
1354 qfields = [quote_id(field) for field in fields]
1355 qfields_str = ",".join(qfields)
1356
1357 batch_size = self._batch_size(table_name)
1358
1359 with self._timer("insert_build_time", tags={"table": table_name.name}):
1360 table = self._schema.tableName(table_name)
1361 if time_part is not None:
1362 table = f"{table}_{time_part}"
1363
1364 holders = ",".join(["?"] * len(qfields))
1365 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1366 statement = self._preparer.prepare(query)
1367 # Cassandra has 64k limit on batch size, normally that should be
1368 # enough but some tests generate too many forced sources.
1369 queries = []
1370 for rec_chunk in chunk_iterable(records.itertuples(index=False), batch_size):
1371 batch = cassandra.query.BatchStatement()
1372 for rec in rec_chunk:
1373 values = []
1374 for field in df_fields:
1375 if field not in column_map:
1376 continue
1377 value = getattr(rec, field)
1378 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1379 if isinstance(value, pandas.Timestamp):
1380 value = value.to_pydatetime()
1381 elif value is pandas.NaT:
1382 value = None
1383 else:
1384 # Assume it's seconds since epoch, Cassandra
1385 # datetime is in milliseconds
1386 value = int(value * 1000)
1387 value = literal(value)
1388 values.append(UNSET_VALUE if value is None else value)
1389 for field in extra_fields:
1390 value = literal(extra_columns[field])
1391 values.append(UNSET_VALUE if value is None else value)
1392 batch.add(statement, values)
1393 queries.append(batch)
1394
1395 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1396 with self._timer("insert_time", tags={"table": table_name.name}) as timer:
1397 for batch in queries:
1398 self._session.execute(
1399 batch, timeout=self.config.connection_config.write_timeout, execution_profile="write"
1400 )
1401 timer.add_values(row_count=len(records))
1402
1403 def _add_apdb_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1404 """Calculate spatial partition for each record and add it to a
1405 DataFrame.
1406
1407 Parameters
1408 ----------
1409 df : `pandas.DataFrame`
1410 DataFrame which has to contain ra/dec columns, names of these
1411 columns are defined by configuration ``ra_dec_columns`` field.
1412
1413 Returns
1414 -------
1415 df : `pandas.DataFrame`
1416 DataFrame with ``apdb_part`` column which contains pixel index
1417 for ra/dec coordinates.
1418
1419 Notes
1420 -----
1421 This overrides any existing column in a DataFrame with the same name
1422 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1423 is returned.
1424 """
1425 # Calculate pixelization index for every record.
1426 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1427 ra_col, dec_col = self.config.ra_dec_columns
1428 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1429 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1430 idx = self._pixelization.pixel(uv3d)
1431 apdb_part[i] = idx
1432 df = df.copy()
1433 df["apdb_part"] = apdb_part
1434 return df
1435
1436 @classmethod
1437 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1438 """Calculate time partition number for a given time.
1439
1440 Parameters
1441 ----------
1442 time : `float` or `astropy.time.Time`
1443 Time for which to calculate partition number. Can be float to mean
1444 MJD or `astropy.time.Time`
1445 epoch_mjd : `float`
1446 Epoch time for partition 0.
1447 part_days : `int`
1448 Number of days per partition.
1449
1450 Returns
1451 -------
1452 partition : `int`
1453 Partition number for a given time.
1454 """
1455 if isinstance(time, astropy.time.Time):
1456 mjd = float(time.mjd)
1457 else:
1458 mjd = time
1459 days_since_epoch = mjd - epoch_mjd
1460 partition = int(days_since_epoch) // part_days
1461 return partition
1462
1463 def _time_partition(self, time: float | astropy.time.Time) -> int:
1464 """Calculate time partition number for a given time.
1465
1466 Parameters
1467 ----------
1468 time : `float` or `astropy.time.Time`
1469 Time for which to calculate partition number. Can be float to mean
1470 MJD or `astropy.time.Time`
1471
1472 Returns
1473 -------
1474 partition : `int`
1475 Partition number for a given time.
1476 """
1477 if isinstance(time, astropy.time.Time):
1478 mjd = float(time.mjd)
1479 else:
1480 mjd = time
1481 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1482 partition = int(days_since_epoch) // self.config.partitioning.time_partition_days
1483 return partition
1484
1485 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1486 """Make an empty catalog for a table with a given name.
1487
1488 Parameters
1489 ----------
1490 table_name : `ApdbTables`
1491 Name of the table.
1492
1493 Returns
1494 -------
1495 catalog : `pandas.DataFrame`
1496 An empty catalog.
1497 """
1498 table = self._schema.tableSchemas[table_name]
1499
1500 data = {
1501 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1502 for columnDef in table.columns
1503 }
1504 return pandas.DataFrame(data)
1505
1507 self,
1508 prefix: str,
1509 where1: list[tuple[str, tuple]],
1510 where2: list[tuple[str, tuple]],
1511 suffix: str | None = None,
1512 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1513 """Make cartesian product of two parts of WHERE clause into a series
1514 of statements to execute.
1515
1516 Parameters
1517 ----------
1518 prefix : `str`
1519 Initial statement prefix that comes before WHERE clause, e.g.
1520 "SELECT * from Table"
1521 """
1522 # If lists are empty use special sentinels.
1523 if not where1:
1524 where1 = [("", ())]
1525 if not where2:
1526 where2 = [("", ())]
1527
1528 for expr1, params1 in where1:
1529 for expr2, params2 in where2:
1530 full_query = prefix
1531 wheres = []
1532 if expr1:
1533 wheres.append(expr1)
1534 if expr2:
1535 wheres.append(expr2)
1536 if wheres:
1537 full_query += " WHERE " + " AND ".join(wheres)
1538 if suffix:
1539 full_query += " " + suffix
1540 params = params1 + params2
1541 if params:
1542 statement = self._preparer.prepare(full_query)
1543 else:
1544 # If there are no params then it is likely that query
1545 # has a bunch of literals rendered already, no point
1546 # trying to prepare it.
1547 statement = cassandra.query.SimpleStatement(full_query)
1548 yield (statement, params)
1549
1551 self, region: sphgeom.Region | None, use_ranges: bool = False
1552 ) -> list[tuple[str, tuple]]:
1553 """Generate expressions for spatial part of WHERE clause.
1554
1555 Parameters
1556 ----------
1557 region : `sphgeom.Region`
1558 Spatial region for query results.
1559 use_ranges : `bool`
1560 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1561 p2") instead of exact list of pixels. Should be set to True for
1562 large regions covering very many pixels.
1563
1564 Returns
1565 -------
1566 expressions : `list` [ `tuple` ]
1567 Empty list is returned if ``region`` is `None`, otherwise a list
1568 of one or more (expression, parameters) tuples
1569 """
1570 if region is None:
1571 return []
1572 if use_ranges:
1573 pixel_ranges = self._pixelization.envelope(region)
1574 expressions: list[tuple[str, tuple]] = []
1575 for lower, upper in pixel_ranges:
1576 upper -= 1
1577 if lower == upper:
1578 expressions.append(('"apdb_part" = ?', (lower,)))
1579 else:
1580 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1581 return expressions
1582 else:
1583 pixels = self._pixelization.pixels(region)
1584 if self.config.partitioning.query_per_spatial_part:
1585 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1586 else:
1587 pixels_str = ",".join([str(pix) for pix in pixels])
1588 return [(f'"apdb_part" IN ({pixels_str})', ())]
1589
1591 self,
1592 table: ApdbTables,
1593 start_time: float | astropy.time.Time,
1594 end_time: float | astropy.time.Time,
1595 query_per_time_part: bool | None = None,
1596 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1597 """Generate table names and expressions for temporal part of WHERE
1598 clauses.
1599
1600 Parameters
1601 ----------
1602 table : `ApdbTables`
1603 Table to select from.
1604 start_time : `astropy.time.Time` or `float`
1605 Starting Datetime of MJD value of the time range.
1606 end_time : `astropy.time.Time` or `float`
1607 Starting Datetime of MJD value of the time range.
1608 query_per_time_part : `bool`, optional
1609 If None then use ``query_per_time_part`` from configuration.
1610
1611 Returns
1612 -------
1613 tables : `list` [ `str` ]
1614 List of the table names to query.
1615 expressions : `list` [ `tuple` ]
1616 A list of zero or more (expression, parameters) tuples.
1617 """
1618 tables: list[str]
1619 temporal_where: list[tuple[str, tuple]] = []
1620 table_name = self._schema.tableName(table)
1621 time_part_start = self._time_partition(start_time)
1622 time_part_end = self._time_partition(end_time)
1623 time_parts = list(range(time_part_start, time_part_end + 1))
1624 if self.config.partitioning.time_partition_tables:
1625 tables = [f"{table_name}_{part}" for part in time_parts]
1626 else:
1627 tables = [table_name]
1628 if query_per_time_part is None:
1629 query_per_time_part = self.config.partitioning.query_per_time_part
1630 if query_per_time_part:
1631 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1632 else:
1633 time_part_list = ",".join([str(part) for part in time_parts])
1634 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1635
1636 return tables, temporal_where
1637
1638 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1639 """Update timestamp columns in input DataFrame to be naive datetime
1640 type.
1641
1642 Clients may or may not generate aware timestamps, code in this class
1643 assumes that timestamps are naive, so we convert them to UTC and
1644 drop timezone.
1645 """
1646 # Find all columns with aware timestamps.
1647 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1648 for column in columns:
1649 # tz_convert(None) will convert to UTC and drop timezone.
1650 df[column] = df[column].dt.tz_convert(None)
1651 return df
1652
1653 def _batch_size(self, table: ApdbTables | ExtraTables) -> int:
1654 """Calculate batch size based on config parameters."""
1655 # Cassandra limit on number of statements in a batch is 64k.
1656 batch_size = 65_535
1657 if 0 < self.config.batch_statement_limit < batch_size:
1658 batch_size = self.config.batch_statement_limit
1659 if self.config.batch_size_limit > 0:
1660 # The purpose of this limit is to try not to exceed batch size
1661 # threshold which is set on server side. Cassandra wire protocol
1662 # for prepared queries (and batches) only sends column values with
1663 # with an additional 4 bytes per value specifying size. Value is
1664 # not included for NULL or NOT_SET values, but the size is always
1665 # there. There is additional small per-query overhead, which we
1666 # ignore.
1667 row_size = self._schema.table_row_size(table)
1668 row_size += 4 * len(self._schema.getColumnMap(table))
1669 batch_size = min(batch_size, (self.config.batch_size_limit // row_size) + 1)
1670 return batch_size
__init__(self, tuple[str,...] public_ips, tuple[str,...] private_ips)
int _batch_size(self, ApdbTables|ExtraTables table)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
bool containsVisitDetector(self, int visit, int detector)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
_DbVersions _versionCheck(self, ApdbMetadataCassandra metadata)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
None _deleteMovingObjects(self, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
Iterable[DatabaseInfo] list_databases(cls, str host)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
None delete_database(cls, str host, str keyspace, *, int timeout=3600)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.