LSST Applications g04e9c324dd+8c5ae1fdc5,g134cb467dc+b203dec576,g18429d2f64+358861cd2c,g199a45376c+0ba108daf9,g1fd858c14a+dd066899e3,g262e1987ae+ebfced1d55,g29ae962dfc+72fd90588e,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+b668f15bc5,g4595892280+3897dae354,g47891489e3+abcf9c3559,g4d44eb3520+fb4ddce128,g53246c7159+8c5ae1fdc5,g67b6fd64d1+abcf9c3559,g67fd3c3899+1f72b5a9f7,g74acd417e5+cb6b47f07b,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+abcf9c3559,g8d7436a09f+bcf525d20c,g8ea07a8fe4+9f5ccc88ac,g90f42f885a+6054cc57f1,g97be763408+06f794da49,g9dd6db0277+1f72b5a9f7,ga681d05dcb+7e36ad54cd,gabf8522325+735880ea63,gac2eed3f23+abcf9c3559,gb89ab40317+abcf9c3559,gbf99507273+8c5ae1fdc5,gd8ff7fe66e+1f72b5a9f7,gdab6d2f7ff+cb6b47f07b,gdc713202bf+1f72b5a9f7,gdfd2d52018+8225f2b331,ge365c994fd+375fc21c71,ge410e46f29+abcf9c3559,geaed405ab2+562b3308c0,gf9a733ac38+8c5ae1fdc5,w.2025.35
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandra"]
25
26import datetime
27import logging
28import random
29import warnings
30from collections import defaultdict
31from collections.abc import Iterable, Iterator, Mapping, Set
32from typing import TYPE_CHECKING, Any, cast
33
34import numpy as np
35import pandas
36
37# If cassandra-driver is not there the module can still be imported
38# but ApdbCassandra cannot be instantiated.
39try:
40 import cassandra
41 import cassandra.query
42 from cassandra.query import UNSET_VALUE
43
44 CASSANDRA_IMPORTED = True
45except ImportError:
46 CASSANDRA_IMPORTED = False
47
48import astropy.time
49import felis.datamodel
50
51from lsst import sphgeom
52from lsst.utils.iteration import chunk_iterable
53
54from ..apdb import Apdb, ApdbConfig
55from ..apdbConfigFreezer import ApdbConfigFreezer
56from ..apdbReplica import ApdbTableData, ReplicaChunk
57from ..apdbSchema import ApdbSchema, ApdbTables
58from ..monitor import MonAgent
59from ..schema_model import Table
60from ..timer import Timer
61from ..versionTuple import IncompatibleVersionError, VersionTuple
62from .apdbCassandraAdmin import ApdbCassandraAdmin
63from .apdbCassandraReplica import ApdbCassandraReplica
64from .apdbCassandraSchema import ApdbCassandraSchema, CreateTableOptions, ExtraTables
65from .apdbMetadataCassandra import ApdbMetadataCassandra
66from .cassandra_utils import (
67 execute_concurrent,
68 literal,
69 quote_id,
70 select_concurrent,
71)
72from .config import ApdbCassandraConfig, ApdbCassandraConnectionConfig, ApdbCassandraTimePartitionRange
73from .connectionContext import ConnectionContext, DbVersions
74from .exceptions import CassandraMissingError
75from .partitioner import Partitioner
76from .sessionFactory import SessionContext, SessionFactory
77
78if TYPE_CHECKING:
79 from ..apdbMetadata import ApdbMetadata
80
81_LOG = logging.getLogger(__name__)
82
83_MON = MonAgent(__name__)
84
85VERSION = VersionTuple(1, 1, 0)
86"""Version for the code controlling non-replication tables. This needs to be
87updated following compatibility rules when schema produced by this code
88changes.
89"""
90
91
93 """Implementation of APDB database with Apache Cassandra backend.
94
95 Parameters
96 ----------
97 config : `ApdbCassandraConfig`
98 Configuration object.
99 """
100
101 def __init__(self, config: ApdbCassandraConfig):
102 if not CASSANDRA_IMPORTED:
104
105 self._config = config
106 self._keyspace = config.keyspace
107 self._schema = ApdbSchema(config.schema_file, config.schema_name)
108
110 self._connection_context: ConnectionContext | None = None
111
112 @property
113 def _context(self) -> ConnectionContext:
114 """Establish connection if not established and return context."""
115 if self._connection_context is None:
116 session = self._session_factory.session()
117 self._connection_context = ConnectionContext(session, self._config, self._schema.tableSchemas)
118
119 # Check version compatibility
120 current_versions = DbVersions(
121 schema_version=self._schema.schemaVersion(),
122 code_version=self.apdbImplementationVersion(),
123 replica_version=(
124 ApdbCassandraReplica.apdbReplicaImplementationVersion()
125 if self._connection_context.config.enable_replica
126 else None
127 ),
128 )
129 _LOG.debug("Current versions: %s", current_versions)
130 self._versionCheck(current_versions, self._connection_context.db_versions)
131
132 if _LOG.isEnabledFor(logging.DEBUG):
133 _LOG.debug("ApdbCassandra Configuration: %s", self._connection_context.config.model_dump())
134
135 return self._connection_context
136
137 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
138 """Create `Timer` instance given its name."""
139 return Timer(name, _MON, tags=tags)
140
141 def _versionCheck(self, current_versions: DbVersions, db_versions: DbVersions) -> None:
142 """Check schema version compatibility."""
143 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
145 f"Configured schema version {current_versions.schema_version} "
146 f"is not compatible with database version {db_versions.schema_version}"
147 )
148 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
150 f"Current code version {current_versions.code_version} "
151 f"is not compatible with database version {db_versions.code_version}"
152 )
153
154 # Check replica code version only if replica is enabled.
155 match current_versions.replica_version, db_versions.replica_version:
156 case None, None:
157 pass
158 case VersionTuple() as current, VersionTuple() as stored:
159 if not current.checkCompatibility(stored):
161 f"Current replication code version {current} "
162 f"is not compatible with database version {stored}"
163 )
164 case _:
166 f"Current replication code version {current_versions.replica_version} "
167 f"is not compatible with database version {db_versions.replica_version}"
168 )
169
170 @classmethod
171 def apdbImplementationVersion(cls) -> VersionTuple:
172 """Return version number for current APDB implementation.
173
174 Returns
175 -------
176 version : `VersionTuple`
177 Version of the code defined in implementation class.
178 """
179 return VERSION
180
181 def getConfig(self) -> ApdbCassandraConfig:
182 # docstring is inherited from a base class
183 return self._context.config
184
185 def tableDef(self, table: ApdbTables) -> Table | None:
186 # docstring is inherited from a base class
187 return self._schema.tableSchemas.get(table)
188
189 @classmethod
191 cls,
192 hosts: tuple[str, ...],
193 keyspace: str,
194 *,
195 schema_file: str | None = None,
196 schema_name: str | None = None,
197 read_sources_months: int | None = None,
198 read_forced_sources_months: int | None = None,
199 enable_replica: bool = False,
200 replica_skips_diaobjects: bool = False,
201 port: int | None = None,
202 username: str | None = None,
203 prefix: str | None = None,
204 part_pixelization: str | None = None,
205 part_pix_level: int | None = None,
206 time_partition_tables: bool = True,
207 time_partition_start: str | None = None,
208 time_partition_end: str | None = None,
209 read_consistency: str | None = None,
210 write_consistency: str | None = None,
211 read_timeout: int | None = None,
212 write_timeout: int | None = None,
213 ra_dec_columns: tuple[str, str] | None = None,
214 replication_factor: int | None = None,
215 drop: bool = False,
216 table_options: CreateTableOptions | None = None,
217 ) -> ApdbCassandraConfig:
218 """Initialize new APDB instance and make configuration object for it.
219
220 Parameters
221 ----------
222 hosts : `tuple` [`str`, ...]
223 List of host names or IP addresses for Cassandra cluster.
224 keyspace : `str`
225 Name of the keyspace for APDB tables.
226 schema_file : `str`, optional
227 Location of (YAML) configuration file with APDB schema. If not
228 specified then default location will be used.
229 schema_name : `str`, optional
230 Name of the schema in YAML configuration file. If not specified
231 then default name will be used.
232 read_sources_months : `int`, optional
233 Number of months of history to read from DiaSource.
234 read_forced_sources_months : `int`, optional
235 Number of months of history to read from DiaForcedSource.
236 enable_replica : `bool`, optional
237 If True, make additional tables used for replication to PPDB.
238 replica_skips_diaobjects : `bool`, optional
239 If `True` then do not fill regular ``DiaObject`` table when
240 ``enable_replica`` is `True`.
241 port : `int`, optional
242 Port number to use for Cassandra connections.
243 username : `str`, optional
244 User name for Cassandra connections.
245 prefix : `str`, optional
246 Optional prefix for all table names.
247 part_pixelization : `str`, optional
248 Name of the MOC pixelization used for partitioning.
249 part_pix_level : `int`, optional
250 Pixelization level.
251 time_partition_tables : `bool`, optional
252 Create per-partition tables.
253 time_partition_start : `str`, optional
254 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
255 format, in TAI.
256 time_partition_end : `str`, optional
257 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
258 format, in TAI.
259 read_consistency : `str`, optional
260 Name of the consistency level for read operations.
261 write_consistency : `str`, optional
262 Name of the consistency level for write operations.
263 read_timeout : `int`, optional
264 Read timeout in seconds.
265 write_timeout : `int`, optional
266 Write timeout in seconds.
267 ra_dec_columns : `tuple` [`str`, `str`], optional
268 Names of ra/dec columns in DiaObject table.
269 replication_factor : `int`, optional
270 Replication factor used when creating new keyspace, if keyspace
271 already exists its replication factor is not changed.
272 drop : `bool`, optional
273 If `True` then drop existing tables before re-creating the schema.
274 table_options : `CreateTableOptions`, optional
275 Options used when creating Cassandra tables.
276
277 Returns
278 -------
279 config : `ApdbCassandraConfig`
280 Resulting configuration object for a created APDB instance.
281 """
282 # Some non-standard defaults for connection parameters, these can be
283 # changed later in generated config. Check Cassandra driver
284 # documentation for what these parameters do. These parameters are not
285 # used during database initialization, but they will be saved with
286 # generated config.
287 connection_config = ApdbCassandraConnectionConfig(
288 extra_parameters={
289 "idle_heartbeat_interval": 0,
290 "idle_heartbeat_timeout": 30,
291 "control_connection_timeout": 100,
292 },
293 )
294 config = ApdbCassandraConfig(
295 contact_points=hosts,
296 keyspace=keyspace,
297 enable_replica=enable_replica,
298 replica_skips_diaobjects=replica_skips_diaobjects,
299 connection_config=connection_config,
300 )
301 config.partitioning.time_partition_tables = time_partition_tables
302 if schema_file is not None:
303 config.schema_file = schema_file
304 if schema_name is not None:
305 config.schema_name = schema_name
306 if read_sources_months is not None:
307 config.read_sources_months = read_sources_months
308 if read_forced_sources_months is not None:
309 config.read_forced_sources_months = read_forced_sources_months
310 if port is not None:
311 config.connection_config.port = port
312 if username is not None:
313 config.connection_config.username = username
314 if prefix is not None:
315 config.prefix = prefix
316 if part_pixelization is not None:
317 config.partitioning.part_pixelization = part_pixelization
318 if part_pix_level is not None:
319 config.partitioning.part_pix_level = part_pix_level
320 if time_partition_start is not None:
321 config.partitioning.time_partition_start = time_partition_start
322 if time_partition_end is not None:
323 config.partitioning.time_partition_end = time_partition_end
324 if read_consistency is not None:
325 config.connection_config.read_consistency = read_consistency
326 if write_consistency is not None:
327 config.connection_config.write_consistency = write_consistency
328 if read_timeout is not None:
329 config.connection_config.read_timeout = read_timeout
330 if write_timeout is not None:
331 config.connection_config.write_timeout = write_timeout
332 if ra_dec_columns is not None:
333 config.ra_dec_columns = ra_dec_columns
334
335 cls._makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
336
337 return config
338
339 def get_replica(self) -> ApdbCassandraReplica:
340 """Return `ApdbReplica` instance for this database."""
341 # Note that this instance has to stay alive while replica exists, so
342 # we pass reference to self.
343 return ApdbCassandraReplica(self)
344
345 @classmethod
347 cls,
348 config: ApdbConfig,
349 *,
350 drop: bool = False,
351 replication_factor: int | None = None,
352 table_options: CreateTableOptions | None = None,
353 ) -> None:
354 # docstring is inherited from a base class
355
356 if not isinstance(config, ApdbCassandraConfig):
357 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
358
359 simple_schema = ApdbSchema(config.schema_file, config.schema_name)
360
361 with SessionContext(config) as session:
362 schema = ApdbCassandraSchema(
363 session=session,
364 keyspace=config.keyspace,
365 table_schemas=simple_schema.tableSchemas,
366 prefix=config.prefix,
367 time_partition_tables=config.partitioning.time_partition_tables,
368 enable_replica=config.enable_replica,
369 replica_skips_diaobjects=config.replica_skips_diaobjects,
370 )
371
372 # Ask schema to create all tables.
373 part_range_config: ApdbCassandraTimePartitionRange | None = None
374 if config.partitioning.time_partition_tables:
375 partitioner = Partitioner(config)
376 time_partition_start = astropy.time.Time(
377 config.partitioning.time_partition_start, format="isot", scale="tai"
378 )
379 time_partition_end = astropy.time.Time(
380 config.partitioning.time_partition_end, format="isot", scale="tai"
381 )
382 part_range_config = ApdbCassandraTimePartitionRange(
383 start=partitioner.time_partition(time_partition_start),
384 end=partitioner.time_partition(time_partition_end),
385 )
386 schema.makeSchema(
387 drop=drop,
388 part_range=part_range_config,
389 replication_factor=replication_factor,
390 table_options=table_options,
391 )
392 else:
393 schema.makeSchema(
394 drop=drop, replication_factor=replication_factor, table_options=table_options
395 )
396
397 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
398 metadata = ApdbMetadataCassandra(
399 session, meta_table_name, config.keyspace, "read_tuples", "write"
400 )
401
402 # Fill version numbers, overrides if they existed before.
403 metadata.set(
404 ConnectionContext.metadataSchemaVersionKey, str(simple_schema.schemaVersion()), force=True
405 )
406 metadata.set(
407 ConnectionContext.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True
408 )
409
410 if config.enable_replica:
411 # Only store replica code version if replica is enabled.
412 metadata.set(
413 ConnectionContext.metadataReplicaVersionKey,
414 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
415 force=True,
416 )
417
418 # Store frozen part of a configuration in metadata.
419 freezer = ApdbConfigFreezer[ApdbCassandraConfig](ConnectionContext.frozen_parameters)
420 metadata.set(ConnectionContext.metadataConfigKey, freezer.to_json(config), force=True)
421
422 # Store time partition range.
423 if part_range_config:
424 part_range_config.save_to_meta(metadata)
425
426 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
427 # docstring is inherited from a base class
428 context = self._context
429 config = context.config
430
431 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=True)
432 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
433
434 # We need to exclude extra partitioning columns from result.
435 column_names = context.schema.apdbColumnNames(ApdbTables.DiaObjectLast)
436 what = ",".join(quote_id(column) for column in column_names)
437
438 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
439 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
440 statements: list[tuple] = []
441 for where, params in sp_where:
442 full_query = f"{query} WHERE {where}"
443 if params:
444 statement = context.preparer.prepare(full_query)
445 else:
446 # If there are no params then it is likely that query has a
447 # bunch of literals rendered already, no point trying to
448 # prepare it because it's not reusable.
449 statement = cassandra.query.SimpleStatement(full_query)
450 statements.append((statement, params))
451 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
452
453 with _MON.context_tags({"table": "DiaObject"}):
454 _MON.add_record(
455 "select_query_stats", values={"num_sp_part": num_sp_part, "num_queries": len(statements)}
456 )
457 with self._timer("select_time") as timer:
458 objects = cast(
459 pandas.DataFrame,
460 select_concurrent(
461 context.session,
462 statements,
463 "read_pandas_multi",
464 config.connection_config.read_concurrency,
465 ),
466 )
467 timer.add_values(row_count=len(objects))
468
469 _LOG.debug("found %s DiaObjects", objects.shape[0])
470 return objects
471
473 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
474 ) -> pandas.DataFrame | None:
475 # docstring is inherited from a base class
476 context = self._context
477 config = context.config
478
479 months = config.read_sources_months
480 if months == 0:
481 return None
482 mjd_end = float(visit_time.mjd)
483 mjd_start = mjd_end - months * 30
484
485 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
486
488 self, region: sphgeom.Region, object_ids: Iterable[int] | None, visit_time: astropy.time.Time
489 ) -> pandas.DataFrame | None:
490 # docstring is inherited from a base class
491 context = self._context
492 config = context.config
493
494 months = config.read_forced_sources_months
495 if months == 0:
496 return None
497 mjd_end = float(visit_time.mjd)
498 mjd_start = mjd_end - months * 30
499
500 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
501
503 self,
504 visit: int,
505 detector: int,
506 region: sphgeom.Region,
507 visit_time: astropy.time.Time,
508 ) -> bool:
509 # docstring is inherited from a base class
510 context = self._context
511 config = context.config
512
513 # If ApdbDetectorVisit table exists just check it.
514 if context.has_visit_detector_table:
515 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
516 query = (
517 f'SELECT count(*) FROM "{self._keyspace}"."{table_name}" WHERE visit = %s AND detector = %s'
518 )
519 with self._timer("contains_visit_detector_time"):
520 result = context.session.execute(query, (visit, detector))
521 return bool(result.one()[0])
522
523 # The order of checks corresponds to order in store(), on potential
524 # store failure earlier tables have higher probability containing
525 # stored records. With per-partition tables there will be many tables
526 # in the list, but it is unlikely that we'll use that setup in
527 # production.
528 sp_where, _ = context.partitioner.spatial_where(region, use_ranges=True, for_prepare=True)
529 visit_detector_where = ("visit = ? AND detector = ?", (visit, detector))
530
531 # Sources are partitioned on their midPointMjdTai. To avoid precision
532 # issues add some fuzzines to visit time.
533 mjd_start = float(visit_time.mjd) - 1.0 / 24
534 mjd_end = float(visit_time.mjd) + 1.0 / 24
535
536 statements: list[tuple] = []
537 for table_type in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
538 tables, temporal_where = context.partitioner.temporal_where(
539 table_type, mjd_start, mjd_end, query_per_time_part=True, for_prepare=True
540 )
541 for table in tables:
542 prefix = f'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
543 # Needs ALLOW FILTERING as there is no PK constraint.
544 suffix = "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
545 statements += list(
546 self._combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
547 )
548
549 with self._timer("contains_visit_detector_time"):
550 result = cast(
551 list[tuple[int] | None],
552 select_concurrent(
553 context.session,
554 statements,
555 "read_tuples",
556 config.connection_config.read_concurrency,
557 ),
558 )
559 return bool(result)
560
561 def getSSObjects(self) -> pandas.DataFrame:
562 # docstring is inherited from a base class
563 context = self._context
564
565 tableName = context.schema.tableName(ApdbTables.SSObject)
566 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
567
568 objects = None
569 with self._timer("select_time", tags={"table": "SSObject"}) as timer:
570 result = context.session.execute(query, execution_profile="read_pandas")
571 objects = result._current_rows
572 timer.add_values(row_count=len(objects))
573
574 _LOG.debug("found %s SSObjects", objects.shape[0])
575 return objects
576
577 def store(
578 self,
579 visit_time: astropy.time.Time,
580 objects: pandas.DataFrame,
581 sources: pandas.DataFrame | None = None,
582 forced_sources: pandas.DataFrame | None = None,
583 ) -> None:
584 # docstring is inherited from a base class
585 context = self._context
586 config = context.config
587
588 if context.has_visit_detector_table:
589 # Store visit/detector in a special table, this has to be done
590 # before all other writes so if there is a failure at any point
591 # later we still have a record for attempted write.
592 visit_detector: set[tuple[int, int]] = set()
593 for df in sources, forced_sources:
594 if df is not None and not df.empty:
595 df = df[["visit", "detector"]]
596 for visit, detector in df.itertuples(index=False):
597 visit_detector.add((visit, detector))
598
599 if visit_detector:
600 # Typically there is only one entry, do not bother with
601 # concurrency.
602 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
603 query = f'INSERT INTO "{self._keyspace}"."{table_name}" (visit, detector) VALUES (%s, %s)'
604 for item in visit_detector:
605 context.session.execute(query, item, execution_profile="write")
606
607 objects = self._fix_input_timestamps(objects)
608 if sources is not None:
609 sources = self._fix_input_timestamps(sources)
610 if forced_sources is not None:
611 forced_sources = self._fix_input_timestamps(forced_sources)
612
613 replica_chunk: ReplicaChunk | None = None
614 if context.schema.replication_enabled:
615 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, config.replica_chunk_seconds)
616 self._storeReplicaChunk(replica_chunk, visit_time)
617
618 # fill region partition column for DiaObjects
619 objects = self._add_apdb_part(objects)
620 self._storeDiaObjects(objects, visit_time, replica_chunk)
621
622 if sources is not None and len(sources) > 0:
623 # copy apdb_part column from DiaObjects to DiaSources
624 sources = self._add_apdb_part(sources)
625 subchunk = self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
626 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk, subchunk)
627
628 if forced_sources is not None and len(forced_sources) > 0:
629 forced_sources = self._add_apdb_part(forced_sources)
630 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
631
632 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
633 # docstring is inherited from a base class
634 objects = self._fix_input_timestamps(objects)
635 self._storeObjectsPandas(objects, ApdbTables.SSObject)
636
637 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
638 # docstring is inherited from a base class
639 context = self._context
640 config = context.config
641
642 if self._schema.has_mjd_timestamps:
643 reassign_time_column = "ssObjectReassocTimeMjdTai"
644 reassignTime = astropy.time.Time.now().tai.mjd
645 else:
646 reassign_time_column = "ssObjectReassocTime"
647 # Current time as milliseconds since epoch.
648 reassignTime = int(datetime.datetime.now(tz=datetime.UTC).timestamp() * 1000)
649
650 # To update a record we need to know its exact primary key (including
651 # partition key) so we start by querying for diaSourceId to find the
652 # primary keys.
653
654 table_name = context.schema.tableName(ExtraTables.DiaSourceToPartition)
655 # split it into 1k IDs per query
656 selects: list[tuple] = []
657 for ids in chunk_iterable(idMap.keys(), 1_000):
658 ids_str = ",".join(str(item) for item in ids)
659 selects.append(
660 (
661 (
662 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
663 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
664 ),
665 {},
666 )
667 )
668
669 # No need for DataFrame here, read data as tuples.
670 result = cast(
671 list[tuple[int, int, int, int | None]],
672 select_concurrent(
673 context.session, selects, "read_tuples", config.connection_config.read_concurrency
674 ),
675 )
676
677 # Make mapping from source ID to its partition.
678 id2partitions: dict[int, tuple[int, int]] = {}
679 id2chunk_id: dict[int, int] = {}
680 for row in result:
681 id2partitions[row[0]] = row[1:3]
682 if row[3] is not None:
683 id2chunk_id[row[0]] = row[3]
684
685 # make sure we know partitions for each ID
686 if set(id2partitions) != set(idMap):
687 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
688 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
689
690 # Reassign in standard tables
691 queries: list[tuple[cassandra.query.PreparedStatement, tuple]] = []
692 for diaSourceId, ssObjectId in idMap.items():
693 apdb_part, apdb_time_part = id2partitions[diaSourceId]
694 values: tuple
695 if config.partitioning.time_partition_tables:
696 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part)
697 query = (
698 f'UPDATE "{self._keyspace}"."{table_name}"'
699 f' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
700 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
701 )
702 values = (ssObjectId, reassignTime, apdb_part, diaSourceId)
703 else:
704 table_name = context.schema.tableName(ApdbTables.DiaSource)
705 query = (
706 f'UPDATE "{self._keyspace}"."{table_name}"'
707 f' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
708 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
709 )
710 values = (ssObjectId, reassignTime, apdb_part, apdb_time_part, diaSourceId)
711 queries.append((context.preparer.prepare(query), values))
712
713 # TODO: (DM-50190) Replication for updated records is not implemented.
714 if id2chunk_id:
715 warnings.warn("Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
716
717 _LOG.debug("%s: will update %d records", table_name, len(idMap))
718 with self._timer("source_reassign_time") as timer:
719 execute_concurrent(context.session, queries, execution_profile="write")
720 timer.add_values(source_count=len(idMap))
721
722 def dailyJob(self) -> None:
723 # docstring is inherited from a base class
724 pass
725
726 def countUnassociatedObjects(self) -> int:
727 # docstring is inherited from a base class
728
729 # It's too inefficient to implement it for Cassandra in current schema.
730 raise NotImplementedError()
731
732 @property
733 def metadata(self) -> ApdbMetadata:
734 # docstring is inherited from a base class
735 context = self._context
736 return context.metadata
737
738 @property
739 def admin(self) -> ApdbCassandraAdmin:
740 # docstring is inherited from a base class
741 return ApdbCassandraAdmin(self)
742
744 self,
745 region: sphgeom.Region,
746 object_ids: Iterable[int] | None,
747 mjd_start: float,
748 mjd_end: float,
749 table_name: ApdbTables,
750 ) -> pandas.DataFrame:
751 """Return catalog of DiaSource instances given set of DiaObject IDs.
752
753 Parameters
754 ----------
755 region : `lsst.sphgeom.Region`
756 Spherical region.
757 object_ids :
758 Collection of DiaObject IDs
759 mjd_start : `float`
760 Lower bound of time interval.
761 mjd_end : `float`
762 Upper bound of time interval.
763 table_name : `ApdbTables`
764 Name of the table.
765
766 Returns
767 -------
768 catalog : `pandas.DataFrame`, or `None`
769 Catalog containing DiaSource records. Empty catalog is returned if
770 ``object_ids`` is empty.
771 """
772 context = self._context
773 config = context.config
774
775 object_id_set: Set[int] = set()
776 if object_ids is not None:
777 object_id_set = set(object_ids)
778 if len(object_id_set) == 0:
779 return self._make_empty_catalog(table_name)
780
781 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=True)
782 tables, temporal_where = context.partitioner.temporal_where(
783 table_name, mjd_start, mjd_end, for_prepare=True, partitons_range=context.time_partitions_range
784 )
785 if not tables:
786 start = astropy.time.Time(mjd_start, format="mjd", scale="tai")
787 end = astropy.time.Time(mjd_end, format="mjd", scale="tai")
788 warnings.warn(
789 f"Query time range ({start.isot} - {end.isot}) does not overlap database time partitions."
790 )
791
792 # We need to exclude extra partitioning columns from result.
793 column_names = context.schema.apdbColumnNames(table_name)
794 what = ",".join(quote_id(column) for column in column_names)
795
796 # Build all queries
797 statements: list[tuple] = []
798 for table in tables:
799 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
800 statements += list(self._combine_where(prefix, sp_where, temporal_where))
801 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
802
803 with _MON.context_tags({"table": table_name.name}):
804 _MON.add_record(
805 "select_query_stats", values={"num_sp_part": num_sp_part, "num_queries": len(statements)}
806 )
807 with self._timer("select_time") as timer:
808 catalog = cast(
809 pandas.DataFrame,
810 select_concurrent(
811 context.session,
812 statements,
813 "read_pandas_multi",
814 config.connection_config.read_concurrency,
815 ),
816 )
817 timer.add_values(row_count_from_db=len(catalog))
818
819 # filter by given object IDs
820 if len(object_id_set) > 0:
821 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
822
823 # precise filtering on midpointMjdTai
824 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
825
826 timer.add_values(row_count=len(catalog))
827
828 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
829 return catalog
830
831 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk, visit_time: astropy.time.Time) -> None:
832 context = self._context
833 config = context.config
834
835 # Cassandra timestamp uses milliseconds since epoch
836 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
837
838 # everything goes into a single partition
839 partition = 0
840
841 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
842
843 columns = ["partition", "apdb_replica_chunk", "last_update_time", "unique_id"]
844 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
845 if context.has_chunk_sub_partitions:
846 columns.append("has_subchunks")
847 values.append(True)
848
849 column_list = ", ".join(columns)
850 placeholders = ",".join(["%s"] * len(columns))
851 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
852
853 context.session.execute(
854 query,
855 values,
856 timeout=config.connection_config.write_timeout,
857 execution_profile="write",
858 )
859
860 def _queryDiaObjectLastPartitions(self, ids: Iterable[int]) -> Mapping[int, int]:
861 """Return existing mapping of diaObjectId to its last partition."""
862 context = self._context
863 config = context.config
864
865 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
866 queries = []
867 object_count = 0
868 for id_chunk in chunk_iterable(ids, 10_000):
869 id_chunk_list = list(id_chunk)
870 query = (
871 f'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
872 f'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
873 )
874 queries.append((query, ()))
875 object_count += len(id_chunk_list)
876
877 with self._timer("query_object_last_partitions") as timer:
878 data = cast(
879 ApdbTableData,
880 select_concurrent(
881 context.session,
882 queries,
883 "read_raw_multi",
884 config.connection_config.read_concurrency,
885 ),
886 )
887 timer.add_values(object_count=object_count, row_count=len(data.rows()))
888
889 if data.column_names() != ["diaObjectId", "apdb_part"]:
890 raise RuntimeError(f"Unexpected column names in query result: {data.column_names()}")
891
892 return {row[0]: row[1] for row in data.rows()}
893
894 def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
895 """Objects in DiaObjectsLast can move from one spatial partition to
896 another. For those objects inserting new version does not replace old
897 one, so we need to explicitly remove old versions before inserting new
898 ones.
899 """
900 context = self._context
901
902 # Extract all object IDs.
903 new_partitions = dict(zip(objs["diaObjectId"], objs["apdb_part"]))
904 old_partitions = self._queryDiaObjectLastPartitions(objs["diaObjectId"])
905
906 moved_oids: dict[int, tuple[int, int]] = {}
907 for oid, old_part in old_partitions.items():
908 new_part = new_partitions.get(oid, old_part)
909 if new_part != old_part:
910 moved_oids[oid] = (old_part, new_part)
911 _LOG.debug("DiaObject IDs that moved to new partition: %s", moved_oids)
912
913 if moved_oids:
914 # Delete old records from DiaObjectLast.
915 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
916 query = f'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
917 statement = context.preparer.prepare(query)
918 queries = []
919 for oid, (old_part, _) in moved_oids.items():
920 queries.append((statement, (old_part, oid)))
921 with self._timer("delete_object_last") as timer:
922 execute_concurrent(context.session, queries, execution_profile="write")
923 timer.add_values(row_count=len(moved_oids))
924
925 # Add all new records to the map.
926 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
927 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
928 statement = context.preparer.prepare(query)
929
930 queries = []
931 for oid, new_part in new_partitions.items():
932 queries.append((statement, (oid, new_part)))
933
934 with self._timer("update_object_last_partition") as timer:
935 execute_concurrent(context.session, queries, execution_profile="write")
936 timer.add_values(row_count=len(queries))
937
939 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
940 ) -> None:
941 """Store catalog of DiaObjects from current visit.
942
943 Parameters
944 ----------
945 objs : `pandas.DataFrame`
946 Catalog with DiaObject records
947 visit_time : `astropy.time.Time`
948 Time of the current visit.
949 replica_chunk : `ReplicaChunk` or `None`
950 Replica chunk identifier if replication is configured.
951 """
952 if len(objs) == 0:
953 _LOG.debug("No objects to write to database.")
954 return
955
956 context = self._context
957 config = context.config
958
959 if context.has_dia_object_last_to_partition:
960 self._deleteMovingObjects(objs)
961
962 if self._schema.has_mjd_timestamps:
963 validity_start_column = "validityStartMjdTai"
964 timestamp = visit_time.tai.mjd
965 else:
966 validity_start_column = "validityStart"
967 timestamp = visit_time.datetime
968
969 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast)
970
971 extra_columns = {validity_start_column: timestamp}
972 visit_time_part = context.partitioner.time_partition(visit_time)
973 time_part: int | None = visit_time_part
974 if (time_partitions_range := context.time_partitions_range) is not None:
975 self._check_time_partitions([visit_time_part], time_partitions_range)
976 if not config.partitioning.time_partition_tables:
977 extra_columns["apdb_time_part"] = time_part
978 time_part = None
979
980 # Only store DiaObects if not doing replication or explicitly
981 # configured to always store them.
982 if replica_chunk is None or not config.replica_skips_diaobjects:
984 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
985 )
986
987 if replica_chunk is not None:
988 extra_columns = {"apdb_replica_chunk": replica_chunk.id, validity_start_column: timestamp}
989 table = ExtraTables.DiaObjectChunks
990 if context.has_chunk_sub_partitions:
991 table = ExtraTables.DiaObjectChunks2
992 # Use a random number for a second part of partitioning key so
993 # that different clients could wrtite to different partitions.
994 # This makes it not exactly reproducible.
995 extra_columns["apdb_replica_subchunk"] = random.randrange(config.replica_sub_chunk_count)
996 self._storeObjectsPandas(objs, table, extra_columns=extra_columns)
997
999 self,
1000 table_name: ApdbTables,
1001 sources: pandas.DataFrame,
1002 replica_chunk: ReplicaChunk | None,
1003 ) -> int | None:
1004 """Store catalog of DIASources or DIAForcedSources from current visit.
1005
1006 Parameters
1007 ----------
1008 table_name : `ApdbTables`
1009 Table where to store the data.
1010 sources : `pandas.DataFrame`
1011 Catalog containing DiaSource records
1012 visit_time : `astropy.time.Time`
1013 Time of the current visit.
1014 replica_chunk : `ReplicaChunk` or `None`
1015 Replica chunk identifier if replication is configured.
1016
1017 Returns
1018 -------
1019 subchunk : `int` or `None`
1020 Subchunk number for resulting replica data, `None` if relication is
1021 not enabled ot subchunking is not enabled.
1022 """
1023 context = self._context
1024 config = context.config
1025
1026 # Time partitioning has to be based on midpointMjdTai, not visit_time
1027 # as visit_time is not really a visit time.
1028 tp_sources = sources.copy(deep=False)
1029 tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(context.partitioner.time_partition)
1030 if (time_partitions_range := context.time_partitions_range) is not None:
1031 self._check_time_partitions(tp_sources["apdb_time_part"], time_partitions_range)
1032 extra_columns: dict[str, Any] = {}
1033 if not config.partitioning.time_partition_tables:
1034 self._storeObjectsPandas(tp_sources, table_name)
1035 else:
1036 # Group by time partition
1037 partitions = set(tp_sources["apdb_time_part"])
1038 if len(partitions) == 1:
1039 # Single partition - just save the whole thing.
1040 time_part = partitions.pop()
1041 self._storeObjectsPandas(sources, table_name, time_part=time_part)
1042 else:
1043 # group by time partition.
1044 for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
1045 sub_frame.drop(columns="apdb_time_part", inplace=True)
1046 self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1047
1048 subchunk: int | None = None
1049 if replica_chunk is not None:
1050 extra_columns = {"apdb_replica_chunk": replica_chunk.id}
1051 if context.has_chunk_sub_partitions:
1052 subchunk = random.randrange(config.replica_sub_chunk_count)
1053 extra_columns["apdb_replica_subchunk"] = subchunk
1054 if table_name is ApdbTables.DiaSource:
1055 extra_table = ExtraTables.DiaSourceChunks2
1056 else:
1057 extra_table = ExtraTables.DiaForcedSourceChunks2
1058 else:
1059 if table_name is ApdbTables.DiaSource:
1060 extra_table = ExtraTables.DiaSourceChunks
1061 else:
1062 extra_table = ExtraTables.DiaForcedSourceChunks
1063 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1064
1065 return subchunk
1066
1068 self, partitions: Iterable[int], time_partitions_range: ApdbCassandraTimePartitionRange
1069 ) -> None:
1070 """Check that time partitons for new data actually exist.
1071
1072 Parameters
1073 ----------
1074 partitions : `~collections.abc.Iterable` [`int`]
1075 Time partitions for new data.
1076 time_partitions_range : `ApdbCassandraTimePartitionRange`
1077 Currrent time partition range.
1078 """
1079 partitions = set(partitions)
1080 min_part = min(partitions)
1081 max_part = max(partitions)
1082 if min_part < time_partitions_range.start or max_part > time_partitions_range.end:
1083 raise ValueError(
1084 "Attempt to store data for time partitions that do not yet exist. "
1085 f"Partitons for new records: {min_part}-{max_part}. "
1086 f"Database partitons: {time_partitions_range.start}-{time_partitions_range.end}."
1087 )
1088 # Make a noise when writing to the last partition.
1089 if max_part == time_partitions_range.end:
1090 warnings.warn(
1091 "Writing into the last temporal partition. Partition range needs to be extended soon.",
1092 stacklevel=3,
1093 )
1094
1096 self,
1097 sources: pandas.DataFrame,
1098 visit_time: astropy.time.Time,
1099 replica_chunk: ReplicaChunk | None,
1100 subchunk: int | None,
1101 ) -> None:
1102 """Store mapping of diaSourceId to its partitioning values.
1103
1104 Parameters
1105 ----------
1106 sources : `pandas.DataFrame`
1107 Catalog containing DiaSource records
1108 visit_time : `astropy.time.Time`
1109 Time of the current visit.
1110 replica_chunk : `ReplicaChunk` or `None`
1111 Replication chunk, or `None` when replication is disabled.
1112 subchunk : `int` or `None`
1113 Replication sub-chunk, or `None` when replication is disabled or
1114 sub-chunking is not used.
1115 """
1116 context = self._context
1117
1118 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1119 extra_columns = {
1120 "apdb_time_part": context.partitioner.time_partition(visit_time),
1121 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1122 }
1123 if context.has_chunk_sub_partitions:
1124 extra_columns["apdb_replica_subchunk"] = subchunk
1125
1127 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1128 )
1129
1131 self,
1132 records: pandas.DataFrame,
1133 table_name: ApdbTables | ExtraTables,
1134 extra_columns: Mapping | None = None,
1135 time_part: int | None = None,
1136 ) -> None:
1137 """Store generic objects.
1138
1139 Takes Pandas catalog and stores a bunch of records in a table.
1140
1141 Parameters
1142 ----------
1143 records : `pandas.DataFrame`
1144 Catalog containing object records
1145 table_name : `ApdbTables`
1146 Name of the table as defined in APDB schema.
1147 extra_columns : `dict`, optional
1148 Mapping (column_name, column_value) which gives fixed values for
1149 columns in each row, overrides values in ``records`` if matching
1150 columns exist there.
1151 time_part : `int`, optional
1152 If not `None` then insert into a per-partition table.
1153
1154 Notes
1155 -----
1156 If Pandas catalog contains additional columns not defined in table
1157 schema they are ignored. Catalog does not have to contain all columns
1158 defined in a table, but partition and clustering keys must be present
1159 in a catalog or ``extra_columns``.
1160 """
1161 context = self._context
1162
1163 # use extra columns if specified
1164 if extra_columns is None:
1165 extra_columns = {}
1166 extra_fields = list(extra_columns.keys())
1167
1168 # Fields that will come from dataframe.
1169 df_fields = [column for column in records.columns if column not in extra_fields]
1170
1171 column_map = context.schema.getColumnMap(table_name)
1172 # list of columns (as in felis schema)
1173 fields = [column_map[field].name for field in df_fields if field in column_map]
1174 fields += extra_fields
1175
1176 # check that all partitioning and clustering columns are defined
1177 partition_columns = context.schema.partitionColumns(table_name)
1178 required_columns = partition_columns + context.schema.clusteringColumns(table_name)
1179 missing_columns = [column for column in required_columns if column not in fields]
1180 if missing_columns:
1181 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1182
1183 qfields = [quote_id(field) for field in fields]
1184 qfields_str = ",".join(qfields)
1185
1186 batch_size = self._batch_size(table_name)
1187
1188 with self._timer("insert_build_time", tags={"table": table_name.name}):
1189 # Multi-partition batches are problematic in general, so we want to
1190 # group records in a batch by their partition key.
1191 values_by_key: dict[tuple, list[list]] = defaultdict(list)
1192 for rec in records.itertuples(index=False):
1193 values = []
1194 partitioning_values: dict[str, Any] = {}
1195 for field in df_fields:
1196 if field not in column_map:
1197 continue
1198 value = getattr(rec, field)
1199 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1200 if isinstance(value, pandas.Timestamp):
1201 value = value.to_pydatetime()
1202 elif value is pandas.NaT:
1203 value = None
1204 else:
1205 # Assume it's seconds since epoch, Cassandra
1206 # datetime is in milliseconds
1207 value = int(value * 1000)
1208 value = literal(value)
1209 values.append(UNSET_VALUE if value is None else value)
1210 if field in partition_columns:
1211 partitioning_values[field] = value
1212 for field in extra_fields:
1213 value = literal(extra_columns[field])
1214 values.append(UNSET_VALUE if value is None else value)
1215 if field in partition_columns:
1216 partitioning_values[field] = value
1217
1218 key = tuple(partitioning_values[field] for field in partition_columns)
1219 values_by_key[key].append(values)
1220
1221 table = context.schema.tableName(table_name, time_part)
1222
1223 holders = ",".join(["?"] * len(qfields))
1224 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1225 statement = context.preparer.prepare(query)
1226 # Cassandra has 64k limit on batch size, normally that should be
1227 # enough but some tests generate too many forced sources.
1228 queries = []
1229 for key_values in values_by_key.values():
1230 for values_chunk in chunk_iterable(key_values, batch_size):
1231 batch = cassandra.query.BatchStatement()
1232 for row_values in values_chunk:
1233 batch.add(statement, row_values)
1234 queries.append((batch, None))
1235 assert batch.routing_key is not None and batch.keyspace is not None
1236
1237 _LOG.debug("%s: will store %d records", context.schema.tableName(table_name), records.shape[0])
1238 with self._timer("insert_time", tags={"table": table_name.name}) as timer:
1239 execute_concurrent(context.session, queries, execution_profile="write")
1240 timer.add_values(row_count=len(records), num_batches=len(queries))
1241
1242 def _add_apdb_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1243 """Calculate spatial partition for each record and add it to a
1244 DataFrame.
1245
1246 Parameters
1247 ----------
1248 df : `pandas.DataFrame`
1249 DataFrame which has to contain ra/dec columns, names of these
1250 columns are defined by configuration ``ra_dec_columns`` field.
1251
1252 Returns
1253 -------
1254 df : `pandas.DataFrame`
1255 DataFrame with ``apdb_part`` column which contains pixel index
1256 for ra/dec coordinates.
1257
1258 Notes
1259 -----
1260 This overrides any existing column in a DataFrame with the same name
1261 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1262 is returned.
1263 """
1264 context = self._context
1265 config = context.config
1266
1267 # Calculate pixelization index for every record.
1268 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1269 ra_col, dec_col = config.ra_dec_columns
1270 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1271 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1272 idx = context.partitioner.pixel(uv3d)
1273 apdb_part[i] = idx
1274 df = df.copy()
1275 df["apdb_part"] = apdb_part
1276 return df
1277
1278 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1279 """Make an empty catalog for a table with a given name.
1280
1281 Parameters
1282 ----------
1283 table_name : `ApdbTables`
1284 Name of the table.
1285
1286 Returns
1287 -------
1288 catalog : `pandas.DataFrame`
1289 An empty catalog.
1290 """
1291 table = self._schema.tableSchemas[table_name]
1292
1293 data = {
1294 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1295 for columnDef in table.columns
1296 }
1297 return pandas.DataFrame(data)
1298
1300 self,
1301 prefix: str,
1302 where1: list[tuple[str, tuple]],
1303 where2: list[tuple[str, tuple]],
1304 where3: tuple[str, tuple] | None = None,
1305 suffix: str | None = None,
1306 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1307 """Make cartesian product of two parts of WHERE clause into a series
1308 of statements to execute.
1309
1310 Parameters
1311 ----------
1312 prefix : `str`
1313 Initial statement prefix that comes before WHERE clause, e.g.
1314 "SELECT * from Table"
1315 """
1316 context = self._context
1317
1318 # If lists are empty use special sentinels.
1319 if not where1:
1320 where1 = [("", ())]
1321 if not where2:
1322 where2 = [("", ())]
1323
1324 for expr1, params1 in where1:
1325 for expr2, params2 in where2:
1326 full_query = prefix
1327 wheres = []
1328 params = params1 + params2
1329 if expr1:
1330 wheres.append(expr1)
1331 if expr2:
1332 wheres.append(expr2)
1333 if where3:
1334 wheres.append(where3[0])
1335 params += where3[1]
1336 if wheres:
1337 full_query += " WHERE " + " AND ".join(wheres)
1338 if suffix:
1339 full_query += " " + suffix
1340 if params:
1341 statement = context.preparer.prepare(full_query)
1342 else:
1343 # If there are no params then it is likely that query
1344 # has a bunch of literals rendered already, no point
1345 # trying to prepare it.
1346 statement = cassandra.query.SimpleStatement(full_query)
1347 yield (statement, params)
1348
1349 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1350 """Update timestamp columns in input DataFrame to be naive datetime
1351 type.
1352
1353 Clients may or may not generate aware timestamps, code in this class
1354 assumes that timestamps are naive, so we convert them to UTC and
1355 drop timezone.
1356 """
1357 # Find all columns with aware timestamps.
1358 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1359 for column in columns:
1360 # tz_convert(None) will convert to UTC and drop timezone.
1361 df[column] = df[column].dt.tz_convert(None)
1362 return df
1363
1364 def _batch_size(self, table: ApdbTables | ExtraTables) -> int:
1365 """Calculate batch size based on config parameters."""
1366 context = self._context
1367 config = context.config
1368
1369 # Cassandra limit on number of statements in a batch is 64k.
1370 batch_size = 65_535
1371 if 0 < config.batch_statement_limit < batch_size:
1372 batch_size = config.batch_statement_limit
1373 if config.batch_size_limit > 0:
1374 # The purpose of this limit is to try not to exceed batch size
1375 # threshold which is set on server side. Cassandra wire protocol
1376 # for prepared queries (and batches) only sends column values with
1377 # with an additional 4 bytes per value specifying size. Value is
1378 # not included for NULL or NOT_SET values, but the size is always
1379 # there. There is additional small per-query overhead, which we
1380 # ignore.
1381 row_size = context.schema.table_row_size(table)
1382 row_size += 4 * len(context.schema.getColumnMap(table))
1383 batch_size = min(batch_size, (config.batch_size_limit // row_size) + 1)
1384 return batch_size
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time)
None _versionCheck(self, DbVersions current_versions, DbVersions db_versions)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
None _check_time_partitions(self, Iterable[int] partitions, ApdbCassandraTimePartitionRange time_partitions_range)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
None _deleteMovingObjects(self, pandas.DataFrame objs)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
Definition Region.h:89
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.