22from __future__
import annotations
24__all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
28from typing
import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Set, Tuple, Union, cast
37 import cassandra.query
38 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile
39 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
41 CASSANDRA_IMPORTED =
True
43 CASSANDRA_IMPORTED =
False
47from felis.simple
import Table
48from lsst
import sphgeom
50from lsst.utils.iteration
import chunk_iterable
52from .apdb
import Apdb, ApdbConfig, ApdbInsertId, ApdbTableData
53from .apdbCassandraSchema
import ApdbCassandraSchema, ExtraTables
54from .apdbSchema
import ApdbTables
55from .cassandra_utils
import (
56 ApdbCassandraTableData,
58 pandas_dataframe_factory,
63from .pixelization
import Pixelization
64from .timer
import Timer
66_LOG = logging.getLogger(__name__)
71 super().
__init__(
"cassandra-driver module cannot be imported")
75 contact_points = ListField[str](
76 doc=
"The list of contact points to try connecting for cluster discovery.", default=[
"127.0.0.1"]
78 private_ips = ListField[str](doc=
"List of internal IP addresses for contact_points.", default=[])
79 keyspace = Field[str](doc=
"Default keyspace for operations.", default=
"apdb")
80 read_consistency = Field[str](
81 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.", default=
"QUORUM"
83 write_consistency = Field[str](
84 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.", default=
"QUORUM"
86 read_timeout = Field[float](doc=
"Timeout in seconds for read operations.", default=120.0)
87 write_timeout = Field[float](doc=
"Timeout in seconds for write operations.", default=10.0)
88 read_concurrency = Field[int](doc=
"Concurrency level for read operations.", default=500)
89 protocol_version = Field[int](
90 doc=
"Cassandra protocol version to use, default is V4",
91 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0,
93 dia_object_columns = ListField[str](
94 doc=
"List of columns to read from DiaObject[Last], by default read all columns", default=[]
96 prefix = Field[str](doc=
"Prefix to add to table names", default=
"")
97 part_pixelization = ChoiceField[str](
98 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
99 doc=
"Pixelization used for partitioning index.",
102 part_pix_level = Field[int](doc=
"Pixelization level used for partitioning index.", default=10)
103 part_pix_max_ranges = Field[int](doc=
"Max number of ranges in pixelization envelope", default=64)
104 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
105 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
106 time_partition_tables = Field[bool](
107 doc=
"Use per-partition tables for sources instead of partitioning by time", default=
True
109 time_partition_days = Field[int](
111 "Time partitioning granularity in days, this value must not be changed after database is "
116 time_partition_start = Field[str](
118 "Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
119 "This is used only when time_partition_tables is True."
121 default=
"2018-12-01T00:00:00",
123 time_partition_end = Field[str](
125 "Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI. "
126 "This is used only when time_partition_tables is True."
128 default=
"2030-01-01T00:00:00",
130 query_per_time_part = Field[bool](
133 "If True then build separate query for each time partition, otherwise build one single query. "
134 "This is only used when time_partition_tables is False in schema config."
137 query_per_spatial_part = Field[bool](
139 doc=
"If True then build one query per spatial partition, otherwise build single query.",
143if CASSANDRA_IMPORTED:
146 """Translate internal IP address to external.
148 Only used for docker-based setup,
not viable long-term solution.
151 def __init__(self, public_ips: List[str], private_ips: List[str]):
152 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
155 return self.
_map.get(private_ip, private_ip)
159 """Quote column name"""
167 """Implementation of APDB database on to of Apache Cassandra.
169 The implementation is configured via standard ``pex_config`` mechanism
170 using `ApdbCassandraConfig` configuration
class. For an example of
171 different configurations check config/ folder.
175 config : `ApdbCassandraConfig`
176 Configuration object.
180 """Start time for partition 0, this should never be changed."""
183 if not CASSANDRA_IMPORTED:
189 _LOG.debug(
"ApdbCassandra Configuration:")
191 _LOG.debug(
" %s: %s", key, value)
194 config.part_pixelization, config.part_pix_level, config.part_pix_max_ranges
197 addressTranslator: Optional[AddressTranslator] =
None
198 if config.private_ips:
205 contact_points=self.
config.contact_points,
206 address_translator=addressTranslator,
207 protocol_version=self.
config.protocol_version,
211 self.
_session.default_fetch_size =
None
216 schema_file=self.
config.schema_file,
217 schema_name=self.
config.schema_name,
218 prefix=self.
config.prefix,
219 time_partition_tables=self.
config.time_partition_tables,
220 use_insert_id=self.
config.use_insert_id,
225 self._prepared_statements: Dict[str, cassandra.query.PreparedStatement] = {}
230 def tableDef(self, table: ApdbTables) -> Optional[Table]:
232 return self.
_schema.tableSchemas.get(table)
237 if self.
config.time_partition_tables:
252 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
255 column_names = self.
_schema.apdbColumnNames(ApdbTables.DiaObjectLast)
256 what =
",".join(
_quote_column(column)
for column
in column_names)
258 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
259 query = f
'SELECT {what} from "{self._keyspace}"."{table_name}"'
260 statements: List[Tuple] = []
261 for where, params
in sp_where:
262 full_query = f
"{query} WHERE {where}"
269 statement = cassandra.query.SimpleStatement(full_query)
270 statements.append((statement, params))
271 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
277 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
281 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
285 self, region: sphgeom.Region, object_ids: Optional[Iterable[int]], visit_time:
dafBase.DateTime
286 ) -> Optional[pandas.DataFrame]:
288 months = self.
config.read_sources_months
291 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
292 mjd_start = mjd_end - months * 30
294 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
297 self, region: sphgeom.Region, object_ids: Optional[Iterable[int]], visit_time:
dafBase.DateTime
298 ) -> Optional[pandas.DataFrame]:
300 months = self.
config.read_forced_sources_months
303 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
304 mjd_start = mjd_end - months * 30
306 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
310 if not self.
_schema.has_insert_id:
316 table_name = self.
_schema.tableName(ExtraTables.DiaInsertId)
317 query = f
'SELECT insert_time, insert_id FROM "{self._keyspace}"."{table_name}" WHERE partition = ?'
322 timeout=self.
config.read_timeout,
323 execution_profile=
"read_tuples",
326 rows = sorted(result)
331 if not self.
_schema.has_insert_id:
332 raise ValueError(
"APDB is not configured for history storage")
334 insert_ids = [id.id
for id
in ids]
335 params =
",".join(
"?" * len(insert_ids))
340 table_name = self.
_schema.tableName(ExtraTables.DiaInsertId)
342 f
'DELETE FROM "{self._keyspace}"."{table_name}" WHERE partition = ? and insert_id IN ({params})'
347 [partition] + insert_ids,
348 timeout=self.
config.write_timeout,
353 return self.
_get_history(ExtraTables.DiaObjectInsertId, ids)
357 return self.
_get_history(ExtraTables.DiaSourceInsertId, ids)
361 return self.
_get_history(ExtraTables.DiaForcedSourceInsertId, ids)
365 tableName = self.
_schema.tableName(ApdbTables.SSObject)
366 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
370 result = self.
_session.execute(query, execution_profile=
"read_pandas")
371 objects = result._current_rows
373 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
378 visit_time: dafBase.DateTime,
379 objects: pandas.DataFrame,
380 sources: Optional[pandas.DataFrame] =
None,
381 forced_sources: Optional[pandas.DataFrame] =
None,
385 insert_id: ApdbInsertId |
None =
None
387 insert_id = ApdbInsertId.new_insert_id()
394 if sources
is not None:
400 if forced_sources
is not None:
402 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time, insert_id)
415 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
417 selects: List[Tuple] = []
418 for ids
in chunk_iterable(idMap.keys(), 1_000):
419 ids_str =
",".join(str(item)
for item
in ids)
423 'SELECT "diaSourceId", "apdb_part", "apdb_time_part", "insert_id" '
424 f
'FROM "{self._keyspace}"."{table_name}" WHERE "diaSourceId" IN ({ids_str})'
432 List[Tuple[int, int, int, uuid.UUID |
None]],
433 select_concurrent(self.
_session, selects,
"read_tuples", self.
config.read_concurrency),
437 id2partitions: Dict[int, Tuple[int, int]] = {}
438 id2insert_id: Dict[int, ApdbInsertId] = {}
440 id2partitions[row[0]] = row[1:3]
441 if row[3]
is not None:
445 if set(id2partitions) !=
set(idMap):
446 missing =
",".join(str(item)
for item
in set(idMap) -
set(id2partitions))
447 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
450 queries = cassandra.query.BatchStatement()
451 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
452 for diaSourceId, ssObjectId
in idMap.items():
453 apdb_part, apdb_time_part = id2partitions[diaSourceId]
455 if self.
config.time_partition_tables:
457 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
458 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
459 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
461 values = (ssObjectId, apdb_part, diaSourceId)
464 f
'UPDATE "{self._keyspace}"."{table_name}"'
465 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
466 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
468 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
478 known_ids =
set(insert_ids)
479 id2insert_id = {key: value
for key, value
in id2insert_id.items()
if value
in known_ids}
481 table_name = self.
_schema.tableName(ExtraTables.DiaSourceInsertId)
482 for diaSourceId, ssObjectId
in idMap.items():
483 if insert_id := id2insert_id.get(diaSourceId):
485 f
'UPDATE "{self._keyspace}"."{table_name}" '
486 ' SET "ssObjectId" = ?, "diaObjectId" = NULL '
487 'WHERE "insert_id" = ? AND "diaSourceId" = ?'
489 values = (ssObjectId, insert_id.id, diaSourceId)
492 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
493 with Timer(table_name +
" update", self.
config.timer):
494 self.
_session.execute(queries, execution_profile=
"write")
504 raise NotImplementedError()
506 def _makeProfiles(self, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
507 """Make all execution profiles used in the code."""
509 if config.private_ips:
510 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
512 loadBalancePolicy = RoundRobinPolicy()
514 read_tuples_profile = ExecutionProfile(
515 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
516 request_timeout=config.read_timeout,
517 row_factory=cassandra.query.tuple_factory,
518 load_balancing_policy=loadBalancePolicy,
520 read_pandas_profile = ExecutionProfile(
521 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
522 request_timeout=config.read_timeout,
523 row_factory=pandas_dataframe_factory,
524 load_balancing_policy=loadBalancePolicy,
526 read_raw_profile = ExecutionProfile(
527 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
528 request_timeout=config.read_timeout,
529 row_factory=raw_data_factory,
530 load_balancing_policy=loadBalancePolicy,
533 read_pandas_multi_profile = ExecutionProfile(
534 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
535 request_timeout=config.read_timeout,
536 row_factory=pandas_dataframe_factory,
537 load_balancing_policy=loadBalancePolicy,
541 read_raw_multi_profile = ExecutionProfile(
542 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
543 request_timeout=config.read_timeout,
544 row_factory=raw_data_factory,
545 load_balancing_policy=loadBalancePolicy,
547 write_profile = ExecutionProfile(
548 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
549 request_timeout=config.write_timeout,
550 load_balancing_policy=loadBalancePolicy,
553 default_profile = ExecutionProfile(
554 load_balancing_policy=loadBalancePolicy,
557 "read_tuples": read_tuples_profile,
558 "read_pandas": read_pandas_profile,
559 "read_raw": read_raw_profile,
560 "read_pandas_multi": read_pandas_multi_profile,
561 "read_raw_multi": read_raw_multi_profile,
562 "write": write_profile,
563 EXEC_PROFILE_DEFAULT: default_profile,
568 region: sphgeom.Region,
569 object_ids: Optional[Iterable[int]],
572 table_name: ApdbTables,
573 ) -> pandas.DataFrame:
574 """Returns catalog of DiaSource instances given set of DiaObject IDs.
581 Collection of DiaObject IDs
583 Lower bound of time interval.
585 Upper bound of time interval.
586 table_name : `ApdbTables`
591 catalog : `pandas.DataFrame`, or `
None`
592 Catalog containing DiaSource records. Empty catalog
is returned
if
593 ``object_ids``
is empty.
595 object_id_set: Set[int] = set()
596 if object_ids
is not None:
597 object_id_set =
set(object_ids)
598 if len(object_id_set) == 0:
602 tables, temporal_where = self.
_temporal_where(table_name, mjd_start, mjd_end)
605 column_names = self.
_schema.apdbColumnNames(table_name)
606 what =
",".join(
_quote_column(column)
for column
in column_names)
609 statements: List[Tuple] = []
611 prefix = f
'SELECT {what} from "{self._keyspace}"."{table}"'
613 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
615 with Timer(table_name.name +
" select", self.
config.timer):
619 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
624 if len(object_id_set) > 0:
625 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
628 catalog = cast(pandas.DataFrame, catalog[catalog[
"midpointMjdTai"] > mjd_start])
630 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
633 def _get_history(self, table: ExtraTables, ids: Iterable[ApdbInsertId]) -> ApdbTableData:
634 """Return records from a particular table given set of insert IDs."""
635 if not self.
_schema.has_insert_id:
636 raise ValueError(
"APDB is not configured for history retrieval")
638 insert_ids = [id.id
for id
in ids]
639 params =
",".join(
"?" * len(insert_ids))
641 table_name = self.
_schema.tableName(table)
645 query = f
'SELECT * FROM "{self._keyspace}"."{table_name}" WHERE insert_id IN ({params})'
648 with Timer(
"DiaObject history", self.
config.timer):
649 result = self.
_session.execute(statement, insert_ids, execution_profile=
"read_raw")
650 table_data = cast(ApdbCassandraTableData, result._current_rows)
655 timestamp = visit_time.nsecs() // 1_000_000
660 table_name = self.
_schema.tableName(ExtraTables.DiaInsertId)
662 f
'INSERT INTO "{self._keyspace}"."{table_name}" (partition, insert_id, insert_time) '
668 (partition, insert_id.id, timestamp),
669 timeout=self.
config.write_timeout,
670 execution_profile=
"write",
674 self, objs: pandas.DataFrame, visit_time:
dafBase.DateTime, insert_id: ApdbInsertId |
None
676 """Store catalog of DiaObjects from current visit.
680 objs : `pandas.DataFrame`
681 Catalog with DiaObject records
683 Time of the current visit.
685 visit_time_dt = visit_time.toPython()
686 extra_columns = dict(lastNonForcedSource=visit_time_dt)
689 extra_columns["validityStart"] = visit_time_dt
691 if not self.
config.time_partition_tables:
692 extra_columns[
"apdb_time_part"] = time_part
695 self.
_storeObjectsPandas(objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part)
697 if insert_id
is not None:
698 extra_columns = dict(insert_id=insert_id.id, validityStart=visit_time_dt)
703 table_name: ApdbTables,
704 sources: pandas.DataFrame,
706 insert_id: ApdbInsertId |
None,
708 """Store catalog of DIASources or DIAForcedSources from current visit.
712 sources : `pandas.DataFrame`
713 Catalog containing DiaSource records
715 Time of the current visit.
718 extra_columns: dict[str, Any] = {}
719 if not self.
config.time_partition_tables:
720 extra_columns[
"apdb_time_part"] = time_part
723 self.
_storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
725 if insert_id
is not None:
726 extra_columns = dict(insert_id=insert_id.id)
727 if table_name
is ApdbTables.DiaSource:
728 extra_table = ExtraTables.DiaSourceInsertId
730 extra_table = ExtraTables.DiaForcedSourceInsertId
734 self, sources: pandas.DataFrame, visit_time:
dafBase.DateTime, insert_id: ApdbInsertId |
None
736 """Store mapping of diaSourceId to its partitioning values.
740 sources : `pandas.DataFrame`
741 Catalog containing DiaSource records
743 Time of the current visit.
745 id_map = cast(pandas.DataFrame, sources[["diaSourceId",
"apdb_part"]])
748 "insert_id": insert_id.id
if insert_id
is not None else None,
752 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
757 records: pandas.DataFrame,
758 table_name: Union[ApdbTables, ExtraTables],
759 extra_columns: Optional[Mapping] =
None,
760 time_part: Optional[int] =
None,
762 """Generic store method.
764 Takes Pandas catalog and stores a bunch of records
in a table.
768 records : `pandas.DataFrame`
769 Catalog containing object records
770 table_name : `ApdbTables`
771 Name of the table
as defined
in APDB schema.
772 extra_columns : `dict`, optional
773 Mapping (column_name, column_value) which gives fixed values
for
774 columns
in each row, overrides values
in ``records``
if matching
776 time_part : `int`, optional
777 If
not `
None` then insert into a per-partition table.
781 If Pandas catalog contains additional columns
not defined
in table
782 schema they are ignored. Catalog does
not have to contain all columns
783 defined
in a table, but partition
and clustering keys must be present
784 in a catalog
or ``extra_columns``.
787 if extra_columns
is None:
789 extra_fields =
list(extra_columns.keys())
792 df_fields = [column
for column
in records.columns
if column
not in extra_fields]
794 column_map = self.
_schema.getColumnMap(table_name)
796 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
797 fields += extra_fields
800 required_columns = self.
_schema.partitionColumns(table_name) + self.
_schema.clusteringColumns(
803 missing_columns = [column
for column
in required_columns
if column
not in fields]
805 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
807 qfields = [quote_id(field)
for field
in fields]
808 qfields_str =
",".join(qfields)
810 with Timer(table_name.name +
" query build", self.
config.timer):
811 table = self.
_schema.tableName(table_name)
812 if time_part
is not None:
813 table = f
"{table}_{time_part}"
815 holders =
",".join([
"?"] * len(qfields))
816 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
818 queries = cassandra.query.BatchStatement()
819 for rec
in records.itertuples(index=
False):
821 for field
in df_fields:
822 if field
not in column_map:
824 value = getattr(rec, field)
825 if column_map[field].datatype
is felis.types.Timestamp:
826 if isinstance(value, pandas.Timestamp):
827 value = literal(value.to_pydatetime())
831 value = int(value * 1000)
832 values.append(literal(value))
833 for field
in extra_fields:
834 value = extra_columns[field]
835 values.append(literal(value))
836 queries.add(statement, values)
838 _LOG.debug(
"%s: will store %d records", self.
_schema.tableName(table_name), records.shape[0])
839 with Timer(table_name.name +
" insert", self.
config.timer):
840 self.
_session.execute(queries, timeout=self.
config.write_timeout, execution_profile=
"write")
843 """Calculate spatial partition for each record and add it to a
848 This overrides any existing column in a DataFrame
with the same name
849 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
853 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
854 ra_col, dec_col = self.
config.ra_dec_columns
855 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
860 df[
"apdb_part"] = apdb_part
863 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
864 """Add apdb_part column to DiaSource catalog.
868 This method copies apdb_part value from a matching DiaObject record.
869 DiaObject catalog needs to have a apdb_part column filled by
870 ``_add_obj_part`` method
and DiaSource records need to be
871 associated to DiaObjects via ``diaObjectId`` column.
873 This overrides any existing column
in a DataFrame
with the same name
874 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
877 pixel_id_map: Dict[int, int] = {
878 diaObjectId: apdb_part for diaObjectId, apdb_part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
880 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
881 ra_col, dec_col = self.
config.ra_dec_columns
882 for i, (diaObjId, ra, dec)
in enumerate(
883 zip(sources[
"diaObjectId"], sources[ra_col], sources[dec_col])
893 apdb_part[i] = pixel_id_map[diaObjId]
894 sources = sources.copy()
895 sources[
"apdb_part"] = apdb_part
898 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
899 """Add apdb_part column to DiaForcedSource catalog.
903 This method copies apdb_part value from a matching DiaObject record.
904 DiaObject catalog needs to have a apdb_part column filled by
905 ``_add_obj_part`` method
and DiaSource records need to be
906 associated to DiaObjects via ``diaObjectId`` column.
908 This overrides any existing column
in a DataFrame
with the same name
909 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
912 pixel_id_map: Dict[int, int] = {
913 diaObjectId: apdb_part for diaObjectId, apdb_part
in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
915 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
916 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
917 apdb_part[i] = pixel_id_map[diaObjId]
918 sources = sources.copy()
919 sources[
"apdb_part"] = apdb_part
923 """Calculate time partiton number for a given time.
928 Time
for which to calculate partition number. Can be float to mean
934 Partition number
for a given time.
937 mjd = time.get(system=dafBase.DateTime.MJD)
941 partition = int(days_since_epoch) // self.
config.time_partition_days
945 """Make an empty catalog for a table with a given name.
949 table_name : `ApdbTables`
954 catalog : `pandas.DataFrame`
957 table = self._schema.tableSchemas[table_name]
960 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
961 for columnDef
in table.columns
963 return pandas.DataFrame(data)
966 """Convert query string into prepared statement."""
967 stmt = self._prepared_statements.get(query)
970 self._prepared_statements[query] = stmt
976 where1: List[Tuple[str, Tuple]],
977 where2: List[Tuple[str, Tuple]],
978 suffix: Optional[str] =
None,
979 ) -> Iterator[Tuple[cassandra.query.Statement, Tuple]]:
980 """Make cartesian product of two parts of WHERE clause into a series
981 of statements to execute.
986 Initial statement prefix that comes before WHERE clause, e.g.
987 "SELECT * from Table"
995 for expr1, params1
in where1:
996 for expr2, params2
in where2:
1000 wheres.append(expr1)
1002 wheres.append(expr2)
1004 full_query +=
" WHERE " +
" AND ".join(wheres)
1006 full_query +=
" " + suffix
1007 params = params1 + params2
1014 statement = cassandra.query.SimpleStatement(full_query)
1015 yield (statement, params)
1019 ) -> List[Tuple[str, Tuple]]:
1020 """Generate expressions for spatial part of WHERE clause.
1025 Spatial region for query results.
1027 If
True then use pixel ranges (
"apdb_part >= p1 AND apdb_part <=
1028 p2") instead of exact list of pixels. Should be set to True for
1029 large regions covering very many pixels.
1033 expressions : `list` [ `tuple` ]
1034 Empty list is returned
if ``region``
is `
None`, otherwise a list
1035 of one
or more (expression, parameters) tuples
1041 expressions: List[Tuple[str, Tuple]] = []
1042 for lower, upper
in pixel_ranges:
1045 expressions.append((
'"apdb_part" = ?', (lower,)))
1047 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1051 if self.
config.query_per_spatial_part:
1052 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1054 pixels_str =
",".join([str(pix)
for pix
in pixels])
1055 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1062 query_per_time_part: Optional[bool] =
None,
1063 ) -> Tuple[List[str], List[Tuple[str, Tuple]]]:
1064 """Generate table names and expressions for temporal part of WHERE
1069 table : `ApdbTables`
1070 Table to select from.
1072 Starting Datetime of MJD value of the time range.
1074 Starting Datetime of MJD value of the time range.
1075 query_per_time_part : `bool`, optional
1076 If
None then use ``query_per_time_part``
from configuration.
1080 tables : `list` [ `str` ]
1081 List of the table names to query.
1082 expressions : `list` [ `tuple` ]
1083 A list of zero
or more (expression, parameters) tuples.
1086 temporal_where: List[Tuple[str, Tuple]] = []
1087 table_name = self._schema.tableName(table)
1090 time_parts = list(range(time_part_start, time_part_end + 1))
1091 if self.
config.time_partition_tables:
1092 tables = [f
"{table_name}_{part}" for part
in time_parts]
1094 tables = [table_name]
1095 if query_per_time_part
is None:
1096 query_per_time_part = self.
config.query_per_time_part
1097 if query_per_time_part:
1098 temporal_where = [(
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts]
1100 time_part_list =
",".join([str(part)
for part
in time_parts])
1101 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1103 return tables, temporal_where
std::vector< SchemaItem< Flag > > * items
Class for handling dates/times, including MJD, UTC, and TAI.
list[ApdbInsertId]|None getInsertIds(self)
__init__(self, List[str] public_ips, List[str] private_ips)
str translate(self, str private_ip)
None deleteInsertIds(self, Iterable[ApdbInsertId] ids)
list[ApdbInsertId]|None getInsertIds(self)
int countUnassociatedObjects(self)
None _storeObjectsPandas(self, pandas.DataFrame records, Union[ApdbTables, ExtraTables] table_name, Optional[Mapping] extra_columns=None, Optional[int] time_part=None)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
cassandra.query.PreparedStatement _prep_statement(self, str query)
Optional[pandas.DataFrame] getDiaForcedSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
__init__(self, ApdbCassandraConfig config)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
None reassignDiaSources(self, Mapping[int, int] idMap)
Iterator[Tuple[cassandra.query.Statement, Tuple]] _combine_where(self, str prefix, List[Tuple[str, Tuple]] where1, List[Tuple[str, Tuple]] where2, Optional[str] suffix=None)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time, ApdbInsertId|None insert_id)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
None makeSchema(self, bool drop=False)
None storeSSObjects(self, pandas.DataFrame objects)
Tuple[List[str], List[Tuple[str, Tuple]]] _temporal_where(self, ApdbTables table, Union[float, dafBase.DateTime] start_time, Union[float, dafBase.DateTime] end_time, Optional[bool] query_per_time_part=None)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
ApdbTableData getDiaSourcesHistory(self, Iterable[ApdbInsertId] ids)
ApdbTableData getDiaForcedSourcesHistory(self, Iterable[ApdbInsertId] ids)
int _time_partition(self, Union[float, dafBase.DateTime] time)
Optional[Table] tableDef(self, ApdbTables table)
ApdbTableData getDiaObjectsHistory(self, Iterable[ApdbInsertId] ids)
Mapping[Any, ExecutionProfile] _makeProfiles(self, ApdbCassandraConfig config)
ApdbTableData _get_history(self, ExtraTables table, Iterable[ApdbInsertId] ids)
Optional[pandas.DataFrame] getDiaSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, dafBase.DateTime visit_time, ApdbInsertId|None insert_id)
_partition_zero_epoch_mjd
List[Tuple[str, Tuple]] _spatial_where(self, Optional[sphgeom.Region] region, bool use_ranges=False)
pandas.DataFrame _getSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
None _storeInsertId(self, ApdbInsertId insert_id, dafBase.DateTime visit_time)
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, dafBase.DateTime visit_time, ApdbInsertId|None insert_id)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set
str _quote_column(str name)