22from __future__
import annotations
24__all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
27from typing
import Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional, Set, Tuple, Union, cast
36 import cassandra.query
37 from cassandra.cluster
import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile
38 from cassandra.policies
import AddressTranslator, RoundRobinPolicy, WhiteListRoundRobinPolicy
39 CASSANDRA_IMPORTED =
True
41 CASSANDRA_IMPORTED =
False
45from felis.simple
import Table
46from lsst
import sphgeom
48from lsst.utils.iteration
import chunk_iterable
50from .apdb
import Apdb, ApdbConfig
51from .apdbCassandraSchema
import ApdbCassandraSchema, ExtraTables
52from .apdbSchema
import ApdbTables
53from .cassandra_utils
import literal, pandas_dataframe_factory, quote_id, raw_data_factory, select_concurrent
54from .pixelization
import Pixelization
55from .timer
import Timer
57_LOG = logging.getLogger(__name__)
62 super().
__init__(
"cassandra-driver module cannot be imported")
67 contact_points = ListField[str](
68 doc=
"The list of contact points to try connecting for cluster discovery.",
71 private_ips = ListField[str](
72 doc=
"List of internal IP addresses for contact_points.",
75 keyspace = Field[str](
76 doc=
"Default keyspace for operations.",
79 read_consistency = Field[str](
80 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.",
83 write_consistency = Field[str](
84 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.",
87 read_timeout = Field[float](
88 doc=
"Timeout in seconds for read operations.",
91 write_timeout = Field[float](
92 doc=
"Timeout in seconds for write operations.",
95 read_concurrency = Field[int](
96 doc=
"Concurrency level for read operations.",
99 protocol_version = Field[int](
100 doc=
"Cassandra protocol version to use, default is V4",
101 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0
103 dia_object_columns = ListField[str](
104 doc=
"List of columns to read from DiaObject, by default read all columns",
108 doc=
"Prefix to add to table names",
111 part_pixelization = ChoiceField[str](
112 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
113 doc=
"Pixelization used for partitioning index.",
116 part_pix_level = Field[int](
117 doc=
"Pixelization level used for partitioning index.",
120 part_pix_max_ranges = Field[int](
121 doc=
"Max number of ranges in pixelization envelope",
124 ra_dec_columns = ListField[str](
125 default=[
"ra",
"decl"],
126 doc=
"Names ra/dec columns in DiaObject table"
129 doc=
"If True then print/log timing information",
132 time_partition_tables = Field[bool](
133 doc=
"Use per-partition tables for sources instead of partitioning by time",
136 time_partition_days = Field[int](
137 doc=
"Time partitioning granularity in days, this value must not be changed"
138 " after database is initialized",
141 time_partition_start = Field[str](
142 doc=
"Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI."
143 " This is used only when time_partition_tables is True.",
144 default=
"2018-12-01T00:00:00"
146 time_partition_end = Field[str](
147 doc=
"Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss format, in TAI"
148 " This is used only when time_partition_tables is True.",
149 default=
"2030-01-01T00:00:00"
151 query_per_time_part = Field[bool](
153 doc=
"If True then build separate query for each time partition, otherwise build one single query. "
154 "This is only used when time_partition_tables is False in schema config."
156 query_per_spatial_part = Field[bool](
158 doc=
"If True then build one query per spacial partition, otherwise build single query. "
160 pandas_delay_conv = Field[bool](
162 doc=
"If True then combine result rows before converting to pandas. "
166if CASSANDRA_IMPORTED:
169 """Translate internal IP address to external.
171 Only used for docker-based setup,
not viable long-term solution.
173 def __init__(self, public_ips: List[str], private_ips: List[str]):
174 self.
_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
177 return self.
_map.get(private_ip, private_ip)
181 """Implementation of APDB database on to of Apache Cassandra.
183 The implementation is configured via standard ``pex_config`` mechanism
184 using `ApdbCassandraConfig` configuration
class. For an example of
185 different configurations check config/ folder.
189 config : `ApdbCassandraConfig`
190 Configuration object.
194 """Start time for partition 0, this should never be changed."""
198 if not CASSANDRA_IMPORTED:
204 _LOG.debug(
"ApdbCassandra Configuration:")
206 _LOG.debug(
" %s: %s", key, value)
209 config.part_pixelization, config.part_pix_level, config.part_pix_max_ranges
212 addressTranslator: Optional[AddressTranslator] =
None
213 if config.private_ips:
219 contact_points=self.
config.contact_points,
220 address_translator=addressTranslator,
221 protocol_version=self.
config.protocol_version)
224 self.
_session.default_fetch_size =
None
228 schema_file=self.
config.schema_file,
229 schema_name=self.
config.schema_name,
230 prefix=self.
config.prefix,
231 time_partition_tables=self.
config.time_partition_tables)
235 self._prepared_statements: Dict[str, cassandra.query.PreparedStatement] = {}
240 def tableDef(self, table: ApdbTables) -> Optional[Table]:
242 return self.
_schema.tableSchemas.get(table)
247 if self.
config.time_partition_tables:
262 _LOG.debug(
"getDiaObjects: #partitions: %s", len(sp_where))
264 table_name = self.
_schema.tableName(ApdbTables.DiaObjectLast)
265 query = f
'SELECT * from "{self._keyspace}"."{table_name}"'
266 statements: List[Tuple] = []
267 for where, params
in sp_where:
268 full_query = f
"{query} WHERE {where}"
275 statement = cassandra.query.SimpleStatement(full_query)
276 statements.append((statement, params))
277 _LOG.debug(
"getDiaObjects: #queries: %s", len(statements))
283 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
287 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
291 object_ids: Optional[Iterable[int]],
294 months = self.
config.read_sources_months
297 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
298 mjd_start = mjd_end - months*30
300 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaSource)
303 object_ids: Optional[Iterable[int]],
306 months = self.
config.read_forced_sources_months
309 mjd_end = visit_time.get(system=dafBase.DateTime.MJD)
310 mjd_start = mjd_end - months*30
312 return self.
_getSources(region, object_ids, mjd_start, mjd_end, ApdbTables.DiaForcedSource)
315 start_time: dafBase.DateTime,
321 tables, temporal_where = self.
_temporal_where(ApdbTables.DiaObject, start_time, end_time,
True)
324 statements: List[Tuple] = []
326 prefix = f
'SELECT * from "{self._keyspace}"."{table}"'
327 statements +=
list(self.
_combine_where(prefix, sp_where, temporal_where,
"ALLOW FILTERING"))
328 _LOG.debug(
"getDiaObjectsHistory: #queries: %s", len(statements))
331 with Timer(
"DiaObject history", self.
config.timer):
335 self.
_session, statements,
"read_pandas_multi", self.
config.read_concurrency
340 validity_start = start_time.toPython()
341 validity_end = end_time.toPython()
344 catalog[(catalog[
"validityStart"] >= validity_start) & (catalog[
"validityStart"] < validity_end)]
347 _LOG.debug(
"found %d DiaObjects", catalog.shape[0])
351 start_time: dafBase.DateTime,
358 start_time: dafBase.DateTime,
362 return self.
_getSourcesHistory(ApdbTables.DiaForcedSource, start_time, end_time, region)
366 tableName = self.
_schema.tableName(ApdbTables.SSObject)
367 query = f
'SELECT * from "{self._keyspace}"."{tableName}"'
371 result = self.
_session.execute(query, execution_profile=
"read_pandas")
372 objects = result._current_rows
374 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
378 visit_time: dafBase.DateTime,
379 objects: pandas.DataFrame,
380 sources: Optional[pandas.DataFrame] =
None,
381 forced_sources: Optional[pandas.DataFrame] =
None) ->
None:
388 if sources
is not None:
394 if forced_sources
is not None:
396 self.
_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time)
409 table_name = self.
_schema.tableName(ExtraTables.DiaSourceToPartition)
411 selects: List[Tuple] = []
412 for ids
in chunk_iterable(idMap.keys(), 1_000):
413 ids_str =
",".join(
str(item)
for item
in ids)
415 (f
'SELECT "diaSourceId", "apdb_part", "apdb_time_part" FROM "{self._keyspace}"."{table_name}"'
416 f
' WHERE "diaSourceId" IN ({ids_str})'),
422 List[Tuple[int, int, int]],
423 select_concurrent(self.
_session, selects,
"read_tuples", self.
config.read_concurrency)
427 id2partitions: Dict[int, Tuple[int, int]] = {}
429 id2partitions[row[0]] = row[1:]
432 if set(id2partitions) !=
set(idMap):
433 missing =
",".join(
str(item)
for item
in set(idMap) -
set(id2partitions))
434 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
436 queries = cassandra.query.BatchStatement()
437 table_name = self.
_schema.tableName(ApdbTables.DiaSource)
438 for diaSourceId, ssObjectId
in idMap.items():
439 apdb_part, apdb_time_part = id2partitions[diaSourceId]
441 if self.
config.time_partition_tables:
443 f
'UPDATE "{self._keyspace}"."{table_name}_{apdb_time_part}"'
444 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
445 ' WHERE "apdb_part" = ? AND "diaSourceId" = ?'
447 values = (ssObjectId, apdb_part, diaSourceId)
450 f
'UPDATE "{self._keyspace}"."{table_name}"'
451 ' SET "ssObjectId" = ?, "diaObjectId" = NULL'
452 ' WHERE "apdb_part" = ? AND "apdb_time_part" = ? AND "diaSourceId" = ?'
454 values = (ssObjectId, apdb_part, apdb_time_part, diaSourceId)
457 _LOG.debug(
"%s: will update %d records", table_name, len(idMap))
458 with Timer(table_name +
' update', self.
config.timer):
459 self.
_session.execute(queries, execution_profile=
"write")
469 raise NotImplementedError()
471 def _makeProfiles(self, config: ApdbCassandraConfig) -> Mapping[Any, ExecutionProfile]:
472 """Make all execution profiles used in the code."""
474 if config.private_ips:
475 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
477 loadBalancePolicy = RoundRobinPolicy()
479 pandas_row_factory: Callable
480 if not config.pandas_delay_conv:
481 pandas_row_factory = pandas_dataframe_factory
483 pandas_row_factory = raw_data_factory
485 read_tuples_profile = ExecutionProfile(
486 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
487 request_timeout=config.read_timeout,
488 row_factory=cassandra.query.tuple_factory,
489 load_balancing_policy=loadBalancePolicy,
491 read_pandas_profile = ExecutionProfile(
492 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
493 request_timeout=config.read_timeout,
494 row_factory=pandas_dataframe_factory,
495 load_balancing_policy=loadBalancePolicy,
497 read_pandas_multi_profile = ExecutionProfile(
498 consistency_level=getattr(cassandra.ConsistencyLevel, config.read_consistency),
499 request_timeout=config.read_timeout,
500 row_factory=pandas_row_factory,
501 load_balancing_policy=loadBalancePolicy,
503 write_profile = ExecutionProfile(
504 consistency_level=getattr(cassandra.ConsistencyLevel, config.write_consistency),
505 request_timeout=config.write_timeout,
506 load_balancing_policy=loadBalancePolicy,
509 default_profile = ExecutionProfile(
510 load_balancing_policy=loadBalancePolicy,
513 "read_tuples": read_tuples_profile,
514 "read_pandas": read_pandas_profile,
515 "read_pandas_multi": read_pandas_multi_profile,
516 "write": write_profile,
517 EXEC_PROFILE_DEFAULT: default_profile,
520 def _getSources(self, region: sphgeom.Region,
521 object_ids: Optional[Iterable[int]],
524 table_name: ApdbTables) -> pandas.DataFrame:
525 """Returns catalog of DiaSource instances given set of DiaObject IDs.
532 Collection of DiaObject IDs
534 Lower bound of time interval.
536 Upper bound of time interval.
537 table_name : `ApdbTables`
542 catalog : `pandas.DataFrame`, or `
None`
543 Catalog contaning DiaSource records. Empty catalog
is returned
if
544 ``object_ids``
is empty.
546 object_id_set: Set[int] = set()
547 if object_ids
is not None:
548 object_id_set =
set(object_ids)
549 if len(object_id_set) == 0:
550 return self._make_empty_catalog(table_name)
552 sp_where = self._spatial_where(region)
553 tables, temporal_where = self._temporal_where(table_name, mjd_start, mjd_end)
556 statements: List[Tuple] = []
558 prefix = f
'SELECT * from "{self._keyspace}"."{table}"'
559 statements +=
list(self._combine_where(prefix, sp_where, temporal_where))
560 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
562 with Timer(table_name.name +
' select', self.config.timer):
566 self._session, statements,
"read_pandas_multi", self.config.read_concurrency
571 if len(object_id_set) > 0:
572 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
575 catalog = cast(pandas.DataFrame, catalog[catalog[
"midPointTai"] > mjd_start])
577 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
580 def _getSourcesHistory(
586 ) -> pandas.DataFrame:
587 """Returns catalog of DiaSource instances given set of DiaObject IDs.
594 Starting time for DiaSource history search. DiaSource record
is
595 selected when its ``midPointTai`` falls into an interval between
596 ``start_time`` (inclusive)
and ``end_time`` (exclusive).
598 Upper limit on time
for DiaSource history search.
604 catalog : `pandas.DataFrame`
605 Catalog contaning DiaSource records.
607 sp_where = self._spatial_where(region, use_ranges=False)
608 tables, temporal_where = self._temporal_where(table, start_time, end_time,
True)
611 statements: List[Tuple] = []
612 for table_name
in tables:
613 prefix = f
'SELECT * from "{self._keyspace}"."{table_name}"'
614 statements +=
list(self._combine_where(prefix, sp_where, temporal_where,
"ALLOW FILTERING"))
615 _LOG.debug(
"getDiaObjectsHistory: #queries: %s", len(statements))
618 with Timer(f
"{table.name} history", self.config.timer):
622 self._session, statements,
"read_pandas_multi", self.config.read_concurrency
627 period_start = start_time.get(system=dafBase.DateTime.MJD)
628 period_end = end_time.get(system=dafBase.DateTime.MJD)
631 catalog[(catalog[
"midPointTai"] >= period_start) & (catalog[
"midPointTai"] < period_end)]
634 _LOG.debug(
"found %d %ss", catalog.shape[0], table.name)
637 def _storeDiaObjects(self, objs: pandas.DataFrame, visit_time:
dafBase.DateTime) ->
None:
638 """Store catalog of DiaObjects from current visit.
642 objs : `pandas.DataFrame`
643 Catalog with DiaObject records
645 Time of the current visit.
647 visit_time_dt = visit_time.toPython()
648 extra_columns = dict(lastNonForcedSource=visit_time_dt)
649 self._storeObjectsPandas(objs, ApdbTables.DiaObjectLast, extra_columns=extra_columns)
651 extra_columns["validityStart"] = visit_time_dt
652 time_part: Optional[int] = self._time_partition(visit_time)
653 if not self.config.time_partition_tables:
654 extra_columns[
"apdb_time_part"] = time_part
657 self._storeObjectsPandas(objs, ApdbTables.DiaObject, extra_columns=extra_columns, time_part=time_part)
659 def _storeDiaSources(self, table_name: ApdbTables, sources: pandas.DataFrame,
661 """Store catalog of DIASources or DIAForcedSources from current visit.
665 sources : `pandas.DataFrame`
666 Catalog containing DiaSource records
668 Time of the current visit.
670 time_part: Optional[int] = self._time_partition(visit_time)
672 if not self.config.time_partition_tables:
673 extra_columns[
"apdb_time_part"] = time_part
676 self._storeObjectsPandas(sources, table_name, extra_columns=extra_columns, time_part=time_part)
678 def _storeDiaSourcesPartitions(self, sources: pandas.DataFrame, visit_time:
dafBase.DateTime) ->
None:
679 """Store mapping of diaSourceId to its partitioning values.
683 sources : `pandas.DataFrame`
684 Catalog containing DiaSource records
686 Time of the current visit.
688 id_map = cast(pandas.DataFrame, sources[["diaSourceId",
"apdb_part"]])
690 "apdb_time_part": self._time_partition(visit_time),
693 self._storeObjectsPandas(
694 id_map, ExtraTables.DiaSourceToPartition, extra_columns=extra_columns, time_part=
None
697 def _storeObjectsPandas(self, objects: pandas.DataFrame, table_name: Union[ApdbTables, ExtraTables],
698 extra_columns: Optional[Mapping] =
None,
699 time_part: Optional[int] =
None) ->
None:
700 """Generic store method.
702 Takes catalog of records and stores a bunch of objects
in a table.
706 objects : `pandas.DataFrame`
707 Catalog containing object records
708 table_name : `ApdbTables`
709 Name of the table
as defined
in APDB schema.
710 extra_columns : `dict`, optional
711 Mapping (column_name, column_value) which gives column values to add
712 to every row, only
if column
is missing
in catalog records.
713 time_part : `int`, optional
714 If
not `
None` then insert into a per-partition table.
717 if extra_columns
is None:
719 extra_fields =
list(extra_columns.keys())
722 column
for column
in objects.columns
if column
not in extra_fields
725 column_map = self._schema.getColumnMap(table_name)
727 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
728 fields += extra_fields
731 required_columns = self._schema.partitionColumns(table_name) \
732 + self._schema.clusteringColumns(table_name)
733 missing_columns = [column
for column
in required_columns
if column
not in fields]
735 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
737 qfields = [quote_id(field)
for field
in fields]
738 qfields_str =
','.join(qfields)
740 with Timer(table_name.name +
' query build', self.config.timer):
742 table = self._schema.tableName(table_name)
743 if time_part
is not None:
744 table = f
"{table}_{time_part}"
746 holders =
','.join([
'?']*len(qfields))
747 query = f
'INSERT INTO "{self._keyspace}"."{table}" ({qfields_str}) VALUES ({holders})'
748 statement = self._prep_statement(query)
749 queries = cassandra.query.BatchStatement()
750 for rec
in objects.itertuples(index=
False):
752 for field
in df_fields:
753 if field
not in column_map:
755 value = getattr(rec, field)
756 if column_map[field].datatype
is felis.types.Timestamp:
757 if isinstance(value, pandas.Timestamp):
758 value = literal(value.to_pydatetime())
762 value = int(value*1000)
763 values.append(literal(value))
764 for field
in extra_fields:
765 value = extra_columns[field]
766 values.append(literal(value))
767 queries.add(statement, values)
769 _LOG.debug(
"%s: will store %d records", self._schema.tableName(table_name), objects.shape[0])
770 with Timer(table_name.name +
' insert', self.config.timer):
771 self._session.execute(queries, timeout=self.config.write_timeout, execution_profile=
"write")
773 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
774 """Calculate spacial partition for each record and add it to a
779 This overrides any existing column in a DataFrame
with the same name
780 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
784 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
785 ra_col, dec_col = self.config.ra_dec_columns
786 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
788 idx = self._pixelization.pixel(uv3d)
791 df[
"apdb_part"] = apdb_part
794 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
795 """Add apdb_part column to DiaSource catalog.
799 This method copies apdb_part value from a matching DiaObject record.
800 DiaObject catalog needs to have a apdb_part column filled by
801 ``_add_obj_part`` method
and DiaSource records need to be
802 associated to DiaObjects via ``diaObjectId`` column.
804 This overrides any existing column
in a DataFrame
with the same name
805 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
808 pixel_id_map: Dict[int, int] = {
809 diaObjectId: apdb_part for diaObjectId, apdb_part
810 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
812 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
813 ra_col, dec_col = self.config.ra_dec_columns
814 for i, (diaObjId, ra, dec)
in enumerate(zip(sources[
"diaObjectId"],
815 sources[ra_col], sources[dec_col])):
821 idx = self._pixelization.pixel(uv3d)
824 apdb_part[i] = pixel_id_map[diaObjId]
825 sources = sources.copy()
826 sources[
"apdb_part"] = apdb_part
829 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
830 """Add apdb_part column to DiaForcedSource catalog.
834 This method copies apdb_part value from a matching DiaObject record.
835 DiaObject catalog needs to have a apdb_part column filled by
836 ``_add_obj_part`` method
and DiaSource records need to be
837 associated to DiaObjects via ``diaObjectId`` column.
839 This overrides any existing column
in a DataFrame
with the same name
840 (apdb_part). Original DataFrame
is not changed, copy of a DataFrame
is
843 pixel_id_map: Dict[int, int] = {
844 diaObjectId: apdb_part for diaObjectId, apdb_part
845 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
847 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
848 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
849 apdb_part[i] = pixel_id_map[diaObjId]
850 sources = sources.copy()
851 sources[
"apdb_part"] = apdb_part
855 """Calculate time partiton number for a given time.
860 Time
for which to calculate partition number. Can be float to mean
866 Partition number
for a given time.
869 mjd = time.get(system=dafBase.DateTime.MJD)
872 days_since_epoch = mjd - self._partition_zero_epoch_mjd
873 partition = int(days_since_epoch) // self.config.time_partition_days
876 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
877 """Make an empty catalog for a table with a given name.
881 table_name : `ApdbTables`
886 catalog : `pandas.DataFrame`
889 table = self._schema.tableSchemas[table_name]
892 columnDef.name: pandas.Series(dtype=self._schema.column_dtype(columnDef.datatype))
893 for columnDef
in table.columns
895 return pandas.DataFrame(data)
897 def _prep_statement(self, query: str) -> cassandra.query.PreparedStatement:
898 """Convert query string into prepared statement."""
899 stmt = self._prepared_statements.get(query)
901 stmt = self._session.prepare(query)
902 self._prepared_statements[query] = stmt
908 where1: List[Tuple[str, Tuple]],
909 where2: List[Tuple[str, Tuple]],
910 suffix: Optional[str] =
None,
911 ) -> Iterator[Tuple[cassandra.query.Statement, Tuple]]:
912 """Make cartesian product of two parts of WHERE clause into a series
913 of statements to execute.
918 Initial statement prefix that comes before WHERE clause, e.g.
919 "SELECT * from Table"
927 for expr1, params1
in where1:
928 for expr2, params2
in where2:
936 full_query +=
" WHERE " +
" AND ".join(wheres)
938 full_query +=
" " + suffix
939 params = params1 + params2
941 statement = self._prep_statement(full_query)
946 statement = cassandra.query.SimpleStatement(full_query)
947 yield (statement, params)
951 ) -> List[Tuple[str, Tuple]]:
952 """Generate expressions for spatial part of WHERE clause.
957 Spatial region for query results.
959 If
True then use pixel ranges (
"apdb_part >= p1 AND apdb_part <=
960 p2") instead of exact list of pixels. Should be set to True for
961 large regions covering very many pixels.
965 expressions : `list` [ `tuple` ]
966 Empty list is returned
if ``region``
is `
None`, otherwise a list
967 of one
or more (expression, parameters) tuples
972 pixel_ranges = self._pixelization.envelope(region)
973 expressions: List[Tuple[str, Tuple]] = []
974 for lower, upper
in pixel_ranges:
977 expressions.append((
'"apdb_part" = ?', (lower, )))
979 expressions.append((
'"apdb_part" >= ? AND "apdb_part" <= ?', (lower, upper)))
982 pixels = self._pixelization.pixels(region)
983 if self.config.query_per_spatial_part:
984 return [(
'"apdb_part" = ?', (pixel,))
for pixel
in pixels]
986 pixels_str =
",".join([
str(pix)
for pix
in pixels])
987 return [(f
'"apdb_part" IN ({pixels_str})', ())]
994 query_per_time_part: Optional[bool] =
None,
995 ) -> Tuple[List[str], List[Tuple[str, Tuple]]]:
996 """Generate table names and expressions for temporal part of WHERE
1001 table : `ApdbTables`
1002 Table to select from.
1004 Starting Datetime of MJD value of the time range.
1006 Starting Datetime of MJD value of the time range.
1007 query_per_time_part : `bool`, optional
1008 If
None then use ``query_per_time_part``
from configuration.
1012 tables : `list` [ `str` ]
1013 List of the table names to query.
1014 expressions : `list` [ `tuple` ]
1015 A list of zero
or more (expression, parameters) tuples.
1018 temporal_where: List[Tuple[str, Tuple]] = []
1019 table_name = self._schema.tableName(table)
1020 time_part_start = self._time_partition(start_time)
1021 time_part_end = self._time_partition(end_time)
1022 time_parts = list(range(time_part_start, time_part_end + 1))
1023 if self.config.time_partition_tables:
1024 tables = [f
"{table_name}_{part}" for part
in time_parts]
1026 tables = [table_name]
1027 if query_per_time_part
is None:
1028 query_per_time_part = self.config.query_per_time_part
1029 if query_per_time_part:
1031 (
'"apdb_time_part" = ?', (time_part,))
for time_part
in time_parts
1034 time_part_list =
",".join([
str(part)
for part
in time_parts])
1035 temporal_where = [(f
'"apdb_time_part" IN ({time_part_list})', ())]
1037 return tables, temporal_where
std::vector< SchemaItem< Flag > > * items
Class for handling dates/times, including MJD, UTC, and TAI.
str translate(self, str private_ip)
def __init__(self, List[str] public_ips, List[str] private_ips)
int countUnassociatedObjects(self)
pandas.DataFrame getSSObjects(self)
def __init__(self, ApdbCassandraConfig config)
pandas.DataFrame getDiaForcedSourcesHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame getDiaSourcesHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
cassandra.query.PreparedStatement _prep_statement(self, str query)
Optional[pandas.DataFrame] getDiaForcedSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
None reassignDiaSources(self, Mapping[int, int] idMap)
Iterator[Tuple[cassandra.query.Statement, Tuple]] _combine_where(self, str prefix, List[Tuple[str, Tuple]] where1, List[Tuple[str, Tuple]] where2, Optional[str] suffix=None)
pandas.DataFrame _getSourcesHistory(self, ApdbTables table, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, dafBase.DateTime visit_time)
None makeSchema(self, bool drop=False)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeObjectsPandas(self, pandas.DataFrame objects, Union[ApdbTables, ExtraTables] table_name, Optional[Mapping] extra_columns=None, Optional[int] time_part=None)
Tuple[List[str], List[Tuple[str, Tuple]]] _temporal_where(self, ApdbTables table, Union[float, dafBase.DateTime] start_time, Union[float, dafBase.DateTime] end_time, Optional[bool] query_per_time_part=None)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
int _time_partition(self, Union[float, dafBase.DateTime] time)
Optional[Table] tableDef(self, ApdbTables table)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time)
Mapping[Any, ExecutionProfile] _makeProfiles(self, ApdbCassandraConfig config)
None _storeDiaSourcesPartitions(self, pandas.DataFrame sources, dafBase.DateTime visit_time)
Optional[pandas.DataFrame] getDiaSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
_partition_zero_epoch_mjd
pandas.DataFrame getDiaObjectsHistory(self, dafBase.DateTime start_time, dafBase.DateTime end_time, Optional[sphgeom.Region] region=None)
List[Tuple[str, Tuple]] _spatial_where(self, Optional[sphgeom.Region] region, bool use_ranges=False)
pandas.DataFrame _getSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, float mjd_start, float mjd_end, ApdbTables table_name)
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set