22 from __future__ 
import annotations
 
   24 __all__ = [
"ApdbCassandraConfig", 
"ApdbCassandra"]
 
   26 from datetime 
import datetime, timedelta
 
   30 from typing 
import cast, Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Union
 
   41     from cassandra.cluster 
import Cluster
 
   42     from cassandra.concurrent 
import execute_concurrent
 
   43     from cassandra.policies 
import RoundRobinPolicy, WhiteListRoundRobinPolicy, AddressTranslator
 
   44     import cassandra.query
 
   45     CASSANDRA_IMPORTED = 
True 
   47     CASSANDRA_IMPORTED = 
False 
   51 from lsst 
import sphgeom
 
   52 from .timer 
import Timer
 
   53 from .apdb 
import Apdb, ApdbConfig
 
   54 from .apdbSchema 
import ApdbTables, ColumnDef, TableDef
 
   55 from .apdbCassandraSchema 
import ApdbCassandraSchema
 
   58 _LOG = logging.getLogger(__name__)
 
   63         super().
__init__(
"cassandra-driver module cannot be imported")
 
   70         doc=
"The list of contact points to try connecting for cluster discovery.",
 
   75         doc=
"List of internal IP addresses for contact_points.",
 
   80         doc=
"Default keyspace for operations.",
 
   85         doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.",
 
   90         doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.",
 
   95         doc=
"Timeout in seconds for read operations.",
 
  100         doc=
"Timeout in seconds for write operations.",
 
  105         doc=
"Concurrency level for read operations.",
 
  110         doc=
"Cassandra protocol version to use, default is V4",
 
  111         default=cassandra.ProtocolVersion.V4 
if CASSANDRA_IMPORTED 
else 0
 
  115         doc=
"List of columns to read from DiaObject, by default read all columns",
 
  120         doc=
"Prefix to add to table names",
 
  125         allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
 
  126         doc=
"Pixelization used for partitioning index.",
 
  131         doc=
"Pixelization level used for partitioning index.",
 
  136         default=[
"ra", 
"decl"],
 
  137         doc=
"Names ra/dec columns in DiaObject table" 
  141         doc=
"If True then print/log timing information",
 
  146         doc=
"Use per-partition tables for sources instead of partitioning by time",
 
  151         doc=
"Time partitoning granularity in days, this value must not be changed" 
  152             " after database is initialized",
 
  157         doc=
"Starting time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI." 
  158             " This is used only when time_partition_tables is True.",
 
  159         default=
"2018-12-01T00:00:00" 
  163         doc=
"Ending time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI" 
  164             " This is used only when time_partition_tables is True.",
 
  165         default=
"2030-01-01T00:00:00" 
  170         doc=
"If True then build separate query for each time partition, otherwise build one single query. " 
  171             "This is only used when time_partition_tables is False in schema config." 
  176         doc=
"If True then build one query per spacial partition, otherwise build single query. " 
  181         doc=
"If True then combine result rows before converting to pandas. " 
  185         allowed=dict(none=
"No field packing", cbor=
"Pack using CBOR"),
 
  186         doc=
"Packing method for table records.",
 
  192         doc=
"If True use Cassandra prepared statements." 
  197     """Class that calculates indices of the objects for partitioning. 
  199     Used internally by `ApdbCassandra` 
  203     config : `ApdbCassandraConfig` 
  206         pix = config.part_pixelization
 
  214             raise ValueError(f
"unknown pixelization: {pix}")
 
  216     def pixels(self, region: sphgeom.Region) -> List[int]:
 
  217         """Compute set of the pixel indices for given region. 
  221         region : `lsst.sphgeom.Region` 
  224         ranges = self.
pixelatorpixelator.envelope(region, 1_000_000)
 
  226         for lower, upper 
in ranges:
 
  227             indices += 
list(range(lower, upper))
 
  230     def pixel(self, direction: sphgeom.UnitVector3d) -> int:
 
  231         """Compute the index of the pixel for given direction. 
  235         direction : `lsst.sphgeom.UnitVector3d` 
  237         index = self.
pixelatorpixelator.index(direction)
 
  241 if CASSANDRA_IMPORTED:
 
  244         """Translate internal IP address to external. 
  246         Only used for docker-based setup, not viable long-term solution. 
  248         def __init__(self, public_ips: List[str], private_ips: List[str]):
 
  249             self.
_map_map = dict((k, v) 
for k, v 
in zip(private_ips, public_ips))
 
  252             return self.
_map_map.get(private_ip, private_ip)
 
  255 def _rows_to_pandas(colnames: List[str], rows: List[Tuple],
 
  256                     packedColumns: List[ColumnDef]) -> pandas.DataFrame:
 
  257     """Convert result rows to pandas. 
  259     Unpacks BLOBs that were packed on insert. 
  263     colname : `list` [ `str` ] 
  264         Names of the columns. 
  265     rows : `list` of `tuple` 
  267     packedColumns : `list` [ `ColumnDef` ] 
  268         Column definitions for packed columns. 
  272     catalog : `pandas.DataFrame` 
  273         DataFrame with the result set. 
  276         idx = colnames.index(
"apdb_packed")
 
  279         return pandas.DataFrame.from_records(rows, columns=colnames)
 
  282     df = pandas.DataFrame.from_records(rows, columns=colnames, exclude=[
"apdb_packed"])
 
  288         if blob[:5] == b
"cbor:":
 
  289             blob = cbor.loads(blob[5:])
 
  291             raise ValueError(
"Unexpected BLOB format: %r", blob)
 
  292         packed_rows.append(blob)
 
  295     packed = pandas.DataFrame.from_records(packed_rows, columns=[col.name 
for col 
in packedColumns])
 
  298     for col 
in packedColumns:
 
  299         if col.type == 
"DATETIME":
 
  300             packed[col.name] = pandas.to_datetime(packed[col.name], unit=
"ms", origin=
"unix")
 
  302     return pandas.concat([df, packed], axis=1)
 
  306     """Create pandas DataFrame from Cassandra result set. 
  310     packedColumns : `list` [ `ColumnDef` ] 
  311         Column definitions for packed columns. 
  313     def __init__(self, packedColumns: Iterable[ColumnDef]):
 
  316     def __call__(self, colnames: List[str], rows: List[Tuple]) -> pandas.DataFrame:
 
  317         """Convert result set into output catalog. 
  321         colname : `list` [ `str` ] 
  322             Names of the columns. 
  323         rows : `list` of `tuple` 
  328         catalog : `pandas.DataFrame` 
  329             DataFrame with the result set. 
  331         return _rows_to_pandas(colnames, rows, self.
packedColumnspackedColumns)
 
  335     """Row factory that makes no conversions. 
  339     packedColumns : `list` [ `ColumnDef` ] 
  340         Column definitions for packed columns. 
  342     def __init__(self, packedColumns: Iterable[ColumnDef]):
 
  345     def __call__(self, colnames: List[str], rows: List[Tuple]) -> Tuple[List[str], List[Tuple]]:
 
  346         """Return parameters without change. 
  350         colname : `list` of `str` 
  351             Names of the columns. 
  352         rows : `list` of `tuple` 
  357         colname : `list` of `str` 
  358             Names of the columns. 
  359         rows : `list` of `tuple` 
  362         return (colnames, rows)
 
  366     """Implementation of APDB database on to of Apache Cassandra. 
  368     The implementation is configured via standard ``pex_config`` mechanism 
  369     using `ApdbCassandraConfig` configuration class. For an example of 
  370     different configurations check config/ folder. 
  374     config : `ApdbCassandraConfig` 
  375         Configuration object. 
  379     """Start time for partition 0, this should never be changed.""" 
  383         if not CASSANDRA_IMPORTED:
 
  388         _LOG.debug(
"ApdbCassandra Configuration:")
 
  389         _LOG.debug(
"    read_consistency: %s", self.
configconfig.read_consistency)
 
  390         _LOG.debug(
"    write_consistency: %s", self.
configconfig.write_consistency)
 
  391         _LOG.debug(
"    read_sources_months: %s", self.
configconfig.read_sources_months)
 
  392         _LOG.debug(
"    read_forced_sources_months: %s", self.
configconfig.read_forced_sources_months)
 
  393         _LOG.debug(
"    dia_object_columns: %s", self.
configconfig.dia_object_columns)
 
  394         _LOG.debug(
"    schema_file: %s", self.
configconfig.schema_file)
 
  395         _LOG.debug(
"    extra_schema_file: %s", self.
configconfig.extra_schema_file)
 
  396         _LOG.debug(
"    schema prefix: %s", self.
configconfig.prefix)
 
  397         _LOG.debug(
"    part_pixelization: %s", self.
configconfig.part_pixelization)
 
  398         _LOG.debug(
"    part_pix_level: %s", self.
configconfig.part_pix_level)
 
  399         _LOG.debug(
"    query_per_time_part: %s", self.
configconfig.query_per_time_part)
 
  400         _LOG.debug(
"    query_per_spatial_part: %s", self.
configconfig.query_per_spatial_part)
 
  404         addressTranslator: Optional[AddressTranslator] = 
None 
  405         if config.private_ips:
 
  406             loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
 
  409             loadBalancePolicy = RoundRobinPolicy()
 
  411         self.
_read_consistency_read_consistency = getattr(cassandra.ConsistencyLevel, config.read_consistency)
 
  412         self.
_write_consistency_write_consistency = getattr(cassandra.ConsistencyLevel, config.write_consistency)
 
  414         self.
_cluster_cluster = Cluster(contact_points=self.
configconfig.contact_points,
 
  415                                 load_balancing_policy=loadBalancePolicy,
 
  416                                 address_translator=addressTranslator,
 
  417                                 protocol_version=self.
configconfig.protocol_version)
 
  418         self.
_session_session = self.
_cluster_cluster.connect(keyspace=config.keyspace)
 
  419         self.
_session_session.row_factory = cassandra.query.named_tuple_factory
 
  422                                            schema_file=self.
configconfig.schema_file,
 
  423                                            extra_schema_file=self.
configconfig.extra_schema_file,
 
  424                                            prefix=self.
configconfig.prefix,
 
  425                                            packing=self.
configconfig.packing,
 
  426                                            time_partition_tables=self.
configconfig.time_partition_tables)
 
  429     def tableDef(self, table: ApdbTables) -> Optional[TableDef]:
 
  431         return self.
_schema_schema.tableSchemas.get(table)
 
  436         if self.
configconfig.time_partition_tables:
 
  449         packedColumns = self.
_schema_schema.packedColumns(ApdbTables.DiaObjectLast)
 
  451         self.
_session_session.default_fetch_size = 
None 
  454         _LOG.debug(
"getDiaObjects: #partitions: %s", len(pixels))
 
  455         pixels_str = 
",".join([str(pix) 
for pix 
in pixels])
 
  457         queries: List[Tuple] = []
 
  458         query = f
'SELECT * from "DiaObjectLast" WHERE "apdb_part" IN ({pixels_str})' 
  459         queries += [(cassandra.query.SimpleStatement(query, consistency_level=self.
_read_consistency_read_consistency), {})]
 
  460         _LOG.debug(
"getDiaObjects: #queries: %s", len(queries))
 
  464         with Timer(
'DiaObject select', self.
configconfig.timer):
 
  466             futures = [self.
_session_session.execute_async(query, values, timeout=self.
configconfig.read_timeout)
 
  467                        for query, values 
in queries]
 
  469             dataframes = [future.result()._current_rows 
for future 
in futures]
 
  471             if len(dataframes) == 1:
 
  472                 objects = dataframes[0]
 
  474                 objects = pandas.concat(dataframes)
 
  476         _LOG.debug(
"found %s DiaObjects", objects.shape[0])
 
  480                       object_ids: Optional[Iterable[int]],
 
  483         return self.
_getSources_getSources(region, object_ids, visit_time, ApdbTables.DiaSource,
 
  484                                 self.
configconfig.read_sources_months)
 
  487                             object_ids: Optional[Iterable[int]],
 
  489         return self.
_getSources_getSources(region, object_ids, visit_time, ApdbTables.DiaForcedSource,
 
  490                                 self.
configconfig.read_forced_sources_months)
 
  492     def _getSources(self, region: sphgeom.Region,
 
  493                     object_ids: Optional[Iterable[int]],
 
  495                     table_name: ApdbTables,
 
  496                     months: int) -> Optional[pandas.DataFrame]:
 
  497         """Returns catalog of DiaSource instances given set of DiaObject IDs. 
  501         region : `lsst.sphgeom.Region` 
  504             Collection of DiaObject IDs 
  505         visit_time : `lsst.daf.base.DateTime` 
  506             Time of the current visit 
  507         table_name : `ApdbTables` 
  508             Name of the table, either "DiaSource" or "DiaForcedSource" 
  510             Number of months of history to return, if negative returns whole 
  511             history (Note: negative does not work with table-per-partition 
  516         catalog : `pandas.DataFrame`, or `None` 
  517             Catalog contaning DiaSource records. `None` is returned if 
  518             ``months`` is 0 or when ``object_ids`` is empty. 
  522         object_id_set: Set[int] = 
set()
 
  523         if object_ids 
is not None:
 
  524             object_id_set = 
set(object_ids)
 
  525             if len(object_id_set) == 0:
 
  528         packedColumns = self.
_schema_schema.packedColumns(table_name)
 
  529         if self.
configconfig.pandas_delay_conv:
 
  533         self.
_session_session.default_fetch_size = 
None 
  537         _LOG.debug(
"_getSources: %s #partitions: %s", table_name.name, len(pixels))
 
  541         if self.
configconfig.query_per_spatial_part:
 
  542             spatial_where = [f
'"apdb_part" = {pixel}' for pixel 
in pixels]
 
  544             pixels_str = 
",".join([str(pix) 
for pix 
in pixels])
 
  545             spatial_where = [f
'"apdb_part" IN ({pixels_str})']
 
  551         full_name = self.
_schema_schema.tableName(table_name)
 
  553         mjd_now = visit_time.get(system=dafBase.DateTime.MJD)
 
  554         mjd_begin = mjd_now - months*30
 
  557         time_parts = 
list(range(time_part_begin, time_part_now + 1))
 
  558         if self.
configconfig.time_partition_tables:
 
  559             tables = [f
"{full_name}_{part}" for part 
in time_parts]
 
  561             if self.
configconfig.query_per_time_part:
 
  562                 temporal_where = [f
'"apdb_time_part" = {time_part}' for time_part 
in time_parts]
 
  564                 time_part_list = 
",".join([str(part) 
for part 
in time_parts])
 
  565                 temporal_where = [f
'"apdb_time_part" IN ({time_part_list})']
 
  568         queries: List[str] = []
 
  570             query = f
'SELECT * from "{table}" WHERE ' 
  571             for spacial 
in spatial_where:
 
  573                     for temporal 
in temporal_where:
 
  574                         queries.append(query + spacial + 
" AND " + temporal)
 
  576                     queries.append(query + spacial)
 
  579         statements: List[Tuple] = [
 
  580             (cassandra.query.SimpleStatement(query, consistency_level=self.
_read_consistency_read_consistency), {})
 
  583         _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
 
  585         with Timer(table_name.name + 
' select', self.
configconfig.timer):
 
  587             results = execute_concurrent(self.
_session_session, statements, results_generator=
True,
 
  588                                          concurrency=self.
configconfig.read_concurrency)
 
  589             if self.
configconfig.pandas_delay_conv:
 
  590                 _LOG.debug(
"making pandas data frame out of rows/columns")
 
  593                 for success, result 
in results:
 
  594                     result = result._current_rows
 
  598                         elif columns != result[0]:
 
  599                             _LOG.error(
"different columns returned by queries: %s and %s",
 
  602                                 f
"diferent columns returned by queries: {columns} and {result[0]}" 
  606                         _LOG.error(
"error returned by query: %s", result)
 
  608                 catalog = _rows_to_pandas(columns, rows, self.
_schema_schema.packedColumns(table_name))
 
  609                 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
 
  611                 if len(object_id_set) > 0:
 
  612                     catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
 
  614                 _LOG.debug(
"making pandas data frame out of set of data frames")
 
  616                 for success, result 
in results:
 
  618                         dataframes.append(result._current_rows)
 
  620                         _LOG.error(
"error returned by query: %s", result)
 
  623                 if len(dataframes) == 1:
 
  624                     catalog = dataframes[0]
 
  626                     catalog = pandas.concat(dataframes)
 
  627                 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
 
  629                 if len(object_id_set) > 0:
 
  630                     catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
 
  633         catalog = cast(pandas.DataFrame, catalog[catalog[
"midPointTai"] > mjd_begin])
 
  635         _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
 
  639               visit_time: dafBase.DateTime,
 
  640               objects: pandas.DataFrame,
 
  641               sources: Optional[pandas.DataFrame] = 
None,
 
  642               forced_sources: Optional[pandas.DataFrame] = 
None) -> 
None:
 
  649         if sources 
is not None:
 
  652             self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaSource, sources, visit_time)
 
  654         if forced_sources 
is not None:
 
  655             forced_sources = self.
_add_fsrc_part_add_fsrc_part(forced_sources, objects)
 
  656             self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time)
 
  658     def _storeDiaObjects(self, objs: pandas.DataFrame, visit_time: 
dafBase.DateTime) -> 
None:
 
  659         """Store catalog of DiaObjects from current visit. 
  663         objs : `pandas.DataFrame` 
  664             Catalog with DiaObject records 
  665         visit_time : `lsst.daf.base.DateTime` 
  666             Time of the current visit. 
  668         visit_time_dt = visit_time.toPython()
 
  669         extra_columns = dict(lastNonForcedSource=visit_time_dt)
 
  670         self.
_storeObjectsPandas_storeObjectsPandas(objs, ApdbTables.DiaObjectLast, visit_time, extra_columns=extra_columns)
 
  672         extra_columns[
"validityStart"] = visit_time_dt
 
  673         time_part: Optional[int] = self.
_time_partition_time_partition(visit_time)
 
  674         if not self.
configconfig.time_partition_tables:
 
  675             extra_columns[
"apdb_time_part"] = time_part
 
  679                                  extra_columns=extra_columns, time_part=time_part)
 
  681     def _storeDiaSources(self, table_name: ApdbTables, sources: pandas.DataFrame,
 
  683         """Store catalog of DIASources or DIAForcedSources from current visit. 
  687         sources : `pandas.DataFrame` 
  688             Catalog containing DiaSource records 
  689         visit_time : `lsst.daf.base.DateTime` 
  690             Time of the current visit. 
  692         time_part: Optional[int] = self.
_time_partition_time_partition(visit_time)
 
  694         if not self.
configconfig.time_partition_tables:
 
  695             extra_columns[
"apdb_time_part"] = time_part
 
  699                                  extra_columns=extra_columns, time_part=time_part)
 
  707         raise NotImplementedError()
 
  709     def _storeObjectsPandas(self, objects: pandas.DataFrame, table_name: ApdbTables,
 
  711                             time_part: Optional[int] = 
None) -> 
None:
 
  712         """Generic store method. 
  714         Takes catalog of records and stores a bunch of objects in a table. 
  718         objects : `pandas.DataFrame` 
  719             Catalog containing object records 
  720         table_name : `ApdbTables` 
  721             Name of the table as defined in APDB schema. 
  722         visit_time : `lsst.daf.base.DateTime` 
  723             Time of the current visit. 
  724         extra_columns : `dict`, optional 
  725             Mapping (column_name, column_value) which gives column values to add 
  726             to every row, only if column is missing in catalog records. 
  727         time_part : `int`, optional 
  728             If not `None` then insert into a per-partition table. 
  731         def qValue(v: Any) -> Any:
 
  732             """Transform object into a value for query""" 
  735             elif isinstance(v, datetime):
 
  736                 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1))*1000
 
  737             elif isinstance(v, (bytes, str)):
 
  741                     if not np.isfinite(v):
 
  747         def quoteId(columnName: str) -> str:
 
  748             """Smart quoting for column names. 
  749             Lower-case names are not quoted. 
  751             if not columnName.islower():
 
  752                 columnName = 
'"' + columnName + 
'"' 
  756         if extra_columns 
is None:
 
  758         extra_fields = 
list(extra_columns.keys())
 
  760         df_fields = [column 
for column 
in objects.columns
 
  761                      if column 
not in extra_fields]
 
  763         column_map = self._schema.getColumnMap(table_name)
 
  765         fields = [column_map[field].name 
for field 
in df_fields 
if field 
in column_map]
 
  766         fields += extra_fields
 
  769         required_columns = self._schema.partitionColumns(table_name) \
 
  770             + self._schema.clusteringColumns(table_name)
 
  771         missing_columns = [column 
for column 
in required_columns 
if column 
not in fields]
 
  773             raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
 
  775         blob_columns = 
set(col.name 
for col 
in self._schema.packedColumns(table_name))
 
  778         qfields = [quoteId(field) 
for field 
in fields 
if field 
not in blob_columns]
 
  780             qfields += [quoteId(
"apdb_packed")]
 
  781         qfields_str = 
','.join(qfields)
 
  783         with Timer(table_name.name + 
' query build', self.config.timer):
 
  785             table = self._schema.tableName(table_name)
 
  786             if time_part 
is not None:
 
  787                 table = f
"{table}_{time_part}" 
  789             prepared: Optional[cassandra.query.PreparedStatement] = 
None 
  790             if self.config.prepared_statements:
 
  791                 holders = 
','.join([
'?']*len(qfields))
 
  792                 query = f
'INSERT INTO "{table}" ({qfields_str}) VALUES ({holders})' 
  793                 prepared = self._session.prepare(query)
 
  794             queries = cassandra.query.BatchStatement(consistency_level=self._write_consistency)
 
  795             for rec 
in objects.itertuples(index=
False):
 
  798                 for field 
in df_fields:
 
  799                     if field 
not in column_map:
 
  801                     value = getattr(rec, field)
 
  802                     if column_map[field].type == 
"DATETIME":
 
  803                         if isinstance(value, pandas.Timestamp):
 
  804                             value = qValue(value.to_pydatetime())
 
  808                             value = int(value*1000)
 
  809                     if field 
in blob_columns:
 
  810                         blob[field] = qValue(value)
 
  812                         values.append(qValue(value))
 
  813                 for field 
in extra_fields:
 
  814                     value = extra_columns[field]
 
  815                     if field 
in blob_columns:
 
  816                         blob[field] = qValue(value)
 
  818                         values.append(qValue(value))
 
  820                     if self.config.packing == 
"cbor":
 
  821                         blob = b
"cbor:" + cbor.dumps(blob)
 
  823                 holders = 
','.join([
'%s']*len(values))
 
  824                 if prepared 
is not None:
 
  827                     query = f
'INSERT INTO "{table}" ({qfields_str}) VALUES ({holders})' 
  830                     stmt = cassandra.query.SimpleStatement(query, consistency_level=self._write_consistency)
 
  831                 queries.add(stmt, values)
 
  834         _LOG.debug(
"%s: will store %d records", self._schema.tableName(table_name), objects.shape[0])
 
  835         with Timer(table_name.name + 
' insert', self.config.timer):
 
  836             self._session.execute(queries, timeout=self.config.write_timeout)
 
  838     def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
 
  839         """Calculate spacial partition for each record and add it to a 
  844         This overrides any existing column in a DataFrame with the same name 
  845         (apdb_part). Original DataFrame is not changed, copy of a DataFrame is 
  849         apdb_part = np.zeros(df.shape[0], dtype=np.int64)
 
  850         ra_col, dec_col = self.config.ra_dec_columns
 
  851         for i, (ra, dec) 
in enumerate(zip(df[ra_col], df[dec_col])):
 
  853             idx = self._partitioner.
pixel(uv3d)
 
  856         df[
"apdb_part"] = apdb_part
 
  859     def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
 
  860         """Add apdb_part column to DiaSource catalog. 
  864         This method copies apdb_part value from a matching DiaObject record. 
  865         DiaObject catalog needs to have a apdb_part column filled by 
  866         ``_add_obj_part`` method and DiaSource records need to be 
  867         associated to DiaObjects via ``diaObjectId`` column. 
  869         This overrides any existing column in a DataFrame with the same name 
  870         (apdb_part). Original DataFrame is not changed, copy of a DataFrame is 
  873         pixel_id_map: Dict[int, int] = {
 
  874             diaObjectId: apdb_part 
for diaObjectId, apdb_part
 
  875             in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
 
  877         apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
 
  878         ra_col, dec_col = self.config.ra_dec_columns
 
  879         for i, (diaObjId, ra, dec) 
in enumerate(zip(sources[
"diaObjectId"],
 
  880                                                     sources[ra_col], sources[dec_col])):
 
  886                 idx = self._partitioner.
pixel(uv3d)
 
  889                 apdb_part[i] = pixel_id_map[diaObjId]
 
  890         sources = sources.copy()
 
  891         sources[
"apdb_part"] = apdb_part
 
  894     def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
 
  895         """Add apdb_part column to DiaForcedSource catalog. 
  899         This method copies apdb_part value from a matching DiaObject record. 
  900         DiaObject catalog needs to have a apdb_part column filled by 
  901         ``_add_obj_part`` method and DiaSource records need to be 
  902         associated to DiaObjects via ``diaObjectId`` column. 
  904         This overrides any existing column in a DataFrame with the same name 
  905         (apdb_part). Original DataFrame is not changed, copy of a DataFrame is 
  908         pixel_id_map: Dict[int, int] = {
 
  909             diaObjectId: apdb_part 
for diaObjectId, apdb_part
 
  910             in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
 
  912         apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
 
  913         for i, diaObjId 
in enumerate(sources[
"diaObjectId"]):
 
  914             apdb_part[i] = pixel_id_map[diaObjId]
 
  915         sources = sources.copy()
 
  916         sources[
"apdb_part"] = apdb_part
 
  920         """Calculate time partiton number for a given time. 
  924         time : `float` or `lsst.daf.base.DateTime` 
  925             Time for which to calculate partition number. Can be float to mean 
  926             MJD or `lsst.daf.base.DateTime` 
  931             Partition number for a given time. 
  934             mjd = time.get(system=dafBase.DateTime.MJD)
 
  937         days_since_epoch = mjd - self._partition_zero_epoch_mjd
 
  938         partition = int(days_since_epoch) // self.config.time_partition_days
 
  941     def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
 
  942         """Make an empty catalog for a table with a given name. 
  946         table_name : `ApdbTables` 
  951         catalog : `pandas.DataFrame` 
  954         table = self._schema.tableSchemas[table_name]
 
  956         data = {columnDef.name: pandas.Series(dtype=columnDef.dtype) 
for columnDef 
in table.columns}
 
  957         return pandas.DataFrame(data)
 
table::PointKey< int > pixel
Class for handling dates/times, including MJD, UTC, and TAI.
str translate(self, str private_ip)
def __init__(self, List[str] public_ips, List[str] private_ips)
def __init__(self, Iterable[ColumnDef] packedColumns)
pandas.DataFrame __call__(self, List[str] colnames, List[Tuple] rows)
Tuple[List[str], List[Tuple]] __call__(self, List[str] colnames, List[Tuple] rows)
def __init__(self, Iterable[ColumnDef] packedColumns)
int countUnassociatedObjects(self)
def __init__(self, ApdbCassandraConfig config)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None _storeObjectsPandas(self, pandas.DataFrame objects, ApdbTables table_name, dafBase.DateTime visit_time, Optional[Mapping] extra_columns=None, Optional[int] time_part=None)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
Optional[pandas.DataFrame] getDiaForcedSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, dafBase.DateTime visit_time)
None makeSchema(self, bool drop=False)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
Optional[TableDef] tableDef(self, ApdbTables table)
int _time_partition(self, Union[float, dafBase.DateTime] time)
Optional[pandas.DataFrame] _getSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time, ApdbTables table_name, int months)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time)
Optional[pandas.DataFrame] getDiaSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
_partition_zero_epoch_mjd
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
List[int] pixels(self, sphgeom.Region region)
def __init__(self, ApdbCassandraConfig config)
int pixel(self, sphgeom.UnitVector3d direction)
HtmPixelization provides HTM indexing of points and regions.
Mq3cPixelization provides modified Q3C indexing of points and regions.
Q3cPixelization provides Q3C indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set