LSST Applications 27.0.0,g0265f82a02+469cd937ee,g02d81e74bb+21ad69e7e1,g1470d8bcf6+cbe83ee85a,g2079a07aa2+e67c6346a6,g212a7c68fe+04a9158687,g2305ad1205+94392ce272,g295015adf3+81dd352a9d,g2bbee38e9b+469cd937ee,g337abbeb29+469cd937ee,g3939d97d7f+72a9f7b576,g487adcacf7+71499e7cba,g50ff169b8f+5929b3527e,g52b1c1532d+a6fc98d2e7,g591dd9f2cf+df404f777f,g5a732f18d5+be83d3ecdb,g64a986408d+21ad69e7e1,g858d7b2824+21ad69e7e1,g8a8a8dda67+a6fc98d2e7,g99cad8db69+f62e5b0af5,g9ddcbc5298+d4bad12328,ga1e77700b3+9c366c4306,ga8c6da7877+71e4819109,gb0e22166c9+25ba2f69a1,gb6a65358fc+469cd937ee,gbb8dafda3b+69d3c0e320,gc07e1c2157+a98bf949bb,gc120e1dc64+615ec43309,gc28159a63d+469cd937ee,gcf0d15dbbd+72a9f7b576,gdaeeff99f8+a38ce5ea23,ge6526c86ff+3a7c1ac5f1,ge79ae78c31+469cd937ee,gee10cc3b42+a6fc98d2e7,gf1cff7945b+21ad69e7e1,gfbcc870c63+9a11dc8c8f
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandraConfig", "ApdbCassandra"]
25
26import dataclasses
27import json
28import logging
29from collections.abc import Iterable, Iterator, Mapping, Set
30from typing import TYPE_CHECKING, Any, cast
31
32import numpy as np
33import pandas
34
35# If cassandra-driver is not there the module can still be imported
36# but ApdbCassandra cannot be instantiated.
37try:
38 import cassandra
39 import cassandra.query
40 from cassandra.auth import AuthProvider, PlainTextAuthProvider
41 from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, Session
42 from cassandra.policies import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
43
44 CASSANDRA_IMPORTED = True
45except ImportError:
46 CASSANDRA_IMPORTED = False
47
48import astropy.time
49import felis.datamodel
50from lsst import sphgeom
51from lsst.pex.config import ChoiceField, Field, ListField
52from lsst.utils.db_auth import DbAuth, DbAuthNotFoundError
53from lsst.utils.iteration import chunk_iterable
54
55from ..apdb import Apdb, ApdbConfig
56from ..apdbConfigFreezer import ApdbConfigFreezer
57from ..apdbReplica import ReplicaChunk
58from ..apdbSchema import ApdbTables
59from ..pixelization import Pixelization
60from ..schema_model import Table
61from ..timer import Timer
62from ..versionTuple import IncompatibleVersionError, VersionTuple
63from .apdbCassandraReplica import ApdbCassandraReplica
64from .apdbCassandraSchema import ApdbCassandraSchema, ExtraTables
65from .apdbMetadataCassandra import ApdbMetadataCassandra
66from .cassandra_utils import (
67 PreparedStatementCache,
68 literal,
69 pandas_dataframe_factory,
70 quote_id,
71 raw_data_factory,
72 select_concurrent,
73)
74
75if TYPE_CHECKING:
76 from ..apdbMetadata import ApdbMetadata
77
78_LOG = logging.getLogger(__name__)
79
80VERSION = VersionTuple(0, 1, 0)
81"""Version for the code controlling non-replication tables. This needs to be
82updated following compatibility rules when schema produced by this code
83changes.
84"""
85
86# Copied from daf_butler.
87DB_AUTH_ENVVAR = "LSST_DB_AUTH"
88"""Default name of the environmental variable that will be used to locate DB
89credentials configuration file. """
90
91DB_AUTH_PATH = "~/.lsst/db-auth.yaml"
92"""Default path at which it is expected that DB credentials are found."""
93
94
95class CassandraMissingError(Exception):
96 def __init__(self) -> None:
97 super().__init__("cassandra-driver module cannot be imported")
98
99
101 """Configuration class for Cassandra-based APDB implementation."""
102
103 contact_points = ListField[str](
104 doc="The list of contact points to try connecting for cluster discovery.", default=["127.0.0.1"]
105 )
106 private_ips = ListField[str](doc="List of internal IP addresses for contact_points.", default=[])
107 port = Field[int](doc="Port number to connect to.", default=9042)
108 keyspace = Field[str](doc="Default keyspace for operations.", default="apdb")
109 username = Field[str](
110 doc=f"Cassandra user name, if empty then {DB_AUTH_PATH} has to provide it with password.",
111 default="",
112 )
113 read_consistency = Field[str](
114 doc="Name for consistency level of read operations, default: QUORUM, can be ONE.", default="QUORUM"
115 )
116 write_consistency = Field[str](
117 doc="Name for consistency level of write operations, default: QUORUM, can be ONE.", default="QUORUM"
118 )
119 read_timeout = Field[float](doc="Timeout in seconds for read operations.", default=120.0)
120 write_timeout = Field[float](doc="Timeout in seconds for write operations.", default=10.0)
121 remove_timeout = Field[float](doc="Timeout in seconds for remove operations.", default=600.0)
122 read_concurrency = Field[int](doc="Concurrency level for read operations.", default=500)
123 protocol_version = Field[int](
124 doc="Cassandra protocol version to use, default is V4",
125 default=cassandra.ProtocolVersion.V4 if CASSANDRA_IMPORTED else 0,
126 )
127 dia_object_columns = ListField[str](
128 doc="List of columns to read from DiaObject[Last], by default read all columns", default=[]
129 )
130 prefix = Field[str](doc="Prefix to add to table names", default="")
131 part_pixelization = ChoiceField[str](
132 allowed=dict(htm="HTM pixelization", q3c="Q3C pixelization", mq3c="MQ3C pixelization"),
133 doc="Pixelization used for partitioning index.",
134 default="mq3c",
135 )
136 part_pix_level = Field[int](doc="Pixelization level used for partitioning index.", default=10)
137 part_pix_max_ranges = Field[int](doc="Max number of ranges in pixelization envelope", default=64)
138 ra_dec_columns = ListField[str](default=["ra", "dec"], doc="Names of ra/dec columns in DiaObject table")
139 timer = Field[bool](doc="If True then print/log timing information", default=False)
140 time_partition_tables = Field[bool](
141 doc="Use per-partition tables for sources instead of partitioning by time", default=False
142 )
143 time_partition_days = Field[int](
144 doc=(
145 "Time partitioning granularity in days, this value must not be changed after database is "
146 "initialized"
147 ),
148 default=30,
149 )
150 time_partition_start = Field[str](
151 doc=(
152 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
153 "This is used only when time_partition_tables is True."
154 ),
155 default="2018-12-01T00:00:00",
156 )
157 time_partition_end = Field[str](
158 doc=(
159 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
160 "This is used only when time_partition_tables is True."
161 ),
162 default="2030-01-01T00:00:00",
163 )
164 query_per_time_part = Field[bool](
165 default=False,
166 doc=(
167 "If True then build separate query for each time partition, otherwise build one single query. "
168 "This is only used when time_partition_tables is False in schema config."
169 ),
170 )
171 query_per_spatial_part = Field[bool](
172 default=False,
173 doc="If True then build one query per spatial partition, otherwise build single query.",
174 )
175 use_insert_id_skips_diaobjects = Field[bool](
176 default=False,
177 doc=(
178 "If True then do not store DiaObjects when use_insert_id is True "
179 "(DiaObjectsChunks has the same data)."
180 ),
181 )
182
183
184@dataclasses.dataclass
186 """Part of the configuration that is saved in metadata table and read back.
187
188 The attributes are a subset of attributes in `ApdbCassandraConfig` class.
189
190 Parameters
191 ----------
192 config : `ApdbSqlConfig`
193 Configuration used to copy initial values of attributes.
194 """
195
196 use_insert_id: bool
197 part_pixelization: str
198 part_pix_level: int
199 ra_dec_columns: list[str]
200 time_partition_tables: bool
201 time_partition_days: int
202 use_insert_id_skips_diaobjects: bool
203
204 def __init__(self, config: ApdbCassandraConfig):
205 self.use_insert_iduse_insert_id = config.use_insert_id
206 self.part_pixelizationpart_pixelization = config.part_pixelization
207 self.part_pix_levelpart_pix_level = config.part_pix_level
208 self.ra_dec_columnsra_dec_columns = list(config.ra_dec_columns)
209 self.time_partition_tablestime_partition_tables = config.time_partition_tables
210 self.time_partition_daystime_partition_days = config.time_partition_days
211 self.use_insert_id_skips_diaobjectsuse_insert_id_skips_diaobjects = config.use_insert_id_skips_diaobjects
212
213 def to_json(self) -> str:
214 """Convert this instance to JSON representation."""
215 return json.dumps(dataclasses.asdict(self))
216
217 def update(self, json_str: str) -> None:
218 """Update attribute values from a JSON string.
219
220 Parameters
221 ----------
222 json_str : str
223 String containing JSON representation of configuration.
224 """
225 data = json.loads(json_str)
226 if not isinstance(data, dict):
227 raise TypeError(f"JSON string must be convertible to object: {json_str!r}")
228 allowed_names = {field.name for field in dataclasses.fields(self)}
229 for key, value in data.items():
230 if key not in allowed_names:
231 raise ValueError(f"JSON object contains unknown key: {key}")
232 setattr(self, key, value)
233
234
235if CASSANDRA_IMPORTED:
236
237 class _AddressTranslator(AddressTranslator):
238 """Translate internal IP address to external.
239
240 Only used for docker-based setup, not viable long-term solution.
241 """
242
243 def __init__(self, public_ips: list[str], private_ips: list[str]):
244 self._map = dict((k, v) for k, v in zip(private_ips, public_ips))
245
246 def translate(self, private_ip: str) -> str:
247 return self._map.get(private_ip, private_ip)
248
249
251 """Implementation of APDB database on to of Apache Cassandra.
252
253 The implementation is configured via standard ``pex_config`` mechanism
254 using `ApdbCassandraConfig` configuration class. For an example of
255 different configurations check config/ folder.
256
257 Parameters
258 ----------
259 config : `ApdbCassandraConfig`
260 Configuration object.
261 """
262
263 metadataSchemaVersionKey = "version:schema"
264 """Name of the metadata key to store schema version number."""
265
266 metadataCodeVersionKey = "version:ApdbCassandra"
267 """Name of the metadata key to store code version number."""
268
269 metadataReplicaVersionKey = "version:ApdbCassandraReplica"
270 """Name of the metadata key to store replica code version number."""
271
272 metadataConfigKey = "config:apdb-cassandra.json"
273 """Name of the metadata key to store code version number."""
274
275 _frozen_parameters = (
276 "use_insert_id",
277 "part_pixelization",
278 "part_pix_level",
279 "ra_dec_columns",
280 "time_partition_tables",
281 "time_partition_days",
282 "use_insert_id_skips_diaobjects",
283 )
284 """Names of the config parameters to be frozen in metadata table."""
285
286 partition_zero_epoch = astropy.time.Time(0, format="unix_tai")
287 """Start time for partition 0, this should never be changed."""
288
289 def __init__(self, config: ApdbCassandraConfig):
290 if not CASSANDRA_IMPORTED:
292
293 self._keyspace = config.keyspace
294
295 self._cluster, self._session = self._make_session(config)
296
297 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
299 self._session, meta_table_name, config.keyspace, "read_tuples", "write"
300 )
301
302 # Read frozen config from metadata.
303 config_json = self._metadata.get(self.metadataConfigKeymetadataConfigKey)
304 if config_json is not None:
305 # Update config from metadata.
306 freezer = ApdbConfigFreezer[ApdbCassandraConfig](self._frozen_parameters)
307 self.config = freezer.update(config, config_json)
308 else:
309 self.config = config
310 self.config.validate()
311
313 self.config.part_pixelization,
314 self.config.part_pix_level,
315 config.part_pix_max_ranges,
316 )
317
319 session=self._session,
320 keyspace=self._keyspace,
321 schema_file=self.config.schema_file,
322 schema_name=self.config.schema_name,
323 prefix=self.config.prefix,
324 time_partition_tables=self.config.time_partition_tables,
325 enable_replica=self.config.use_insert_id,
326 )
328
329 if self._metadata.table_exists():
330 self._versionCheck(self._metadata)
331
332 # Cache for prepared statements
334
335 _LOG.debug("ApdbCassandra Configuration:")
336 for key, value in self.config.items():
337 _LOG.debug(" %s: %s", key, value)
338
339 def __del__(self) -> None:
340 if hasattr(self, "_cluster"):
341 self._cluster.shutdown()
342
343 @classmethod
344 def _make_session(cls, config: ApdbCassandraConfig) -> tuple[Cluster, Session]:
345 """Make Cassandra session."""
346 addressTranslator: AddressTranslator | None = None
347 if config.private_ips:
348 addressTranslator = _AddressTranslator(list(config.contact_points), list(config.private_ips))
349
350 cluster = Cluster(
351 execution_profiles=cls._makeProfiles(config),
352 contact_points=config.contact_points,
353 port=config.port,
354 address_translator=addressTranslator,
355 protocol_version=config.protocol_version,
356 auth_provider=cls._make_auth_provider(config),
357 )
358 session = cluster.connect()
359 # Disable result paging
360 session.default_fetch_size = None
361
362 return cluster, session
363
364 @classmethod
365 def _make_auth_provider(cls, config: ApdbCassandraConfig) -> AuthProvider | None:
366 """Make Cassandra authentication provider instance."""
367 try:
368 dbauth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
369 except DbAuthNotFoundError:
370 # Credentials file doesn't exist, use anonymous login.
371 return None
372
373 empty_username = True
374 # Try every contact point in turn.
375 for hostname in config.contact_points:
376 try:
377 username, password = dbauth.getAuth(
378 "cassandra", config.username, hostname, config.port, config.keyspace
379 )
380 if not username:
381 # Password without user name, try next hostname, but give
382 # warning later if no better match is found.
383 empty_username = True
384 else:
385 return PlainTextAuthProvider(username=username, password=password)
386 except DbAuthNotFoundError:
387 pass
388
389 if empty_username:
390 _LOG.warning(
391 f"Credentials file ({DB_AUTH_PATH} or ${DB_AUTH_ENVVAR}) provided password but not "
392 f"user name, anonymous Cassandra logon will be attempted."
393 )
394
395 return None
396
397 def _versionCheck(self, metadata: ApdbMetadataCassandra) -> None:
398 """Check schema version compatibility."""
399
400 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
401 """Retrieve version number from given metadata key."""
402 if metadata.table_exists():
403 version_str = metadata.get(key)
404 if version_str is None:
405 # Should not happen with existing metadata table.
406 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
407 return VersionTuple.fromString(version_str)
408 return default
409
410 # For old databases where metadata table does not exist we assume that
411 # version of both code and schema is 0.1.0.
412 initial_version = VersionTuple(0, 1, 0)
413 db_schema_version = _get_version(self.metadataSchemaVersionKeymetadataSchemaVersionKey, initial_version)
414 db_code_version = _get_version(self.metadataCodeVersionKeymetadataCodeVersionKey, initial_version)
415
416 # For now there is no way to make read-only APDB instances, assume that
417 # any access can do updates.
418 if not self._schema.schemaVersion().checkCompatibility(db_schema_version, True):
420 f"Configured schema version {self._schema.schemaVersion()} "
421 f"is not compatible with database version {db_schema_version}"
422 )
423 if not self.apdbImplementationVersionapdbImplementationVersion().checkCompatibility(db_code_version, True):
425 f"Current code version {self.apdbImplementationVersion()} "
426 f"is not compatible with database version {db_code_version}"
427 )
428
429 # Check replica code version only if replica is enabled.
430 if self._schema.has_replica_chunks:
431 db_replica_version = _get_version(self.metadataReplicaVersionKeymetadataReplicaVersionKey, initial_version)
432 code_replica_version = ApdbCassandraReplica.apdbReplicaImplementationVersion()
433 if not code_replica_version.checkCompatibility(db_replica_version, True):
435 f"Current replication code version {code_replica_version} "
436 f"is not compatible with database version {db_replica_version}"
437 )
438
439 @classmethod
440 def apdbImplementationVersion(cls) -> VersionTuple:
441 # Docstring inherited from base class.
442 return VERSION
443
444 def apdbSchemaVersion(self) -> VersionTuple:
445 # Docstring inherited from base class.
446 return self._schema.schemaVersion()
447
448 def tableDef(self, table: ApdbTables) -> Table | None:
449 # docstring is inherited from a base class
450 return self._schema.tableSchemas.get(table)
451
452 @classmethod
454 cls,
455 hosts: list[str],
456 keyspace: str,
457 *,
458 schema_file: str | None = None,
459 schema_name: str | None = None,
460 read_sources_months: int | None = None,
461 read_forced_sources_months: int | None = None,
462 use_insert_id: bool = False,
463 replica_skips_diaobjects: bool = False,
464 port: int | None = None,
465 username: str | None = None,
466 prefix: str | None = None,
467 part_pixelization: str | None = None,
468 part_pix_level: int | None = None,
469 time_partition_tables: bool = True,
470 time_partition_start: str | None = None,
471 time_partition_end: str | None = None,
472 read_consistency: str | None = None,
473 write_consistency: str | None = None,
474 read_timeout: int | None = None,
475 write_timeout: int | None = None,
476 ra_dec_columns: list[str] | None = None,
477 replication_factor: int | None = None,
478 drop: bool = False,
479 ) -> ApdbCassandraConfig:
480 """Initialize new APDB instance and make configuration object for it.
481
482 Parameters
483 ----------
484 hosts : `list` [`str`]
485 List of host names or IP addresses for Cassandra cluster.
486 keyspace : `str`
487 Name of the keyspace for APDB tables.
488 schema_file : `str`, optional
489 Location of (YAML) configuration file with APDB schema. If not
490 specified then default location will be used.
491 schema_name : `str`, optional
492 Name of the schema in YAML configuration file. If not specified
493 then default name will be used.
494 read_sources_months : `int`, optional
495 Number of months of history to read from DiaSource.
496 read_forced_sources_months : `int`, optional
497 Number of months of history to read from DiaForcedSource.
498 use_insert_id : `bool`, optional
499 If True, make additional tables used for replication to PPDB.
500 replica_skips_diaobjects : `bool`, optional
501 If `True` then do not fill regular ``DiaObject`` table when
502 ``use_insert_id`` is `True`.
503 port : `int`, optional
504 Port number to use for Cassandra connections.
505 username : `str`, optional
506 User name for Cassandra connections.
507 prefix : `str`, optional
508 Optional prefix for all table names.
509 part_pixelization : `str`, optional
510 Name of the MOC pixelization used for partitioning.
511 part_pix_level : `int`, optional
512 Pixelization level.
513 time_partition_tables : `bool`, optional
514 Create per-partition tables.
515 time_partition_start : `str`, optional
516 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
517 format, in TAI.
518 time_partition_end : `str`, optional
519 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
520 format, in TAI.
521 read_consistency : `str`, optional
522 Name of the consistency level for read operations.
523 write_consistency : `str`, optional
524 Name of the consistency level for write operations.
525 read_timeout : `int`, optional
526 Read timeout in seconds.
527 write_timeout : `int`, optional
528 Write timeout in seconds.
529 ra_dec_columns : `list` [`str`], optional
530 Names of ra/dec columns in DiaObject table.
531 replication_factor : `int`, optional
532 Replication factor used when creating new keyspace, if keyspace
533 already exists its replication factor is not changed.
534 drop : `bool`, optional
535 If `True` then drop existing tables before re-creating the schema.
536
537 Returns
538 -------
539 config : `ApdbCassandraConfig`
540 Resulting configuration object for a created APDB instance.
541 """
542 config = ApdbCassandraConfig(
543 contact_points=hosts,
544 keyspace=keyspace,
545 use_insert_id=use_insert_id,
546 use_insert_id_skips_diaobjects=replica_skips_diaobjects,
547 time_partition_tables=time_partition_tables,
548 )
549 if schema_file is not None:
550 config.schema_file = schema_file
551 if schema_name is not None:
552 config.schema_name = schema_name
553 if read_sources_months is not None:
554 config.read_sources_months = read_sources_months
555 if read_forced_sources_months is not None:
556 config.read_forced_sources_months = read_forced_sources_months
557 if port is not None:
558 config.port = port
559 if username is not None:
560 config.username = username
561 if prefix is not None:
562 config.prefix = prefix
563 if part_pixelization is not None:
564 config.part_pixelization = part_pixelization
565 if part_pix_level is not None:
566 config.part_pix_level = part_pix_level
567 if time_partition_start is not None:
568 config.time_partition_start = time_partition_start
569 if time_partition_end is not None:
570 config.time_partition_end = time_partition_end
571 if read_consistency is not None:
572 config.read_consistency = read_consistency
573 if write_consistency is not None:
574 config.write_consistency = write_consistency
575 if read_timeout is not None:
576 config.read_timeout = read_timeout
577 if write_timeout is not None:
578 config.write_timeout = write_timeout
579 if ra_dec_columns is not None:
580 config.ra_dec_columns = ra_dec_columns
581
582 cls._makeSchema(config, drop=drop, replication_factor=replication_factor)
583
584 return config
585
586 def get_replica(self) -> ApdbCassandraReplica:
587 """Return `ApdbReplica` instance for this database."""
588 # Note that this instance has to stay alive while replica exists, so
589 # we pass reference to self.
590 return ApdbCassandraReplica(self, self._schema, self._session)
591
592 @classmethod
594 cls, config: ApdbConfig, *, drop: bool = False, replication_factor: int | None = None
595 ) -> None:
596 # docstring is inherited from a base class
597
598 if not isinstance(config, ApdbCassandraConfig):
599 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
600
601 cluster, session = cls._make_session(config)
602
603 schema = ApdbCassandraSchema(
604 session=session,
605 keyspace=config.keyspace,
606 schema_file=config.schema_file,
607 schema_name=config.schema_name,
608 prefix=config.prefix,
609 time_partition_tables=config.time_partition_tables,
610 enable_replica=config.use_insert_id,
611 )
612
613 # Ask schema to create all tables.
614 if config.time_partition_tables:
615 time_partition_start = astropy.time.Time(config.time_partition_start, format="isot", scale="tai")
616 time_partition_end = astropy.time.Time(config.time_partition_end, format="isot", scale="tai")
617 part_epoch = float(cls.partition_zero_epoch.mjd)
618 part_days = config.time_partition_days
619 part_range = (
620 cls._time_partition_cls(time_partition_start, part_epoch, part_days),
621 cls._time_partition_cls(time_partition_end, part_epoch, part_days) + 1,
622 )
623 schema.makeSchema(drop=drop, part_range=part_range, replication_factor=replication_factor)
624 else:
625 schema.makeSchema(drop=drop, replication_factor=replication_factor)
626
627 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
628 metadata = ApdbMetadataCassandra(session, meta_table_name, config.keyspace, "read_tuples", "write")
629
630 # Fill version numbers, overrides if they existed before.
631 if metadata.table_exists():
632 metadata.set(cls.metadataSchemaVersionKeymetadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
634
635 if config.use_insert_id:
636 # Only store replica code version if replica is enabled.
637 metadata.set(
639 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
640 force=True,
641 )
642
643 # Store frozen part of a configuration in metadata.
644 freezer = ApdbConfigFreezer[ApdbCassandraConfig](cls._frozen_parameters)
645 metadata.set(cls.metadataConfigKeymetadataConfigKey, freezer.to_json(config), force=True)
646
647 cluster.shutdown()
648
649 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
650 # docstring is inherited from a base class
651
652 sp_where = self._spatial_where(region)
653 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
654
655 # We need to exclude extra partitioning columns from result.
656 column_names = self._schema.apdbColumnNames(ApdbTables.DiaObjectLast)
657 what = ",".join(quote_id(column) for column in column_names)
658
659 table_name = self._schema.tableName(ApdbTables.DiaObjectLast)
660 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
661 statements: list[tuple] = []
662 for where, params in sp_where:
663 full_query = f"{query} WHERE {where}"
664 if params:
665 statement = self._preparer.prepare(full_query)
666 else:
667 # If there are no params then it is likely that query has a
668 # bunch of literals rendered already, no point trying to
669 # prepare it because it's not reusable.
670 statement = cassandra.query.SimpleStatement(full_query)
671 statements.append((statement, params))
672 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
673
674 with Timer("DiaObject select", self.config.timer):
675 objects = cast(
676 pandas.DataFrame,
677 select_concurrent(
678 self._session, statements, "read_pandas_multi", self.config.read_concurrency
679 ),
680 )
681
682 _LOG.debug("found %s DiaObjects", objects.shape[0])
683 return objects
684
686 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
687 ) -> pandas.DataFrame | None:
688 # docstring is inherited from a base class
689 months = self.config.read_sources_months
690 if months == 0:
691 return None
692 mjd_end = visit_time.mjd
693 mjd_start = mjd_end - months * 30
694
695 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
696
698 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
699 ) -> pandas.DataFrame | None:
700 # docstring is inherited from a base class
701 months = self.config.read_forced_sources_months
702 if months == 0:
703 return None
704 mjd_end = visit_time.mjd
705 mjd_start = mjd_end - months * 30
706
707 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
708
709 def containsVisitDetector(self, visit: int, detector: int) -> bool:
710 # docstring is inherited from a base class
711 raise NotImplementedError()
712
713 def getSSObjects(self) -> pandas.DataFrame:
714 # docstring is inherited from a base class
715 tableName = self._schema.tableName(ApdbTables.SSObject)
716 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
717
718 objects = None
719 with Timer("SSObject select", self.config.timer):
720 result = self._session.execute(query, execution_profile="read_pandas")
721 objects = result._current_rows
722
723 _LOG.debug("found %s DiaObjects", objects.shape[0])
724 return objects
725
726 def store(
727 self,
728 visit_time: astropy.time.Time,
729 objects: pandas.DataFrame,
730 sources: pandas.DataFrame | None = None,
731 forced_sources: pandas.DataFrame | None = None,
732 ) -> None:
733 # docstring is inherited from a base class
734
735 replica_chunk: ReplicaChunk | None = None
736 if self._schema.has_replica_chunks:
737 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
738 self._storeReplicaChunk(replica_chunk, visit_time)
739
740 # fill region partition column for DiaObjects
741 objects = self._add_obj_part(objects)
742 self._storeDiaObjects(objects, visit_time, replica_chunk)
743
744 if sources is not None:
745 # copy apdb_part column from DiaObjects to DiaSources
746 sources = self._add_src_part(sources, objects)
747 self._storeDiaSources(ApdbTables.DiaSource, sources, visit_time, replica_chunk)
748 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
749
750 if forced_sources is not None:
751 forced_sources = self._add_fsrc_part(forced_sources, objects)
752 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time, replica_chunk)
753
754 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
755 # docstring is inherited from a base class
756 self._storeObjectsPandas(objects, ApdbTables.SSObject)
757
758 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
759 # docstring is inherited from a base class
760
761 # To update a record we need to know its exact primary key (including
762 # partition key) so we start by querying for diaSourceId to find the
763 # primary keys.
764
765 table_name = self._schema.tableName(ExtraTables.DiaSourceToPartition)
766 # split it into 1k IDs per query
767 selects: list[tuple] = []
768 for ids in chunk_iterable(idMap.keys(), 1_000):
769 ids_str = ",".join(str(item) for item in ids)
770 selects.append(
771 (
772 (
773 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
774 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
775 ),
776 {},
777 )
778 )
779
780 # No need for DataFrame here, read data as tuples.
781 result = cast(
782 list[tuple[int, int, int, int | None]],
783 select_concurrent(self._session, selects, "read_tuples", self.config.read_concurrency),
784 )
785
786 # Make mapping from source ID to its partition.
787 id2partitions: dict[int, tuple[int, int]] = {}
788 id2chunk_id: dict[int, int] = {}
789 for row in result:
790 id2partitions[row[0]] = row[1:3]
791 if row[3] is not None:
792 id2chunk_id[row[0]] = row[3]
793
794 # make sure we know partitions for each ID
795 if set(id2partitions) != set(idMap):
796 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
797 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
798
799 # Reassign in standard tables
800 queries = cassandra.query.BatchStatement()
801 table_name = self._schema.tableName(ApdbTables.DiaSource)
802 for diaSourceId, ssObjectId in idMap.items():
803 apdb_part, apdb_time_part = id2partitions[diaSourceId]
804 values: tuple
805 if self.config.time_partition_tables:
806 query = (
807 f'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
808 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
809 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
810 )
811 values = (ssObjectId, apdb_part, diaSourceId)
812 else:
813 query = (
814 f'UPDATE "{self._keyspace}"."{table_name}"'
815 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
816 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
817 )
818 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
819 queries.add(self._preparer.prepare(query), values)
820
821 # Reassign in replica tables, only if replication is enabled
822 if id2chunk_id:
823 # Filter out chunks that have been deleted already. There is a
824 # potential race with concurrent removal of chunks, but it
825 # should be handled by WHERE in UPDATE.
826 known_ids = set()
827 if replica_chunks := self.get_replica().getReplicaChunks():
828 known_ids = set(replica_chunk.id for replica_chunk in replica_chunks)
829 id2chunk_id = {key: value for key, value in id2chunk_id.items() if value in known_ids}
830 if id2chunk_id:
831 table_name = self._schema.tableName(ExtraTables.DiaSourceChunks)
832 for diaSourceId, ssObjectId in idMap.items():
833 if replica_chunk := id2chunk_id.get(diaSourceId):
834 query = (
835 f'UPDATE "{self._keyspace}"."{table_name}" '
836 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
837 'WHERE "apdb_replica_chunk" = ? AND "diaSourceId" = ?'
838 )
839 values = (ssObjectId, replica_chunk, diaSourceId)
840 queries.add(self._preparer.prepare(query), values)
841
842 _LOG.debug("%s: will update %d records", table_name, len(idMap))
843 with Timer(table_name + " update", self.config.timer):
844 self._session.execute(queries, execution_profile="write")
845
846 def dailyJob(self) -> None:
847 # docstring is inherited from a base class
848 pass
849
850 def countUnassociatedObjects(self) -> int:
851 # docstring is inherited from a base class
852
853 # It's too inefficient to implement it for Cassandra in current schema.
854 raise NotImplementedError()
855
856 @property
857 def metadata(self) -> ApdbMetadata:
858 # docstring is inherited from a base class
859 if self._metadata is None:
860 raise RuntimeError("Database schema was not initialized.")
861 return self._metadata
862
863 @classmethod
864 def _makeProfiles(cls, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
865 """Make all execution profiles used in the code."""
866 if config.private_ips:
867 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
868 else:
869 loadBalancePolicy = RoundRobinPolicy()
870
871 read_tuples_profile = ExecutionProfile(
872 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
873 request_timeout=config.read_timeout,
874 row_factory=cassandra.query.tuple_factory,
875 load_balancing_policy=loadBalancePolicy,
876 )
877 read_pandas_profile = ExecutionProfile(
878 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
879 request_timeout=config.read_timeout,
880 row_factory=pandas_dataframe_factory,
881 load_balancing_policy=loadBalancePolicy,
882 )
883 read_raw_profile = ExecutionProfile(
884 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
885 request_timeout=config.read_timeout,
886 row_factory=raw_data_factory,
887 load_balancing_policy=loadBalancePolicy,
888 )
889 # Profile to use with select_concurrent to return pandas data frame
890 read_pandas_multi_profile = ExecutionProfile(
891 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
892 request_timeout=config.read_timeout,
893 row_factory=pandas_dataframe_factory,
894 load_balancing_policy=loadBalancePolicy,
895 )
896 # Profile to use with select_concurrent to return raw data (columns and
897 # rows)
898 read_raw_multi_profile = ExecutionProfile(
899 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
900 request_timeout=config.read_timeout,
901 row_factory=raw_data_factory,
902 load_balancing_policy=loadBalancePolicy,
903 )
904 write_profile = ExecutionProfile(
905 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
906 request_timeout=config.write_timeout,
907 load_balancing_policy=loadBalancePolicy,
908 )
909 # To replace default DCAwareRoundRobinPolicy
910 default_profile = ExecutionProfile(
911 load_balancing_policy=loadBalancePolicy,
912 )
913 return {
914 "read_tuples": read_tuples_profile,
915 "read_pandas": read_pandas_profile,
916 "read_raw": read_raw_profile,
917 "read_pandas_multi": read_pandas_multi_profile,
918 "read_raw_multi": read_raw_multi_profile,
919 "write": write_profile,
920 EXEC_PROFILE_DEFAULT: default_profile,
921 }
922
924 self,
925 region: sphgeom.Region,
926 object_ids: Iterable[int] | None,
927 mjd_start: float,
928 mjd_end: float,
929 table_name: ApdbTables,
930 ) -> pandas.DataFrame:
931 """Return catalog of DiaSource instances given set of DiaObject IDs.
932
933 Parameters
934 ----------
935 region : `lsst.sphgeom.Region`
936 Spherical region.
937 object_ids :
938 Collection of DiaObject IDs
939 mjd_start : `float`
940 Lower bound of time interval.
941 mjd_end : `float`
942 Upper bound of time interval.
943 table_name : `ApdbTables`
944 Name of the table.
945
946 Returns
947 -------
948 catalog : `pandas.DataFrame`, or `None`
949 Catalog containing DiaSource records. Empty catalog is returned if
950 ``object_ids`` is empty.
951 """
952 object_id_set: Set[int] = set()
953 if object_ids is not None:
954 object_id_set = set(object_ids)
955 if len(object_id_set) == 0:
956 return self._make_empty_catalog(table_name)
957
958 sp_where = self._spatial_where(region)
959 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
960
961 # We need to exclude extra partitioning columns from result.
962 column_names = self._schema.apdbColumnNames(table_name)
963 what = ",".join(quote_id(column) for column in column_names)
964
965 # Build all queries
966 statements: list[tuple] = []
967 for table in tables:
968 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
969 statements += list(self._combine_where(prefix, sp_where, temporal_where))
970 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
971
972 with Timer(table_name.name + " select", self.config.timer):
973 catalog = cast(
974 pandas.DataFrame,
975 select_concurrent(
976 self._session, statements, "read_pandas_multi", self.config.read_concurrency
977 ),
978 )
979
980 # filter by given object IDs
981 if len(object_id_set) > 0:
982 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
983
984 # precise filtering on midpointMjdTai
985 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
986
987 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
988 return catalog
989
990 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
991 # Cassandra timestamp uses milliseconds since epoch
992 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
993
994 # everything goes into a single partition
995 partition = 0
996
997 table_name = self._schema.tableName(ExtraTables.ApdbReplicaChunks)
998 query = (
999 f'INSERT INTO "{self._keyspace}"."{table_name}" '
1000 "(partition, apdb_replica_chunk, last_update_time, unique_id) "
1001 "VALUES (?, ?, ?, ?)"
1002 )
1003
1004 self._session.execute(
1005 self._preparer.prepare(query),
1006 (partition, replica_chunk.id, timestamp, replica_chunk.unique_id),
1007 timeout=self.config.write_timeout,
1008 execution_profile="write",
1009 )
1010
1012 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1013 ) -> None:
1014 """Store catalog of DiaObjects from current visit.
1015
1016 Parameters
1017 ----------
1018 objs : `pandas.DataFrame`
1019 Catalog with DiaObject records
1020 visit_time : `astropy.time.Time`
1021 Time of the current visit.
1022 replica_chunk : `ReplicaChunk` or `None`
1023 Replica chunk identifier if replication is configured.
1024 """
1025 if len(objs) == 0:
1026 _LOG.debug("No objects to write to database.")
1027 return
1028
1029 visit_time_dt = visit_time.datetime
1030 extra_columns = dict(lastNonForcedSource=visit_time_dt)
1031 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
1032
1033 extra_columns["validityStart"] = visit_time_dt
1034 time_part: int | None = self._time_partition(visit_time)
1035 if not self.config.time_partition_tables:
1036 extra_columns["apdb_time_part"] = time_part
1037 time_part = None
1038
1039 # Only store DiaObects if not doing replication or explicitly
1040 # configured to always store them.
1041 if replica_chunk is None or not self.config.use_insert_id_skips_diaobjects:
1043 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1044 )
1045
1046 if replica_chunk is not None:
1047 extra_columns = dict(apdb_replica_chunk=replica_chunk.id, validityStart=visit_time_dt)
1048 self._storeObjectsPandas(objs, ExtraTables.DiaObjectChunks, extra_columns=extra_columns)
1049
1051 self,
1052 table_name: ApdbTables,
1053 sources: pandas.DataFrame,
1054 visit_time: astropy.time.Time,
1055 replica_chunk: ReplicaChunk | None,
1056 ) -> None:
1057 """Store catalog of DIASources or DIAForcedSources from current visit.
1058
1059 Parameters
1060 ----------
1061 table_name : `ApdbTables`
1062 Table where to store the data.
1063 sources : `pandas.DataFrame`
1064 Catalog containing DiaSource records
1065 visit_time : `astropy.time.Time`
1066 Time of the current visit.
1067 replica_chunk : `ReplicaChunk` or `None`
1068 Replica chunk identifier if replication is configured.
1069 """
1070 time_part: int | None = self._time_partition(visit_time)
1071 extra_columns: dict[str, Any] = {}
1072 if not self.config.time_partition_tables:
1073 extra_columns["apdb_time_part"] = time_part
1074 time_part = None
1075
1076 self._storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
1077
1078 if replica_chunk is not None:
1079 extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
1080 if table_name is ApdbTables.DiaSource:
1081 extra_table = ExtraTables.DiaSourceChunks
1082 else:
1083 extra_table = ExtraTables.DiaForcedSourceChunks
1084 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1085
1087 self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1088 ) -> None:
1089 """Store mapping of diaSourceId to its partitioning values.
1090
1091 Parameters
1092 ----------
1093 sources : `pandas.DataFrame`
1094 Catalog containing DiaSource records
1095 visit_time : `astropy.time.Time`
1096 Time of the current visit.
1097 """
1098 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1099 extra_columns = {
1100 "apdb_time_part": self._time_partition(visit_time),
1101 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1102 }
1103
1105 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1106 )
1107
1109 self,
1110 records: pandas.DataFrame,
1111 table_name: ApdbTables | ExtraTables,
1112 extra_columns: Mapping | None = None,
1113 time_part: int | None = None,
1114 ) -> None:
1115 """Store generic objects.
1116
1117 Takes Pandas catalog and stores a bunch of records in a table.
1118
1119 Parameters
1120 ----------
1121 records : `pandas.DataFrame`
1122 Catalog containing object records
1123 table_name : `ApdbTables`
1124 Name of the table as defined in APDB schema.
1125 extra_columns : `dict`, optional
1126 Mapping (column_name, column_value) which gives fixed values for
1127 columns in each row, overrides values in ``records`` if matching
1128 columns exist there.
1129 time_part : `int`, optional
1130 If not `None` then insert into a per-partition table.
1131
1132 Notes
1133 -----
1134 If Pandas catalog contains additional columns not defined in table
1135 schema they are ignored. Catalog does not have to contain all columns
1136 defined in a table, but partition and clustering keys must be present
1137 in a catalog or ``extra_columns``.
1138 """
1139 # use extra columns if specified
1140 if extra_columns is None:
1141 extra_columns = {}
1142 extra_fields = list(extra_columns.keys())
1143
1144 # Fields that will come from dataframe.
1145 df_fields = [column for column in records.columns if column not in extra_fields]
1146
1147 column_map = self._schema.getColumnMap(table_name)
1148 # list of columns (as in felis schema)
1149 fields = [column_map[field].name for field in df_fields if field in column_map]
1150 fields += extra_fields
1151
1152 # check that all partitioning and clustering columns are defined
1153 required_columns = self._schema.partitionColumns(table_name) + self._schema.clusteringColumns(
1154 table_name
1155 )
1156 missing_columns = [column for column in required_columns if column not in fields]
1157 if missing_columns:
1158 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1159
1160 qfields = [quote_id(field) for field in fields]
1161 qfields_str = ",".join(qfields)
1162
1163 with Timer(table_name.name + " query build", self.config.timer):
1164 table = self._schema.tableName(table_name)
1165 if time_part is not None:
1166 table = f"{table}_{time_part}"
1167
1168 holders = ",".join(["?"] * len(qfields))
1169 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1170 statement = self._preparer.prepare(query)
1171 queries = cassandra.query.BatchStatement()
1172 for rec in records.itertuples(index=False):
1173 values = []
1174 for field in df_fields:
1175 if field not in column_map:
1176 continue
1177 value = getattr(rec, field)
1178 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1179 if isinstance(value, pandas.Timestamp):
1180 value = literal(value.to_pydatetime())
1181 else:
1182 # Assume it's seconds since epoch, Cassandra
1183 # datetime is in milliseconds
1184 value = int(value * 1000)
1185 values.append(literal(value))
1186 for field in extra_fields:
1187 value = extra_columns[field]
1188 values.append(literal(value))
1189 queries.add(statement, values)
1190
1191 _LOG.debug("%s: will store %d records", self._schema.tableName(table_name), records.shape[0])
1192 with Timer(table_name.name + " insert", self.config.timer):
1193 self._session.execute(queries, timeout=self.config.write_timeout, execution_profile="write")
1194
1195 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1196 """Calculate spatial partition for each record and add it to a
1197 DataFrame.
1198
1199 Notes
1200 -----
1201 This overrides any existing column in a DataFrame with the same name
1202 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1203 returned.
1204 """
1205 # calculate HTM index for every DiaObject
1206 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1207 ra_col, dec_col = self.config.ra_dec_columns
1208 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1209 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1210 idx = self._pixelization.pixel(uv3d)
1211 apdb_part[i] = idx
1212 df = df.copy()
1213 df["apdb_part"] = apdb_part
1214 return df
1215
1216 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1217 """Add apdb_part column to DiaSource catalog.
1218
1219 Notes
1220 -----
1221 This method copies apdb_part value from a matching DiaObject record.
1222 DiaObject catalog needs to have a apdb_part column filled by
1223 ``_add_obj_part`` method and DiaSource records need to be
1224 associated to DiaObjects via ``diaObjectId`` column.
1225
1226 This overrides any existing column in a DataFrame with the same name
1227 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1228 returned.
1229 """
1230 pixel_id_map: dict[int, int] = {
1231 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1232 }
1233 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1234 ra_col, dec_col = self.config.ra_dec_columns
1235 for i, (diaObjId, ra, dec) in enumerate(
1236 zip(sources["diaObjectId"], sources[ra_col], sources[dec_col])
1237 ):
1238 if diaObjId == 0:
1239 # DiaSources associated with SolarSystemObjects do not have an
1240 # associated DiaObject hence we skip them and set partition
1241 # based on its own ra/dec
1242 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1243 idx = self._pixelization.pixel(uv3d)
1244 apdb_part[i] = idx
1245 else:
1246 apdb_part[i] = pixel_id_map[diaObjId]
1247 sources = sources.copy()
1248 sources["apdb_part"] = apdb_part
1249 return sources
1250
1251 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1252 """Add apdb_part column to DiaForcedSource catalog.
1253
1254 Notes
1255 -----
1256 This method copies apdb_part value from a matching DiaObject record.
1257 DiaObject catalog needs to have a apdb_part column filled by
1258 ``_add_obj_part`` method and DiaSource records need to be
1259 associated to DiaObjects via ``diaObjectId`` column.
1260
1261 This overrides any existing column in a DataFrame with the same name
1262 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
1263 returned.
1264 """
1265 pixel_id_map: dict[int, int] = {
1266 diaObjectId: apdb_part for diaObjectId, apdb_part in zip(objs["diaObjectId"], objs["apdb_part"])
1267 }
1268 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
1269 for i, diaObjId in enumerate(sources["diaObjectId"]):
1270 apdb_part[i] = pixel_id_map[diaObjId]
1271 sources = sources.copy()
1272 sources["apdb_part"] = apdb_part
1273 return sources
1274
1275 @classmethod
1276 def _time_partition_cls(cls, time: float | astropy.time.Time, epoch_mjd: float, part_days: int) -> int:
1277 """Calculate time partition number for a given time.
1278
1279 Parameters
1280 ----------
1281 time : `float` or `astropy.time.Time`
1282 Time for which to calculate partition number. Can be float to mean
1283 MJD or `astropy.time.Time`
1284 epoch_mjd : `float`
1285 Epoch time for partition 0.
1286 part_days : `int`
1287 Number of days per partition.
1288
1289 Returns
1290 -------
1291 partition : `int`
1292 Partition number for a given time.
1293 """
1294 if isinstance(time, astropy.time.Time):
1295 mjd = float(time.mjd)
1296 else:
1297 mjd = time
1298 days_since_epoch = mjd - epoch_mjd
1299 partition = int(days_since_epoch) // part_days
1300 return partition
1301
1302 def _time_partition(self, time: float | astropy.time.Time) -> int:
1303 """Calculate time partition number for a given time.
1304
1305 Parameters
1306 ----------
1307 time : `float` or `astropy.time.Time`
1308 Time for which to calculate partition number. Can be float to mean
1309 MJD or `astropy.time.Time`
1310
1311 Returns
1312 -------
1313 partition : `int`
1314 Partition number for a given time.
1315 """
1316 if isinstance(time, astropy.time.Time):
1317 mjd = float(time.mjd)
1318 else:
1319 mjd = time
1320 days_since_epoch = mjd - self._partition_zero_epoch_mjd
1321 partition = int(days_since_epoch) // self.config.time_partition_days
1322 return partition
1323
1324 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1325 """Make an empty catalog for a table with a given name.
1326
1327 Parameters
1328 ----------
1329 table_name : `ApdbTables`
1330 Name of the table.
1331
1332 Returns
1333 -------
1334 catalog : `pandas.DataFrame`
1335 An empty catalog.
1336 """
1337 table = self._schema.tableSchemas[table_name]
1338
1339 data = {
1340 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1341 for columnDef in table.columns
1342 }
1343 return pandas.DataFrame(data)
1344
1346 self,
1347 prefix: str,
1348 where1: list[tuple[str, tuple]],
1349 where2: list[tuple[str, tuple]],
1350 suffix: str | None = None,
1351 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1352 """Make cartesian product of two parts of WHERE clause into a series
1353 of statements to execute.
1354
1355 Parameters
1356 ----------
1357 prefix : `str`
1358 Initial statement prefix that comes before WHERE clause, e.g.
1359 "SELECT * from Table"
1360 """
1361 # If lists are empty use special sentinels.
1362 if not where1:
1363 where1 = [("", ())]
1364 if not where2:
1365 where2 = [("", ())]
1366
1367 for expr1, params1 in where1:
1368 for expr2, params2 in where2:
1369 full_query = prefix
1370 wheres = []
1371 if expr1:
1372 wheres.append(expr1)
1373 if expr2:
1374 wheres.append(expr2)
1375 if wheres:
1376 full_query += " WHERE " + " AND ".join(wheres)
1377 if suffix:
1378 full_query += " " + suffix
1379 params = params1 + params2
1380 if params:
1381 statement = self._preparer.prepare(full_query)
1382 else:
1383 # If there are no params then it is likely that query
1384 # has a bunch of literals rendered already, no point
1385 # trying to prepare it.
1386 statement = cassandra.query.SimpleStatement(full_query)
1387 yield (statement, params)
1388
1390 self, region: sphgeom.Region | None, use_ranges: bool = False
1391 ) -> list[tuple[str, tuple]]:
1392 """Generate expressions for spatial part of WHERE clause.
1393
1394 Parameters
1395 ----------
1396 region : `sphgeom.Region`
1397 Spatial region for query results.
1398 use_ranges : `bool`
1399 If True then use pixel ranges ("apdb_part >= p1 AND apdb_part <=
1400 p2") instead of exact list of pixels. Should be set to True for
1401 large regions covering very many pixels.
1402
1403 Returns
1404 -------
1405 expressions : `list` [ `tuple` ]
1406 Empty list is returned if ``region`` is `None`, otherwise a list
1407 of one or more (expression, parameters) tuples
1408 """
1409 if region is None:
1410 return []
1411 if use_ranges:
1412 pixel_ranges = self._pixelization.envelope(region)
1413 expressions: list[tuple[str, tuple]] = []
1414 for lower, upper in pixel_ranges:
1415 upper -= 1
1416 if lower == upper:
1417 expressions.append(('"apdb_part" = ?', (lower,)))
1418 else:
1419 expressions.append(('"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1420 return expressions
1421 else:
1422 pixels = self._pixelization.pixels(region)
1423 if self.config.query_per_spatial_part:
1424 return [('"apdb_part" = ?', (pixel,)) for pixel in pixels]
1425 else:
1426 pixels_str = ",".join([str(pix) for pix in pixels])
1427 return [(f'"apdb_part" IN ({pixels_str})', ())]
1428
1430 self,
1431 table: ApdbTables,
1432 start_time: float | astropy.time.Time,
1433 end_time: float | astropy.time.Time,
1434 query_per_time_part: bool | None = None,
1435 ) -> tuple[list[str], list[tuple[str, tuple]]]:
1436 """Generate table names and expressions for temporal part of WHERE
1437 clauses.
1438
1439 Parameters
1440 ----------
1441 table : `ApdbTables`
1442 Table to select from.
1443 start_time : `astropy.time.Time` or `float`
1444 Starting Datetime of MJD value of the time range.
1445 end_time : `astropy.time.Time` or `float`
1446 Starting Datetime of MJD value of the time range.
1447 query_per_time_part : `bool`, optional
1448 If None then use ``query_per_time_part`` from configuration.
1449
1450 Returns
1451 -------
1452 tables : `list` [ `str` ]
1453 List of the table names to query.
1454 expressions : `list` [ `tuple` ]
1455 A list of zero or more (expression, parameters) tuples.
1456 """
1457 tables: list[str]
1458 temporal_where: list[tuple[str, tuple]] = []
1459 table_name = self._schema.tableName(table)
1460 time_part_start = self._time_partition(start_time)
1461 time_part_end = self._time_partition(end_time)
1462 time_parts = list(range(time_part_start, time_part_end + 1))
1463 if self.config.time_partition_tables:
1464 tables = [f"{table_name}_{part}" for part in time_parts]
1465 else:
1466 tables = [table_name]
1467 if query_per_time_part is None:
1468 query_per_time_part = self.config.query_per_time_part
1469 if query_per_time_part:
1470 temporal_where = [('"apdb_time_part" = ?', (time_part,)) for time_part in time_parts]
1471 else:
1472 time_part_list = ",".join([str(part) for part in time_parts])
1473 temporal_where = [(f'"apdb_time_part" IN ({time_part_list})', ())]
1474
1475 return tables, temporal_where
std::vector< SchemaItem< Flag > > * items
VersionTuple apdbImplementationVersion(cls)
Definition apdb.py:141
__init__(self, list[str] public_ips, list[str] private_ips)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
tuple[Cluster, Session] _make_session(cls, ApdbCassandraConfig config)
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
bool containsVisitDetector(self, int visit, int detector)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
ApdbCassandraConfig init_database(cls, list[str] hosts, str keyspace, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, list[str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
AuthProvider|None _make_auth_provider(cls, ApdbCassandraConfig config)
None _versionCheck(self, ApdbMetadataCassandra metadata)
Mapping[Any, ExecutionProfile] _makeProfiles(cls, ApdbCassandraConfig config)
tuple[list[str], list[tuple[str, tuple]]] _temporal_where(self, ApdbTables table, float|astropy.time.Time start_time, float|astropy.time.Time end_time, bool|None query_per_time_part=None)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _makeSchema(cls, ApdbConfig config, *bool drop=False, int|None replication_factor=None)
list[tuple[str, tuple]] _spatial_where(self, sphgeom.Region|None region, bool use_ranges=False)
int _time_partition_cls(cls, float|astropy.time.Time time, float epoch_mjd, int part_days)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, str|None suffix=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
int _time_partition(self, float|astropy.time.Time time)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
Table|None tableDef(self, ApdbTables table)
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertySet * set
Definition fits.cc:931