22"""Module defining Apdb class and related methods.
25from __future__
import annotations
27__all__ = [
"ApdbSqlConfig",
"ApdbSql"]
34from contextlib
import closing, suppress
35from typing
import TYPE_CHECKING, Any, cast
41import sqlalchemy.dialects.postgresql
42import sqlalchemy.dialects.sqlite
44from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
45from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
46from lsst.utils.iteration
import chunk_iterable
47from sqlalchemy
import func, sql
48from sqlalchemy.pool
import NullPool
50from .._auth
import DB_AUTH_ENVVAR, DB_AUTH_PATH
51from ..apdb
import Apdb, ApdbConfig
52from ..apdbConfigFreezer
import ApdbConfigFreezer
53from ..apdbReplica
import ReplicaChunk
54from ..apdbSchema
import ApdbTables
55from ..monitor
import MonAgent
56from ..schema_model
import Table
57from ..timer
import Timer
58from ..versionTuple
import IncompatibleVersionError, VersionTuple
59from .apdbMetadataSql
import ApdbMetadataSql
60from .apdbSqlReplica
import ApdbSqlReplica
61from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
66 from ..apdbMetadata
import ApdbMetadata
68_LOG = logging.getLogger(__name__)
73"""Version for the code controlling non-replication tables. This needs to be
74updated following compatibility rules when schema produced by this code
80 """Change the type of uint64 columns to int64, and return copy of data
83 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
84 return df.astype({name: np.int64
for name
in names})
88 """Calculate starting point for time-based source search.
92 visit_time : `astropy.time.Time`
93 Time of current visit.
95 Number of months in the sources history.
100 A ``midpointMjdTai`` starting point, MJD time.
104 return visit_time.mjd - months * 30
108 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
111 with closing(dbapiConnection.cursor())
as cursor:
112 cursor.execute(
"PRAGMA foreign_keys=ON;")
116 """APDB configuration class for SQL implementation (ApdbSql)."""
118 db_url = Field[str](doc=
"SQLAlchemy database connection URI")
119 isolation_level = ChoiceField[str](
121 "Transaction isolation level, if unset then backend-default value "
122 "is used, except for SQLite backend where we use READ_UNCOMMITTED. "
123 "Some backends may not support every allowed value."
126 "READ_COMMITTED":
"Read committed",
127 "READ_UNCOMMITTED":
"Read uncommitted",
128 "REPEATABLE_READ":
"Repeatable read",
129 "SERIALIZABLE":
"Serializable",
134 connection_pool = Field[bool](
135 doc=
"If False then disable SQLAlchemy connection pool. Do not use connection pool when forking.",
138 connection_timeout = Field[float](
140 "Maximum time to wait time for database lock to be released before exiting. "
141 "Defaults to sqlalchemy defaults if not set."
146 sql_echo = Field[bool](doc=
"If True then pass SQLAlchemy echo option.", default=
False)
147 dia_object_index = ChoiceField[str](
148 doc=
"Indexing mode for DiaObject table",
150 "baseline":
"Index defined in baseline schema",
151 "pix_id_iov":
"(pixelId, objectId, iovStart) PK",
152 "last_object_table":
"Separate DiaObjectLast table",
156 htm_level = Field[int](doc=
"HTM indexing level", default=20)
157 htm_max_ranges = Field[int](doc=
"Max number of ranges in HTM envelope", default=64)
158 htm_index_column = Field[str](
159 default=
"pixelId", doc=
"Name of a HTM index column for DiaObject and DiaSource tables"
161 ra_dec_columns = ListField[str](default=[
"ra",
"dec"], doc=
"Names of ra/dec columns in DiaObject table")
162 dia_object_columns = ListField[str](
163 doc=
"List of columns to read from DiaObject, by default read all columns", default=[]
165 prefix = Field[str](doc=
"Prefix to add to table names and index names", default=
"")
166 namespace = Field[str](
168 "Namespace or schema name for all tables in APDB database. "
169 "Presently only works for PostgreSQL backend. "
170 "If schema with this name does not exist it will be created when "
171 "APDB tables are created."
176 timer = Field[bool](doc=
"If True then print/log timing information", default=
False)
181 raise ValueError(
"ra_dec_columns must have exactly two column names")
185 """Implementation of APDB interface based on SQL database.
187 The implementation is configured via standard ``pex_config`` mechanism
188 using `ApdbSqlConfig` configuration class. For an example of different
189 configurations check ``config/`` folder.
193 config : `ApdbSqlConfig`
194 Configuration object.
197 ConfigClass = ApdbSqlConfig
199 metadataSchemaVersionKey =
"version:schema"
200 """Name of the metadata key to store schema version number."""
202 metadataCodeVersionKey =
"version:ApdbSql"
203 """Name of the metadata key to store code version number."""
205 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
206 """Name of the metadata key to store replica code version number."""
208 metadataConfigKey =
"config:apdb-sql.json"
209 """Name of the metadata key to store code version number."""
211 _frozen_parameters = (
218 """Names of the config parameters to be frozen in metadata table."""
223 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
224 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
225 meta_table: sqlalchemy.schema.Table |
None =
None
226 with suppress(sqlalchemy.exc.NoSuchTableError):
227 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
233 if config_json
is not None:
236 self.
config = freezer.update(config, config_json)
243 dia_object_index=self.
config.dia_object_index,
244 schema_file=self.
config.schema_file,
245 schema_name=self.
config.schema_name,
246 prefix=self.
config.prefix,
247 namespace=self.
config.namespace,
248 htm_index_column=self.
config.htm_index_column,
249 enable_replica=self.
config.use_insert_id,
257 _LOG.debug(
"APDB Configuration:")
258 _LOG.debug(
" dia_object_index: %s", self.
config.dia_object_index)
259 _LOG.debug(
" read_sources_months: %s", self.
config.read_sources_months)
260 _LOG.debug(
" read_forced_sources_months: %s", self.
config.read_forced_sources_months)
261 _LOG.debug(
" dia_object_columns: %s", self.
config.dia_object_columns)
262 _LOG.debug(
" schema_file: %s", self.
config.schema_file)
263 _LOG.debug(
" extra_schema_file: %s", self.
config.extra_schema_file)
264 _LOG.debug(
" schema prefix: %s", self.
config.prefix)
266 self.
_timer_args: list[MonAgent | logging.Logger] = [_MON]
270 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
271 """Create `Timer` instance given its name."""
275 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
276 """Make SQLALchemy engine based on configured parameters.
280 config : `ApdbSqlConfig`
281 Configuration object.
283 Whether to try to create new database file, only relevant for
284 SQLite backend which always creates new files by default.
288 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
289 conn_args: dict[str, Any] = dict()
290 if not config.connection_pool:
291 kw.update(poolclass=NullPool)
292 if config.isolation_level
is not None:
293 kw.update(isolation_level=config.isolation_level)
294 elif config.db_url.startswith(
"sqlite"):
296 kw.update(isolation_level=
"READ_UNCOMMITTED")
297 if config.connection_timeout
is not None:
298 if config.db_url.startswith(
"sqlite"):
299 conn_args.update(timeout=config.connection_timeout)
300 elif config.db_url.startswith((
"postgresql",
"mysql")):
301 conn_args.update(connect_timeout=config.connection_timeout)
302 kw.update(connect_args=conn_args)
303 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
305 if engine.dialect.name ==
"sqlite":
307 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
312 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
313 """Generate a complete URL for database with proper credentials.
318 Database URL as specified in configuration.
320 Whether to try to create new database file, only relevant for
321 SQLite backend which always creates new files by default.
325 connection_url : `sqlalchemy.engine.URL` or `str`
326 Connection URL including credentials.
331 components = urllib.parse.urlparse(config_url)
332 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
334 db_auth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
335 config_url = db_auth.getUrl(config_url)
336 except DbAuthNotFoundError:
350 """If URL refers to sqlite dialect, update it so that the backend does
351 not try to create database file if it does not exist already.
361 Possibly updated connection string.
364 url = sqlalchemy.make_url(url_string)
365 except sqlalchemy.exc.SQLAlchemyError:
370 if url.get_backend_name() ==
"sqlite":
375 database = url.database
376 if database
and not database.startswith((
":",
"file:")):
377 query = dict(url.query, mode=
"rw", uri=
"true")
384 if database.startswith(
"//"):
386 f
"Database URL contains extra leading slashes which will be removed: {url}",
389 database =
"/" + database.lstrip(
"/")
390 url = url.set(database=f
"file:{database}", query=query)
391 url_string = url.render_as_string()
396 """Check schema version compatibility."""
398 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
399 """Retrieve version number from given metadata key."""
400 if metadata.table_exists():
401 version_str = metadata.get(key)
402 if version_str
is None:
404 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
405 return VersionTuple.fromString(version_str)
416 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
418 f
"Configured schema version {self._schema.schemaVersion()} "
419 f
"is not compatible with database version {db_schema_version}"
423 f
"Current code version {self.apdbImplementationVersion()} "
424 f
"is not compatible with database version {db_code_version}"
428 if self.
_schema.has_replica_chunks:
430 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
431 if not code_replica_version.checkCompatibility(db_replica_version):
433 f
"Current replication code version {code_replica_version} "
434 f
"is not compatible with database version {db_replica_version}"
439 """Return version number for current APDB implementation.
443 version : `VersionTuple`
444 Version of the code defined in implementation class.
453 schema_file: str |
None =
None,
454 schema_name: str |
None =
None,
455 read_sources_months: int |
None =
None,
456 read_forced_sources_months: int |
None =
None,
457 use_insert_id: bool =
False,
458 connection_timeout: int |
None =
None,
459 dia_object_index: str |
None =
None,
460 htm_level: int |
None =
None,
461 htm_index_column: str |
None =
None,
462 ra_dec_columns: list[str] |
None =
None,
463 prefix: str |
None =
None,
464 namespace: str |
None =
None,
467 """Initialize new APDB instance and make configuration object for it.
472 SQLAlchemy database URL.
473 schema_file : `str`, optional
474 Location of (YAML) configuration file with APDB schema. If not
475 specified then default location will be used.
476 schema_name : str | None
477 Name of the schema in YAML configuration file. If not specified
478 then default name will be used.
479 read_sources_months : `int`, optional
480 Number of months of history to read from DiaSource.
481 read_forced_sources_months : `int`, optional
482 Number of months of history to read from DiaForcedSource.
483 use_insert_id : `bool`
484 If True, make additional tables used for replication to PPDB.
485 connection_timeout : `int`, optional
486 Database connection timeout in seconds.
487 dia_object_index : `str`, optional
488 Indexing mode for DiaObject table.
489 htm_level : `int`, optional
491 htm_index_column : `str`, optional
492 Name of a HTM index column for DiaObject and DiaSource tables.
493 ra_dec_columns : `list` [`str`], optional
494 Names of ra/dec columns in DiaObject table.
495 prefix : `str`, optional
496 Optional prefix for all table names.
497 namespace : `str`, optional
498 Name of the database schema for all APDB tables. If not specified
499 then default schema is used.
500 drop : `bool`, optional
501 If `True` then drop existing tables before re-creating the schema.
505 config : `ApdbSqlConfig`
506 Resulting configuration object for a created APDB instance.
508 config =
ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
509 if schema_file
is not None:
510 config.schema_file = schema_file
511 if schema_name
is not None:
512 config.schema_name = schema_name
513 if read_sources_months
is not None:
514 config.read_sources_months = read_sources_months
515 if read_forced_sources_months
is not None:
516 config.read_forced_sources_months = read_forced_sources_months
517 if connection_timeout
is not None:
518 config.connection_timeout = connection_timeout
519 if dia_object_index
is not None:
520 config.dia_object_index = dia_object_index
521 if htm_level
is not None:
522 config.htm_level = htm_level
523 if htm_index_column
is not None:
524 config.htm_index_column = htm_index_column
525 if ra_dec_columns
is not None:
526 config.ra_dec_columns = ra_dec_columns
527 if prefix
is not None:
528 config.prefix = prefix
529 if namespace
is not None:
530 config.namespace = namespace
541 """Return `ApdbReplica` instance for this database."""
545 """Return dictionary with the table names and row counts.
547 Used by ``ap_proto`` to keep track of the size of the database tables.
548 Depending on database technology this could be expensive operation.
553 Dict where key is a table name and value is a row count.
556 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
557 if self.
config.dia_object_index ==
"last_object_table":
558 tables.append(ApdbTables.DiaObjectLast)
559 with self.
_engine.begin()
as conn:
561 sa_table = self.
_schema.get_table(table)
562 stmt = sql.select(func.count()).select_from(sa_table)
563 count: int = conn.execute(stmt).scalar_one()
564 res[table.name] = count
568 def tableDef(self, table: ApdbTables) -> Table |
None:
570 return self.
_schema.tableSchemas.get(table)
573 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
576 if not isinstance(config, ApdbSqlConfig):
577 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
584 dia_object_index=config.dia_object_index,
585 schema_file=config.schema_file,
586 schema_name=config.schema_name,
587 prefix=config.prefix,
588 namespace=config.namespace,
589 htm_index_column=config.htm_index_column,
590 enable_replica=config.use_insert_id,
592 schema.makeSchema(drop=drop)
595 meta_table: sqlalchemy.schema.Table |
None =
None
596 with suppress(ValueError):
597 meta_table = schema.get_table(ApdbTables.metadata)
600 if apdb_meta.table_exists():
604 if config.use_insert_id:
608 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
620 if self.
config.dia_object_index ==
"last_object_table":
621 table_enum = ApdbTables.DiaObjectLast
623 table_enum = ApdbTables.DiaObject
624 table = self.
_schema.get_table(table_enum)
625 if not self.
config.dia_object_columns:
626 columns = self.
_schema.get_apdb_columns(table_enum)
628 columns = [
table.c[col]
for col
in self.
config.dia_object_columns]
629 query = sql.select(*columns)
635 if self.
config.dia_object_index !=
"last_object_table":
636 query = query.where(table.c.validityEnd ==
None)
641 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
642 with self.
_engine.begin()
as conn:
643 objects = pandas.read_sql_query(query, conn)
644 timer.add_values(row_count=len(objects))
645 _LOG.debug(
"found %s DiaObjects", len(objects))
649 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
650 ) -> pandas.DataFrame |
None:
652 if self.
config.read_sources_months == 0:
653 _LOG.debug(
"Skip DiaSources fetching")
656 if object_ids
is None:
663 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
664 ) -> pandas.DataFrame |
None:
666 if self.
config.read_forced_sources_months == 0:
667 _LOG.debug(
"Skip DiaForceSources fetching")
670 if object_ids
is None:
672 raise NotImplementedError(
"Region-based selection is not supported")
677 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
679 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
681 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
683 timer.add_values(row_count=len(sources))
685 _LOG.debug(
"found %s DiaForcedSources", len(sources))
690 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
691 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
693 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
695 with self.
_engine.begin()
as conn:
696 result = conn.execute(query1).scalar_one_or_none()
697 if result
is not None:
701 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
702 result = conn.execute(query2).scalar_one_or_none()
703 return result
is not None
708 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
709 query = sql.select(*columns)
712 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
713 with self.
_engine.begin()
as conn:
714 objects = pandas.read_sql_query(query, conn)
715 timer.add_values(row_count=len(objects))
716 _LOG.debug(
"found %s SSObjects", len(objects))
721 visit_time: astropy.time.Time,
722 objects: pandas.DataFrame,
723 sources: pandas.DataFrame |
None =
None,
724 forced_sources: pandas.DataFrame |
None =
None,
728 if sources
is not None:
730 if forced_sources
is not None:
734 with self.
_engine.begin()
as connection:
735 replica_chunk: ReplicaChunk |
None =
None
736 if self.
_schema.has_replica_chunks:
737 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
744 if sources
is not None:
749 if forced_sources
is not None:
756 idColumn =
"ssObjectId"
757 table = self.
_schema.get_table(ApdbTables.SSObject)
760 with self.
_engine.begin()
as conn:
763 ids = sorted(int(oid)
for oid
in objects[idColumn])
765 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
766 result = conn.execute(query)
767 knownIds = set(row.ssObjectId
for row
in result)
769 filter = objects[idColumn].isin(knownIds)
770 toUpdate = cast(pandas.DataFrame, objects[filter])
771 toInsert = cast(pandas.DataFrame, objects[~filter])
774 if len(toInsert) > 0:
775 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
778 if len(toUpdate) > 0:
779 whereKey = f
"{idColumn}_param"
780 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
781 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
782 values = toUpdate.to_dict(
"records")
783 result = conn.execute(update, values)
788 table = self.
_schema.get_table(ApdbTables.DiaSource)
789 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
791 with self.
_engine.begin()
as conn:
795 missing_ids: list[int] = []
796 for key, value
in idMap.items():
797 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
798 result = conn.execute(query, params)
799 if result.rowcount == 0:
800 missing_ids.append(key)
802 missing =
",".join(str(item)
for item
in missing_ids)
803 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
813 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
816 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
817 stmt = stmt.where(table.c.validityEnd ==
None)
820 with self._engine.begin()
as conn:
821 count = conn.execute(stmt).scalar_one()
829 raise RuntimeError(
"Database schema was not initialized.")
833 """Return catalog of DiaSource instances from given region.
837 region : `lsst.sphgeom.Region`
838 Region to search for DIASources.
839 visit_time : `astropy.time.Time`
840 Time of the current visit.
844 catalog : `pandas.DataFrame`
845 Catalog containing DiaSource records.
850 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
852 table = self.
_schema.get_table(ApdbTables.DiaSource)
853 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
854 query = sql.select(*columns)
857 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
858 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
859 query = query.where(where)
862 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
863 with self.
_engine.begin()
as conn:
864 sources = pandas.read_sql_query(query, conn)
865 timer.add_values(row_counts=len(sources))
866 _LOG.debug(
"found %s DiaSources", len(sources))
870 """Return catalog of DiaSource instances given set of DiaObject IDs.
875 Collection of DiaObject IDs
876 visit_time : `astropy.time.Time`
877 Time of the current visit.
881 catalog : `pandas.DataFrame`
882 Catalog contaning DiaSource records.
887 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
889 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
890 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
891 timer.add_values(row_count=len(sources))
893 _LOG.debug(
"found %s DiaSources", len(sources))
897 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
898 ) -> pandas.DataFrame:
899 """Return catalog of DiaSource or DiaForcedSource instances given set
904 table : `sqlalchemy.schema.Table`
907 Collection of DiaObject IDs
908 midpointMjdTai_start : `float`
909 Earliest midpointMjdTai to retrieve.
913 catalog : `pandas.DataFrame`
914 Catalog contaning DiaSource records. `None` is returned if
915 ``read_sources_months`` configuration parameter is set to 0 or
916 when ``object_ids`` is empty.
918 table = self.
_schema.get_table(table_enum)
919 columns = self.
_schema.get_apdb_columns(table_enum)
921 sources: pandas.DataFrame |
None =
None
922 if len(object_ids) <= 0:
923 _LOG.debug(
"ID list is empty, just fetch empty result")
924 query = sql.select(*columns).where(sql.literal(
False))
925 with self.
_engine.begin()
as conn:
926 sources = pandas.read_sql_query(query, conn)
928 data_frames: list[pandas.DataFrame] = []
929 for ids
in chunk_iterable(sorted(object_ids), 1000):
930 query = sql.select(*columns)
934 int_ids = [int(oid)
for oid
in ids]
939 table.columns[
"diaObjectId"].in_(int_ids),
940 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
945 with self.
_engine.begin()
as conn:
946 data_frames.append(pandas.read_sql_query(query, conn))
948 if len(data_frames) == 1:
949 sources = data_frames[0]
951 sources = pandas.concat(data_frames)
952 assert sources
is not None,
"Catalog cannot be None"
957 replica_chunk: ReplicaChunk,
958 visit_time: astropy.time.Time,
959 connection: sqlalchemy.engine.Connection,
964 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
966 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
969 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
970 row = {
"apdb_replica_chunk": replica_chunk.id} | values
971 if connection.dialect.name ==
"sqlite":
972 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
973 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
974 connection.execute(insert_sqlite, row)
975 elif connection.dialect.name ==
"postgresql":
976 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
977 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
978 connection.execute(insert_pg, row)
980 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
984 objs: pandas.DataFrame,
985 visit_time: astropy.time.Time,
986 replica_chunk: ReplicaChunk |
None,
987 connection: sqlalchemy.engine.Connection,
989 """Store catalog of DiaObjects from current visit.
993 objs : `pandas.DataFrame`
994 Catalog with DiaObject records.
995 visit_time : `astropy.time.Time`
997 replica_chunk : `ReplicaChunk`
1001 _LOG.debug(
"No objects to write to database.")
1006 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
1007 _LOG.debug(
"first object ID: %d", ids[0])
1011 dt = visit_time.datetime
1014 if self.
config.dia_object_index ==
"last_object_table":
1016 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
1019 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
1021 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
1022 res = connection.execute(query)
1023 timer.add_values(row_count=res.rowcount)
1024 _LOG.debug(
"deleted %s objects", res.rowcount)
1027 last_column_names = [column.name
for column
in table.columns]
1028 last_objs = objs[last_column_names]
1031 if "lastNonForcedSource" in last_objs.columns:
1034 last_objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
1036 extra_column = pandas.Series([dt] * len(objs), name=
"lastNonForcedSource")
1037 last_objs.set_index(extra_column.index, inplace=
True)
1038 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
1040 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
1046 schema=table.schema,
1048 timer.add_values(row_count=len(last_objs))
1051 table = self.
_schema.get_table(ApdbTables.DiaObject)
1055 .values(validityEnd=dt)
1057 sql.expression.and_(
1058 table.columns[
"diaObjectId"].in_(ids),
1059 table.columns[
"validityEnd"].is_(
None),
1064 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
1065 res = connection.execute(update)
1066 timer.add_values(row_count=res.rowcount)
1067 _LOG.debug(
"truncated %s intervals", res.rowcount)
1072 extra_columns: list[pandas.Series] = []
1073 if "validityStart" in objs.columns:
1074 objs[
"validityStart"] = dt
1076 extra_columns.append(pandas.Series([dt] * len(objs), name=
"validityStart"))
1077 if "validityEnd" in objs.columns:
1078 objs[
"validityEnd"] =
None
1080 extra_columns.append(pandas.Series([
None] * len(objs), name=
"validityEnd"))
1081 if "lastNonForcedSource" in objs.columns:
1084 objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
1086 extra_columns.append(pandas.Series([dt] * len(objs), name=
"lastNonForcedSource"))
1088 objs.set_index(extra_columns[0].index, inplace=
True)
1089 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1092 table = self.
_schema.get_table(ApdbTables.DiaObject)
1093 replica_data: list[dict] = []
1094 replica_stmt: Any =
None
1095 replica_table_name =
""
1096 if replica_chunk
is not None:
1097 pk_names = [column.name
for column
in table.primary_key]
1098 replica_data = objs[pk_names].to_dict(
"records")
1099 for row
in replica_data:
1100 row[
"apdb_replica_chunk"] = replica_chunk.id
1101 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1102 replica_table_name = replica_table.name
1103 replica_stmt = replica_table.insert()
1106 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1107 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1108 timer.add_values(row_count=len(objs))
1109 if replica_stmt
is not None:
1110 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1111 connection.execute(replica_stmt, replica_data)
1112 timer.add_values(row_count=len(replica_data))
1116 sources: pandas.DataFrame,
1117 replica_chunk: ReplicaChunk |
None,
1118 connection: sqlalchemy.engine.Connection,
1120 """Store catalog of DiaSources from current visit.
1124 sources : `pandas.DataFrame`
1125 Catalog containing DiaSource records
1127 table = self.
_schema.get_table(ApdbTables.DiaSource)
1130 replica_data: list[dict] = []
1131 replica_stmt: Any =
None
1132 replica_table_name =
""
1133 if replica_chunk
is not None:
1134 pk_names = [column.name
for column
in table.primary_key]
1135 replica_data = sources[pk_names].to_dict(
"records")
1136 for row
in replica_data:
1137 row[
"apdb_replica_chunk"] = replica_chunk.id
1138 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1139 replica_table_name = replica_table.name
1140 replica_stmt = replica_table.insert()
1143 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1145 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1146 timer.add_values(row_count=len(sources))
1147 if replica_stmt
is not None:
1148 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1149 connection.execute(replica_stmt, replica_data)
1150 timer.add_values(row_count=len(replica_data))
1154 sources: pandas.DataFrame,
1155 replica_chunk: ReplicaChunk |
None,
1156 connection: sqlalchemy.engine.Connection,
1158 """Store a set of DiaForcedSources from current visit.
1162 sources : `pandas.DataFrame`
1163 Catalog containing DiaForcedSource records
1165 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1168 replica_data: list[dict] = []
1169 replica_stmt: Any =
None
1170 replica_table_name =
""
1171 if replica_chunk
is not None:
1172 pk_names = [column.name
for column
in table.primary_key]
1173 replica_data = sources[pk_names].to_dict(
"records")
1174 for row
in replica_data:
1175 row[
"apdb_replica_chunk"] = replica_chunk.id
1176 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1177 replica_table_name = replica_table.name
1178 replica_stmt = replica_table.insert()
1181 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1183 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1184 timer.add_values(row_count=len(sources))
1185 if replica_stmt
is not None:
1186 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1187 connection.execute(replica_stmt, replica_data)
1188 timer.add_values(row_count=len(replica_data))
1191 """Generate a set of HTM indices covering specified region.
1195 region: `sphgeom.Region`
1196 Region that needs to be indexed.
1200 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1202 _LOG.debug(
"region: %s", region)
1205 return indices.ranges()
1207 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1208 """Make SQLAlchemy expression for selecting records in a region."""
1209 htm_index_column = table.columns[self.
config.htm_index_column]
1212 for low, upper
in pixel_ranges:
1215 exprlist.append(htm_index_column == low)
1217 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1219 return sql.expression.or_(*exprlist)
1222 """Calculate spatial index for each record and add it to a DataFrame.
1226 df : `pandas.DataFrame`
1227 DataFrame which has to contain ra/dec columns, names of these
1228 columns are defined by configuration ``ra_dec_columns`` field.
1232 df : `pandas.DataFrame`
1233 DataFrame with ``pixelId`` column which contains pixel index
1234 for ra/dec coordinates.
1238 This overrides any existing column in a DataFrame with the same name
1239 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1243 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1244 ra_col, dec_col = self.
config.ra_dec_columns
1245 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1250 df[self.
config.htm_index_column] = htm_index
1254 """Update timestamp columns in input DataFrame to be aware datetime
1257 AP pipeline generates naive datetime instances, we want them to be
1258 aware before they go to database. All naive timestamps are assumed to
1259 be in UTC timezone (they should be TAI).
1264 for column, dtype
in df.dtypes.items()
1265 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.timezone.utc
1267 for column
in columns:
1268 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1271 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1273 for column
in columns:
1274 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1278 """Update timestamp columns to be naive datetime type in returned
1281 AP pipeline code expects DataFrames to contain naive datetime columns,
1282 while Postgres queries return timezone-aware type. This method converts
1283 those columns to naive datetime in UTC timezone.
1286 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1287 for column
in columns:
1289 df[column] = df[column].dt.tz_convert(
None)
Tag types used to declare specialized field types.
__init__(self, ApdbSqlConfig config)
str metadataSchemaVersionKey
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *bool create)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
Timer _timer(self, str name, *Mapping[str, str|int]|None tags=None)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *bool create)
None _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
bool containsVisitDetector(self, int visit, int detector)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
ApdbSqlConfig init_database(cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Table|None tableDef(self, ApdbTables table)
str metadataCodeVersionKey
str metadataReplicaVersionKey
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)