22from __future__
import annotations
24__all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
29from typing
import Any, cast, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Set, Tuple, Union
35 from cassandra.cluster
import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
36 from cassandra.policies
import RoundRobinPolicy, WhiteListRoundRobinPolicy, AddressTranslator
37 import cassandra.query
38 CASSANDRA_IMPORTED =
True
40 CASSANDRA_IMPORTED =
False
43from lsst
import sphgeom
45from lsst.utils.iteration
import chunk_iterable
46from .timer
import Timer
47from .apdb
import Apdb, ApdbConfig
48from .apdbSchema
import ApdbTables, TableDef
49from .apdbCassandraSchema
import ApdbCassandraSchema, ExtraTables
50from .cassandra_utils
import (
52 pandas_dataframe_factory,
57from .pixelization
import Pixelization
59_LOG = logging.getLogger(__name__)
64 super().
__init__(
"cassandra-driver module cannot be imported")
71 doc=
"The list of contact points to try connecting for cluster discovery.",
76 doc=
"List of internal IP addresses for contact_points.",
81 doc=
"Default keyspace for operations.",
86 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.",
91 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.",
96 doc=
"Timeout in seconds for read operations.",
101 doc=
"Timeout in seconds for write operations.",
106 doc=
"Concurrency level for read operations.",
111 doc=
"Cassandra protocol version to use, default is V4",
112 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0
116 doc=
"List of columns to read from DiaObject, by default read all columns",
121 doc=
"Prefix to add to table names",
126 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
127 doc=
"Pixelization used for partitioning index.",
132 doc=
"Pixelization level used for partitioning index.",
137 doc=
"Max number of ranges in pixelization envelope",
142 default=[
"ra",
"decl"],
143 doc=
"Names ra/dec columns in DiaObject table"
147 doc=
"If True then print/log timing information",
152 doc=
"Use per-partition tables for sources instead of partitioning by time",
157 doc=
"Time partitoning granularity in days, this value must not be changed"
158 " after database is initialized",
163 doc=
"Starting time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI."
164 " This is used only when time_partition_tables is True.",
165 default=
"2018-12-01T00:00:00"
169 doc=
"Ending time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI"
170 " This is used only when time_partition_tables is True.",
171 default=
"2030-01-01T00:00:00"
176 doc=
"If True then build separate query for each time partition, otherwise build one single query. "
177 "This is only used when time_partition_tables is False in schema config."
182 doc=
"If True then build one query per spacial partition, otherwise build single query. "
187 doc=
"If True then combine result rows before converting to pandas. "
191if CASSANDRA_IMPORTED:
194 """Translate internal IP address to external.
196 Only used for docker-based setup,
not viable long-term solution.
198 def __init__(self, public_ips: List[str], private_ips: List[str]):
199 self.
_map_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
202 return self.
_map_map.get(private_ip, private_ip)
206 """Implementation of APDB database on to of Apache Cassandra.
208 The implementation is configured via standard ``pex_config`` mechanism
209 using `ApdbCassandraConfig` configuration
class. For an example of
210 different configurations check config/ folder.
214 config : `ApdbCassandraConfig`
215 Configuration object.
219 """Start time for partition 0, this should never be changed."""
223 if not CASSANDRA_IMPORTED:
228 _LOG.debug(
"ApdbCassandra Configuration:")
230 _LOG.debug(
" %s: %s", key, value)
233 config.part_pixelization, config.part_pix_level, config.part_pix_max_ranges
236 addressTranslator: Optional[AddressTranslator] =
None
237 if config.private_ips:
240 self.
_keyspace_keyspace = config.keyspace
243 contact_points=self.
configconfig.contact_points,
244 address_translator=addressTranslator,
245 protocol_version=self.
configconfig.protocol_version)
248 self.
_session_session.default_fetch_size =
None
252 schema_file=self.
configconfig.schema_file,
253 schema_name=self.
configconfig.schema_name,
254 prefix=self.
configconfig.prefix,
255 time_partition_tables=self.
configconfig.time_partition_tables)
259 self._prepared_statements: Dict[str, cassandra.query.PreparedStatement] = {}
261 def tableDef(self, table: ApdbTables) -> Optional[TableDef]:
263 return self.
_schema_schema.tableSchemas.get(table)
268 if self.
configconfig.time_partition_tables:
283 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
285 table_name = self.
_schema_schema.tableName(ApdbTables.DiaObjectLast)
286 query = f
'SELECT * from "{self._keyspace}"."{table_name}"'
287 statements: List[Tuple] = []
288 for where, params
in sp_where:
289 full_query = f
"{query} WHERE {where}"
296 statement = cassandra.query.SimpleStatement(full_query)
297 statements.append((statement, params))
298 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
300 with Timer(
'DiaObject select', self.
configconfig.timer):
304 self.
_session_session, statements,
"read_pandas_multi", self.
configconfig.read_concurrency
308 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
312 object_ids: Optional[Iterable[int]],
315 months = self.
configconfig.read_sources_months
318 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
319 mjd_start = mjd_end - months*30
321 return self.
_getSources_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
324 object_ids: Optional[Iterable[int]],
327 months = self.
configconfig.read_forced_sources_months
330 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
331 mjd_start = mjd_end - months*30
333 return self.
_getSources_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
336 start_time: dafBase.DateTime,
341 sp_where = self.
_spatial_where_spatial_where(region, use_ranges=
True)
342 tables, temporal_where = self.
_temporal_where_temporal_where(ApdbTables.DiaObject, start_time, end_time,
True)
345 statements: List[Tuple] = []
347 prefix = f
'SELECT * from "{self._keyspace}"."{table}"'
348 statements +=
list(self.
_combine_where_combine_where(prefix, sp_where, temporal_where,
"ALLOW FILTERING"))
349 _LOG.debug(
"getDiaObjectsHistory: #queries: %s", len(statements))
352 with Timer(
"DiaObject history", self.
configconfig.timer):
356 self.
_session_session, statements,
"read_pandas_multi", self.
configconfig.read_concurrency
361 validity_start = start_time.toPython()
362 validity_end = end_time.toPython()
365 catalog[(catalog[
"validityStart"] >= validity_start) & (catalog[
"validityStart"] < validity_end)]
368 _LOG.debug(
"found %d DiaObjects", catalog.shape[0])
372 start_time: dafBase.DateTime,
376 return self.
_getSourcesHistory_getSourcesHistory(ApdbTables.DiaSource, start_time, end_time, region)
379 start_time: dafBase.DateTime,
383 return self.
_getSourcesHistory_getSourcesHistory(ApdbTables.DiaForcedSource, start_time, end_time, region)
387 tableName = self.
_schema_schema.tableName(ApdbTables.SSObject)
388 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
391 with Timer(
'SSObject select', self.
configconfig.timer):
392 result = self.
_session_session.execute(query, execution_profile=
"read_pandas")
393 objects = result._current_rows
395 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
399 visit_time: dafBase.DateTime,
400 objects: pandas.DataFrame,
401 sources: Optional[pandas.DataFrame] =
None,
402 forced_sources: Optional[pandas.DataFrame] =
None) ->
None:
409 if sources
is not None:
412 self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaSource, sources, visit_time)
415 if forced_sources
is not None:
416 forced_sources = self.
_add_fsrc_part_add_fsrc_part(forced_sources, objects)
417 self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time)
430 table_name = self.
_schema_schema.tableName(ExtraTables.DiaSourceToPartition)
432 selects: List[Tuple] = []
433 for ids
in chunk_iterable(idMap.keys(), 1_000):
434 ids_str =
",".join(
str(item)
for item
in ids)
436 (f
'SELECT "diaSourceId", "apdb_part", "apdb_time_part" FROM "{self._keyspace}"."{table_name}"'
437 f
' WHERE "diaSourceId" IN ({ids_str})'),
443 List[Tuple[int, int, int]],
448 id2partitions: Dict[int, Tuple[int, int]] = {}
450 id2partitions[row[0]] = row[1:]
453 if set(id2partitions) !=
set(idMap):
454 missing =
",".join(
str(item)
for item
in set(idMap) -
set(id2partitions))
455 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
457 queries = cassandra.query.BatchStatement()
458 table_name = self.
_schema_schema.tableName(ApdbTables.DiaSource)
459 for diaSourceId, ssObjectId
in idMap.items():
460 apdb_part, apdb_time_part = id2partitions[diaSourceId]
462 if self.
configconfig.time_partition_tables:
464 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
465 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
466 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
468 values = (ssObjectId, apdb_part, diaSourceId)
471 f
'UPDATE "{self._keyspace}"."{table_name}"'
472 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
473 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
475 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
478 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
479 with Timer(table_name +
' update', self.
configconfig.timer):
480 self.
_session_session.execute(queries, execution_profile=
"write")
490 raise NotImplementedError()
492 def _makeProfiles(self, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
493 """Make all execution profiles used in the code."""
495 if config.private_ips:
496 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
498 loadBalancePolicy = RoundRobinPolicy()
500 pandas_row_factory: Callable
501 if not config.pandas_delay_conv:
502 pandas_row_factory = pandas_dataframe_factory
504 pandas_row_factory = raw_data_factory
506 read_tuples_profile = ExecutionProfile(
507 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
508 request_timeout=config.read_timeout,
509 row_factory=cassandra.query.tuple_factory,
510 load_balancing_policy=loadBalancePolicy,
512 read_pandas_profile = ExecutionProfile(
513 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
514 request_timeout=config.read_timeout,
515 row_factory=pandas_dataframe_factory,
516 load_balancing_policy=loadBalancePolicy,
518 read_pandas_multi_profile = ExecutionProfile(
519 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
520 request_timeout=config.read_timeout,
521 row_factory=pandas_row_factory,
522 load_balancing_policy=loadBalancePolicy,
524 write_profile = ExecutionProfile(
525 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
526 request_timeout=config.write_timeout,
527 load_balancing_policy=loadBalancePolicy,
530 default_profile = ExecutionProfile(
531 load_balancing_policy=loadBalancePolicy,
534 "read_tuples": read_tuples_profile,
535 "read_pandas": read_pandas_profile,
536 "read_pandas_multi": read_pandas_multi_profile,
537 "write": write_profile,
538 EXEC_PROFILE_DEFAULT: default_profile,
541 def _getSources(self, region: sphgeom.Region,
542 object_ids: Optional[Iterable[int]],
545 table_name: ApdbTables) -> pandas.DataFrame:
546 """Returns catalog of DiaSource instances given set of DiaObject IDs.
553 Collection of DiaObject IDs
555 Lower bound of time interval.
557 Upper bound of time interval.
558 table_name : `ApdbTables`
563 catalog : `pandas.DataFrame`, or `
None`
564 Catalog contaning DiaSource records. Empty catalog
is returned
if
565 ``object_ids``
is empty.
567 object_id_set: Set[int] = set()
568 if object_ids
is not None:
569 object_id_set =
set(object_ids)
570 if len(object_id_set) == 0:
571 return self._make_empty_catalog(table_name)
573 sp_where = self._spatial_where(region)
574 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
577 statements: List[Tuple] = []
579 prefix = f
'SELECT * from "{self._keyspace}"."{table}"'
580 statements +=
list(self._combine_where(prefix, sp_where, temporal_where))
581 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
583 with Timer(table_name.name +
' select', self.config.timer):
587 self._session, statements,
"read_pandas_multi", self.config.read_concurrency
592 if len(object_id_set) > 0:
593 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
596 catalog = cast(pandas.DataFrame, catalog[catalog[
"midPointTai"] > mjd_start])
598 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
601 def _getSourcesHistory(
607 ) -> pandas.DataFrame:
608 """Returns catalog of DiaSource instances given set of DiaObject IDs.
615 Starting time for DiaSource history search. DiaSource record
is
616 selected when its ``midPointTai`` falls into an interval between
617 ``start_time`` (inclusive)
and ``end_time`` (exclusive).
619 Upper limit on time
for DiaSource history search.
625 catalog : `pandas.DataFrame`
626 Catalog contaning DiaSource records.
628 sp_where = self._spatial_where(region, use_ranges=False)
629 tables, temporal_where = self._temporal_where(table, start_time, end_time,
True)
632 statements: List[Tuple] = []
633 for table_name
in tables:
634 prefix = f
'SELECT * from "{self._keyspace}"."{table_name}"'
635 statements +=
list(self._combine_where(prefix, sp_where, temporal_where,
"ALLOW FILTERING"))
636 _LOG.debug(
"getDiaObjectsHistory: #queries: %s", len(statements))
639 with Timer(f
"{table.name} history", self.config.timer):
643 self._session, statements,
"read_pandas_multi", self.config.read_concurrency
648 period_start = start_time.get(system=dafBase.DateTime.MJD)
649 period_end = end_time.get(system=dafBase.DateTime.MJD)
652 catalog[(catalog[
"midPointTai"] >= period_start) & (catalog[
"midPointTai"] < period_end)]
655 _LOG.debug(
"found %d %ss", catalog.shape[0], table.name)
658 def _storeDiaObjects(self, objs: pandas.DataFrame, visit_time:
dafBase.DateTime) ->
None:
659 """Store catalog of DiaObjects from current visit.
663 objs : `pandas.DataFrame`
664 Catalog with DiaObject records
666 Time of the current visit.
668 visit_time_dt = visit_time.toPython()
669 extra_columns = dict(lastNonForcedSource=visit_time_dt)
670 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
672 extra_columns["validityStart"] = visit_time_dt
673 time_part: Optional[int] = self._time_partition(visit_time)
674 if not self.config.time_partition_tables:
675 extra_columns[
"apdb_time_part"] = time_part
678 self._storeObjectsPandas(objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part)
680 def _storeDiaSources(self, table_name: ApdbTables, sources: pandas.DataFrame,
682 """Store catalog of DIASources or DIAForcedSources from current visit.
686 sources : `pandas.DataFrame`
687 Catalog containing DiaSource records
689 Time of the current visit.
691 time_part: Optional[int] = self._time_partition(visit_time)
693 if not self.config.time_partition_tables:
694 extra_columns[
"apdb_time_part"] = time_part
697 self._storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
699 def _storeDiaSourcesPartitions(self, sources: pandas.DataFrame, visit_time:
dafBase.DateTime) ->
None:
700 """Store mapping of diaSourceId to its partitioning values.
704 sources : `pandas.DataFrame`
705 Catalog containing DiaSource records
707 Time of the current visit.
709 id_map = cast(pandas.DataFrame, sources[["diaSourceId",
"apdb_part"]])
711 "apdb_time_part": self._time_partition(visit_time),
714 self._storeObjectsPandas(
715 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
718 def _storeObjectsPandas(self, objects: pandas.DataFrame, table_name: Union[ApdbTables, ExtraTables],
719 extra_columns: Optional[Mapping] =
None,
720 time_part: Optional[int] =
None) ->
None:
721 """Generic store method.
723 Takes catalog of records and stores a bunch of objects
in a table.
727 objects : `pandas.DataFrame`
728 Catalog containing object records
729 table_name : `ApdbTables`
730 Name of the table
as defined
in APDB schema.
731 extra_columns : `dict`, optional
732 Mapping (column_name, column_value) which gives column values to add
733 to every row, only
if column
is missing
in catalog records.
734 time_part : `int`, optional
735 If
not `
None` then insert into a per-partition table.
738 if extra_columns
is None:
740 extra_fields =
list(extra_columns.keys())
743 column
for column
in objects.columns
if column
not in extra_fields
746 column_map = self._schema.getColumnMap(table_name)
748 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
749 fields += extra_fields
752 required_columns = self._schema.partitionColumns(table_name) \
753 + self._schema.clusteringColumns(table_name)
754 missing_columns = [column
for column
in required_columns
if column
not in fields]
756 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
758 qfields = [
quote_id(field)
for field
in fields]
759 qfields_str =
','.join(qfields)
761 with Timer(table_name.name +
' query build', self.config.timer):
763 table = self._schema.tableName(table_name)
764 if time_part
is not None:
765 table = f
"{table}_{time_part}"
767 holders =
','.join([
'?']*len(qfields))
768 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
769 statement = self._prep_statement(query)
770 queries = cassandra.query.BatchStatement()
771 for rec
in objects.itertuples(index=
False):
773 for field
in df_fields:
774 if field
not in column_map:
776 value = getattr(rec, field)
777 if column_map[field].type ==
"DATETIME":
778 if isinstance(value, pandas.Timestamp):
779 value =
literal(value.to_pydatetime())
783 value =
int(value*1000)
785 for field
in extra_fields:
786 value = extra_columns[field]
788 queries.add(statement, values)
790 _LOG.debug(
"%s: will store %d records", self._schema.tableName(table_name), objects.shape[0])
791 with Timer(table_name.name +
' insert', self.config.timer):
792 self._session.execute(queries, timeout=self.config.write_timeout, execution_profile=
"write")
794 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
795 """Calculate spacial partition for each record and add it to a
800 This overrides any existing column in a DataFrame
with the same name
801 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
805 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
806 ra_col, dec_col = self.config.ra_dec_columns
807 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
809 idx = self._pixelization.
pixel(uv3d)
812 df[
"apdb_part"] = apdb_part
815 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
816 """Add apdb_part column to DiaSource catalog.
820 This method copies apdb_part value from a matching DiaObject record.
821 DiaObject catalog needs to have a apdb_part column filled by
822 ``_add_obj_part`` method
and DiaSource records need to be
823 associated to DiaObjects via ``diaObjectId`` column.
825 This overrides any existing column
in a DataFrame
with the same name
826 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
829 pixel_id_map: Dict[int, int] = {
830 diaObjectId: apdb_part for diaObjectId, apdb_part
831 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
833 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
834 ra_col, dec_col = self.config.ra_dec_columns
835 for i, (diaObjId, ra, dec)
in enumerate(zip(sources[
"diaObjectId"],
836 sources[ra_col], sources[dec_col])):
842 idx = self._pixelization.
pixel(uv3d)
845 apdb_part[i] = pixel_id_map[diaObjId]
846 sources = sources.copy()
847 sources[
"apdb_part"] = apdb_part
850 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
851 """Add apdb_part column to DiaForcedSource catalog.
855 This method copies apdb_part value from a matching DiaObject record.
856 DiaObject catalog needs to have a apdb_part column filled by
857 ``_add_obj_part`` method
and DiaSource records need to be
858 associated to DiaObjects via ``diaObjectId`` column.
860 This overrides any existing column
in a DataFrame
with the same name
861 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
864 pixel_id_map: Dict[int, int] = {
865 diaObjectId: apdb_part for diaObjectId, apdb_part
866 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
868 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
869 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
870 apdb_part[i] = pixel_id_map[diaObjId]
871 sources = sources.copy()
872 sources[
"apdb_part"] = apdb_part
876 """Calculate time partiton number for a given time.
881 Time
for which to calculate partition number. Can be float to mean
887 Partition number
for a given time.
890 mjd = time.get(system=dafBase.DateTime.MJD)
893 days_since_epoch = mjd - self._partition_zero_epoch_mjd
894 partition =
int(days_since_epoch) // self.config.time_partition_days
897 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
898 """Make an empty catalog for a table with a given name.
902 table_name : `ApdbTables`
907 catalog : `pandas.DataFrame`
910 table = self._schema.tableSchemas[table_name]
912 data = {columnDef.name: pandas.Series(dtype=columnDef.dtype) for columnDef
in table.columns}
913 return pandas.DataFrame(data)
915 def _prep_statement(self, query: str) -> cassandra.query.PreparedStatement:
916 """Convert query string into prepared statement."""
917 stmt = self._prepared_statements.get(query)
919 stmt = self._session.prepare(query)
920 self._prepared_statements[query] = stmt
926 where1: List[Tuple[str, Tuple]],
927 where2: List[Tuple[str, Tuple]],
928 suffix: Optional[str] =
None,
929 ) -> Iterator[Tuple[cassandra.query.Statement, Tuple]]:
930 """Make cartesian product of two parts of WHERE clause into a series
931 of statements to execute.
936 Initial statement prefix that comes before WHERE clause, e.g.
937 "SELECT * from Table"
945 for expr1, params1
in where1:
946 for expr2, params2
in where2:
954 full_query +=
" WHERE " +
" AND ".join(wheres)
956 full_query +=
" " + suffix
957 params = params1 + params2
959 statement = self._prep_statement(full_query)
964 statement = cassandra.query.SimpleStatement(full_query)
965 yield (statement, params)
969 ) -> List[Tuple[str, Tuple]]:
970 """Generate expressions for spatial part of WHERE clause.
975 Spatial region for query results.
977 If
True then use pixel ranges (
"apdb_part >= p1 AND apdb_part <=
978 p2") instead of exact list of pixels. Should be set to True for
979 large regions covering very many pixels.
983 expressions : `list` [ `tuple` ]
984 Empty list is returned
if ``region``
is `
None`, otherwise a list
985 of one
or more (expression, parameters) tuples
990 pixel_ranges = self._pixelization.envelope(region)
991 expressions: List[Tuple[str, Tuple]] = []
992 for lower, upper
in pixel_ranges:
995 expressions.append((
'"apdb_part" = ?', (lower, )))
997 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
1000 pixels = self._pixelization.pixels(region)
1001 if self.config.query_per_spatial_part:
1002 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
1004 pixels_str =
",".join([
str(pix)
for pix
in pixels])
1005 return [(f
'"apdb_part" IN ({pixels_str})', ())]
1007 def _temporal_where(
1012 query_per_time_part: Optional[bool] =
None,
1013 ) -> Tuple[List[str], List[Tuple[str, Tuple]]]:
1014 """Generate table names and expressions for temporal part of WHERE
1019 table : `ApdbTables`
1020 Table to select from.
1022 Starting Datetime of MJD value of the time range.
1024 Starting Datetime of MJD value of the time range.
1025 query_per_time_part : `bool`, optional
1026 If
None then use ``query_per_time_part``
from configuration.
1030 tables : `list` [ `str` ]
1031 List of the table names to query.
1032 expressions : `list` [ `tuple` ]
1033 A list of zero
or more (expression, parameters) tuples.
1036 temporal_where: List[Tuple[str, Tuple]] = []
1037 table_name = self._schema.tableName(table)
1038 time_part_start = self._time_partition(start_time)
1039 time_part_end = self._time_partition(end_time)
1040 time_parts = list(range(time_part_start, time_part_end + 1))
1041 if self.config.time_partition_tables:
1042 tables = [f
"{table_name}_{part}" for part
in time_parts]
1044 tables = [table_name]
1045 if query_per_time_part
is None:
1046 query_per_time_part = self.config.query_per_time_part
1047 if query_per_time_part:
1049 (
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts
1052 time_part_list =
",".join([
str(part)
for part
in time_parts])
1053 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1055 return tables, temporal_where
std::vector< SchemaItem< Flag > > * items
table::PointKey< int > pixel
Class for handling dates/times, including MJD, UTC, and TAI.
str translate(self, str private_ip)
def __init__(self, List[str] public_ips, List[str] private_ips)
int countUnassociatedObjects(self)
pandas.DataFrame getSSObjects(self)
def __init__(self, ApdbCassandraConfig config)
pandas.DataFrame getDiaForcedSourcesHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame getDiaSourcesHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
cassandra.query.PreparedStatement _prep_statement(self, str query)
Optional[pandas.DataFrame] getDiaForcedSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
None reassignDiaSources(self, Mapping[int, int] idMap)
Iterator[Tuple[cassandra.query.Statement, Tuple]] _combine_where(self, str prefix, List[Tuple[str, Tuple]] where1, List[Tuple[str, Tuple]] where2, Optional[str] suffix=None)
pandas.DataFrame _getSourcesHistory(self, ApdbTables table, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, dafBase.DateTime visit_time)
None makeSchema(self, bool drop=False)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeObjectsPandas(self, pandas.DataFrame objects, Union[ApdbTables, ExtraTables] table_name, Optional[Mapping] extra_columns=None, Optional[int] time_part=None)
Tuple[List[str], List[Tuple[str, Tuple]]] _temporal_where(self, ApdbTables table, Union[float, dafBase.DateTime] start_time, Union[float, dafBase.DateTime] end_time, Optional[bool] query_per_time_part=None)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
Optional[TableDef] tableDef(self, ApdbTables table)
int _time_partition(self, Union[float, dafBase.DateTime] time)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time)
Mapping[Any, ExecutionProfile] _makeProfiles(self, ApdbCassandraConfig config)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, dafBase.DateTime visit_time)
Optional[pandas.DataFrame] getDiaSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
_partition_zero_epoch_mjd
pandas.DataFrame getDiaObjectsHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
List[Tuple[str, Tuple]] _spatial_where(self, Optional[sphgeom.Region] region, bool use_ranges=False)
pandas.DataFrame _getSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set
str quote_id(str columnName)
Union[pandas.DataFrame, List] select_concurrent(Session session, List[Tuple] statements, str execution_profile, int concurrency)