22"""Module defining Apdb class and related methods."""
24from __future__
import annotations
33from contextlib
import closing, suppress
34from typing
import TYPE_CHECKING, Any, cast
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
43from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
44from lsst.utils.iteration
import chunk_iterable
45from sqlalchemy
import func, sql
46from sqlalchemy.pool
import NullPool
48from .._auth
import DB_AUTH_ENVVAR, DB_AUTH_PATH
49from ..apdb
import Apdb
50from ..apdbConfigFreezer
import ApdbConfigFreezer
51from ..apdbReplica
import ReplicaChunk
52from ..apdbSchema
import ApdbTables
53from ..config
import ApdbConfig
54from ..monitor
import MonAgent
55from ..schema_model
import Table
56from ..timer
import Timer
57from ..versionTuple
import IncompatibleVersionError, VersionTuple
58from .apdbMetadataSql
import ApdbMetadataSql
59from .apdbSqlReplica
import ApdbSqlReplica
60from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
61from .config
import ApdbSqlConfig
66 from ..apdbMetadata
import ApdbMetadata
68_LOG = logging.getLogger(__name__)
73"""Version for the code controlling non-replication tables. This needs to be
74updated following compatibility rules when schema produced by this code
80 """Change the type of uint64 columns to int64, and return copy of data
83 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
84 return df.astype({name: np.int64
for name
in names})
88 """Calculate starting point for time-based source search.
92 visit_time : `astropy.time.Time`
93 Time of current visit.
95 Number of months in the sources history.
100 A ``midpointMjdTai`` starting point, MJD time.
104 return float(visit_time.mjd - months * 30)
108 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
111 with closing(dbapiConnection.cursor())
as cursor:
112 cursor.execute(
"PRAGMA foreign_keys=ON;")
116 """Implementation of APDB interface based on SQL database.
118 The implementation is configured via standard ``pex_config`` mechanism
119 using `ApdbSqlConfig` configuration class. For an example of different
120 configurations check ``config/`` folder.
124 config : `ApdbSqlConfig`
125 Configuration object.
128 metadataSchemaVersionKey =
"version:schema"
129 """Name of the metadata key to store schema version number."""
131 metadataCodeVersionKey =
"version:ApdbSql"
132 """Name of the metadata key to store code version number."""
134 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
135 """Name of the metadata key to store replica code version number."""
137 metadataConfigKey =
"config:apdb-sql.json"
138 """Name of the metadata key to store code version number."""
140 _frozen_parameters = (
143 "pixelization.htm_level",
144 "pixelization.htm_index_column",
147 """Names of the config parameters to be frozen in metadata table."""
152 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
153 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
154 meta_table: sqlalchemy.schema.Table |
None =
None
155 with suppress(sqlalchemy.exc.NoSuchTableError):
156 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
162 if config_json
is not None:
165 self.
config = freezer.update(config, config_json)
171 dia_object_index=self.
config.dia_object_index,
172 schema_file=self.
config.schema_file,
173 schema_name=self.
config.schema_name,
174 prefix=self.
config.prefix,
175 namespace=self.
config.namespace,
176 htm_index_column=self.
config.pixelization.htm_index_column,
177 enable_replica=self.
config.enable_replica,
185 if _LOG.isEnabledFor(logging.DEBUG):
186 _LOG.debug(
"ApdbSql Configuration: %s", self.
config.model_dump())
188 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
189 """Create `Timer` instance given its name."""
190 return Timer(name, _MON, tags=tags)
193 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
194 """Make SQLALchemy engine based on configured parameters.
198 config : `ApdbSqlConfig`
199 Configuration object.
201 Whether to try to create new database file, only relevant for
202 SQLite backend which always creates new files by default.
206 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
207 conn_args: dict[str, Any] = dict()
208 if not config.connection_config.connection_pool:
209 kw.update(poolclass=NullPool)
210 if config.connection_config.isolation_level
is not None:
211 kw.update(isolation_level=config.connection_config.isolation_level)
212 elif config.db_url.startswith(
"sqlite"):
214 kw.update(isolation_level=
"READ_UNCOMMITTED")
215 if config.connection_config.connection_timeout
is not None:
216 if config.db_url.startswith(
"sqlite"):
217 conn_args.update(timeout=config.connection_config.connection_timeout)
218 elif config.db_url.startswith((
"postgresql",
"mysql")):
219 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
220 kw.update(connect_args=conn_args)
221 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
223 if engine.dialect.name ==
"sqlite":
225 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
230 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
231 """Generate a complete URL for database with proper credentials.
236 Database URL as specified in configuration.
238 Whether to try to create new database file, only relevant for
239 SQLite backend which always creates new files by default.
243 connection_url : `sqlalchemy.engine.URL` or `str`
244 Connection URL including credentials.
249 components = urllib.parse.urlparse(config_url)
250 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
252 db_auth = DbAuth(DB_AUTH_PATH, DB_AUTH_ENVVAR)
253 config_url = db_auth.getUrl(config_url)
254 except DbAuthNotFoundError:
268 """If URL refers to sqlite dialect, update it so that the backend does
269 not try to create database file if it does not exist already.
279 Possibly updated connection string.
282 url = sqlalchemy.make_url(url_string)
283 except sqlalchemy.exc.SQLAlchemyError:
288 if url.get_backend_name() ==
"sqlite":
293 database = url.database
294 if database
and not database.startswith((
":",
"file:")):
295 query = dict(url.query, mode=
"rw", uri=
"true")
302 if database.startswith(
"//"):
304 f
"Database URL contains extra leading slashes which will be removed: {url}",
307 database =
"/" + database.lstrip(
"/")
308 url = url.set(database=f
"file:{database}", query=query)
309 url_string = url.render_as_string()
314 """Check schema version compatibility."""
316 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
317 """Retrieve version number from given metadata key."""
318 if metadata.table_exists():
319 version_str = metadata.get(key)
320 if version_str
is None:
322 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
323 return VersionTuple.fromString(version_str)
334 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
336 f
"Configured schema version {self._schema.schemaVersion()} "
337 f
"is not compatible with database version {db_schema_version}"
341 f
"Current code version {self.apdbImplementationVersion()} "
342 f
"is not compatible with database version {db_code_version}"
346 if self.
_schema.has_replica_chunks:
348 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
349 if not code_replica_version.checkCompatibility(db_replica_version):
351 f
"Current replication code version {code_replica_version} "
352 f
"is not compatible with database version {db_replica_version}"
357 """Return version number for current APDB implementation.
361 version : `VersionTuple`
362 Version of the code defined in implementation class.
371 schema_file: str |
None =
None,
372 schema_name: str |
None =
None,
373 read_sources_months: int |
None =
None,
374 read_forced_sources_months: int |
None =
None,
375 enable_replica: bool =
False,
376 connection_timeout: int |
None =
None,
377 dia_object_index: str |
None =
None,
378 htm_level: int |
None =
None,
379 htm_index_column: str |
None =
None,
380 ra_dec_columns: tuple[str, str] |
None =
None,
381 prefix: str |
None =
None,
382 namespace: str |
None =
None,
385 """Initialize new APDB instance and make configuration object for it.
390 SQLAlchemy database URL.
391 schema_file : `str`, optional
392 Location of (YAML) configuration file with APDB schema. If not
393 specified then default location will be used.
394 schema_name : str | None
395 Name of the schema in YAML configuration file. If not specified
396 then default name will be used.
397 read_sources_months : `int`, optional
398 Number of months of history to read from DiaSource.
399 read_forced_sources_months : `int`, optional
400 Number of months of history to read from DiaForcedSource.
401 enable_replica : `bool`
402 If True, make additional tables used for replication to PPDB.
403 connection_timeout : `int`, optional
404 Database connection timeout in seconds.
405 dia_object_index : `str`, optional
406 Indexing mode for DiaObject table.
407 htm_level : `int`, optional
409 htm_index_column : `str`, optional
410 Name of a HTM index column for DiaObject and DiaSource tables.
411 ra_dec_columns : `tuple` [`str`, `str`], optional
412 Names of ra/dec columns in DiaObject table.
413 prefix : `str`, optional
414 Optional prefix for all table names.
415 namespace : `str`, optional
416 Name of the database schema for all APDB tables. If not specified
417 then default schema is used.
418 drop : `bool`, optional
419 If `True` then drop existing tables before re-creating the schema.
423 config : `ApdbSqlConfig`
424 Resulting configuration object for a created APDB instance.
426 config =
ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
427 if schema_file
is not None:
428 config.schema_file = schema_file
429 if schema_name
is not None:
430 config.schema_name = schema_name
431 if read_sources_months
is not None:
432 config.read_sources_months = read_sources_months
433 if read_forced_sources_months
is not None:
434 config.read_forced_sources_months = read_forced_sources_months
435 if connection_timeout
is not None:
436 config.connection_config.connection_timeout = connection_timeout
437 if dia_object_index
is not None:
438 config.dia_object_index = dia_object_index
439 if htm_level
is not None:
440 config.pixelization.htm_level = htm_level
441 if htm_index_column
is not None:
442 config.pixelization.htm_index_column = htm_index_column
443 if ra_dec_columns
is not None:
444 config.ra_dec_columns = ra_dec_columns
445 if prefix
is not None:
446 config.prefix = prefix
447 if namespace
is not None:
448 config.namespace = namespace
459 """Return `ApdbReplica` instance for this database."""
463 """Return dictionary with the table names and row counts.
465 Used by ``ap_proto`` to keep track of the size of the database tables.
466 Depending on database technology this could be expensive operation.
471 Dict where key is a table name and value is a row count.
474 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
475 if self.
config.dia_object_index ==
"last_object_table":
476 tables.append(ApdbTables.DiaObjectLast)
477 with self.
_engine.begin()
as conn:
479 sa_table = self.
_schema.get_table(table)
480 stmt = sql.select(func.count()).select_from(sa_table)
481 count: int = conn.execute(stmt).scalar_one()
482 res[table.name] = count
486 def tableDef(self, table: ApdbTables) -> Table |
None:
488 return self.
_schema.tableSchemas.get(table)
491 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
494 if not isinstance(config, ApdbSqlConfig):
495 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
502 dia_object_index=config.dia_object_index,
503 schema_file=config.schema_file,
504 schema_name=config.schema_name,
505 prefix=config.prefix,
506 namespace=config.namespace,
507 htm_index_column=config.pixelization.htm_index_column,
508 enable_replica=config.enable_replica,
510 schema.makeSchema(drop=drop)
513 meta_table: sqlalchemy.schema.Table |
None =
None
514 with suppress(ValueError):
515 meta_table = schema.get_table(ApdbTables.metadata)
518 if apdb_meta.table_exists():
522 if config.enable_replica:
526 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
538 if self.
config.dia_object_index ==
"last_object_table":
539 table_enum = ApdbTables.DiaObjectLast
541 table_enum = ApdbTables.DiaObject
542 table = self.
_schema.get_table(table_enum)
543 if not self.
config.dia_object_columns:
544 columns = self.
_schema.get_apdb_columns(table_enum)
546 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
547 query = sql.select(*columns)
553 if self.
config.dia_object_index !=
"last_object_table":
554 query = query.where(table.c.validityEnd ==
None)
559 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
560 with self.
_engine.begin()
as conn:
561 objects = pandas.read_sql_query(query, conn)
562 timer.add_values(row_count=len(objects))
563 _LOG.debug(
"found %s DiaObjects", len(objects))
567 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
568 ) -> pandas.DataFrame |
None:
570 if self.
config.read_sources_months == 0:
571 _LOG.debug(
"Skip DiaSources fetching")
574 if object_ids
is None:
581 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
582 ) -> pandas.DataFrame |
None:
584 if self.
config.read_forced_sources_months == 0:
585 _LOG.debug(
"Skip DiaForceSources fetching")
588 if object_ids
is None:
590 raise NotImplementedError(
"Region-based selection is not supported")
595 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
597 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
599 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
601 timer.add_values(row_count=len(sources))
603 _LOG.debug(
"found %s DiaForcedSources", len(sources))
608 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
609 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
611 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
613 with self.
_engine.begin()
as conn:
614 result = conn.execute(query1).scalar_one_or_none()
615 if result
is not None:
619 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
620 result = conn.execute(query2).scalar_one_or_none()
621 return result
is not None
626 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
627 query = sql.select(*columns)
630 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
631 with self.
_engine.begin()
as conn:
632 objects = pandas.read_sql_query(query, conn)
633 timer.add_values(row_count=len(objects))
634 _LOG.debug(
"found %s SSObjects", len(objects))
639 visit_time: astropy.time.Time,
640 objects: pandas.DataFrame,
641 sources: pandas.DataFrame |
None =
None,
642 forced_sources: pandas.DataFrame |
None =
None,
646 if sources
is not None:
648 if forced_sources
is not None:
652 with self.
_engine.begin()
as connection:
653 replica_chunk: ReplicaChunk |
None =
None
654 if self.
_schema.has_replica_chunks:
655 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
662 if sources
is not None:
667 if forced_sources
is not None:
674 idColumn =
"ssObjectId"
675 table = self.
_schema.get_table(ApdbTables.SSObject)
678 with self.
_engine.begin()
as conn:
681 ids = sorted(int(oid)
for oid
in objects[idColumn])
683 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
684 result = conn.execute(query)
685 knownIds = set(row.ssObjectId
for row
in result)
687 filter = objects[idColumn].isin(knownIds)
688 toUpdate = cast(pandas.DataFrame, objects[filter])
689 toInsert = cast(pandas.DataFrame, objects[~filter])
692 if len(toInsert) > 0:
693 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
696 if len(toUpdate) > 0:
697 whereKey = f
"{idColumn}_param"
698 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
699 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
700 values = toUpdate.to_dict(
"records")
701 result = conn.execute(update, values)
706 table = self.
_schema.get_table(ApdbTables.DiaSource)
707 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
709 with self.
_engine.begin()
as conn:
713 missing_ids: list[int] = []
714 for key, value
in idMap.items():
715 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
716 result = conn.execute(query, params)
717 if result.rowcount == 0:
718 missing_ids.append(key)
720 missing =
",".join(str(item)
for item
in missing_ids)
721 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
731 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
734 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
735 stmt = stmt.where(table.c.validityEnd ==
None)
738 with self._engine.begin()
as conn:
739 count = conn.execute(stmt).scalar_one()
747 raise RuntimeError(
"Database schema was not initialized.")
751 """Return catalog of DiaSource instances from given region.
755 region : `lsst.sphgeom.Region`
756 Region to search for DIASources.
757 visit_time : `astropy.time.Time`
758 Time of the current visit.
762 catalog : `pandas.DataFrame`
763 Catalog containing DiaSource records.
768 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
770 table = self.
_schema.get_table(ApdbTables.DiaSource)
771 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
772 query = sql.select(*columns)
775 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
776 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
777 query = query.where(where)
780 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
781 with self.
_engine.begin()
as conn:
782 sources = pandas.read_sql_query(query, conn)
783 timer.add_values(row_counts=len(sources))
784 _LOG.debug(
"found %s DiaSources", len(sources))
788 """Return catalog of DiaSource instances given set of DiaObject IDs.
793 Collection of DiaObject IDs
794 visit_time : `astropy.time.Time`
795 Time of the current visit.
799 catalog : `pandas.DataFrame`
800 Catalog contaning DiaSource records.
805 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
807 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
808 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
809 timer.add_values(row_count=len(sources))
811 _LOG.debug(
"found %s DiaSources", len(sources))
815 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
816 ) -> pandas.DataFrame:
817 """Return catalog of DiaSource or DiaForcedSource instances given set
822 table : `sqlalchemy.schema.Table`
825 Collection of DiaObject IDs
826 midpointMjdTai_start : `float`
827 Earliest midpointMjdTai to retrieve.
831 catalog : `pandas.DataFrame`
832 Catalog contaning DiaSource records. `None` is returned if
833 ``read_sources_months`` configuration parameter is set to 0 or
834 when ``object_ids`` is empty.
836 table = self.
_schema.get_table(table_enum)
837 columns = self.
_schema.get_apdb_columns(table_enum)
839 sources: pandas.DataFrame |
None =
None
840 if len(object_ids) <= 0:
841 _LOG.debug(
"ID list is empty, just fetch empty result")
842 query = sql.select(*columns).where(sql.literal(
False))
843 with self.
_engine.begin()
as conn:
844 sources = pandas.read_sql_query(query, conn)
846 data_frames: list[pandas.DataFrame] = []
847 for ids
in chunk_iterable(sorted(object_ids), 1000):
848 query = sql.select(*columns)
852 int_ids = [int(oid)
for oid
in ids]
857 table.columns[
"diaObjectId"].in_(int_ids),
858 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
863 with self.
_engine.begin()
as conn:
864 data_frames.append(pandas.read_sql_query(query, conn))
866 if len(data_frames) == 1:
867 sources = data_frames[0]
869 sources = pandas.concat(data_frames)
870 assert sources
is not None,
"Catalog cannot be None"
875 replica_chunk: ReplicaChunk,
876 visit_time: astropy.time.Time,
877 connection: sqlalchemy.engine.Connection,
882 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
884 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
887 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
888 row = {
"apdb_replica_chunk": replica_chunk.id} | values
889 if connection.dialect.name ==
"sqlite":
890 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
891 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
892 connection.execute(insert_sqlite, row)
893 elif connection.dialect.name ==
"postgresql":
894 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
895 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
896 connection.execute(insert_pg, row)
898 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
902 objs: pandas.DataFrame,
903 visit_time: astropy.time.Time,
904 replica_chunk: ReplicaChunk |
None,
905 connection: sqlalchemy.engine.Connection,
907 """Store catalog of DiaObjects from current visit.
911 objs : `pandas.DataFrame`
912 Catalog with DiaObject records.
913 visit_time : `astropy.time.Time`
915 replica_chunk : `ReplicaChunk`
919 _LOG.debug(
"No objects to write to database.")
924 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
925 _LOG.debug(
"first object ID: %d", ids[0])
929 dt = visit_time.datetime
932 if self.
config.dia_object_index ==
"last_object_table":
934 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
937 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
939 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
940 res = connection.execute(query)
941 timer.add_values(row_count=res.rowcount)
942 _LOG.debug(
"deleted %s objects", res.rowcount)
945 last_column_names = [column.name
for column
in table.columns]
946 last_objs = objs[last_column_names]
949 if "lastNonForcedSource" in last_objs.columns:
952 last_objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
954 extra_column = pandas.Series([dt] * len(objs), name=
"lastNonForcedSource")
955 last_objs.set_index(extra_column.index, inplace=
True)
956 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
958 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
966 timer.add_values(row_count=len(last_objs))
969 table = self.
_schema.get_table(ApdbTables.DiaObject)
973 .values(validityEnd=dt)
976 table.columns[
"diaObjectId"].in_(ids),
977 table.columns[
"validityEnd"].is_(
None),
982 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
983 res = connection.execute(update)
984 timer.add_values(row_count=res.rowcount)
985 _LOG.debug(
"truncated %s intervals", res.rowcount)
990 extra_columns: list[pandas.Series] = []
991 if "validityStart" in objs.columns:
992 objs[
"validityStart"] = dt
994 extra_columns.append(pandas.Series([dt] * len(objs), name=
"validityStart"))
995 if "validityEnd" in objs.columns:
996 objs[
"validityEnd"] =
None
998 extra_columns.append(pandas.Series([
None] * len(objs), name=
"validityEnd"))
999 if "lastNonForcedSource" in objs.columns:
1002 objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
1004 extra_columns.append(pandas.Series([dt] * len(objs), name=
"lastNonForcedSource"))
1006 objs.set_index(extra_columns[0].index, inplace=
True)
1007 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1010 table = self.
_schema.get_table(ApdbTables.DiaObject)
1011 replica_data: list[dict] = []
1012 replica_stmt: Any =
None
1013 replica_table_name =
""
1014 if replica_chunk
is not None:
1015 pk_names = [column.name
for column
in table.primary_key]
1016 replica_data = objs[pk_names].to_dict(
"records")
1017 for row
in replica_data:
1018 row[
"apdb_replica_chunk"] = replica_chunk.id
1019 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1020 replica_table_name = replica_table.name
1021 replica_stmt = replica_table.insert()
1024 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1025 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1026 timer.add_values(row_count=len(objs))
1027 if replica_stmt
is not None:
1028 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1029 connection.execute(replica_stmt, replica_data)
1030 timer.add_values(row_count=len(replica_data))
1034 sources: pandas.DataFrame,
1035 replica_chunk: ReplicaChunk |
None,
1036 connection: sqlalchemy.engine.Connection,
1038 """Store catalog of DiaSources from current visit.
1042 sources : `pandas.DataFrame`
1043 Catalog containing DiaSource records
1045 table = self.
_schema.get_table(ApdbTables.DiaSource)
1048 replica_data: list[dict] = []
1049 replica_stmt: Any =
None
1050 replica_table_name =
""
1051 if replica_chunk
is not None:
1052 pk_names = [column.name
for column
in table.primary_key]
1053 replica_data = sources[pk_names].to_dict(
"records")
1054 for row
in replica_data:
1055 row[
"apdb_replica_chunk"] = replica_chunk.id
1056 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1057 replica_table_name = replica_table.name
1058 replica_stmt = replica_table.insert()
1061 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1063 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1064 timer.add_values(row_count=len(sources))
1065 if replica_stmt
is not None:
1066 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1067 connection.execute(replica_stmt, replica_data)
1068 timer.add_values(row_count=len(replica_data))
1072 sources: pandas.DataFrame,
1073 replica_chunk: ReplicaChunk |
None,
1074 connection: sqlalchemy.engine.Connection,
1076 """Store a set of DiaForcedSources from current visit.
1080 sources : `pandas.DataFrame`
1081 Catalog containing DiaForcedSource records
1083 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1086 replica_data: list[dict] = []
1087 replica_stmt: Any =
None
1088 replica_table_name =
""
1089 if replica_chunk
is not None:
1090 pk_names = [column.name
for column
in table.primary_key]
1091 replica_data = sources[pk_names].to_dict(
"records")
1092 for row
in replica_data:
1093 row[
"apdb_replica_chunk"] = replica_chunk.id
1094 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1095 replica_table_name = replica_table.name
1096 replica_stmt = replica_table.insert()
1099 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1101 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1102 timer.add_values(row_count=len(sources))
1103 if replica_stmt
is not None:
1104 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1105 connection.execute(replica_stmt, replica_data)
1106 timer.add_values(row_count=len(replica_data))
1109 """Generate a set of HTM indices covering specified region.
1113 region: `sphgeom.Region`
1114 Region that needs to be indexed.
1118 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1120 _LOG.debug(
"region: %s", region)
1121 indices = self.
pixelator.envelope(region, self.
config.pixelization.htm_max_ranges)
1123 return indices.ranges()
1125 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1126 """Make SQLAlchemy expression for selecting records in a region."""
1127 htm_index_column = table.columns[self.
config.pixelization.htm_index_column]
1130 for low, upper
in pixel_ranges:
1133 exprlist.append(htm_index_column == low)
1135 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1137 return sql.expression.or_(*exprlist)
1140 """Calculate spatial index for each record and add it to a DataFrame.
1144 df : `pandas.DataFrame`
1145 DataFrame which has to contain ra/dec columns, names of these
1146 columns are defined by configuration ``ra_dec_columns`` field.
1150 df : `pandas.DataFrame`
1151 DataFrame with ``pixelId`` column which contains pixel index
1152 for ra/dec coordinates.
1156 This overrides any existing column in a DataFrame with the same name
1157 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1161 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1162 ra_col, dec_col = self.
config.ra_dec_columns
1163 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1168 df[self.
config.pixelization.htm_index_column] = htm_index
1172 """Update timestamp columns in input DataFrame to be aware datetime
1175 AP pipeline generates naive datetime instances, we want them to be
1176 aware before they go to database. All naive timestamps are assumed to
1177 be in UTC timezone (they should be TAI).
1182 for column, dtype
in df.dtypes.items()
1183 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.timezone.utc
1185 for column
in columns:
1186 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1189 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1191 for column
in columns:
1192 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1196 """Update timestamp columns to be naive datetime type in returned
1199 AP pipeline code expects DataFrames to contain naive datetime columns,
1200 while Postgres queries return timezone-aware type. This method converts
1201 those columns to naive datetime in UTC timezone.
1204 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1205 for column
in columns:
1207 df[column] = df[column].dt.tz_convert(
None)
__init__(self, ApdbSqlConfig config)
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
str metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
None _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
str metadataCodeVersionKey
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
sqlalchemy.engine.Engine _engine
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
bool containsVisitDetector(self, int visit, int detector)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
str metadataSchemaVersionKey
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)