22"""Module defining Apdb class and related methods.
25from __future__
import annotations
27__all__ = [
"ApdbSqlConfig",
"ApdbSql"]
31from contextlib
import closing, suppress
32from typing
import TYPE_CHECKING, Any, cast
38import sqlalchemy.dialects.postgresql
39import sqlalchemy.dialects.sqlite
41from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
42from lsst.utils.iteration
import chunk_iterable
43from sqlalchemy
import func, sql
44from sqlalchemy.pool
import NullPool
46from ..apdb
import Apdb, ApdbConfig
47from ..apdbConfigFreezer
import ApdbConfigFreezer
48from ..apdbReplica
import ReplicaChunk
49from ..apdbSchema
import ApdbTables
50from ..monitor
import MonAgent
51from ..schema_model
import Table
52from ..timer
import Timer
53from ..versionTuple
import IncompatibleVersionError, VersionTuple
54from .apdbMetadataSql
import ApdbMetadataSql
55from .apdbSqlReplica
import ApdbSqlReplica
56from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
61 from ..apdbMetadata
import ApdbMetadata
63_LOG = logging.getLogger(__name__)
68"""Version for the code controlling non-replication tables. This needs to be
69updated following compatibility rules when schema produced by this code
75 """Change the type of uint64 columns to int64, and return copy of data
78 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
79 return df.astype({name: np.int64
for name
in names})
83 """Calculate starting point for time-based source search.
87 visit_time : `astropy.time.Time`
88 Time of current visit.
90 Number of months in the sources history.
95 A ``midpointMjdTai`` starting point, MJD time.
99 return visit_time.mjd - months * 30
103 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
106 with closing(dbapiConnection.cursor())
as cursor:
107 cursor.execute(
"PRAGMA foreign_keys=ON;")
111 """APDB configuration class for SQL implementation (ApdbSql)."""
113 db_url = Field[str](doc=
"SQLAlchemy database connection URI")
114 isolation_level = ChoiceField[str](
116 "Transaction isolation level, if unset then backend-default value "
117 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
118 "Some backends may not support every allowed value."
121 "READ_COMMITTED":
"Read committed",
122 "READ_UNCOMMITTED":
"Read uncommitted",
123 "REPEATABLE_READ":
"Repeatable read",
124 "SERIALIZABLE":
"Serializable",
129 connection_pool = Field[bool](
130 doc=
"If False then disable SQLAlchemy connection pool. Do not use connection pool when forking.",
133 connection_timeout = Field[float](
135 "Maximum time to wait time for database lock to be released before exiting. "
136 "Defaults to sqlalchemy defaults if not set."
141 sql_echo = Field[bool](doc=
"If True then pass SQLAlchemy echo option.", default=
False)
142 dia_object_index = ChoiceField[str](
143 doc=
"Indexing mode for DiaObject table",
145 "baseline":
"Index defined in baseline schema",
146 "pix_id_iov":
"(pixelId, objectId, iovStart) PK",
147 "last_object_table":
"Separate DiaObjectLast table",
151 htm_level = Field[int](doc=
"HTM indexing level", default=20)
152 htm_max_ranges = Field[int](doc=
"Max number of ranges in HTM envelope", default=64)
153 htm_index_column = Field[str](
154 default=
"pixelId", doc=
"Name of a HTM index column for DiaObject and DiaSource tables"
156 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
157 dia_object_columns = ListField[str](
158 doc=
"List of columns to read from DiaObject, by default read all columns", default=[]
160 prefix = Field[str](doc=
"Prefix to add to table names and index names", default=
"")
161 namespace = Field[str](
163 "Namespace or schema name for all tables in APDB database. "
164 "Presently only works for PostgreSQL backend. "
165 "If schema with this name does not exist it will be created when "
166 "APDB tables are created."
171 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
176 raise ValueError(
"ra_dec_columns must have exactly two column names")
180 """Implementation of APDB interface based on SQL database.
182 The implementation is configured via standard ``pex_config`` mechanism
183 using `ApdbSqlConfig` configuration class. For an example of different
184 configurations check ``config/`` folder.
188 config : `ApdbSqlConfig`
189 Configuration object.
192 ConfigClass = ApdbSqlConfig
194 metadataSchemaVersionKey =
"version:schema"
195 """Name of the metadata key to store schema version number."""
197 metadataCodeVersionKey =
"version:ApdbSql"
198 """Name of the metadata key to store code version number."""
200 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
201 """Name of the metadata key to store replica code version number."""
203 metadataConfigKey =
"config:apdb-sql.json"
204 """Name of the metadata key to store code version number."""
206 _frozen_parameters = (
213 """Names of the config parameters to be frozen in metadata table."""
218 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
219 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
220 meta_table: sqlalchemy.schema.Table |
None =
None
221 with suppress(sqlalchemy.exc.NoSuchTableError):
222 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
228 if config_json
is not None:
231 self.
config = freezer.update(config, config_json)
238 dia_object_index=self.
config.dia_object_index,
239 schema_file=self.
config.schema_file,
240 schema_name=self.
config.schema_name,
241 prefix=self.
config.prefix,
242 namespace=self.
config.namespace,
243 htm_index_column=self.
config.htm_index_column,
244 enable_replica=self.
config.use_insert_id,
252 _LOG.debug(
"APDB Configuration:")
253 _LOG.debug(
" dia_object_index: %s", self.
config.dia_object_index)
254 _LOG.debug(
" read_sources_months: %s", self.
config.read_sources_months)
255 _LOG.debug(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
256 _LOG.debug(
" dia_object_columns: %s", self.
config.dia_object_columns)
257 _LOG.debug(
" schema_file: %s", self.
config.schema_file)
258 _LOG.debug(
" extra_schema_file: %s", self.
config.extra_schema_file)
259 _LOG.debug(
" schema prefix: %s", self.
config.prefix)
261 self.
_timer_args: list[MonAgent | logging.Logger] = [_MON]
265 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
266 """Create `Timer` instance given its name."""
270 def _makeEngine(cls, config: ApdbSqlConfig) -> sqlalchemy.engine.Engine:
271 """Make SQLALchemy engine based on configured parameters.
275 config : `ApdbSqlConfig`
276 Configuration object.
280 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
281 conn_args: dict[str, Any] = dict()
282 if not config.connection_pool:
283 kw.update(poolclass=NullPool)
284 if config.isolation_level
is not None:
285 kw.update(isolation_level=config.isolation_level)
286 elif config.db_url.startswith(
"sqlite"):
288 kw.update(isolation_level=
"READ_UNCOMMITTED")
289 if config.connection_timeout
is not None:
290 if config.db_url.startswith(
"sqlite"):
291 conn_args.update(timeout=config.connection_timeout)
292 elif config.db_url.startswith((
"postgresql",
"mysql")):
293 conn_args.update(connect_timeout=config.connection_timeout)
294 kw.update(connect_args=conn_args)
295 engine = sqlalchemy.create_engine(config.db_url, **kw)
297 if engine.dialect.name ==
"sqlite":
299 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
304 """Check schema version compatibility."""
306 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
307 """Retrieve version number from given metadata key."""
308 if metadata.table_exists():
309 version_str = metadata.get(key)
310 if version_str
is None:
312 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
313 return VersionTuple.fromString(version_str)
324 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
326 f
"Configured schema version {self._schema.schemaVersion()} "
327 f
"is not compatible with database version {db_schema_version}"
331 f
"Current code version {self.apdbImplementationVersion()} "
332 f
"is not compatible with database version {db_code_version}"
336 if self.
_schema.has_replica_chunks:
338 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
339 if not code_replica_version.checkCompatibility(db_replica_version):
341 f
"Current replication code version {code_replica_version} "
342 f
"is not compatible with database version {db_replica_version}"
347 """Return version number for current APDB implementation.
351 version : `VersionTuple`
352 Version of the code defined in implementation class.
361 schema_file: str |
None =
None,
362 schema_name: str |
None =
None,
363 read_sources_months: int |
None =
None,
364 read_forced_sources_months: int |
None =
None,
365 use_insert_id: bool =
False,
366 connection_timeout: int |
None =
None,
367 dia_object_index: str |
None =
None,
368 htm_level: int |
None =
None,
369 htm_index_column: str |
None =
None,
370 ra_dec_columns: list[str] |
None =
None,
371 prefix: str |
None =
None,
372 namespace: str |
None =
None,
375 """Initialize new APDB instance and make configuration object for it.
380 SQLAlchemy database URL.
381 schema_file : `str`, optional
382 Location of (YAML) configuration file with APDB schema. If not
383 specified then default location will be used.
384 schema_name : str | None
385 Name of the schema in YAML configuration file. If not specified
386 then default name will be used.
387 read_sources_months : `int`, optional
388 Number of months of history to read from DiaSource.
389 read_forced_sources_months : `int`, optional
390 Number of months of history to read from DiaForcedSource.
391 use_insert_id : `bool`
392 If True, make additional tables used for replication to PPDB.
393 connection_timeout : `int`, optional
394 Database connection timeout in seconds.
395 dia_object_index : `str`, optional
396 Indexing mode for DiaObject table.
397 htm_level : `int`, optional
399 htm_index_column : `str`, optional
400 Name of a HTM index column for DiaObject and DiaSource tables.
401 ra_dec_columns : `list` [`str`], optional
402 Names of ra/dec columns in DiaObject table.
403 prefix : `str`, optional
404 Optional prefix for all table names.
405 namespace : `str`, optional
406 Name of the database schema for all APDB tables. If not specified
407 then default schema is used.
408 drop : `bool`, optional
409 If `True` then drop existing tables before re-creating the schema.
413 config : `ApdbSqlConfig`
414 Resulting configuration object for a created APDB instance.
416 config =
ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
417 if schema_file
is not None:
418 config.schema_file = schema_file
419 if schema_name
is not None:
420 config.schema_name = schema_name
421 if read_sources_months
is not None:
422 config.read_sources_months = read_sources_months
423 if read_forced_sources_months
is not None:
424 config.read_forced_sources_months = read_forced_sources_months
425 if connection_timeout
is not None:
426 config.connection_timeout = connection_timeout
427 if dia_object_index
is not None:
428 config.dia_object_index = dia_object_index
429 if htm_level
is not None:
430 config.htm_level = htm_level
431 if htm_index_column
is not None:
432 config.htm_index_column = htm_index_column
433 if ra_dec_columns
is not None:
434 config.ra_dec_columns = ra_dec_columns
435 if prefix
is not None:
436 config.prefix = prefix
437 if namespace
is not None:
438 config.namespace = namespace
445 """Return `ApdbReplica` instance for this database."""
449 """Return dictionary with the table names and row counts.
451 Used by ``ap_proto`` to keep track of the size of the database tables.
452 Depending on database technology this could be expensive operation.
457 Dict where key is a table name and value is a row count.
460 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
461 if self.
config.dia_object_index ==
"last_object_table":
462 tables.append(ApdbTables.DiaObjectLast)
463 with self.
_engine.begin()
as conn:
465 sa_table = self.
_schema.get_table(table)
466 stmt = sql.select(func.count()).select_from(sa_table)
467 count: int = conn.execute(stmt).scalar_one()
468 res[table.name] = count
472 def tableDef(self, table: ApdbTables) -> Table |
None:
474 return self.
_schema.tableSchemas.get(table)
477 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
480 if not isinstance(config, ApdbSqlConfig):
481 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
488 dia_object_index=config.dia_object_index,
489 schema_file=config.schema_file,
490 schema_name=config.schema_name,
491 prefix=config.prefix,
492 namespace=config.namespace,
493 htm_index_column=config.htm_index_column,
494 enable_replica=config.use_insert_id,
496 schema.makeSchema(drop=drop)
499 meta_table: sqlalchemy.schema.Table |
None =
None
500 with suppress(ValueError):
501 meta_table = schema.get_table(ApdbTables.metadata)
504 if apdb_meta.table_exists():
508 if config.use_insert_id:
512 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
524 if self.
config.dia_object_index ==
"last_object_table":
525 table_enum = ApdbTables.DiaObjectLast
527 table_enum = ApdbTables.DiaObject
528 table = self.
_schema.get_table(table_enum)
529 if not self.
config.dia_object_columns:
530 columns = self.
_schema.get_apdb_columns(table_enum)
532 columns = [
table.c[col]
for col
in self.
config.dia_object_columns]
533 query = sql.select(*columns)
539 if self.
config.dia_object_index !=
"last_object_table":
540 query = query.where(table.c.validityEnd ==
None)
545 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"}):
546 with self.
_engine.begin()
as conn:
547 objects = pandas.read_sql_query(query, conn)
548 _LOG.debug(
"found %s DiaObjects", len(objects))
552 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
553 ) -> pandas.DataFrame |
None:
555 if self.
config.read_sources_months == 0:
556 _LOG.debug(
"Skip DiaSources fetching")
559 if object_ids
is None:
566 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
567 ) -> pandas.DataFrame |
None:
569 if self.
config.read_forced_sources_months == 0:
570 _LOG.debug(
"Skip DiaForceSources fetching")
573 if object_ids
is None:
575 raise NotImplementedError(
"Region-based selection is not supported")
580 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
582 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"}):
584 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
587 _LOG.debug(
"found %s DiaForcedSources", len(sources))
592 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
593 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
595 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
597 with self.
_engine.begin()
as conn:
598 result = conn.execute(query1).scalar_one_or_none()
599 if result
is not None:
603 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
604 result = conn.execute(query2).scalar_one_or_none()
605 return result
is not None
610 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
611 query = sql.select(*columns)
614 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"}):
615 with self.
_engine.begin()
as conn:
616 objects = pandas.read_sql_query(query, conn)
617 _LOG.debug(
"found %s SSObjects", len(objects))
622 visit_time: astropy.time.Time,
623 objects: pandas.DataFrame,
624 sources: pandas.DataFrame |
None =
None,
625 forced_sources: pandas.DataFrame |
None =
None,
630 with self.
_engine.begin()
as connection:
631 replica_chunk: ReplicaChunk |
None =
None
632 if self.
_schema.has_replica_chunks:
633 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
640 if sources
is not None:
645 if forced_sources
is not None:
651 idColumn =
"ssObjectId"
652 table = self.
_schema.get_table(ApdbTables.SSObject)
655 with self.
_engine.begin()
as conn:
658 ids = sorted(int(oid)
for oid
in objects[idColumn])
660 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
661 result = conn.execute(query)
662 knownIds =
set(row.ssObjectId
for row
in result)
664 filter = objects[idColumn].isin(knownIds)
665 toUpdate = cast(pandas.DataFrame, objects[filter])
666 toInsert = cast(pandas.DataFrame, objects[~filter])
669 if len(toInsert) > 0:
670 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
673 if len(toUpdate) > 0:
674 whereKey = f
"{idColumn}_param"
675 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
676 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
677 values = toUpdate.to_dict(
"records")
678 result = conn.execute(update, values)
683 table = self.
_schema.get_table(ApdbTables.DiaSource)
684 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
686 with self.
_engine.begin()
as conn:
690 missing_ids: list[int] = []
691 for key, value
in idMap.items():
692 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
693 result = conn.execute(query, params)
694 if result.rowcount == 0:
695 missing_ids.append(key)
697 missing =
",".join(str(item)
for item
in missing_ids)
698 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
708 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
711 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
712 stmt = stmt.where(table.c.validityEnd ==
None)
715 with self._engine.begin()
as conn:
716 count = conn.execute(stmt).scalar_one()
724 raise RuntimeError(
"Database schema was not initialized.")
728 """Return catalog of DiaSource instances from given region.
732 region : `lsst.sphgeom.Region`
733 Region to search for DIASources.
734 visit_time : `astropy.time.Time`
735 Time of the current visit.
739 catalog : `pandas.DataFrame`
740 Catalog containing DiaSource records.
745 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
747 table = self.
_schema.get_table(ApdbTables.DiaSource)
748 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
749 query = sql.select(*columns)
752 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
753 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
754 query = query.where(where)
757 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"}):
758 with self.
_engine.begin()
as conn:
759 sources = pandas.read_sql_query(query, conn)
760 _LOG.debug(
"found %s DiaSources", len(sources))
764 """Return catalog of DiaSource instances given set of DiaObject IDs.
769 Collection of DiaObject IDs
770 visit_time : `astropy.time.Time`
771 Time of the current visit.
775 catalog : `pandas.DataFrame`
776 Catalog contaning DiaSource records.
781 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
783 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"}):
784 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
786 _LOG.debug(
"found %s DiaSources", len(sources))
790 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
791 ) -> pandas.DataFrame:
792 """Return catalog of DiaSource or DiaForcedSource instances given set
797 table : `sqlalchemy.schema.Table`
800 Collection of DiaObject IDs
801 midpointMjdTai_start : `float`
802 Earliest midpointMjdTai to retrieve.
806 catalog : `pandas.DataFrame`
807 Catalog contaning DiaSource records. `None` is returned if
808 ``read_sources_months`` configuration parameter is set to 0 or
809 when ``object_ids`` is empty.
811 table = self.
_schema.get_table(table_enum)
812 columns = self.
_schema.get_apdb_columns(table_enum)
814 sources: pandas.DataFrame |
None =
None
815 if len(object_ids) <= 0:
816 _LOG.debug(
"ID list is empty, just fetch empty result")
817 query = sql.select(*columns).where(sql.literal(
False))
818 with self.
_engine.begin()
as conn:
819 sources = pandas.read_sql_query(query, conn)
821 data_frames: list[pandas.DataFrame] = []
822 for ids
in chunk_iterable(sorted(object_ids), 1000):
823 query = sql.select(*columns)
827 int_ids = [int(oid)
for oid
in ids]
832 table.columns[
"diaObjectId"].in_(int_ids),
833 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
838 with self.
_engine.begin()
as conn:
839 data_frames.append(pandas.read_sql_query(query, conn))
841 if len(data_frames) == 1:
842 sources = data_frames[0]
844 sources = pandas.concat(data_frames)
845 assert sources
is not None,
"Catalog cannot be None"
850 replica_chunk: ReplicaChunk,
851 visit_time: astropy.time.Time,
852 connection: sqlalchemy.engine.Connection,
854 dt = visit_time.datetime
856 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
859 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
860 row = {
"apdb_replica_chunk": replica_chunk.id} | values
861 if connection.dialect.name ==
"sqlite":
862 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
863 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
864 connection.execute(insert_sqlite, row)
865 elif connection.dialect.name ==
"postgresql":
866 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
867 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
868 connection.execute(insert_pg, row)
870 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
874 objs: pandas.DataFrame,
875 visit_time: astropy.time.Time,
876 replica_chunk: ReplicaChunk |
None,
877 connection: sqlalchemy.engine.Connection,
879 """Store catalog of DiaObjects from current visit.
883 objs : `pandas.DataFrame`
884 Catalog with DiaObject records.
885 visit_time : `astropy.time.Time`
887 replica_chunk : `ReplicaChunk`
891 _LOG.debug(
"No objects to write to database.")
896 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
897 _LOG.debug(
"first object ID: %d", ids[0])
901 dt = visit_time.datetime
904 if self.
config.dia_object_index ==
"last_object_table":
906 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
909 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
911 with self.
_timer(
"delete_time", tags={
"table": table.name}):
912 res = connection.execute(query)
913 _LOG.debug(
"deleted %s objects", res.rowcount)
916 last_column_names = [column.name
for column
in table.columns]
917 last_objs = objs[last_column_names]
920 if "lastNonForcedSource" in last_objs.columns:
923 last_objs[
"lastNonForcedSource"].fillna(dt, inplace=
True)
925 extra_column = pandas.Series([dt] * len(objs), name=
"lastNonForcedSource")
926 last_objs.set_index(extra_column.index, inplace=
True)
927 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
929 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"}):
939 table = self.
_schema.get_table(ApdbTables.DiaObject)
943 .values(validityEnd=dt)
946 table.columns[
"diaObjectId"].in_(ids),
947 table.columns[
"validityEnd"].is_(
None),
952 with self.
_timer(
"truncate_time", tags={
"table": table.name}):
953 res = connection.execute(update)
954 _LOG.debug(
"truncated %s intervals", res.rowcount)
959 extra_columns: list[pandas.Series] = []
960 if "validityStart" in objs.columns:
961 objs[
"validityStart"] = dt
963 extra_columns.append(pandas.Series([dt] * len(objs), name=
"validityStart"))
964 if "validityEnd" in objs.columns:
965 objs[
"validityEnd"] =
None
967 extra_columns.append(pandas.Series([
None] * len(objs), name=
"validityEnd"))
968 if "lastNonForcedSource" in objs.columns:
971 objs[
"lastNonForcedSource"].fillna(dt, inplace=
True)
973 extra_columns.append(pandas.Series([dt] * len(objs), name=
"lastNonForcedSource"))
975 objs.set_index(extra_columns[0].index, inplace=
True)
976 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
979 table = self.
_schema.get_table(ApdbTables.DiaObject)
980 replica_data: list[dict] = []
981 replica_stmt: Any =
None
982 replica_table_name =
""
983 if replica_chunk
is not None:
984 pk_names = [column.name
for column
in table.primary_key]
985 replica_data = objs[pk_names].to_dict(
"records")
986 for row
in replica_data:
987 row[
"apdb_replica_chunk"] = replica_chunk.id
988 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
989 replica_table_name = replica_table.name
990 replica_stmt = replica_table.insert()
993 with self.
_timer(
"insert_time", tags={
"table": table.name}):
994 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
995 if replica_stmt
is not None:
996 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
997 connection.execute(replica_stmt, replica_data)
1001 sources: pandas.DataFrame,
1002 replica_chunk: ReplicaChunk |
None,
1003 connection: sqlalchemy.engine.Connection,
1005 """Store catalog of DiaSources from current visit.
1009 sources : `pandas.DataFrame`
1010 Catalog containing DiaSource records
1012 table = self.
_schema.get_table(ApdbTables.DiaSource)
1015 replica_data: list[dict] = []
1016 replica_stmt: Any =
None
1017 replica_table_name =
""
1018 if replica_chunk
is not None:
1019 pk_names = [column.name
for column
in table.primary_key]
1020 replica_data = sources[pk_names].to_dict(
"records")
1021 for row
in replica_data:
1022 row[
"apdb_replica_chunk"] = replica_chunk.id
1023 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1024 replica_table_name = replica_table.name
1025 replica_stmt = replica_table.insert()
1028 with self.
_timer(
"insert_time", tags={
"table": table.name}):
1030 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1031 if replica_stmt
is not None:
1032 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name}):
1033 connection.execute(replica_stmt, replica_data)
1037 sources: pandas.DataFrame,
1038 replica_chunk: ReplicaChunk |
None,
1039 connection: sqlalchemy.engine.Connection,
1041 """Store a set of DiaForcedSources from current visit.
1045 sources : `pandas.DataFrame`
1046 Catalog containing DiaForcedSource records
1048 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1051 replica_data: list[dict] = []
1052 replica_stmt: Any =
None
1053 replica_table_name =
""
1054 if replica_chunk
is not None:
1055 pk_names = [column.name
for column
in table.primary_key]
1056 replica_data = sources[pk_names].to_dict(
"records")
1057 for row
in replica_data:
1058 row[
"apdb_replica_chunk"] = replica_chunk.id
1059 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1060 replica_table_name = replica_table.name
1061 replica_stmt = replica_table.insert()
1064 with self.
_timer(
"insert_time", tags={
"table": table.name}):
1066 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1067 if replica_stmt
is not None:
1068 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1069 connection.execute(replica_stmt, replica_data)
1072 """Generate a set of HTM indices covering specified region.
1076 region: `sphgeom.Region`
1077 Region that needs to be indexed.
1081 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1083 _LOG.debug(
"region: %s", region)
1086 return indices.ranges()
1088 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1089 """Make SQLAlchemy expression for selecting records in a region."""
1090 htm_index_column = table.columns[self.
config.htm_index_column]
1093 for low, upper
in pixel_ranges:
1096 exprlist.append(htm_index_column == low)
1098 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1100 return sql.expression.or_(*exprlist)
1103 """Calculate HTM index for each record and add it to a DataFrame.
1107 This overrides any existing column in a DataFrame with the same name
1108 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1112 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1113 ra_col, dec_col = self.
config.ra_dec_columns
1114 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1119 df[self.
config.htm_index_column] = htm_index
1123 """Add pixelId column to DiaSource catalog.
1127 This method copies pixelId value from a matching DiaObject record.
1128 DiaObject catalog needs to have a pixelId column filled by
1129 ``_add_obj_htm_index`` method and DiaSource records need to be
1130 associated to DiaObjects via ``diaObjectId`` column.
1132 This overrides any existing column in a DataFrame with the same name
1133 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1136 pixel_id_map: dict[int, int] = {
1137 diaObjectId: pixelId
1138 for diaObjectId, pixelId
in zip(objs[
"diaObjectId"], objs[self.
config.htm_index_column])
1144 htm_index = np.zeros(sources.shape[0], dtype=np.int64)
1145 for i, diaObjId
in enumerate(sources[
"diaObjectId"]):
1146 htm_index[i] = pixel_id_map[diaObjId]
1147 sources = sources.copy()
1148 sources[self.
config.htm_index_column] = htm_index
Tag types used to declare specialized field types.
__init__(self, ApdbSqlConfig config)
str metadataSchemaVersionKey
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
metadataReplicaVersionKey
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
pandas.DataFrame _add_obj_htm_index(self, pandas.DataFrame df)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
VersionTuple apdbImplementationVersion(cls)
None _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_src_htm_index(self, pandas.DataFrame sources, pandas.DataFrame objs)
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
bool containsVisitDetector(self, int visit, int detector)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
ApdbSqlConfig init_database(cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Table|None tableDef(self, ApdbTables table)
str metadataCodeVersionKey
str metadataReplicaVersionKey
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
daf::base::PropertySet * set
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)