22"""Module defining Apdb class and related methods.
25from __future__
import annotations
27__all__ = [
"ApdbSqlConfig",
"ApdbSql"]
31from typing
import Any, Dict, List, Optional, Tuple, cast
37from felis.simple
import Table
39from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
40from lsst.utils.iteration
import chunk_iterable
41from sqlalchemy
import func, inspection, sql
43from sqlalchemy.pool
import NullPool
45from .apdb
import Apdb, ApdbConfig, ApdbInsertId, ApdbTableData
46from .apdbSchema
import ApdbTables
47from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
48from .timer
import Timer
50_LOG = logging.getLogger(__name__)
53if pandas.__version__.partition(
".")[0] ==
"1":
56 """Terrible hack to workaround Pandas 1 incomplete support for
59 We need to pass a Connection instance to pandas method, but
in SA 2 the
60 Connection
class lost ``connect`` method which
is used by Pandas.
63 def __init__(self, connection: sqlalchemy.engine.Connection):
81 def __enter__(self) -> sqlalchemy.engine.Connection:
84 def __exit__(self, type_: Any, value: Any, traceback: Any) ->
None:
88 @inspection._inspects(_ConnectionHackSA2)
90 return Inspector._construct(Inspector._init_connection, conn._connection)
95 conn: sqlalchemy.engine.Connectable,
96 ) -> sqlalchemy.engine.Connectable:
101 """Change type of the uint64 columns to int64, return copy of data frame."""
102 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
103 return df.astype({name: np.int64
for name
in names})
107 """Calculate starting point for time-based source search.
112 Time of current visit.
114 Number of months in the sources history.
119 A ``midpointMjdTai`` starting point, MJD time.
123 return visit_time.get(system=dafBase.DateTime.MJD) - months * 30
127 """APDB configuration class for SQL implementation (ApdbSql)."""
129 db_url = Field[str](doc=
"SQLAlchemy database connection URI")
130 isolation_level = ChoiceField[str](
132 "Transaction isolation level, if unset then backend-default value "
133 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
134 "Some backends may not support every allowed value."
137 "READ_COMMITTED":
"Read committed",
138 "READ_UNCOMMITTED":
"Read uncommitted",
139 "REPEATABLE_READ":
"Repeatable read",
140 "SERIALIZABLE":
"Serializable",
145 connection_pool = Field[bool](
146 doc=
"If False then disable SQLAlchemy connection pool. Do not use connection pool when forking.",
149 connection_timeout = Field[float](
151 "Maximum time to wait time for database lock to be released before exiting. "
152 "Defaults to sqlalchemy defaults if not set."
157 sql_echo = Field[bool](doc=
"If True then pass SQLAlchemy echo option.", default=
False)
158 dia_object_index = ChoiceField[str](
159 doc=
"Indexing mode for DiaObject table",
161 "baseline":
"Index defined in baseline schema",
162 "pix_id_iov":
"(pixelId, objectId, iovStart) PK",
163 "last_object_table":
"Separate DiaObjectLast table",
167 htm_level = Field[int](doc=
"HTM indexing level", default=20)
168 htm_max_ranges = Field[int](doc=
"Max number of ranges in HTM envelope", default=64)
169 htm_index_column = Field[str](
170 default=
"pixelId", doc=
"Name of a HTM index column for DiaObject and DiaSource tables"
172 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
173 dia_object_columns = ListField[str](
174 doc=
"List of columns to read from DiaObject, by default read all columns", default=[]
176 prefix = Field[str](doc=
"Prefix to add to table names and index names", default=
"")
177 namespace = Field[str](
179 "Namespace or schema name for all tables in APDB database. "
180 "Presently only works for PostgreSQL backend. "
181 "If schema with this name does not exist it will be created when "
182 "APDB tables are created."
187 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
192 raise ValueError(
"ra_dec_columns must have exactly two column names")
196 """Implementation of ApdbTableData that wraps sqlalchemy Result."""
198 def __init__(self, result: sqlalchemy.engine.Result):
200 self._rows: list[tuple] = cast(list[tuple],
list(result.fetchall()))
205 def rows(self) -> Iterable[tuple]:
210 """Implementation of APDB interface based on SQL database.
212 The implementation is configured via standard ``pex_config`` mechanism
213 using `ApdbSqlConfig` configuration
class. For an example of different
214 configurations check ``config/`` folder.
218 config : `ApdbSqlConfig`
219 Configuration object.
222 ConfigClass = ApdbSqlConfig
228 _LOG.debug(
"APDB Configuration:")
229 _LOG.debug(
" dia_object_index: %s", self.
config.dia_object_index)
230 _LOG.debug(
" read_sources_months: %s", self.
config.read_sources_months)
231 _LOG.debug(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
232 _LOG.debug(
" dia_object_columns: %s", self.
config.dia_object_columns)
233 _LOG.debug(
" schema_file: %s", self.
config.schema_file)
234 _LOG.debug(
" extra_schema_file: %s", self.
config.extra_schema_file)
235 _LOG.debug(
" schema prefix: %s", self.
config.prefix)
239 kw: MutableMapping[str, Any] = dict(echo=self.
config.sql_echo)
240 conn_args: Dict[str, Any] = dict()
241 if not self.
config.connection_pool:
242 kw.update(poolclass=NullPool)
243 if self.
config.isolation_level
is not None:
244 kw.update(isolation_level=self.
config.isolation_level)
245 elif self.
config.db_url.startswith(
"sqlite"):
247 kw.update(isolation_level=
"READ_UNCOMMITTED")
248 if self.
config.connection_timeout
is not None:
249 if self.
config.db_url.startswith(
"sqlite"):
250 conn_args.update(timeout=self.
config.connection_timeout)
251 elif self.
config.db_url.startswith((
"postgresql",
"mysql")):
252 conn_args.update(connect_timeout=self.
config.connection_timeout)
253 kw.update(connect_args=conn_args)
258 dia_object_index=self.
config.dia_object_index,
259 schema_file=self.
config.schema_file,
260 schema_name=self.
config.schema_name,
261 prefix=self.
config.prefix,
262 namespace=self.
config.namespace,
263 htm_index_column=self.
config.htm_index_column,
264 use_insert_id=config.use_insert_id,
271 """Returns dictionary with the table names and row counts.
273 Used by ``ap_proto`` to keep track of the size of the database tables.
274 Depending on database technology this could be expensive operation.
279 Dict where key is a table name
and value
is a row count.
282 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
283 if self.
config.dia_object_index ==
"last_object_table":
284 tables.append(ApdbTables.DiaObjectLast)
285 with self.
_engine.begin()
as conn:
287 sa_table = self.
_schema.get_table(table)
288 stmt = sql.select(func.count()).select_from(sa_table)
289 count: int = conn.execute(stmt).scalar_one()
290 res[table.name] = count
294 def tableDef(self, table: ApdbTables) -> Optional[Table]:
296 return self.
_schema.tableSchemas.get(table)
306 if self.
config.dia_object_index ==
"last_object_table":
307 table_enum = ApdbTables.DiaObjectLast
309 table_enum = ApdbTables.DiaObject
310 table = self.
_schema.get_table(table_enum)
311 if not self.
config.dia_object_columns:
312 columns = self.
_schema.get_apdb_columns(table_enum)
314 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
315 query = sql.select(*columns)
321 if self.
config.dia_object_index !=
"last_object_table":
322 query = query.where(table.c.validityEnd ==
None)
328 with self.
_engine.begin()
as conn:
330 _LOG.debug(
"found %s DiaObjects", len(objects))
334 self, region: Region, object_ids: Optional[Iterable[int]], visit_time:
dafBase.DateTime
335 ) -> Optional[pandas.DataFrame]:
337 if self.
config.read_sources_months == 0:
338 _LOG.debug(
"Skip DiaSources fetching")
341 if object_ids
is None:
348 self, region: Region, object_ids: Optional[Iterable[int]], visit_time:
dafBase.DateTime
349 ) -> Optional[pandas.DataFrame]:
350 """Return catalog of DiaForcedSource instances from a given region.
355 Region to search for DIASources.
356 object_ids : iterable [ `int` ], optional
357 List of DiaObject IDs to further constrain the set of returned
358 sources. If list
is empty then empty catalog
is returned
with a
361 Time of the current visit.
365 catalog : `pandas.DataFrame`,
or `
None`
366 Catalog containing DiaSource records. `
None`
is returned
if
367 ``read_sources_months`` configuration parameter
is set to 0.
372 Raised
if ``object_ids``
is `
None`.
376 Even though base
class allows `
None` to be passed
for ``object_ids``,
377 this
class requires ``object_ids`` to be
not-`
None`.
378 `NotImplementedError`
is raised
if `
None`
is passed.
380 This method returns DiaForcedSource catalog
for a region
with additional
381 filtering based on DiaObject IDs. Only a subset of DiaSource history
382 is returned limited by ``read_forced_sources_months`` config parameter,
383 w.r.t. ``visit_time``. If ``object_ids``
is empty then an empty catalog
384 is always returned
with a correct schema (columns/types).
387 if self.
config.read_forced_sources_months == 0:
388 _LOG.debug(
"Skip DiaForceSources fetching")
391 if object_ids
is None:
393 raise NotImplementedError(
"Region-based selection is not supported")
398 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
400 with Timer(
"DiaForcedSource select", self.
config.timer):
402 ApdbTables.DiaForcedSource,
list(object_ids), midpointMjdTai_start
405 _LOG.debug(
"found %s DiaForcedSources", len(sources))
410 if not self.
_schema.has_insert_id:
413 table = self.
_schema.get_table(ExtraTables.DiaInsertId)
414 assert table
is not None,
"has_insert_id=True means it must be defined"
415 query = sql.select(table.columns[
"insert_id"]).order_by(table.columns[
"insert_time"])
416 with Timer(
"DiaObject insert id select", self.
config.timer):
417 with self.
_engine.connect()
as conn:
418 result = conn.execution_options(stream_results=
True, max_row_buffer=10000).execute(query)
423 if not self.
_schema.has_insert_id:
424 raise ValueError(
"APDB is not configured for history storage")
426 table = self.
_schema.get_table(ExtraTables.DiaInsertId)
428 insert_ids = [id.id
for id
in ids]
429 where_clause = table.columns[
"insert_id"].in_(insert_ids)
430 stmt = table.delete().where(where_clause)
431 with self.
_engine.begin()
as conn:
436 return self.
_get_history(ids, ApdbTables.DiaObject, ExtraTables.DiaObjectInsertId)
440 return self.
_get_history(ids, ApdbTables.DiaSource, ExtraTables.DiaSourceInsertId)
444 return self.
_get_history(ids, ApdbTables.DiaForcedSource, ExtraTables.DiaForcedSourceInsertId)
448 ids: Iterable[ApdbInsertId],
449 table_enum: ApdbTables,
450 history_table_enum: ExtraTables,
452 """Common implementation of the history methods."""
453 if not self.
_schema.has_insert_id:
454 raise ValueError(
"APDB is not configured for history retrieval")
456 table = self.
_schema.get_table(table_enum)
457 history_table = self.
_schema.get_table(history_table_enum)
459 join = table.join(history_table)
460 insert_ids = [id.id
for id
in ids]
461 history_id_column = history_table.columns[
"insert_id"]
462 apdb_columns = self.
_schema.get_apdb_columns(table_enum)
463 where_clause = history_id_column.in_(insert_ids)
464 query = sql.select(history_id_column, *apdb_columns).select_from(join).where(where_clause)
467 with Timer(f
"{table.name} history select", self.
config.timer):
468 with self.
_engine.begin()
as conn:
469 result = conn.execution_options(stream_results=
True, max_row_buffer=10000).execute(query)
475 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
476 query = sql.select(*columns)
480 with self.
_engine.begin()
as conn:
481 objects = pandas.read_sql_query(query, conn)
482 _LOG.debug(
"found %s SSObjects", len(objects))
487 visit_time: dafBase.DateTime,
488 objects: pandas.DataFrame,
489 sources: Optional[pandas.DataFrame] =
None,
490 forced_sources: Optional[pandas.DataFrame] =
None,
495 with self.
_engine.begin()
as connection:
496 insert_id: ApdbInsertId |
None =
None
498 insert_id = ApdbInsertId.new_insert_id()
505 if sources
is not None:
510 if forced_sources
is not None:
516 idColumn =
"ssObjectId"
517 table = self.
_schema.get_table(ApdbTables.SSObject)
520 with self.
_engine.begin()
as conn:
523 ids = sorted(int(oid)
for oid
in objects[idColumn])
525 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
526 result = conn.execute(query)
527 knownIds =
set(row.ssObjectId
for row
in result)
529 filter = objects[idColumn].isin(knownIds)
530 toUpdate = cast(pandas.DataFrame, objects[filter])
531 toInsert = cast(pandas.DataFrame, objects[~filter])
534 if len(toInsert) > 0:
536 table.name,
_ConnectionHackSA2(conn), if_exists=
"append", index=
False, schema=table.schema
540 if len(toUpdate) > 0:
541 whereKey = f
"{idColumn}_param"
542 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
543 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
544 values = toUpdate.to_dict(
"records")
545 result = conn.execute(update, values)
550 table = self.
_schema.get_table(ApdbTables.DiaSource)
551 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
553 with self.
_engine.begin()
as conn:
557 missing_ids: List[int] = []
558 for key, value
in idMap.items():
559 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
560 result = conn.execute(query, params)
561 if result.rowcount == 0:
562 missing_ids.append(key)
564 missing =
",".join(str(item)
for item
in missing_ids)
565 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
575 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
578 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
579 stmt = stmt.where(table.c.validityEnd ==
None)
582 with self._engine.begin()
as conn:
583 count = conn.execute(stmt).scalar_one()
588 """Returns catalog of DiaSource instances from given region.
593 Region to search for DIASources.
595 Time of the current visit.
599 catalog : `pandas.DataFrame`
600 Catalog containing DiaSource records.
605 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
607 table = self.
_schema.get_table(ApdbTables.DiaSource)
608 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
609 query = sql.select(*columns)
612 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
613 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
614 query = query.where(where)
618 with self.
_engine.begin()
as conn:
619 sources = pandas.read_sql_query(query, conn)
620 _LOG.debug(
"found %s DiaSources", len(sources))
624 """Returns catalog of DiaSource instances given set of DiaObject IDs.
629 Collection of DiaObject IDs
631 Time of the current visit.
635 catalog : `pandas.DataFrame`
636 Catalog contaning DiaSource records.
641 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
644 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
646 _LOG.debug(
"found %s DiaSources", len(sources))
650 self, table_enum: ApdbTables, object_ids: List[int], midpointMjdTai_start: float
651 ) -> pandas.DataFrame:
652 """Returns catalog of DiaSource or DiaForcedSource instances given set
657 table : `sqlalchemy.schema.Table`
660 Collection of DiaObject IDs
661 midpointMjdTai_start : `float`
662 Earliest midpointMjdTai to retrieve.
666 catalog : `pandas.DataFrame`
667 Catalog contaning DiaSource records. `None`
is returned
if
668 ``read_sources_months`` configuration parameter
is set to 0
or
669 when ``object_ids``
is empty.
671 table = self._schema.get_table(table_enum)
672 columns = self._schema.get_apdb_columns(table_enum)
674 sources: Optional[pandas.DataFrame] = None
675 if len(object_ids) <= 0:
676 _LOG.debug(
"ID list is empty, just fetch empty result")
677 query = sql.select(*columns).where(sql.literal(
False))
678 with self.
_engine.begin()
as conn:
679 sources = pandas.read_sql_query(query, conn)
681 data_frames: list[pandas.DataFrame] = []
682 for ids
in chunk_iterable(sorted(object_ids), 1000):
683 query = sql.select(*columns)
687 int_ids = [int(oid)
for oid
in ids]
692 table.columns[
"diaObjectId"].in_(int_ids),
693 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
698 with self.
_engine.begin()
as conn:
699 data_frames.append(pandas.read_sql_query(query, conn))
701 if len(data_frames) == 1:
702 sources = data_frames[0]
704 sources = pandas.concat(data_frames)
705 assert sources
is not None,
"Catalog cannot be None"
709 self, insert_id: ApdbInsertId, visit_time:
dafBase.DateTime, connection: sqlalchemy.engine.Connection
711 dt = visit_time.toPython()
713 table = self.
_schema.get_table(ExtraTables.DiaInsertId)
715 stmt = table.insert().values(insert_id=insert_id.id, insert_time=dt)
716 connection.execute(stmt)
720 objs: pandas.DataFrame,
722 insert_id: ApdbInsertId |
None,
723 connection: sqlalchemy.engine.Connection,
725 """Store catalog of DiaObjects from current visit.
729 objs : `pandas.DataFrame`
730 Catalog with DiaObject records.
733 insert_id : `ApdbInsertId`
739 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
740 _LOG.debug(
"first object ID: %d", ids[0])
744 dt = visit_time.toPython()
747 if self.
config.dia_object_index ==
"last_object_table":
750 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
753 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
755 with Timer(table.name +
" delete", self.
config.timer):
756 res = connection.execute(query)
757 _LOG.debug(
"deleted %s objects", res.rowcount)
760 last_column_names = [column.name
for column
in table.columns]
761 last_objs = objs[last_column_names]
764 if "lastNonForcedSource" in last_objs.columns:
767 last_objs[
"lastNonForcedSource"].fillna(dt, inplace=
True)
769 extra_column = pandas.Series([dt] * len(objs), name=
"lastNonForcedSource")
770 last_objs.set_index(extra_column.index, inplace=
True)
771 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
773 with Timer(
"DiaObjectLast insert", self.
config.timer):
783 table = self.
_schema.get_table(ApdbTables.DiaObject)
787 .values(validityEnd=dt)
790 table.columns[
"diaObjectId"].in_(ids),
791 table.columns[
"validityEnd"].is_(
None),
798 with Timer(table.name +
" truncate", self.
config.timer):
799 res = connection.execute(update)
800 _LOG.debug(
"truncated %s intervals", res.rowcount)
805 extra_columns: List[pandas.Series] = []
806 if "validityStart" in objs.columns:
807 objs[
"validityStart"] = dt
809 extra_columns.append(pandas.Series([dt] * len(objs), name=
"validityStart"))
810 if "validityEnd" in objs.columns:
811 objs[
"validityEnd"] =
None
813 extra_columns.append(pandas.Series([
None] * len(objs), name=
"validityEnd"))
814 if "lastNonForcedSource" in objs.columns:
817 objs[
"lastNonForcedSource"].fillna(dt, inplace=
True)
819 extra_columns.append(pandas.Series([dt] * len(objs), name=
"lastNonForcedSource"))
821 objs.set_index(extra_columns[0].index, inplace=
True)
822 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
825 table = self.
_schema.get_table(ApdbTables.DiaObject)
826 history_data: list[dict] = []
827 history_stmt: Any =
None
828 if insert_id
is not None:
829 pk_names = [column.name
for column
in table.primary_key]
830 history_data = objs[pk_names].to_dict(
"records")
831 for row
in history_data:
832 row[
"insert_id"] = insert_id.id
833 history_table = self.
_schema.get_table(ExtraTables.DiaObjectInsertId)
834 history_stmt = history_table.insert()
845 if history_stmt
is not None:
846 connection.execute(history_stmt, history_data)
850 sources: pandas.DataFrame,
851 insert_id: ApdbInsertId |
None,
852 connection: sqlalchemy.engine.Connection,
854 """Store catalog of DiaSources from current visit.
858 sources : `pandas.DataFrame`
859 Catalog containing DiaSource records
861 table = self._schema.get_table(ApdbTables.DiaSource)
864 history: list[dict] = []
865 history_stmt: Any =
None
866 if insert_id
is not None:
867 pk_names = [column.name
for column
in table.primary_key]
868 history = sources[pk_names].to_dict(
"records")
870 row[
"insert_id"] = insert_id.id
871 history_table = self.
_schema.get_table(ExtraTables.DiaSourceInsertId)
872 history_stmt = history_table.insert()
884 if history_stmt
is not None:
885 connection.execute(history_stmt, history)
889 sources: pandas.DataFrame,
890 insert_id: ApdbInsertId |
None,
891 connection: sqlalchemy.engine.Connection,
893 """Store a set of DiaForcedSources from current visit.
897 sources : `pandas.DataFrame`
898 Catalog containing DiaForcedSource records
900 table = self._schema.get_table(ApdbTables.DiaForcedSource)
903 history: list[dict] = []
904 history_stmt: Any =
None
905 if insert_id
is not None:
906 pk_names = [column.name
for column
in table.primary_key]
907 history = sources[pk_names].to_dict(
"records")
909 row[
"insert_id"] = insert_id.id
910 history_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceInsertId)
911 history_stmt = history_table.insert()
914 with Timer(
"DiaForcedSource insert", self.
config.timer):
923 if history_stmt
is not None:
924 connection.execute(history_stmt, history)
927 """Generate a set of HTM indices covering specified region.
932 Region that needs to be indexed.
936 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
938 _LOG.debug("region: %s", region)
941 return indices.ranges()
943 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
944 """Make SQLAlchemy expression for selecting records in a region."""
945 htm_index_column = table.columns[self.
config.htm_index_column]
948 for low, upper
in pixel_ranges:
951 exprlist.append(htm_index_column == low)
953 exprlist.append(sql.expression.between(htm_index_column, low, upper))
955 return sql.expression.or_(*exprlist)
958 """Calculate HTM index for each record and add it to a DataFrame.
962 This overrides any existing column in a DataFrame
with the same name
963 (pixelId). Original DataFrame
is not changed, copy of a DataFrame
is
967 htm_index = np.zeros(df.shape[0], dtype=np.int64)
968 ra_col, dec_col = self.
config.ra_dec_columns
969 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
974 df[self.
config.htm_index_column] = htm_index
978 """Add pixelId column to DiaSource catalog.
982 This method copies pixelId value from a matching DiaObject record.
983 DiaObject catalog needs to have a pixelId column filled by
984 ``_add_obj_htm_index`` method
and DiaSource records need to be
985 associated to DiaObjects via ``diaObjectId`` column.
987 This overrides any existing column
in a DataFrame
with the same name
988 (pixelId). Original DataFrame
is not changed, copy of a DataFrame
is
991 pixel_id_map: Dict[int, int] = {
993 for diaObjectId, pixelId
in zip(objs[
"diaObjectId"], objs[self.
config.htm_index_column])
999 htm_index = np.zeros(sources.shape[0], dtype=np.int64)
1000 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
1001 htm_index[i] = pixel_id_map[diaObjId]
1002 sources = sources.copy()
1003 sources[self.
config.htm_index_column] = htm_index
Class for handling dates/times, including MJD, UTC, and TAI.
None __exit__(self, Any type_, Any value, Any traceback)
Any connect(self, **Any kwargs)
sqlalchemy.engine.Connection __enter__(self)
__init__(self, sqlalchemy.engine.Connection connection)
Callable execution_options(self)
Optional[pandas.DataFrame] getDiaForcedSources(self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None _storeInsertId(self, ApdbInsertId insert_id, dafBase.DateTime visit_time, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
ApdbTableData getDiaSourcesHistory(self, Iterable[ApdbInsertId] ids)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, List[int] object_ids, float midpointMjdTai_start)
pandas.DataFrame _getDiaSourcesByIDs(self, List[int] object_ids, dafBase.DateTime visit_time)
ApdbTableData _get_history(self, Iterable[ApdbInsertId] ids, ApdbTables table_enum, ExtraTables history_table_enum)
None deleteInsertIds(self, Iterable[ApdbInsertId] ids)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ApdbInsertId|None insert_id, sqlalchemy.engine.Connection connection)
Optional[Table] tableDef(self, ApdbTables table)
pandas.DataFrame _add_src_htm_index(self, pandas.DataFrame sources, pandas.DataFrame objs)
None store(self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
pandas.DataFrame getDiaObjects(self, Region region)
pandas.DataFrame getSSObjects(self)
List[Tuple[int, int]] _htm_indices(self, Region region)
None makeSchema(self, bool drop=False)
ApdbTableData getDiaObjectsHistory(self, Iterable[ApdbInsertId] ids)
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, dafBase.DateTime visit_time)
Optional[pandas.DataFrame] getDiaSources(self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
None storeSSObjects(self, pandas.DataFrame objects)
list[ApdbInsertId]|None getInsertIds(self)
Dict[str, int] tableRowCount(self)
ApdbTableData getDiaForcedSourcesHistory(self, Iterable[ApdbInsertId] ids)
None _storeDiaSources(self, pandas.DataFrame sources, ApdbInsertId|None insert_id, sqlalchemy.engine.Connection connection)
int countUnassociatedObjects(self)
None _storeDiaObjects(self, pandas.DataFrame objs, dafBase.DateTime visit_time, ApdbInsertId|None insert_id, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_obj_htm_index(self, pandas.DataFrame df)
__init__(self, ApdbSqlConfig config)
None reassignDiaSources(self, Mapping[int, int] idMap)
list[str] column_names(self)
Iterable[tuple] rows(self)
__init__(self, sqlalchemy.engine.Result result)
HtmPixelization provides HTM indexing of points and regions.
Region is a minimal interface for 2-dimensional regions on the unit sphere.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertyList * list
daf::base::PropertySet * set
Inspector _connection_insp(_ConnectionHackSA2 conn)
float _make_midpointMjdTai_start(dafBase.DateTime visit_time, int months)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)