22 from __future__
import annotations
24 __all__ = [
"ApdbCassandraConfig",
"ApdbCassandra"]
26 from datetime
import datetime, timedelta
30 from typing
import cast, Any, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Union
41 from cassandra.cluster
import Cluster
42 from cassandra.concurrent
import execute_concurrent
43 from cassandra.policies
import RoundRobinPolicy, WhiteListRoundRobinPolicy, AddressTranslator
44 import cassandra.query
45 CASSANDRA_IMPORTED =
True
47 CASSANDRA_IMPORTED =
False
51 from lsst
import sphgeom
52 from .timer
import Timer
53 from .apdb
import Apdb, ApdbConfig
54 from .apdbSchema
import ApdbTables, ColumnDef, TableDef
55 from .apdbCassandraSchema
import ApdbCassandraSchema
58 _LOG = logging.getLogger(__name__)
63 super().
__init__(
"cassandra-driver module cannot be imported")
70 doc=
"The list of contact points to try connecting for cluster discovery.",
75 doc=
"List of internal IP addresses for contact_points.",
80 doc=
"Default keyspace for operations.",
85 doc=
"Name for consistency level of read operations, default: QUORUM, can be ONE.",
90 doc=
"Name for consistency level of write operations, default: QUORUM, can be ONE.",
95 doc=
"Timeout in seconds for read operations.",
100 doc=
"Timeout in seconds for write operations.",
105 doc=
"Concurrency level for read operations.",
110 doc=
"Cassandra protocol version to use, default is V4",
111 default=cassandra.ProtocolVersion.V4
if CASSANDRA_IMPORTED
else 0
115 doc=
"List of columns to read from DiaObject, by default read all columns",
120 doc=
"Prefix to add to table names",
125 allowed=dict(htm=
"HTM pixelization", q3c=
"Q3C pixelization", mq3c=
"MQ3C pixelization"),
126 doc=
"Pixelization used for partitioning index.",
131 doc=
"Pixelization level used for partitioning index.",
136 default=[
"ra",
"decl"],
137 doc=
"Names ra/dec columns in DiaObject table"
141 doc=
"If True then print/log timing information",
146 doc=
"Use per-partition tables for sources instead of partitioning by time",
151 doc=
"Time partitoning granularity in days, this value must not be changed"
152 " after database is initialized",
157 doc=
"Starting time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI."
158 " This is used only when time_partition_tables is True.",
159 default=
"2018-12-01T00:00:00"
163 doc=
"Ending time for per-partion tables, in yyyy-mm-ddThh:mm:ss format, in TAI"
164 " This is used only when time_partition_tables is True.",
165 default=
"2030-01-01T00:00:00"
170 doc=
"If True then build separate query for each time partition, otherwise build one single query. "
171 "This is only used when time_partition_tables is False in schema config."
176 doc=
"If True then build one query per spacial partition, otherwise build single query. "
181 doc=
"If True then combine result rows before converting to pandas. "
185 allowed=dict(none=
"No field packing", cbor=
"Pack using CBOR"),
186 doc=
"Packing method for table records.",
192 doc=
"If True use Cassandra prepared statements."
197 """Class that calculates indices of the objects for partitioning.
199 Used internally by `ApdbCassandra`
203 config : `ApdbCassandraConfig`
206 pix = config.part_pixelization
214 raise ValueError(f
"unknown pixelization: {pix}")
216 def pixels(self, region: sphgeom.Region) -> List[int]:
217 """Compute set of the pixel indices for given region.
221 region : `lsst.sphgeom.Region`
224 ranges = self.
pixelatorpixelator.envelope(region, 1_000_000)
226 for lower, upper
in ranges:
227 indices +=
list(range(lower, upper))
230 def pixel(self, direction: sphgeom.UnitVector3d) -> int:
231 """Compute the index of the pixel for given direction.
235 direction : `lsst.sphgeom.UnitVector3d`
237 index = self.
pixelatorpixelator.index(direction)
241 if CASSANDRA_IMPORTED:
244 """Translate internal IP address to external.
246 Only used for docker-based setup, not viable long-term solution.
248 def __init__(self, public_ips: List[str], private_ips: List[str]):
249 self.
_map_map = dict((k, v)
for k, v
in zip(private_ips, public_ips))
252 return self.
_map_map.get(private_ip, private_ip)
255 def _rows_to_pandas(colnames: List[str], rows: List[Tuple],
256 packedColumns: List[ColumnDef]) -> pandas.DataFrame:
257 """Convert result rows to pandas.
259 Unpacks BLOBs that were packed on insert.
263 colname : `list` [ `str` ]
264 Names of the columns.
265 rows : `list` of `tuple`
267 packedColumns : `list` [ `ColumnDef` ]
268 Column definitions for packed columns.
272 catalog : `pandas.DataFrame`
273 DataFrame with the result set.
276 idx = colnames.index(
"apdb_packed")
279 return pandas.DataFrame.from_records(rows, columns=colnames)
282 df = pandas.DataFrame.from_records(rows, columns=colnames, exclude=[
"apdb_packed"])
288 if blob[:5] == b
"cbor:":
289 blob = cbor.loads(blob[5:])
291 raise ValueError(
"Unexpected BLOB format: %r", blob)
292 packed_rows.append(blob)
295 packed = pandas.DataFrame.from_records(packed_rows, columns=[col.name
for col
in packedColumns])
298 for col
in packedColumns:
299 if col.type ==
"DATETIME":
300 packed[col.name] = pandas.to_datetime(packed[col.name], unit=
"ms", origin=
"unix")
302 return pandas.concat([df, packed], axis=1)
306 """Create pandas DataFrame from Cassandra result set.
310 packedColumns : `list` [ `ColumnDef` ]
311 Column definitions for packed columns.
313 def __init__(self, packedColumns: Iterable[ColumnDef]):
316 def __call__(self, colnames: List[str], rows: List[Tuple]) -> pandas.DataFrame:
317 """Convert result set into output catalog.
321 colname : `list` [ `str` ]
322 Names of the columns.
323 rows : `list` of `tuple`
328 catalog : `pandas.DataFrame`
329 DataFrame with the result set.
331 return _rows_to_pandas(colnames, rows, self.
packedColumnspackedColumns)
335 """Row factory that makes no conversions.
339 packedColumns : `list` [ `ColumnDef` ]
340 Column definitions for packed columns.
342 def __init__(self, packedColumns: Iterable[ColumnDef]):
345 def __call__(self, colnames: List[str], rows: List[Tuple]) -> Tuple[List[str], List[Tuple]]:
346 """Return parameters without change.
350 colname : `list` of `str`
351 Names of the columns.
352 rows : `list` of `tuple`
357 colname : `list` of `str`
358 Names of the columns.
359 rows : `list` of `tuple`
362 return (colnames, rows)
366 """Implementation of APDB database on to of Apache Cassandra.
368 The implementation is configured via standard ``pex_config`` mechanism
369 using `ApdbCassandraConfig` configuration class. For an example of
370 different configurations check config/ folder.
374 config : `ApdbCassandraConfig`
375 Configuration object.
379 """Start time for partition 0, this should never be changed."""
383 if not CASSANDRA_IMPORTED:
388 _LOG.debug(
"ApdbCassandra Configuration:")
389 _LOG.debug(
" read_consistency: %s", self.
configconfig.read_consistency)
390 _LOG.debug(
" write_consistency: %s", self.
configconfig.write_consistency)
391 _LOG.debug(
" read_sources_months: %s", self.
configconfig.read_sources_months)
392 _LOG.debug(
" read_forced_sources_months: %s", self.
configconfig.read_forced_sources_months)
393 _LOG.debug(
" dia_object_columns: %s", self.
configconfig.dia_object_columns)
394 _LOG.debug(
" schema_file: %s", self.
configconfig.schema_file)
395 _LOG.debug(
" extra_schema_file: %s", self.
configconfig.extra_schema_file)
396 _LOG.debug(
" schema prefix: %s", self.
configconfig.prefix)
397 _LOG.debug(
" part_pixelization: %s", self.
configconfig.part_pixelization)
398 _LOG.debug(
" part_pix_level: %s", self.
configconfig.part_pix_level)
399 _LOG.debug(
" query_per_time_part: %s", self.
configconfig.query_per_time_part)
400 _LOG.debug(
" query_per_spatial_part: %s", self.
configconfig.query_per_spatial_part)
404 addressTranslator: Optional[AddressTranslator] =
None
405 if config.private_ips:
406 loadBalancePolicy = WhiteListRoundRobinPolicy(hosts=config.contact_points)
409 loadBalancePolicy = RoundRobinPolicy()
411 self.
_read_consistency_read_consistency = getattr(cassandra.ConsistencyLevel, config.read_consistency)
412 self.
_write_consistency_write_consistency = getattr(cassandra.ConsistencyLevel, config.write_consistency)
414 self.
_cluster_cluster = Cluster(contact_points=self.
configconfig.contact_points,
415 load_balancing_policy=loadBalancePolicy,
416 address_translator=addressTranslator,
417 protocol_version=self.
configconfig.protocol_version)
418 self.
_session_session = self.
_cluster_cluster.connect(keyspace=config.keyspace)
419 self.
_session_session.row_factory = cassandra.query.named_tuple_factory
422 schema_file=self.
configconfig.schema_file,
423 extra_schema_file=self.
configconfig.extra_schema_file,
424 prefix=self.
configconfig.prefix,
425 packing=self.
configconfig.packing,
426 time_partition_tables=self.
configconfig.time_partition_tables)
429 def tableDef(self, table: ApdbTables) -> Optional[TableDef]:
431 return self.
_schema_schema.tableSchemas.get(table)
436 if self.
configconfig.time_partition_tables:
449 packedColumns = self.
_schema_schema.packedColumns(ApdbTables.DiaObjectLast)
451 self.
_session_session.default_fetch_size =
None
454 _LOG.debug(
"getDiaObjects: #partitions: %s", len(pixels))
455 pixels_str =
",".join([str(pix)
for pix
in pixels])
457 queries: List[Tuple] = []
458 query = f
'SELECT * from "DiaObjectLast" WHERE "apdb_part" IN ({pixels_str})'
459 queries += [(cassandra.query.SimpleStatement(query, consistency_level=self.
_read_consistency_read_consistency), {})]
460 _LOG.debug(
"getDiaObjects: #queries: %s", len(queries))
464 with Timer(
'DiaObject select', self.
configconfig.timer):
466 futures = [self.
_session_session.execute_async(query, values, timeout=self.
configconfig.read_timeout)
467 for query, values
in queries]
469 dataframes = [future.result()._current_rows
for future
in futures]
471 if len(dataframes) == 1:
472 objects = dataframes[0]
474 objects = pandas.concat(dataframes)
476 _LOG.debug(
"found %s DiaObjects", objects.shape[0])
480 object_ids: Optional[Iterable[int]],
483 return self.
_getSources_getSources(region, object_ids, visit_time, ApdbTables.DiaSource,
484 self.
configconfig.read_sources_months)
487 object_ids: Optional[Iterable[int]],
489 return self.
_getSources_getSources(region, object_ids, visit_time, ApdbTables.DiaForcedSource,
490 self.
configconfig.read_forced_sources_months)
492 def _getSources(self, region: sphgeom.Region,
493 object_ids: Optional[Iterable[int]],
495 table_name: ApdbTables,
496 months: int) -> Optional[pandas.DataFrame]:
497 """Returns catalog of DiaSource instances given set of DiaObject IDs.
501 region : `lsst.sphgeom.Region`
504 Collection of DiaObject IDs
505 visit_time : `lsst.daf.base.DateTime`
506 Time of the current visit
507 table_name : `ApdbTables`
508 Name of the table, either "DiaSource" or "DiaForcedSource"
510 Number of months of history to return, if negative returns whole
511 history (Note: negative does not work with table-per-partition
516 catalog : `pandas.DataFrame`, or `None`
517 Catalog contaning DiaSource records. `None` is returned if
518 ``months`` is 0 or when ``object_ids`` is empty.
522 object_id_set: Set[int] =
set()
523 if object_ids
is not None:
524 object_id_set =
set(object_ids)
525 if len(object_id_set) == 0:
528 packedColumns = self.
_schema_schema.packedColumns(table_name)
529 if self.
configconfig.pandas_delay_conv:
533 self.
_session_session.default_fetch_size =
None
537 _LOG.debug(
"_getSources: %s #partitions: %s", table_name.name, len(pixels))
541 if self.
configconfig.query_per_spatial_part:
542 spatial_where = [f
'"apdb_part" = {pixel}' for pixel
in pixels]
544 pixels_str =
",".join([str(pix)
for pix
in pixels])
545 spatial_where = [f
'"apdb_part" IN ({pixels_str})']
551 full_name = self.
_schema_schema.tableName(table_name)
553 mjd_now = visit_time.get(system=dafBase.DateTime.MJD)
554 mjd_begin = mjd_now - months*30
557 time_parts =
list(range(time_part_begin, time_part_now + 1))
558 if self.
configconfig.time_partition_tables:
559 tables = [f
"{full_name}_{part}" for part
in time_parts]
561 if self.
configconfig.query_per_time_part:
562 temporal_where = [f
'"apdb_time_part" = {time_part}' for time_part
in time_parts]
564 time_part_list =
",".join([str(part)
for part
in time_parts])
565 temporal_where = [f
'"apdb_time_part" IN ({time_part_list})']
568 queries: List[str] = []
570 query = f
'SELECT * from "{table}" WHERE '
571 for spacial
in spatial_where:
573 for temporal
in temporal_where:
574 queries.append(query + spacial +
" AND " + temporal)
576 queries.append(query + spacial)
579 statements: List[Tuple] = [
580 (cassandra.query.SimpleStatement(query, consistency_level=self.
_read_consistency_read_consistency), {})
583 _LOG.debug(
"_getSources %s: #queries: %s", table_name, len(statements))
585 with Timer(table_name.name +
' select', self.
configconfig.timer):
587 results = execute_concurrent(self.
_session_session, statements, results_generator=
True,
588 concurrency=self.
configconfig.read_concurrency)
589 if self.
configconfig.pandas_delay_conv:
590 _LOG.debug(
"making pandas data frame out of rows/columns")
593 for success, result
in results:
594 result = result._current_rows
598 elif columns != result[0]:
599 _LOG.error(
"different columns returned by queries: %s and %s",
602 f
"diferent columns returned by queries: {columns} and {result[0]}"
606 _LOG.error(
"error returned by query: %s", result)
608 catalog = _rows_to_pandas(columns, rows, self.
_schema_schema.packedColumns(table_name))
609 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
611 if len(object_id_set) > 0:
612 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
614 _LOG.debug(
"making pandas data frame out of set of data frames")
616 for success, result
in results:
618 dataframes.append(result._current_rows)
620 _LOG.error(
"error returned by query: %s", result)
623 if len(dataframes) == 1:
624 catalog = dataframes[0]
626 catalog = pandas.concat(dataframes)
627 _LOG.debug(
"pandas catalog shape: %s", catalog.shape)
629 if len(object_id_set) > 0:
630 catalog = cast(pandas.DataFrame, catalog[catalog[
"diaObjectId"].isin(object_id_set)])
633 catalog = cast(pandas.DataFrame, catalog[catalog[
"midPointTai"] > mjd_begin])
635 _LOG.debug(
"found %d %ss", catalog.shape[0], table_name.name)
639 visit_time: dafBase.DateTime,
640 objects: pandas.DataFrame,
641 sources: Optional[pandas.DataFrame] =
None,
642 forced_sources: Optional[pandas.DataFrame] =
None) ->
None:
649 if sources
is not None:
652 self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaSource, sources, visit_time)
654 if forced_sources
is not None:
655 forced_sources = self.
_add_fsrc_part_add_fsrc_part(forced_sources, objects)
656 self.
_storeDiaSources_storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, visit_time)
658 def _storeDiaObjects(self, objs: pandas.DataFrame, visit_time:
dafBase.DateTime) ->
None:
659 """Store catalog of DiaObjects from current visit.
663 objs : `pandas.DataFrame`
664 Catalog with DiaObject records
665 visit_time : `lsst.daf.base.DateTime`
666 Time of the current visit.
668 visit_time_dt = visit_time.toPython()
669 extra_columns = dict(lastNonForcedSource=visit_time_dt)
670 self.
_storeObjectsPandas_storeObjectsPandas(objs, ApdbTables.DiaObjectLast, visit_time, extra_columns=extra_columns)
672 extra_columns[
"validityStart"] = visit_time_dt
673 time_part: Optional[int] = self.
_time_partition_time_partition(visit_time)
674 if not self.
configconfig.time_partition_tables:
675 extra_columns[
"apdb_time_part"] = time_part
679 extra_columns=extra_columns, time_part=time_part)
681 def _storeDiaSources(self, table_name: ApdbTables, sources: pandas.DataFrame,
683 """Store catalog of DIASources or DIAForcedSources from current visit.
687 sources : `pandas.DataFrame`
688 Catalog containing DiaSource records
689 visit_time : `lsst.daf.base.DateTime`
690 Time of the current visit.
692 time_part: Optional[int] = self.
_time_partition_time_partition(visit_time)
694 if not self.
configconfig.time_partition_tables:
695 extra_columns[
"apdb_time_part"] = time_part
699 extra_columns=extra_columns, time_part=time_part)
707 raise NotImplementedError()
709 def _storeObjectsPandas(self, objects: pandas.DataFrame, table_name: ApdbTables,
711 time_part: Optional[int] =
None) ->
None:
712 """Generic store method.
714 Takes catalog of records and stores a bunch of objects in a table.
718 objects : `pandas.DataFrame`
719 Catalog containing object records
720 table_name : `ApdbTables`
721 Name of the table as defined in APDB schema.
722 visit_time : `lsst.daf.base.DateTime`
723 Time of the current visit.
724 extra_columns : `dict`, optional
725 Mapping (column_name, column_value) which gives column values to add
726 to every row, only if column is missing in catalog records.
727 time_part : `int`, optional
728 If not `None` then insert into a per-partition table.
731 def qValue(v: Any) -> Any:
732 """Transform object into a value for query"""
735 elif isinstance(v, datetime):
736 v = int((v - datetime(1970, 1, 1)) / timedelta(seconds=1))*1000
737 elif isinstance(v, (bytes, str)):
741 if not np.isfinite(v):
747 def quoteId(columnName: str) -> str:
748 """Smart quoting for column names.
749 Lower-case names are not quoted.
751 if not columnName.islower():
752 columnName =
'"' + columnName +
'"'
756 if extra_columns
is None:
758 extra_fields =
list(extra_columns.keys())
760 df_fields = [column
for column
in objects.columns
761 if column
not in extra_fields]
763 column_map = self._schema.getColumnMap(table_name)
765 fields = [column_map[field].name
for field
in df_fields
if field
in column_map]
766 fields += extra_fields
769 required_columns = self._schema.partitionColumns(table_name) \
770 + self._schema.clusteringColumns(table_name)
771 missing_columns = [column
for column
in required_columns
if column
not in fields]
773 raise ValueError(f
"Primary key columns are missing from catalog: {missing_columns}")
775 blob_columns =
set(col.name
for col
in self._schema.packedColumns(table_name))
778 qfields = [quoteId(field)
for field
in fields
if field
not in blob_columns]
780 qfields += [quoteId(
"apdb_packed")]
781 qfields_str =
','.join(qfields)
783 with Timer(table_name.name +
' query build', self.config.timer):
785 table = self._schema.tableName(table_name)
786 if time_part
is not None:
787 table = f
"{table}_{time_part}"
789 prepared: Optional[cassandra.query.PreparedStatement] =
None
790 if self.config.prepared_statements:
791 holders =
','.join([
'?']*len(qfields))
792 query = f
'INSERT INTO "{table}" ({qfields_str}) VALUES ({holders})'
793 prepared = self._session.prepare(query)
794 queries = cassandra.query.BatchStatement(consistency_level=self._write_consistency)
795 for rec
in objects.itertuples(index=
False):
798 for field
in df_fields:
799 if field
not in column_map:
801 value = getattr(rec, field)
802 if column_map[field].type ==
"DATETIME":
803 if isinstance(value, pandas.Timestamp):
804 value = qValue(value.to_pydatetime())
808 value = int(value*1000)
809 if field
in blob_columns:
810 blob[field] = qValue(value)
812 values.append(qValue(value))
813 for field
in extra_fields:
814 value = extra_columns[field]
815 if field
in blob_columns:
816 blob[field] = qValue(value)
818 values.append(qValue(value))
820 if self.config.packing ==
"cbor":
821 blob = b
"cbor:" + cbor.dumps(blob)
823 holders =
','.join([
'%s']*len(values))
824 if prepared
is not None:
827 query = f
'INSERT INTO "{table}" ({qfields_str}) VALUES ({holders})'
830 stmt = cassandra.query.SimpleStatement(query, consistency_level=self._write_consistency)
831 queries.add(stmt, values)
834 _LOG.debug(
"%s: will store %d records", self._schema.tableName(table_name), objects.shape[0])
835 with Timer(table_name.name +
' insert', self.config.timer):
836 self._session.execute(queries, timeout=self.config.write_timeout)
838 def _add_obj_part(self, df: pandas.DataFrame) -> pandas.DataFrame:
839 """Calculate spacial partition for each record and add it to a
844 This overrides any existing column in a DataFrame with the same name
845 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
849 apdb_part = np.zeros(df.shape[0], dtype=np.int64)
850 ra_col, dec_col = self.config.ra_dec_columns
851 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
853 idx = self._partitioner.
pixel(uv3d)
856 df[
"apdb_part"] = apdb_part
859 def _add_src_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
860 """Add apdb_part column to DiaSource catalog.
864 This method copies apdb_part value from a matching DiaObject record.
865 DiaObject catalog needs to have a apdb_part column filled by
866 ``_add_obj_part`` method and DiaSource records need to be
867 associated to DiaObjects via ``diaObjectId`` column.
869 This overrides any existing column in a DataFrame with the same name
870 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
873 pixel_id_map: Dict[int, int] = {
874 diaObjectId: apdb_part
for diaObjectId, apdb_part
875 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
877 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
878 ra_col, dec_col = self.config.ra_dec_columns
879 for i, (diaObjId, ra, dec)
in enumerate(zip(sources[
"diaObjectId"],
880 sources[ra_col], sources[dec_col])):
886 idx = self._partitioner.
pixel(uv3d)
889 apdb_part[i] = pixel_id_map[diaObjId]
890 sources = sources.copy()
891 sources[
"apdb_part"] = apdb_part
894 def _add_fsrc_part(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
895 """Add apdb_part column to DiaForcedSource catalog.
899 This method copies apdb_part value from a matching DiaObject record.
900 DiaObject catalog needs to have a apdb_part column filled by
901 ``_add_obj_part`` method and DiaSource records need to be
902 associated to DiaObjects via ``diaObjectId`` column.
904 This overrides any existing column in a DataFrame with the same name
905 (apdb_part). Original DataFrame is not changed, copy of a DataFrame is
908 pixel_id_map: Dict[int, int] = {
909 diaObjectId: apdb_part
for diaObjectId, apdb_part
910 in zip(objs[
"diaObjectId"], objs[
"apdb_part"])
912 apdb_part = np.zeros(sources.shape[0], dtype=np.int64)
913 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
914 apdb_part[i] = pixel_id_map[diaObjId]
915 sources = sources.copy()
916 sources[
"apdb_part"] = apdb_part
920 """Calculate time partiton number for a given time.
924 time : `float` or `lsst.daf.base.DateTime`
925 Time for which to calculate partition number. Can be float to mean
926 MJD or `lsst.daf.base.DateTime`
931 Partition number for a given time.
934 mjd = time.get(system=dafBase.DateTime.MJD)
937 days_since_epoch = mjd - self._partition_zero_epoch_mjd
938 partition = int(days_since_epoch) // self.config.time_partition_days
941 def _make_empty_catalog(self, table_name: ApdbTables) -> pandas.DataFrame:
942 """Make an empty catalog for a table with a given name.
946 table_name : `ApdbTables`
951 catalog : `pandas.DataFrame`
954 table = self._schema.tableSchemas[table_name]
956 data = {columnDef.name: pandas.Series(dtype=columnDef.dtype)
for columnDef
in table.columns}
957 return pandas.DataFrame(data)
table::PointKey< int > pixel
Class for handling dates/times, including MJD, UTC, and TAI.
str translate(self, str private_ip)
def __init__(self, List[str] public_ips, List[str] private_ips)
def __init__(self, Iterable[ColumnDef] packedColumns)
pandas.DataFrame __call__(self, List[str] colnames, List[Tuple] rows)
Tuple[List[str], List[Tuple]] __call__(self, List[str] colnames, List[Tuple] rows)
def __init__(self, Iterable[ColumnDef] packedColumns)
int countUnassociatedObjects(self)
def __init__(self, ApdbCassandraConfig config)
pandas.DataFrame getDiaObjects(self, sphgeom.Region region)
None _storeObjectsPandas(self, pandas.DataFrame objects, ApdbTables table_name, dafBase.DateTime visit_time, Optional[Mapping] extra_columns=None, Optional[int] time_part=None)
pandas.DataFrame _make_empty_catalog(self, ApdbTables table_name)
Optional[pandas.DataFrame] getDiaForcedSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
pandas.DataFrame _add_fsrc_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
None _storeDiaSources(self, ApdbTables table_name, pandas.DataFrame sources, dafBase.DateTime visit_time)
None makeSchema(self, bool drop=False)
pandas.DataFrame _add_src_part(self, pandas.DataFrame sources, pandas.DataFrame objs)
Optional[TableDef] tableDef(self, ApdbTables table)
int _time_partition(self, Union[float, dafBase.DateTime] time)
Optional[pandas.DataFrame] _getSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time, ApdbTables table_name, int months)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time)
Optional[pandas.DataFrame] getDiaSources(self, sphgeom.Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
_partition_zero_epoch_mjd
pandas.DataFrame _add_obj_part(self, pandas.DataFrame df)
List[int] pixels(self, sphgeom.Region region)
def __init__(self, ApdbCassandraConfig config)
int pixel(self, sphgeom.UnitVector3d direction)
HtmPixelization provides HTM indexing of points and regions.
Mq3cPixelization provides modified Q3C indexing of points and regions.
Q3cPixelization provides Q3C indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set