LSST Applications g013ef56533+63812263fb,g083dd6704c+a047e97985,g199a45376c+0ba108daf9,g1fd858c14a+fde7a7a78c,g210f2d0738+db0c280453,g262e1987ae+abed931625,g29ae962dfc+058d1915d8,g2cef7863aa+aef1011c0b,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+64337f1634,g47891489e3+f459a6810c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+890c8e1e5d,g5a60e81ecd+d9e514a434,g64539dfbff+db0c280453,g67b6fd64d1+f459a6810c,g6ebf1fc0d4+8c5ae1fdc5,g7382096ae9+36d16ea71a,g74acd417e5+c70e70fbf6,g786e29fd12+668abc6043,g87389fa792+8856018cbb,g89139ef638+f459a6810c,g8d7436a09f+1b779678e3,g8ea07a8fe4+81eaaadc04,g90f42f885a+34c0557caf,g97be763408+9583a964dd,g98a1a72a9c+028271c396,g98df359435+530b675b85,gb8cb2b794d+4e54f68785,gbf99507273+8c5ae1fdc5,gc2a301910b+db0c280453,gca7fc764a6+f459a6810c,gd7ef33dd92+f459a6810c,gdab6d2f7ff+c70e70fbf6,ge410e46f29+f459a6810c,ge41e95a9f2+db0c280453,geaed405ab2+e3b4b2a692,gf9a733ac38+8c5ae1fdc5,w.2025.43
LSST Data Management Base Package
Loading...
Searching...
No Matches
apdbCassandra.py
Go to the documentation of this file.
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22from __future__ import annotations
23
24__all__ = ["ApdbCassandra"]
25
26import datetime
27import logging
28import random
29import uuid
30import warnings
31from collections import defaultdict
32from collections.abc import Iterable, Iterator, Mapping, Set
33from typing import TYPE_CHECKING, Any, cast
34
35import numpy as np
36import pandas
37
38# If cassandra-driver is not there the module can still be imported
39# but ApdbCassandra cannot be instantiated.
40try:
41 import cassandra
42 import cassandra.query
43 from cassandra.query import UNSET_VALUE
44
45 CASSANDRA_IMPORTED = True
46except ImportError:
47 CASSANDRA_IMPORTED = False
48
49import astropy.time
50import felis.datamodel
51
52from lsst import sphgeom
53from lsst.utils.iteration import chunk_iterable
54
55from ..apdb import Apdb, ApdbConfig
56from ..apdbConfigFreezer import ApdbConfigFreezer
57from ..apdbReplica import ApdbTableData, ReplicaChunk
58from ..apdbSchema import ApdbSchema, ApdbTables
59from ..monitor import MonAgent
60from ..schema_model import Table
61from ..timer import Timer
62from ..versionTuple import IncompatibleVersionError, VersionTuple
63from .apdbCassandraAdmin import ApdbCassandraAdmin
64from .apdbCassandraReplica import ApdbCassandraReplica
65from .apdbCassandraSchema import ApdbCassandraSchema, CreateTableOptions, ExtraTables
66from .apdbMetadataCassandra import ApdbMetadataCassandra
67from .cassandra_utils import (
68 execute_concurrent,
69 literal,
70 quote_id,
71 select_concurrent,
72)
73from .config import ApdbCassandraConfig, ApdbCassandraConnectionConfig, ApdbCassandraTimePartitionRange
74from .connectionContext import ConnectionContext, DbVersions
75from .exceptions import CassandraMissingError
76from .partitioner import Partitioner
77from .sessionFactory import SessionContext, SessionFactory
78
79if TYPE_CHECKING:
80 from ..apdbMetadata import ApdbMetadata
81 from ..apdbUpdateRecord import ApdbUpdateRecord
82
83_LOG = logging.getLogger(__name__)
84
85_MON = MonAgent(__name__)
86
87VERSION = VersionTuple(1, 2, 0)
88"""Version for the code controlling non-replication tables. This needs to be
89updated following compatibility rules when schema produced by this code
90changes.
91"""
92
93
95 """Implementation of APDB database with Apache Cassandra backend.
96
97 Parameters
98 ----------
99 config : `ApdbCassandraConfig`
100 Configuration object.
101 """
102
103 def __init__(self, config: ApdbCassandraConfig):
104 if not CASSANDRA_IMPORTED:
106
107 self._config = config
108 self._keyspace = config.keyspace
109 self._schema = ApdbSchema(config.schema_file, config.schema_name)
110
112 self._connection_context: ConnectionContext | None = None
113
114 @property
115 def _context(self) -> ConnectionContext:
116 """Establish connection if not established and return context."""
117 if self._connection_context is None:
118 session = self._session_factory.session()
119 self._connection_context = ConnectionContext(session, self._config, self._schema.tableSchemas)
120
121 # Check version compatibility
122 current_versions = DbVersions(
123 schema_version=self._schema.schemaVersion(),
124 code_version=self.apdbImplementationVersion(),
125 replica_version=(
126 ApdbCassandraReplica.apdbReplicaImplementationVersion()
127 if self._connection_context.config.enable_replica
128 else None
129 ),
130 )
131 _LOG.debug("Current versions: %s", current_versions)
132 self._versionCheck(current_versions, self._connection_context.db_versions)
133
134 if _LOG.isEnabledFor(logging.DEBUG):
135 _LOG.debug("ApdbCassandra Configuration: %s", self._connection_context.config.model_dump())
136
137 return self._connection_context
138
139 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
140 """Create `Timer` instance given its name."""
141 return Timer(name, _MON, tags=tags)
142
143 def _versionCheck(self, current_versions: DbVersions, db_versions: DbVersions) -> None:
144 """Check schema version compatibility."""
145 if not current_versions.schema_version.checkCompatibility(db_versions.schema_version):
147 f"Configured schema version {current_versions.schema_version} "
148 f"is not compatible with database version {db_versions.schema_version}"
149 )
150 if not current_versions.code_version.checkCompatibility(db_versions.code_version):
152 f"Current code version {current_versions.code_version} "
153 f"is not compatible with database version {db_versions.code_version}"
154 )
155
156 # Check replica code version only if replica is enabled.
157 match current_versions.replica_version, db_versions.replica_version:
158 case None, None:
159 pass
160 case VersionTuple() as current, VersionTuple() as stored:
161 if not current.checkCompatibility(stored):
163 f"Current replication code version {current} "
164 f"is not compatible with database version {stored}"
165 )
166 case _:
168 f"Current replication code version {current_versions.replica_version} "
169 f"is not compatible with database version {db_versions.replica_version}"
170 )
171
172 @classmethod
173 def apdbImplementationVersion(cls) -> VersionTuple:
174 """Return version number for current APDB implementation.
175
176 Returns
177 -------
178 version : `VersionTuple`
179 Version of the code defined in implementation class.
180 """
181 return VERSION
182
183 def getConfig(self) -> ApdbCassandraConfig:
184 # docstring is inherited from a base class
185 return self._context.config
186
187 def tableDef(self, table: ApdbTables) -> Table | None:
188 # docstring is inherited from a base class
189 return self._schema.tableSchemas.get(table)
190
191 @classmethod
193 cls,
194 hosts: tuple[str, ...],
195 keyspace: str,
196 *,
197 schema_file: str | None = None,
198 schema_name: str | None = None,
199 read_sources_months: int | None = None,
200 read_forced_sources_months: int | None = None,
201 enable_replica: bool = False,
202 replica_skips_diaobjects: bool = False,
203 port: int | None = None,
204 username: str | None = None,
205 prefix: str | None = None,
206 part_pixelization: str | None = None,
207 part_pix_level: int | None = None,
208 time_partition_tables: bool = True,
209 time_partition_start: str | None = None,
210 time_partition_end: str | None = None,
211 read_consistency: str | None = None,
212 write_consistency: str | None = None,
213 read_timeout: int | None = None,
214 write_timeout: int | None = None,
215 ra_dec_columns: tuple[str, str] | None = None,
216 replication_factor: int | None = None,
217 drop: bool = False,
218 table_options: CreateTableOptions | None = None,
219 ) -> ApdbCassandraConfig:
220 """Initialize new APDB instance and make configuration object for it.
221
222 Parameters
223 ----------
224 hosts : `tuple` [`str`, ...]
225 List of host names or IP addresses for Cassandra cluster.
226 keyspace : `str`
227 Name of the keyspace for APDB tables.
228 schema_file : `str`, optional
229 Location of (YAML) configuration file with APDB schema. If not
230 specified then default location will be used.
231 schema_name : `str`, optional
232 Name of the schema in YAML configuration file. If not specified
233 then default name will be used.
234 read_sources_months : `int`, optional
235 Number of months of history to read from DiaSource.
236 read_forced_sources_months : `int`, optional
237 Number of months of history to read from DiaForcedSource.
238 enable_replica : `bool`, optional
239 If True, make additional tables used for replication to PPDB.
240 replica_skips_diaobjects : `bool`, optional
241 If `True` then do not fill regular ``DiaObject`` table when
242 ``enable_replica`` is `True`.
243 port : `int`, optional
244 Port number to use for Cassandra connections.
245 username : `str`, optional
246 User name for Cassandra connections.
247 prefix : `str`, optional
248 Optional prefix for all table names.
249 part_pixelization : `str`, optional
250 Name of the MOC pixelization used for partitioning.
251 part_pix_level : `int`, optional
252 Pixelization level.
253 time_partition_tables : `bool`, optional
254 Create per-partition tables.
255 time_partition_start : `str`, optional
256 Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
257 format, in TAI.
258 time_partition_end : `str`, optional
259 Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
260 format, in TAI.
261 read_consistency : `str`, optional
262 Name of the consistency level for read operations.
263 write_consistency : `str`, optional
264 Name of the consistency level for write operations.
265 read_timeout : `int`, optional
266 Read timeout in seconds.
267 write_timeout : `int`, optional
268 Write timeout in seconds.
269 ra_dec_columns : `tuple` [`str`, `str`], optional
270 Names of ra/dec columns in DiaObject table.
271 replication_factor : `int`, optional
272 Replication factor used when creating new keyspace, if keyspace
273 already exists its replication factor is not changed.
274 drop : `bool`, optional
275 If `True` then drop existing tables before re-creating the schema.
276 table_options : `CreateTableOptions`, optional
277 Options used when creating Cassandra tables.
278
279 Returns
280 -------
281 config : `ApdbCassandraConfig`
282 Resulting configuration object for a created APDB instance.
283 """
284 # Some non-standard defaults for connection parameters, these can be
285 # changed later in generated config. Check Cassandra driver
286 # documentation for what these parameters do. These parameters are not
287 # used during database initialization, but they will be saved with
288 # generated config.
289 connection_config = ApdbCassandraConnectionConfig(
290 extra_parameters={
291 "idle_heartbeat_interval": 0,
292 "idle_heartbeat_timeout": 30,
293 "control_connection_timeout": 100,
294 },
295 )
296 config = ApdbCassandraConfig(
297 contact_points=hosts,
298 keyspace=keyspace,
299 enable_replica=enable_replica,
300 replica_skips_diaobjects=replica_skips_diaobjects,
301 connection_config=connection_config,
302 )
303 config.partitioning.time_partition_tables = time_partition_tables
304 if schema_file is not None:
305 config.schema_file = schema_file
306 if schema_name is not None:
307 config.schema_name = schema_name
308 if read_sources_months is not None:
309 config.read_sources_months = read_sources_months
310 if read_forced_sources_months is not None:
311 config.read_forced_sources_months = read_forced_sources_months
312 if port is not None:
313 config.connection_config.port = port
314 if username is not None:
315 config.connection_config.username = username
316 if prefix is not None:
317 config.prefix = prefix
318 if part_pixelization is not None:
319 config.partitioning.part_pixelization = part_pixelization
320 if part_pix_level is not None:
321 config.partitioning.part_pix_level = part_pix_level
322 if time_partition_start is not None:
323 config.partitioning.time_partition_start = time_partition_start
324 if time_partition_end is not None:
325 config.partitioning.time_partition_end = time_partition_end
326 if read_consistency is not None:
327 config.connection_config.read_consistency = read_consistency
328 if write_consistency is not None:
329 config.connection_config.write_consistency = write_consistency
330 if read_timeout is not None:
331 config.connection_config.read_timeout = read_timeout
332 if write_timeout is not None:
333 config.connection_config.write_timeout = write_timeout
334 if ra_dec_columns is not None:
335 config.ra_dec_columns = ra_dec_columns
336
337 cls._makeSchema(config, drop=drop, replication_factor=replication_factor, table_options=table_options)
338
339 return config
340
341 def get_replica(self) -> ApdbCassandraReplica:
342 """Return `ApdbReplica` instance for this database."""
343 # Note that this instance has to stay alive while replica exists, so
344 # we pass reference to self.
345 return ApdbCassandraReplica(self)
346
347 @classmethod
349 cls,
350 config: ApdbConfig,
351 *,
352 drop: bool = False,
353 replication_factor: int | None = None,
354 table_options: CreateTableOptions | None = None,
355 ) -> None:
356 # docstring is inherited from a base class
357
358 if not isinstance(config, ApdbCassandraConfig):
359 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
360
361 simple_schema = ApdbSchema(config.schema_file, config.schema_name)
362
363 with SessionContext(config) as session:
364 schema = ApdbCassandraSchema(
365 session=session,
366 keyspace=config.keyspace,
367 table_schemas=simple_schema.tableSchemas,
368 prefix=config.prefix,
369 time_partition_tables=config.partitioning.time_partition_tables,
370 enable_replica=config.enable_replica,
371 replica_skips_diaobjects=config.replica_skips_diaobjects,
372 )
373
374 # Ask schema to create all tables.
375 part_range_config: ApdbCassandraTimePartitionRange | None = None
376 if config.partitioning.time_partition_tables:
377 partitioner = Partitioner(config)
378 time_partition_start = astropy.time.Time(
379 config.partitioning.time_partition_start, format="isot", scale="tai"
380 )
381 time_partition_end = astropy.time.Time(
382 config.partitioning.time_partition_end, format="isot", scale="tai"
383 )
384 part_range_config = ApdbCassandraTimePartitionRange(
385 start=partitioner.time_partition(time_partition_start),
386 end=partitioner.time_partition(time_partition_end),
387 )
388 schema.makeSchema(
389 drop=drop,
390 part_range=part_range_config,
391 replication_factor=replication_factor,
392 table_options=table_options,
393 )
394 else:
395 schema.makeSchema(
396 drop=drop, replication_factor=replication_factor, table_options=table_options
397 )
398
399 meta_table_name = ApdbTables.metadata.table_name(config.prefix)
400 metadata = ApdbMetadataCassandra(
401 session, meta_table_name, config.keyspace, "read_tuples", "write"
402 )
403
404 # Fill version numbers, overrides if they existed before.
405 metadata.set(
406 ConnectionContext.metadataSchemaVersionKey, str(simple_schema.schemaVersion()), force=True
407 )
408 metadata.set(
409 ConnectionContext.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True
410 )
411
412 if config.enable_replica:
413 # Only store replica code version if replica is enabled.
414 metadata.set(
415 ConnectionContext.metadataReplicaVersionKey,
416 str(ApdbCassandraReplica.apdbReplicaImplementationVersion()),
417 force=True,
418 )
419
420 # Store frozen part of a configuration in metadata.
421 freezer = ApdbConfigFreezer[ApdbCassandraConfig](ConnectionContext.frozen_parameters)
422 metadata.set(ConnectionContext.metadataConfigKey, freezer.to_json(config), force=True)
423
424 # Store time partition range.
425 if part_range_config:
426 part_range_config.save_to_meta(metadata)
427
428 def getDiaObjects(self, region: sphgeom.Region) -> pandas.DataFrame:
429 # docstring is inherited from a base class
430 context = self._context
431 config = context.config
432
433 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=True)
434 _LOG.debug("getDiaObjects: #partitions: %s", len(sp_where))
435
436 # We need to exclude extra partitioning columns from result.
437 column_names = context.schema.apdbColumnNames(ApdbTables.DiaObjectLast)
438 what = ",".join(quote_id(column) for column in column_names)
439
440 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
441 query = f'SELECT {what} from "{self._keyspace}"."{table_name}"'
442 statements: list[tuple] = []
443 for where, params in sp_where:
444 full_query = f"{query} WHERE {where}"
445 if params:
446 statement = context.preparer.prepare(full_query)
447 else:
448 # If there are no params then it is likely that query has a
449 # bunch of literals rendered already, no point trying to
450 # prepare it because it's not reusable.
451 statement = cassandra.query.SimpleStatement(full_query)
452 statements.append((statement, params))
453 _LOG.debug("getDiaObjects: #queries: %s", len(statements))
454
455 with _MON.context_tags({"table": "DiaObject"}):
456 _MON.add_record(
457 "select_query_stats", values={"num_sp_part": num_sp_part, "num_queries": len(statements)}
458 )
459 with self._timer("select_time") as timer:
460 objects = cast(
461 pandas.DataFrame,
462 select_concurrent(
463 context.session,
464 statements,
465 "read_pandas_multi",
466 config.connection_config.read_concurrency,
467 ),
468 )
469 timer.add_values(row_count=len(objects))
470
471 _LOG.debug("found %s DiaObjects", objects.shape[0])
472 return objects
473
475 self,
476 region: sphgeom.Region,
477 object_ids: Iterable[int] | None,
478 visit_time: astropy.time.Time,
479 start_time: astropy.time.Time | None = None,
480 ) -> pandas.DataFrame | None:
481 # docstring is inherited from a base class
482 context = self._context
483 config = context.config
484
485 months = config.read_sources_months
486 if start_time is None and months == 0:
487 return None
488
489 mjd_end = float(visit_time.tai.mjd)
490 if start_time is None:
491 mjd_start = mjd_end - months * 30
492 else:
493 mjd_start = float(start_time.tai.mjd)
494
495 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
496
498 self,
499 region: sphgeom.Region,
500 object_ids: Iterable[int] | None,
501 visit_time: astropy.time.Time,
502 start_time: astropy.time.Time | None = None,
503 ) -> pandas.DataFrame | None:
504 # docstring is inherited from a base class
505 context = self._context
506 config = context.config
507
508 months = config.read_forced_sources_months
509 if start_time is None and months == 0:
510 return None
511
512 mjd_end = float(visit_time.tai.mjd)
513 if start_time is None:
514 mjd_start = mjd_end - months * 30
515 else:
516 mjd_start = float(start_time.tai.mjd)
517
518 return self._getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
519
521 self,
522 visit: int,
523 detector: int,
524 region: sphgeom.Region,
525 visit_time: astropy.time.Time,
526 ) -> bool:
527 # docstring is inherited from a base class
528 context = self._context
529 config = context.config
530
531 # If ApdbDetectorVisit table exists just check it.
532 if context.has_visit_detector_table:
533 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
534 query = (
535 f'SELECT count(*) FROM "{self._keyspace}"."{table_name}" WHERE visit = %s AND detector = %s'
536 )
537 with self._timer("contains_visit_detector_time"):
538 result = context.session.execute(query, (visit, detector))
539 return bool(result.one()[0])
540
541 # The order of checks corresponds to order in store(), on potential
542 # store failure earlier tables have higher probability containing
543 # stored records. With per-partition tables there will be many tables
544 # in the list, but it is unlikely that we'll use that setup in
545 # production.
546 sp_where, _ = context.partitioner.spatial_where(region, use_ranges=True, for_prepare=True)
547 visit_detector_where = ("visit = ? AND detector = ?", (visit, detector))
548
549 # Sources are partitioned on their midPointMjdTai. To avoid precision
550 # issues add some fuzzines to visit time.
551 mjd_start = float(visit_time.tai.mjd) - 1.0 / 24
552 mjd_end = float(visit_time.tai.mjd) + 1.0 / 24
553
554 statements: list[tuple] = []
555 for table_type in ApdbTables.DiaSource, ApdbTables.DiaForcedSource:
556 tables, temporal_where = context.partitioner.temporal_where(
557 table_type, mjd_start, mjd_end, query_per_time_part=True, for_prepare=True
558 )
559 for table in tables:
560 prefix = f'SELECT apdb_part FROM "{self._keyspace}"."{table}"'
561 # Needs ALLOW FILTERING as there is no PK constraint.
562 suffix = "PER PARTITION LIMIT 1 LIMIT 1 ALLOW FILTERING"
563 statements += list(
564 self._combine_where(prefix, sp_where, temporal_where, visit_detector_where, suffix)
565 )
566
567 with self._timer("contains_visit_detector_time"):
568 result = cast(
569 list[tuple[int] | None],
570 select_concurrent(
571 context.session,
572 statements,
573 "read_tuples",
574 config.connection_config.read_concurrency,
575 ),
576 )
577 return bool(result)
578
579 def getSSObjects(self) -> pandas.DataFrame:
580 # docstring is inherited from a base class
581 context = self._context
582
583 tableName = context.schema.tableName(ApdbTables.SSObject)
584 query = f'SELECT * from "{self._keyspace}"."{tableName}"'
585
586 objects = None
587 with self._timer("select_time", tags={"table": "SSObject"}) as timer:
588 result = context.session.execute(query, execution_profile="read_pandas")
589 objects = result._current_rows
590 timer.add_values(row_count=len(objects))
591
592 _LOG.debug("found %s SSObjects", objects.shape[0])
593 return objects
594
595 def store(
596 self,
597 visit_time: astropy.time.Time,
598 objects: pandas.DataFrame,
599 sources: pandas.DataFrame | None = None,
600 forced_sources: pandas.DataFrame | None = None,
601 ) -> None:
602 # docstring is inherited from a base class
603 context = self._context
604 config = context.config
605
606 if context.has_visit_detector_table:
607 # Store visit/detector in a special table, this has to be done
608 # before all other writes so if there is a failure at any point
609 # later we still have a record for attempted write.
610 visit_detector: set[tuple[int, int]] = set()
611 for df in sources, forced_sources:
612 if df is not None and not df.empty:
613 df = df[["visit", "detector"]]
614 for visit, detector in df.itertuples(index=False):
615 visit_detector.add((visit, detector))
616
617 if visit_detector:
618 # Typically there is only one entry, do not bother with
619 # concurrency.
620 table_name = context.schema.tableName(ExtraTables.ApdbVisitDetector)
621 query = f'INSERT INTO "{self._keyspace}"."{table_name}" (visit, detector) VALUES (%s, %s)'
622 for item in visit_detector:
623 context.session.execute(query, item, execution_profile="write")
624
625 objects = self._fix_input_timestamps(objects)
626 if sources is not None:
627 sources = self._fix_input_timestamps(sources)
628 if forced_sources is not None:
629 forced_sources = self._fix_input_timestamps(forced_sources)
630
631 replica_chunk: ReplicaChunk | None = None
632 if context.schema.replication_enabled:
633 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, config.replica_chunk_seconds)
634 self._storeReplicaChunk(replica_chunk)
635
636 # fill region partition column for DiaObjects
637 objects = self._add_apdb_part(objects)
638 self._storeDiaObjects(objects, visit_time, replica_chunk)
639
640 if sources is not None and len(sources) > 0:
641 # copy apdb_part column from DiaObjects to DiaSources
642 sources = self._add_apdb_part(sources)
643 subchunk = self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
644 self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk, subchunk)
645
646 if forced_sources is not None and len(forced_sources) > 0:
647 forced_sources = self._add_apdb_part(forced_sources)
648 self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
649
650 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
651 # docstring is inherited from a base class
652 objects = self._fix_input_timestamps(objects)
653 self._storeObjectsPandas(objects, ApdbTables.SSObject)
654
655 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
656 # docstring is inherited from a base class
657 context = self._context
658 config = context.config
659
660 if self._schema.has_mjd_timestamps:
661 reassign_time_column = "ssObjectReassocTimeMjdTai"
662 reassignTime = float(astropy.time.Time.now().tai.mjd)
663 else:
664 reassign_time_column = "ssObjectReassocTime"
665 # Current time as milliseconds since epoch.
666 reassignTime = int(datetime.datetime.now(tz=datetime.UTC).timestamp() * 1000)
667
668 # To update a record we need to know its exact primary key (including
669 # partition key) so we start by querying for diaSourceId to find the
670 # primary keys.
671
672 table_name = context.schema.tableName(ExtraTables.DiaSourceToPartition)
673 # split it into 1k IDs per query
674 selects: list[tuple] = []
675 for ids in chunk_iterable(idMap.keys(), 1_000):
676 ids_str = ",".join(str(item) for item in ids)
677 selects.append(
678 (
679 (
680 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "apdb_replica_chunk" '
681 f'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
682 ),
683 {},
684 )
685 )
686
687 # No need for DataFrame here, read data as tuples.
688 result = cast(
689 list[tuple[int, int, int, int | None]],
690 select_concurrent(
691 context.session, selects, "read_tuples", config.connection_config.read_concurrency
692 ),
693 )
694
695 # Make mapping from source ID to its partition.
696 id2partitions: dict[int, tuple[int, int]] = {}
697 id2chunk_id: dict[int, int] = {}
698 for row in result:
699 id2partitions[row[0]] = row[1:3]
700 if row[3] is not None:
701 id2chunk_id[row[0]] = row[3]
702
703 # make sure we know partitions for each ID
704 if set(id2partitions) != set(idMap):
705 missing = ",".join(str(item) for item in set(idMap) - set(id2partitions))
706 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
707
708 # Reassign in standard tables
709 queries: list[tuple[cassandra.query.PreparedStatement, tuple]] = []
710 for diaSourceId, ssObjectId in idMap.items():
711 apdb_part, apdb_time_part = id2partitions[diaSourceId]
712 values: tuple
713 if config.partitioning.time_partition_tables:
714 table_name = context.schema.tableName(ApdbTables.DiaSource, apdb_time_part)
715 query = (
716 f'UPDATE "{self._keyspace}"."{table_name}"'
717 f' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
718 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
719 )
720 values = (ssObjectId, reassignTime, apdb_part, diaSourceId)
721 else:
722 table_name = context.schema.tableName(ApdbTables.DiaSource)
723 query = (
724 f'UPDATE "{self._keyspace}"."{table_name}"'
725 f' SET "ssObjectId" = ?, "diaObjectId" = NULL, "{reassign_time_column}" = ?'
726 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
727 )
728 values = (ssObjectId, reassignTime, apdb_part, apdb_time_part, diaSourceId)
729 queries.append((context.preparer.prepare(query), values))
730
731 # TODO: (DM-50190) Replication for updated records is not implemented.
732 if id2chunk_id:
733 warnings.warn("Replication of reassigned DiaSource records is not implemented.", stacklevel=2)
734
735 _LOG.debug("%s: will update %d records", table_name, len(idMap))
736 with self._timer("source_reassign_time") as timer:
737 execute_concurrent(context.session, queries, execution_profile="write")
738 timer.add_values(source_count=len(idMap))
739
740 def dailyJob(self) -> None:
741 # docstring is inherited from a base class
742 pass
743
744 def countUnassociatedObjects(self) -> int:
745 # docstring is inherited from a base class
746
747 # It's too inefficient to implement it for Cassandra in current schema.
748 raise NotImplementedError()
749
750 @property
751 def metadata(self) -> ApdbMetadata:
752 # docstring is inherited from a base class
753 context = self._context
754 return context.metadata
755
756 @property
757 def admin(self) -> ApdbCassandraAdmin:
758 # docstring is inherited from a base class
759 return ApdbCassandraAdmin(self)
760
762 self,
763 region: sphgeom.Region,
764 object_ids: Iterable[int] | None,
765 mjd_start: float,
766 mjd_end: float,
767 table_name: ApdbTables,
768 ) -> pandas.DataFrame:
769 """Return catalog of DiaSource instances given set of DiaObject IDs.
770
771 Parameters
772 ----------
773 region : `lsst.sphgeom.Region`
774 Spherical region.
775 object_ids :
776 Collection of DiaObject IDs
777 mjd_start : `float`
778 Lower bound of time interval.
779 mjd_end : `float`
780 Upper bound of time interval.
781 table_name : `ApdbTables`
782 Name of the table.
783
784 Returns
785 -------
786 catalog : `pandas.DataFrame`, or `None`
787 Catalog containing DiaSource records. Empty catalog is returned if
788 ``object_ids`` is empty.
789 """
790 context = self._context
791 config = context.config
792
793 object_id_set: Set[int] = set()
794 if object_ids is not None:
795 object_id_set = set(object_ids)
796 if len(object_id_set) == 0:
797 return self._make_empty_catalog(table_name)
798
799 sp_where, num_sp_part = context.partitioner.spatial_where(region, for_prepare=True)
800 tables, temporal_where = context.partitioner.temporal_where(
801 table_name, mjd_start, mjd_end, for_prepare=True, partitons_range=context.time_partitions_range
802 )
803 if not tables:
804 start = astropy.time.Time(mjd_start, format="mjd", scale="tai")
805 end = astropy.time.Time(mjd_end, format="mjd", scale="tai")
806 warnings.warn(
807 f"Query time range ({start.isot} - {end.isot}) does not overlap database time partitions."
808 )
809
810 # We need to exclude extra partitioning columns from result.
811 column_names = context.schema.apdbColumnNames(table_name)
812 what = ",".join(quote_id(column) for column in column_names)
813
814 # Build all queries
815 statements: list[tuple] = []
816 for table in tables:
817 prefix = f'SELECT {what} from "{self._keyspace}"."{table}"'
818 statements += list(self._combine_where(prefix, sp_where, temporal_where))
819 _LOG.debug("_getSources %s: #queries: %s", table_name, len(statements))
820
821 with _MON.context_tags({"table": table_name.name}):
822 _MON.add_record(
823 "select_query_stats", values={"num_sp_part": num_sp_part, "num_queries": len(statements)}
824 )
825 with self._timer("select_time") as timer:
826 catalog = cast(
827 pandas.DataFrame,
828 select_concurrent(
829 context.session,
830 statements,
831 "read_pandas_multi",
832 config.connection_config.read_concurrency,
833 ),
834 )
835 timer.add_values(row_count_from_db=len(catalog))
836
837 # filter by given object IDs
838 if len(object_id_set) > 0:
839 catalog = cast(pandas.DataFrame, catalog[catalog["diaObjectId"].isin(object_id_set)])
840
841 # precise filtering on midpointMjdTai
842 catalog = cast(pandas.DataFrame, catalog[catalog["midpointMjdTai"] > mjd_start])
843
844 timer.add_values(row_count=len(catalog))
845
846 _LOG.debug("found %d %ss", catalog.shape[0], table_name.name)
847 return catalog
848
849 def _storeReplicaChunk(self, replica_chunk: ReplicaChunk) -> None:
850 context = self._context
851 config = context.config
852
853 # Cassandra timestamp uses milliseconds since epoch
854 timestamp = int(replica_chunk.last_update_time.unix_tai * 1000)
855
856 # everything goes into a single partition
857 partition = 0
858
859 table_name = context.schema.tableName(ExtraTables.ApdbReplicaChunks)
860
861 columns = ["partition", "apdb_replica_chunk", "last_update_time", "unique_id"]
862 values = [partition, replica_chunk.id, timestamp, replica_chunk.unique_id]
863 if context.has_chunk_sub_partitions:
864 columns.append("has_subchunks")
865 values.append(True)
866
867 column_list = ", ".join(columns)
868 placeholders = ",".join(["%s"] * len(columns))
869 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ({column_list}) VALUES ({placeholders})'
870
871 context.session.execute(
872 query,
873 values,
874 timeout=config.connection_config.write_timeout,
875 execution_profile="write",
876 )
877
878 def _queryDiaObjectLastPartitions(self, ids: Iterable[int]) -> Mapping[int, int]:
879 """Return existing mapping of diaObjectId to its last partition."""
880 context = self._context
881 config = context.config
882
883 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
884 queries = []
885 object_count = 0
886 for id_chunk in chunk_iterable(ids, 10_000):
887 id_chunk_list = list(id_chunk)
888 query = (
889 f'SELECT "diaObjectId", apdb_part FROM "{self._keyspace}"."{table_name}" '
890 f'WHERE "diaObjectId" in ({",".join(str(oid) for oid in id_chunk_list)})'
891 )
892 queries.append((query, ()))
893 object_count += len(id_chunk_list)
894
895 with self._timer("query_object_last_partitions") as timer:
896 data = cast(
897 ApdbTableData,
898 select_concurrent(
899 context.session,
900 queries,
901 "read_raw_multi",
902 config.connection_config.read_concurrency,
903 ),
904 )
905 timer.add_values(object_count=object_count, row_count=len(data.rows()))
906
907 if data.column_names() != ["diaObjectId", "apdb_part"]:
908 raise RuntimeError(f"Unexpected column names in query result: {data.column_names()}")
909
910 return {row[0]: row[1] for row in data.rows()}
911
912 def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
913 """Objects in DiaObjectsLast can move from one spatial partition to
914 another. For those objects inserting new version does not replace old
915 one, so we need to explicitly remove old versions before inserting new
916 ones.
917 """
918 context = self._context
919
920 # Extract all object IDs.
921 new_partitions = dict(zip(objs["diaObjectId"], objs["apdb_part"]))
922 old_partitions = self._queryDiaObjectLastPartitions(objs["diaObjectId"])
923
924 moved_oids: dict[int, tuple[int, int]] = {}
925 for oid, old_part in old_partitions.items():
926 new_part = new_partitions.get(oid, old_part)
927 if new_part != old_part:
928 moved_oids[oid] = (old_part, new_part)
929 _LOG.debug("DiaObject IDs that moved to new partition: %s", moved_oids)
930
931 if moved_oids:
932 # Delete old records from DiaObjectLast.
933 table_name = context.schema.tableName(ApdbTables.DiaObjectLast)
934 query = f'DELETE FROM "{self._keyspace}"."{table_name}" WHERE apdb_part = ? AND "diaObjectId" = ?'
935 statement = context.preparer.prepare(query)
936 queries = []
937 for oid, (old_part, _) in moved_oids.items():
938 queries.append((statement, (old_part, oid)))
939 with self._timer("delete_object_last") as timer:
940 execute_concurrent(context.session, queries, execution_profile="write")
941 timer.add_values(row_count=len(moved_oids))
942
943 # Add all new records to the map.
944 table_name = context.schema.tableName(ExtraTables.DiaObjectLastToPartition)
945 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ("diaObjectId", apdb_part) VALUES (?,?)'
946 statement = context.preparer.prepare(query)
947
948 queries = []
949 for oid, new_part in new_partitions.items():
950 queries.append((statement, (oid, new_part)))
951
952 with self._timer("update_object_last_partition") as timer:
953 execute_concurrent(context.session, queries, execution_profile="write")
954 timer.add_values(row_count=len(queries))
955
957 self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
958 ) -> None:
959 """Store catalog of DiaObjects from current visit.
960
961 Parameters
962 ----------
963 objs : `pandas.DataFrame`
964 Catalog with DiaObject records
965 visit_time : `astropy.time.Time`
966 Time of the current visit.
967 replica_chunk : `ReplicaChunk` or `None`
968 Replica chunk identifier if replication is configured.
969 """
970 if len(objs) == 0:
971 _LOG.debug("No objects to write to database.")
972 return
973
974 context = self._context
975 config = context.config
976
977 if context.has_dia_object_last_to_partition:
978 self._deleteMovingObjects(objs)
979
980 timestamp: float | datetime.datetime
981 if self._schema.has_mjd_timestamps:
982 validity_start_column = "validityStartMjdTai"
983 timestamp = float(visit_time.tai.mjd)
984 else:
985 validity_start_column = "validityStart"
986 timestamp = visit_time.datetime
987
988 # DiaObjectLast did not have this column in the past.
989 extra_columns: dict[str, Any] = {}
990 if context.schema.check_column(ApdbTables.DiaObjectLast, validity_start_column):
991 extra_columns[validity_start_column] = timestamp
992
993 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
994
995 extra_columns[validity_start_column] = timestamp
996 visit_time_part = context.partitioner.time_partition(visit_time)
997 time_part: int | None = visit_time_part
998 if (time_partitions_range := context.time_partitions_range) is not None:
999 self._check_time_partitions([visit_time_part], time_partitions_range)
1000 if not config.partitioning.time_partition_tables:
1001 extra_columns["apdb_time_part"] = time_part
1002 time_part = None
1003
1004 # Only store DiaObects if not doing replication or explicitly
1005 # configured to always store them.
1006 if replica_chunk is None or not config.replica_skips_diaobjects:
1008 objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part
1009 )
1010
1011 if replica_chunk is not None:
1012 extra_columns = {"apdb_replica_chunk": replica_chunk.id, validity_start_column: timestamp}
1013 table = ExtraTables.DiaObjectChunks
1014 if context.has_chunk_sub_partitions:
1015 table = ExtraTables.DiaObjectChunks2
1016 # Use a random number for a second part of partitioning key so
1017 # that different clients could wrtite to different partitions.
1018 # This makes it not exactly reproducible.
1019 extra_columns["apdb_replica_subchunk"] = random.randrange(config.replica_sub_chunk_count)
1020 self._storeObjectsPandas(objs, table, extra_columns=extra_columns)
1021
1023 self,
1024 table_name: ApdbTables,
1025 sources: pandas.DataFrame,
1026 replica_chunk: ReplicaChunk | None,
1027 ) -> int | None:
1028 """Store catalog of DIASources or DIAForcedSources from current visit.
1029
1030 Parameters
1031 ----------
1032 table_name : `ApdbTables`
1033 Table where to store the data.
1034 sources : `pandas.DataFrame`
1035 Catalog containing DiaSource records
1036 visit_time : `astropy.time.Time`
1037 Time of the current visit.
1038 replica_chunk : `ReplicaChunk` or `None`
1039 Replica chunk identifier if replication is configured.
1040
1041 Returns
1042 -------
1043 subchunk : `int` or `None`
1044 Subchunk number for resulting replica data, `None` if relication is
1045 not enabled ot subchunking is not enabled.
1046 """
1047 context = self._context
1048 config = context.config
1049
1050 # Time partitioning has to be based on midpointMjdTai, not visit_time
1051 # as visit_time is not really a visit time.
1052 tp_sources = sources.copy(deep=False)
1053 tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(context.partitioner.time_partition)
1054 if (time_partitions_range := context.time_partitions_range) is not None:
1055 self._check_time_partitions(tp_sources["apdb_time_part"], time_partitions_range)
1056 extra_columns: dict[str, Any] = {}
1057 if not config.partitioning.time_partition_tables:
1058 self._storeObjectsPandas(tp_sources, table_name)
1059 else:
1060 # Group by time partition
1061 partitions = set(tp_sources["apdb_time_part"])
1062 if len(partitions) == 1:
1063 # Single partition - just save the whole thing.
1064 time_part = partitions.pop()
1065 self._storeObjectsPandas(sources, table_name, time_part=time_part)
1066 else:
1067 # group by time partition.
1068 for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
1069 sub_frame.drop(columns="apdb_time_part", inplace=True)
1070 self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1071
1072 subchunk: int | None = None
1073 if replica_chunk is not None:
1074 extra_columns = {"apdb_replica_chunk": replica_chunk.id}
1075 if context.has_chunk_sub_partitions:
1076 subchunk = random.randrange(config.replica_sub_chunk_count)
1077 extra_columns["apdb_replica_subchunk"] = subchunk
1078 if table_name is ApdbTables.DiaSource:
1079 extra_table = ExtraTables.DiaSourceChunks2
1080 else:
1081 extra_table = ExtraTables.DiaForcedSourceChunks2
1082 else:
1083 if table_name is ApdbTables.DiaSource:
1084 extra_table = ExtraTables.DiaSourceChunks
1085 else:
1086 extra_table = ExtraTables.DiaForcedSourceChunks
1087 self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1088
1089 return subchunk
1090
1092 self, partitions: Iterable[int], time_partitions_range: ApdbCassandraTimePartitionRange
1093 ) -> None:
1094 """Check that time partitons for new data actually exist.
1095
1096 Parameters
1097 ----------
1098 partitions : `~collections.abc.Iterable` [`int`]
1099 Time partitions for new data.
1100 time_partitions_range : `ApdbCassandraTimePartitionRange`
1101 Currrent time partition range.
1102 """
1103 partitions = set(partitions)
1104 min_part = min(partitions)
1105 max_part = max(partitions)
1106 if min_part < time_partitions_range.start or max_part > time_partitions_range.end:
1107 raise ValueError(
1108 "Attempt to store data for time partitions that do not yet exist. "
1109 f"Partitons for new records: {min_part}-{max_part}. "
1110 f"Database partitons: {time_partitions_range.start}-{time_partitions_range.end}."
1111 )
1112 # Make a noise when writing to the last partition.
1113 if max_part == time_partitions_range.end:
1114 warnings.warn(
1115 "Writing into the last temporal partition. Partition range needs to be extended soon.",
1116 stacklevel=3,
1117 )
1118
1120 self,
1121 sources: pandas.DataFrame,
1122 visit_time: astropy.time.Time,
1123 replica_chunk: ReplicaChunk | None,
1124 subchunk: int | None,
1125 ) -> None:
1126 """Store mapping of diaSourceId to its partitioning values.
1127
1128 Parameters
1129 ----------
1130 sources : `pandas.DataFrame`
1131 Catalog containing DiaSource records
1132 visit_time : `astropy.time.Time`
1133 Time of the current visit.
1134 replica_chunk : `ReplicaChunk` or `None`
1135 Replication chunk, or `None` when replication is disabled.
1136 subchunk : `int` or `None`
1137 Replication sub-chunk, or `None` when replication is disabled or
1138 sub-chunking is not used.
1139 """
1140 context = self._context
1141
1142 id_map = cast(pandas.DataFrame, sources[["diaSourceId", "apdb_part"]])
1143 extra_columns = {
1144 "apdb_time_part": context.partitioner.time_partition(visit_time),
1145 "apdb_replica_chunk": replica_chunk.id if replica_chunk is not None else None,
1146 }
1147 if context.has_chunk_sub_partitions:
1148 extra_columns["apdb_replica_subchunk"] = subchunk
1149
1151 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=None
1152 )
1153
1155 self,
1156 records: pandas.DataFrame,
1157 table_name: ApdbTables | ExtraTables,
1158 extra_columns: Mapping | None = None,
1159 time_part: int | None = None,
1160 ) -> None:
1161 """Store generic objects.
1162
1163 Takes Pandas catalog and stores a bunch of records in a table.
1164
1165 Parameters
1166 ----------
1167 records : `pandas.DataFrame`
1168 Catalog containing object records
1169 table_name : `ApdbTables`
1170 Name of the table as defined in APDB schema.
1171 extra_columns : `dict`, optional
1172 Mapping (column_name, column_value) which gives fixed values for
1173 columns in each row, overrides values in ``records`` if matching
1174 columns exist there.
1175 time_part : `int`, optional
1176 If not `None` then insert into a per-partition table.
1177
1178 Notes
1179 -----
1180 If Pandas catalog contains additional columns not defined in table
1181 schema they are ignored. Catalog does not have to contain all columns
1182 defined in a table, but partition and clustering keys must be present
1183 in a catalog or ``extra_columns``.
1184 """
1185 context = self._context
1186
1187 # use extra columns if specified
1188 if extra_columns is None:
1189 extra_columns = {}
1190 extra_fields = list(extra_columns.keys())
1191
1192 # Fields that will come from dataframe.
1193 df_fields = [column for column in records.columns if column not in extra_fields]
1194
1195 column_map = context.schema.getColumnMap(table_name)
1196 # list of columns (as in felis schema)
1197 fields = [column_map[field].name for field in df_fields if field in column_map]
1198 fields += extra_fields
1199
1200 # check that all partitioning and clustering columns are defined
1201 partition_columns = context.schema.partitionColumns(table_name)
1202 required_columns = partition_columns + context.schema.clusteringColumns(table_name)
1203 missing_columns = [column for column in required_columns if column not in fields]
1204 if missing_columns:
1205 raise ValueError(f"Primary key columns are missing from catalog: {missing_columns}")
1206
1207 qfields = [quote_id(field) for field in fields]
1208 qfields_str = ",".join(qfields)
1209
1210 batch_size = self._batch_size(table_name)
1211
1212 with self._timer("insert_build_time", tags={"table": table_name.name}):
1213 # Multi-partition batches are problematic in general, so we want to
1214 # group records in a batch by their partition key.
1215 values_by_key: dict[tuple, list[list]] = defaultdict(list)
1216 for rec in records.itertuples(index=False):
1217 values = []
1218 partitioning_values: dict[str, Any] = {}
1219 for field in df_fields:
1220 if field not in column_map:
1221 continue
1222 value = getattr(rec, field)
1223 if column_map[field].datatype is felis.datamodel.DataType.timestamp:
1224 if isinstance(value, pandas.Timestamp):
1225 value = value.to_pydatetime()
1226 elif value is pandas.NaT:
1227 value = None
1228 else:
1229 # Assume it's seconds since epoch, Cassandra
1230 # datetime is in milliseconds
1231 value = int(value * 1000)
1232 value = literal(value)
1233 values.append(UNSET_VALUE if value is None else value)
1234 if field in partition_columns:
1235 partitioning_values[field] = value
1236 for field in extra_fields:
1237 value = literal(extra_columns[field])
1238 values.append(UNSET_VALUE if value is None else value)
1239 if field in partition_columns:
1240 partitioning_values[field] = value
1241
1242 key = tuple(partitioning_values[field] for field in partition_columns)
1243 values_by_key[key].append(values)
1244
1245 table = context.schema.tableName(table_name, time_part)
1246
1247 holders = ",".join(["?"] * len(qfields))
1248 query = f'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
1249 statement = context.preparer.prepare(query)
1250 # Cassandra has 64k limit on batch size, normally that should be
1251 # enough but some tests generate too many forced sources.
1252 queries = []
1253 for key_values in values_by_key.values():
1254 for values_chunk in chunk_iterable(key_values, batch_size):
1255 batch = cassandra.query.BatchStatement()
1256 for row_values in values_chunk:
1257 batch.add(statement, row_values)
1258 queries.append((batch, None))
1259 assert batch.routing_key is not None and batch.keyspace is not None
1260
1261 _LOG.debug("%s: will store %d records", context.schema.tableName(table_name), records.shape[0])
1262 with self._timer("insert_time", tags={"table": table_name.name}) as timer:
1263 execute_concurrent(context.session, queries, execution_profile="write")
1264 timer.add_values(row_count=len(records), num_batches=len(queries))
1265
1267 self, records: Iterable[ApdbUpdateRecord], chunk: ReplicaChunk, *, store_chunk: bool = False
1268 ) -> None:
1269 """Store ApdbUpdateRecords in the replica table for those records.
1270
1271 Parameters
1272 ----------
1273 records : `list` [`ApdbUpdateRecord`]
1274 Records to store.
1275 chunk : `ReplicaChunk`
1276 Replica chunk for these records.
1277 store_chunk : `bool`
1278 If True then also store replica chunk.
1279
1280 Raises
1281 ------
1282 TypeError
1283 Raised if replication is not enabled for this instance.
1284 """
1285 context = self._context
1286 config = context.config
1287
1288 if not context.schema.replication_enabled:
1289 raise TypeError("Replication is not enabled for this APDB instance.")
1290
1291 if store_chunk:
1292 self._storeReplicaChunk(chunk)
1293
1294 apdb_replica_chunk = chunk.id
1295 # Do not use unique_if from ReplicaChunk as it could be reused in
1296 # multiple calls to this method.
1297 update_unique_id = uuid.uuid4()
1298
1299 rows = []
1300 for record in records:
1301 rows.append(
1302 [
1303 apdb_replica_chunk,
1304 record.update_time_ns,
1305 record.update_order,
1306 update_unique_id,
1307 record.to_json(),
1308 ]
1309 )
1310 columns = [
1311 "apdb_replica_chunk",
1312 "update_time_ns",
1313 "update_order",
1314 "update_unique_id",
1315 "update_payload",
1316 ]
1317 if context.has_chunk_sub_partitions:
1318 subchunk = random.randrange(config.replica_sub_chunk_count)
1319 for row in rows:
1320 row.append(subchunk)
1321 columns.append("apdb_replica_subchunk")
1322
1323 table_name = context.schema.tableName(ExtraTables.ApdbUpdateRecordChunks)
1324 placeholders = ", ".join(["%s"] * len(columns))
1325 columns_str = ", ".join(columns)
1326 query = f'INSERT INTO "{self._keyspace}"."{table_name}" ({columns_str}) VALUES ({placeholders})'
1327 queries = [(query, row) for row in rows]
1328
1329 with self._timer("store_update_record") as timer:
1330 execute_concurrent(context.session, queries, execution_profile="write")
1331 timer.add_values(row_count=len(queries))
1332
1333 def _add_apdb_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
1334 """Calculate spatial partition for each record and add it to a
1335 DataFrame.
1336
1337 Parameters
1338 ----------
1339 df : `pandas.DataFrame`
1340 DataFrame which has to contain ra/dec columns, names of these
1341 columns are defined by configuration ``ra_dec_columns`` field.
1342
1343 Returns
1344 -------
1345 df : `pandas.DataFrame`
1346 DataFrame with ``apdb_part`` column which contains pixel index
1347 for ra/dec coordinates.
1348
1349 Notes
1350 -----
1351 This overrides any existing column in a DataFrame with the same name
1352 (``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
1353 is returned.
1354 """
1355 context = self._context
1356 config = context.config
1357
1358 # Calculate pixelization index for every record.
1359 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
1360 ra_col, dec_col = config.ra_dec_columns
1361 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1362 uv3d = sphgeom.UnitVector3d(sphgeom.LonLat.fromDegrees(ra, dec))
1363 idx = context.partitioner.pixel(uv3d)
1364 apdb_part[i] = idx
1365 df = df.copy()
1366 df["apdb_part"] = apdb_part
1367 return df
1368
1369 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
1370 """Make an empty catalog for a table with a given name.
1371
1372 Parameters
1373 ----------
1374 table_name : `ApdbTables`
1375 Name of the table.
1376
1377 Returns
1378 -------
1379 catalog : `pandas.DataFrame`
1380 An empty catalog.
1381 """
1382 table = self._schema.tableSchemas[table_name]
1383
1384 data = {
1385 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
1386 for columnDef in table.columns
1387 }
1388 return pandas.DataFrame(data)
1389
1391 self,
1392 prefix: str,
1393 where1: list[tuple[str, tuple]],
1394 where2: list[tuple[str, tuple]],
1395 where3: tuple[str, tuple] | None = None,
1396 suffix: str | None = None,
1397 ) -> Iterator[tuple[cassandra.query.Statement, tuple]]:
1398 """Make cartesian product of two parts of WHERE clause into a series
1399 of statements to execute.
1400
1401 Parameters
1402 ----------
1403 prefix : `str`
1404 Initial statement prefix that comes before WHERE clause, e.g.
1405 "SELECT * from Table"
1406 """
1407 context = self._context
1408
1409 # If lists are empty use special sentinels.
1410 if not where1:
1411 where1 = [("", ())]
1412 if not where2:
1413 where2 = [("", ())]
1414
1415 for expr1, params1 in where1:
1416 for expr2, params2 in where2:
1417 full_query = prefix
1418 wheres = []
1419 params = params1 + params2
1420 if expr1:
1421 wheres.append(expr1)
1422 if expr2:
1423 wheres.append(expr2)
1424 if where3:
1425 wheres.append(where3[0])
1426 params += where3[1]
1427 if wheres:
1428 full_query += " WHERE " + " AND ".join(wheres)
1429 if suffix:
1430 full_query += " " + suffix
1431 if params:
1432 statement = context.preparer.prepare(full_query)
1433 else:
1434 # If there are no params then it is likely that query
1435 # has a bunch of literals rendered already, no point
1436 # trying to prepare it.
1437 statement = cassandra.query.SimpleStatement(full_query)
1438 yield (statement, params)
1439
1440 def _fix_input_timestamps(self, df: pandas.DataFrame) -> pandas.DataFrame:
1441 """Update timestamp columns in input DataFrame to be naive datetime
1442 type.
1443
1444 Clients may or may not generate aware timestamps, code in this class
1445 assumes that timestamps are naive, so we convert them to UTC and
1446 drop timezone.
1447 """
1448 # Find all columns with aware timestamps.
1449 columns = [column for column, dtype in df.dtypes.items() if isinstance(dtype, pandas.DatetimeTZDtype)]
1450 for column in columns:
1451 # tz_convert(None) will convert to UTC and drop timezone.
1452 df[column] = df[column].dt.tz_convert(None)
1453 return df
1454
1455 def _batch_size(self, table: ApdbTables | ExtraTables) -> int:
1456 """Calculate batch size based on config parameters."""
1457 context = self._context
1458 config = context.config
1459
1460 # Cassandra limit on number of statements in a batch is 64k.
1461 batch_size = 65_535
1462 if 0 < config.batch_statement_limit < batch_size:
1463 batch_size = config.batch_statement_limit
1464 if config.batch_size_limit > 0:
1465 # The purpose of this limit is to try not to exceed batch size
1466 # threshold which is set on server side. Cassandra wire protocol
1467 # for prepared queries (and batches) only sends column values with
1468 # with an additional 4 bytes per value specifying size. Value is
1469 # not included for NULL or NOT_SET values, but the size is always
1470 # there. There is additional small per-query overhead, which we
1471 # ignore.
1472 row_size = context.schema.table_row_size(table)
1473 row_size += 4 * len(context.schema.getColumnMap(table))
1474 batch_size = min(batch_size, (config.batch_size_limit // row_size) + 1)
1475 return batch_size
int _batch_size(self, ApdbTables|ExtraTables table)
int|None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
None _versionCheck(self, DbVersions current_versions, DbVersions db_versions)
pandas.DataFrame|None getDiaForcedSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
None reassignDiaSources(self, Mapping[int, int] idMap)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
bool containsVisitDetector(self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
None _makeSchema(cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None)
None _check_time_partitions(self, Iterable[int] partitions, ApdbCassandraTimePartitionRange time_partitions_range)
pandas.DataFrame _add_apdb_part(self, pandas.DataFrame df)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeObjectsPandas(self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None)
Mapping[int, int] _queryDiaObjectLastPartitions(self, Iterable[int] ids)
pandas.DataFrame|None getDiaSources(self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk)
None _deleteMovingObjects(self, pandas.DataFrame objs)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk)
None _storeUpdateRecords(self, Iterable[ApdbUpdateRecord] records, ReplicaChunk chunk, *, bool store_chunk=False)
pandas.DataFrame _getSources(self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
ApdbCassandraConfig init_database(cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
Iterator[tuple[cassandra.query.Statement, tuple]] _combine_where(self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
Definition Region.h:89
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.