22"""Module defining Apdb class and related methods."""
24from __future__
import annotations
34from contextlib
import closing
35from typing
import TYPE_CHECKING, Any, cast
41import sqlalchemy.dialects.postgresql
42import sqlalchemy.dialects.sqlite
43from sqlalchemy
import func, sql
44from sqlalchemy.pool
import NullPool
46from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
47from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
48from lsst.utils.iteration
import chunk_iterable
50from ..apdb
import Apdb
51from ..apdbConfigFreezer
import ApdbConfigFreezer
52from ..apdbReplica
import ReplicaChunk
53from ..apdbSchema
import ApdbTables
54from ..config
import ApdbConfig
55from ..monitor
import MonAgent
56from ..schema_model
import Table
57from ..timer
import Timer
58from ..versionTuple
import IncompatibleVersionError, VersionTuple
59from .apdbMetadataSql
import ApdbMetadataSql
60from .apdbSqlAdmin
import ApdbSqlAdmin
61from .apdbSqlReplica
import ApdbSqlReplica
62from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
63from .config
import ApdbSqlConfig
68 from ..apdbMetadata
import ApdbMetadata
69 from ..apdbUpdateRecord
import ApdbUpdateRecord
71_LOG = logging.getLogger(__name__)
76"""Version for the code controlling non-replication tables. This needs to be
77updated following compatibility rules when schema produced by this code
83 """Change the type of uint64 columns to int64, and return copy of data
86 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
87 return df.astype(dict.fromkeys(names, np.int64))
91 """Calculate starting point for time-based source search.
95 visit_time : `astropy.time.Time`
96 Time of current visit.
98 Number of months in the sources history.
103 A ``midpointMjdTai`` starting point, MJD time.
107 return float(visit_time.tai.mjd) - months * 30
111 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
114 with closing(dbapiConnection.cursor())
as cursor:
115 cursor.execute(
"PRAGMA foreign_keys=ON;")
119 """Implementation of APDB interface based on SQL database.
121 The implementation is configured via standard ``pex_config`` mechanism
122 using `ApdbSqlConfig` configuration class. For an example of different
123 configurations check ``config/`` folder.
127 config : `ApdbSqlConfig`
128 Configuration object.
131 metadataSchemaVersionKey =
"version:schema"
132 """Name of the metadata key to store schema version number."""
134 metadataCodeVersionKey =
"version:ApdbSql"
135 """Name of the metadata key to store code version number."""
137 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
138 """Name of the metadata key to store replica code version number."""
140 metadataConfigKey =
"config:apdb-sql.json"
141 """Name of the metadata key to store code version number."""
143 _frozen_parameters = (
146 "pixelization.htm_level",
147 "pixelization.htm_index_column",
150 """Names of the config parameters to be frozen in metadata table."""
155 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
156 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
157 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
162 if config_json
is not None:
165 self.
config = freezer.update(config, config_json)
171 dia_object_index=self.
config.dia_object_index,
172 schema_file=self.
config.schema_file,
173 schema_name=self.
config.schema_name,
174 prefix=self.
config.prefix,
175 namespace=self.
config.namespace,
176 htm_index_column=self.
config.pixelization.htm_index_column,
177 enable_replica=self.
config.enable_replica,
184 if _LOG.isEnabledFor(logging.DEBUG):
185 _LOG.debug(
"ApdbSql Configuration: %s", self.
config.model_dump())
187 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
188 """Create `Timer` instance given its name."""
189 return Timer(name, _MON, tags=tags)
192 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
193 """Make SQLALchemy engine based on configured parameters.
197 config : `ApdbSqlConfig`
198 Configuration object.
200 Whether to try to create new database file, only relevant for
201 SQLite backend which always creates new files by default.
205 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
206 conn_args: dict[str, Any] = {}
207 if not config.connection_config.connection_pool:
208 kw.update(poolclass=NullPool)
209 if config.connection_config.isolation_level
is not None:
210 kw.update(isolation_level=config.connection_config.isolation_level)
211 elif config.db_url.startswith(
"sqlite"):
213 kw.update(isolation_level=
"READ_UNCOMMITTED")
214 if config.connection_config.connection_timeout
is not None:
215 if config.db_url.startswith(
"sqlite"):
216 conn_args.update(timeout=config.connection_config.connection_timeout)
217 elif config.db_url.startswith((
"postgresql",
"mysql")):
218 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
219 kw.update(connect_args=conn_args)
220 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
222 if engine.dialect.name ==
"sqlite":
224 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
229 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
230 """Generate a complete URL for database with proper credentials.
235 Database URL as specified in configuration.
237 Whether to try to create new database file, only relevant for
238 SQLite backend which always creates new files by default.
242 connection_url : `sqlalchemy.engine.URL` or `str`
243 Connection URL including credentials.
248 components = urllib.parse.urlparse(config_url)
249 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
252 config_url = db_auth.getUrl(config_url)
253 except DbAuthNotFoundError:
267 """If URL refers to sqlite dialect, update it so that the backend does
268 not try to create database file if it does not exist already.
278 Possibly updated connection string.
281 url = sqlalchemy.make_url(url_string)
282 except sqlalchemy.exc.SQLAlchemyError:
287 if url.get_backend_name() ==
"sqlite":
292 database = url.database
293 if database
and not database.startswith((
":",
"file:")):
294 query = dict(url.query, mode=
"rw", uri=
"true")
301 if database.startswith(
"//"):
303 f
"Database URL contains extra leading slashes which will be removed: {url}",
306 database =
"/" + database.lstrip(
"/")
307 url = url.set(database=f
"file:{database}", query=query)
308 url_string = url.render_as_string()
313 """Check schema version compatibility and return the database schema
317 def _get_version(key: str) -> VersionTuple:
318 """Retrieve version number from given metadata key."""
319 version_str = metadata.get(key)
320 if version_str
is None:
322 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
323 return VersionTuple.fromString(version_str)
330 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
332 f
"Configured schema version {self._schema.schemaVersion()} "
333 f
"is not compatible with database version {db_schema_version}"
337 f
"Current code version {self.apdbImplementationVersion()} "
338 f
"is not compatible with database version {db_code_version}"
342 if self.
_schema.replication_enabled:
344 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
345 if not code_replica_version.checkCompatibility(db_replica_version):
347 f
"Current replication code version {code_replica_version} "
348 f
"is not compatible with database version {db_replica_version}"
351 return db_schema_version
355 """Return version number for current APDB implementation.
359 version : `VersionTuple`
360 Version of the code defined in implementation class.
369 schema_file: str |
None =
None,
370 schema_name: str |
None =
None,
371 read_sources_months: int |
None =
None,
372 read_forced_sources_months: int |
None =
None,
373 enable_replica: bool =
False,
374 connection_timeout: int |
None =
None,
375 dia_object_index: str |
None =
None,
376 htm_level: int |
None =
None,
377 htm_index_column: str |
None =
None,
378 ra_dec_columns: tuple[str, str] |
None =
None,
379 prefix: str |
None =
None,
380 namespace: str |
None =
None,
383 """Initialize new APDB instance and make configuration object for it.
388 SQLAlchemy database URL.
389 schema_file : `str`, optional
390 Location of (YAML) configuration file with APDB schema. If not
391 specified then default location will be used.
392 schema_name : str | None
393 Name of the schema in YAML configuration file. If not specified
394 then default name will be used.
395 read_sources_months : `int`, optional
396 Number of months of history to read from DiaSource.
397 read_forced_sources_months : `int`, optional
398 Number of months of history to read from DiaForcedSource.
399 enable_replica : `bool`
400 If True, make additional tables used for replication to PPDB.
401 connection_timeout : `int`, optional
402 Database connection timeout in seconds.
403 dia_object_index : `str`, optional
404 Indexing mode for DiaObject table.
405 htm_level : `int`, optional
407 htm_index_column : `str`, optional
408 Name of a HTM index column for DiaObject and DiaSource tables.
409 ra_dec_columns : `tuple` [`str`, `str`], optional
410 Names of ra/dec columns in DiaObject table.
411 prefix : `str`, optional
412 Optional prefix for all table names.
413 namespace : `str`, optional
414 Name of the database schema for all APDB tables. If not specified
415 then default schema is used.
416 drop : `bool`, optional
417 If `True` then drop existing tables before re-creating the schema.
421 config : `ApdbSqlConfig`
422 Resulting configuration object for a created APDB instance.
424 config =
ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
425 if schema_file
is not None:
426 config.schema_file = schema_file
427 if schema_name
is not None:
428 config.schema_name = schema_name
429 if read_sources_months
is not None:
430 config.read_sources_months = read_sources_months
431 if read_forced_sources_months
is not None:
432 config.read_forced_sources_months = read_forced_sources_months
433 if connection_timeout
is not None:
434 config.connection_config.connection_timeout = connection_timeout
435 if dia_object_index
is not None:
436 config.dia_object_index = dia_object_index
437 if htm_level
is not None:
438 config.pixelization.htm_level = htm_level
439 if htm_index_column
is not None:
440 config.pixelization.htm_index_column = htm_index_column
441 if ra_dec_columns
is not None:
442 config.ra_dec_columns = ra_dec_columns
443 if prefix
is not None:
444 config.prefix = prefix
445 if namespace
is not None:
446 config.namespace = namespace
457 """Return `ApdbReplica` instance for this database."""
461 """Return dictionary with the table names and row counts.
463 Used by ``ap_proto`` to keep track of the size of the database tables.
464 Depending on database technology this could be expensive operation.
469 Dict where key is a table name and value is a row count.
472 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
473 if self.
config.dia_object_index ==
"last_object_table":
474 tables.append(ApdbTables.DiaObjectLast)
475 with self.
_engine.begin()
as conn:
477 sa_table = self.
_schema.get_table(table)
478 stmt = sql.select(func.count()).select_from(sa_table)
479 count: int = conn.execute(stmt).scalar_one()
480 res[table.name] = count
488 def tableDef(self, table: ApdbTables) -> Table |
None:
490 return self.
_schema.tableSchemas.get(table)
493 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
496 if not isinstance(config, ApdbSqlConfig):
497 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
504 dia_object_index=config.dia_object_index,
505 schema_file=config.schema_file,
506 schema_name=config.schema_name,
507 prefix=config.prefix,
508 namespace=config.namespace,
509 htm_index_column=config.pixelization.htm_index_column,
510 enable_replica=config.enable_replica,
512 schema.makeSchema(drop=drop)
515 meta_table = schema.get_table(ApdbTables.metadata)
521 if config.enable_replica:
525 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
537 if self.
config.dia_object_index ==
"last_object_table":
538 table_enum = ApdbTables.DiaObjectLast
540 table_enum = ApdbTables.DiaObject
541 table = self.
_schema.get_table(table_enum)
542 if not self.
config.dia_object_columns:
543 columns = self.
_schema.get_apdb_columns(table_enum)
545 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
546 query = sql.select(*columns)
551 if self.
_schema.has_mjd_timestamps:
552 validity_end_column =
"validityEndMjdTai"
554 validity_end_column =
"validityEnd"
557 if self.
config.dia_object_index !=
"last_object_table":
558 query = query.where(table.columns[validity_end_column] ==
None)
563 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
564 with self.
_engine.begin()
as conn:
565 objects = pandas.read_sql_query(query, conn)
566 timer.add_values(row_count=len(objects))
567 _LOG.debug(
"found %s DiaObjects", len(objects))
573 object_ids: Iterable[int] |
None,
574 visit_time: astropy.time.Time,
575 start_time: astropy.time.Time |
None =
None,
576 ) -> pandas.DataFrame |
None:
578 if start_time
is None and self.
config.read_sources_months == 0:
579 _LOG.debug(
"Skip DiaSources fetching")
582 if start_time
is None:
585 start_time_mjdTai = float(start_time.tai.mjd)
586 _LOG.debug(
"start_time_mjdTai = %.6f", start_time_mjdTai)
588 if object_ids
is None:
597 object_ids: Iterable[int] |
None,
598 visit_time: astropy.time.Time,
599 start_time: astropy.time.Time |
None =
None,
600 ) -> pandas.DataFrame |
None:
602 if start_time
is None and self.
config.read_forced_sources_months == 0:
603 _LOG.debug(
"Skip DiaForceSources fetching")
606 if object_ids
is None:
611 raise NotImplementedError(
"Region-based selection is not supported")
615 if start_time
is None:
618 start_time_mjdTai = float(start_time.tai.mjd)
619 _LOG.debug(
"start_time_mjdTai = %.6f", start_time_mjdTai)
621 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
622 sources = self.
_getSourcesByIDs(ApdbTables.DiaForcedSource, list(object_ids), start_time_mjdTai)
623 timer.add_values(row_count=len(sources))
625 _LOG.debug(
"found %s DiaForcedSources", len(sources))
633 visit_time: astropy.time.Time,
636 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
637 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
639 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
641 with self.
_engine.begin()
as conn:
642 result = conn.execute(query1).scalar_one_or_none()
643 if result
is not None:
647 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
648 result = conn.execute(query2).scalar_one_or_none()
649 return result
is not None
654 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
655 query = sql.select(*columns)
658 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
659 with self.
_engine.begin()
as conn:
660 objects = pandas.read_sql_query(query, conn)
661 timer.add_values(row_count=len(objects))
662 _LOG.debug(
"found %s SSObjects", len(objects))
667 visit_time: astropy.time.Time,
668 objects: pandas.DataFrame,
669 sources: pandas.DataFrame |
None =
None,
670 forced_sources: pandas.DataFrame |
None =
None,
674 if sources
is not None:
676 if forced_sources
is not None:
680 with self.
_engine.begin()
as connection:
681 replica_chunk: ReplicaChunk |
None =
None
682 if self.
_schema.replication_enabled:
683 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
690 if sources
is not None:
695 if forced_sources
is not None:
702 idColumn =
"ssObjectId"
703 table = self.
_schema.get_table(ApdbTables.SSObject)
706 with self.
_engine.begin()
as conn:
709 ids = sorted(int(oid)
for oid
in objects[idColumn])
711 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
712 result = conn.execute(query)
713 knownIds = {row.ssObjectId
for row
in result}
715 filter = objects[idColumn].isin(knownIds)
716 toUpdate = cast(pandas.DataFrame, objects[filter])
717 toInsert = cast(pandas.DataFrame, objects[~filter])
720 if len(toInsert) > 0:
721 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
724 if len(toUpdate) > 0:
725 whereKey = f
"{idColumn}_param"
726 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
727 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
728 values = toUpdate.to_dict(
"records")
729 result = conn.execute(update, values)
734 timestamp: float | datetime.datetime
735 if self.
_schema.has_mjd_timestamps:
736 timestamp_column =
"ssObjectReassocTimeMjdTai"
737 timestamp = float(astropy.time.Time.now().tai.mjd)
739 timestamp_column =
"ssObjectReassocTime"
740 timestamp = datetime.datetime.now(tz=datetime.UTC)
742 table = self.
_schema.get_table(ApdbTables.DiaSource)
743 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
745 with self.
_engine.begin()
as conn:
749 missing_ids: list[int] = []
750 for key, value
in idMap.items():
755 timestamp_column: timestamp,
757 result = conn.execute(query, params)
758 if result.rowcount == 0:
759 missing_ids.append(key)
761 missing =
",".join(str(item)
for item
in missing_ids)
762 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
772 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
774 if self._schema.has_mjd_timestamps:
775 validity_end_column =
"validityEndMjdTai"
777 validity_end_column =
"validityEnd"
780 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
781 stmt = stmt.where(table.columns[validity_end_column] ==
None)
784 with self._engine.begin()
as conn:
785 count = conn.execute(stmt).scalar_one()
800 """Return catalog of DiaSource instances from given region.
804 region : `lsst.sphgeom.Region`
805 Region to search for DIASources.
806 start_time_mjdTai : `float`
807 Lower bound of time window for the query.
811 catalog : `pandas.DataFrame`
812 Catalog containing DiaSource records.
814 table = self.
_schema.get_table(ApdbTables.DiaSource)
815 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
816 query = sql.select(*columns)
819 time_filter = table.columns[
"midpointMjdTai"] > start_time_mjdTai
820 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
821 query = query.where(where)
824 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
825 with self.
_engine.begin()
as conn:
826 sources = pandas.read_sql_query(query, conn)
827 timer.add_values(row_counts=len(sources))
828 _LOG.debug(
"found %s DiaSources", len(sources))
832 """Return catalog of DiaSource instances given set of DiaObject IDs.
837 Collection of DiaObject IDs
838 start_time_mjdTai : `float`
839 Lower bound of time window for the query.
843 catalog : `pandas.DataFrame`
844 Catalog containing DiaSource records.
846 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
847 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, start_time_mjdTai)
848 timer.add_values(row_count=len(sources))
850 _LOG.debug(
"found %s DiaSources", len(sources))
854 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
855 ) -> pandas.DataFrame:
856 """Return catalog of DiaSource or DiaForcedSource instances given set
861 table : `sqlalchemy.schema.Table`
864 Collection of DiaObject IDs
865 midpointMjdTai_start : `float`
866 Earliest midpointMjdTai to retrieve.
870 catalog : `pandas.DataFrame`
871 Catalog contaning DiaSource records. `None` is returned if
872 ``read_sources_months`` configuration parameter is set to 0 or
873 when ``object_ids`` is empty.
875 table = self.
_schema.get_table(table_enum)
876 columns = self.
_schema.get_apdb_columns(table_enum)
878 sources: pandas.DataFrame |
None =
None
879 if len(object_ids) <= 0:
880 _LOG.debug(
"ID list is empty, just fetch empty result")
881 query = sql.select(*columns).where(sql.literal(
False))
882 with self.
_engine.begin()
as conn:
883 sources = pandas.read_sql_query(query, conn)
885 data_frames: list[pandas.DataFrame] = []
886 for ids
in chunk_iterable(sorted(object_ids), 1000):
887 query = sql.select(*columns)
891 int_ids = [int(oid)
for oid
in ids]
896 table.columns[
"diaObjectId"].in_(int_ids),
897 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
902 with self.
_engine.begin()
as conn:
903 data_frames.append(pandas.read_sql_query(query, conn))
905 if len(data_frames) == 1:
906 sources = data_frames[0]
908 sources = pandas.concat(data_frames)
909 assert sources
is not None,
"Catalog cannot be None"
914 replica_chunk: ReplicaChunk,
915 connection: sqlalchemy.engine.Connection,
920 dt = datetime.datetime.fromtimestamp(replica_chunk.last_update_time.unix_tai, tz=datetime.UTC)
922 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
925 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
926 row = {
"apdb_replica_chunk": replica_chunk.id} | values
927 if connection.dialect.name ==
"sqlite":
928 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
929 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
930 connection.execute(insert_sqlite, row)
931 elif connection.dialect.name ==
"postgresql":
932 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
933 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
934 connection.execute(insert_pg, row)
936 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
940 objs: pandas.DataFrame,
941 visit_time: astropy.time.Time,
942 replica_chunk: ReplicaChunk |
None,
943 connection: sqlalchemy.engine.Connection,
945 """Store catalog of DiaObjects from current visit.
949 objs : `pandas.DataFrame`
950 Catalog with DiaObject records.
951 visit_time : `astropy.time.Time`
953 replica_chunk : `ReplicaChunk`
957 _LOG.debug(
"No objects to write to database.")
962 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
963 _LOG.debug(
"first object ID: %d", ids[0])
965 if self.
_schema.has_mjd_timestamps:
966 validity_start_column =
"validityStartMjdTai"
967 validity_end_column =
"validityEndMjdTai"
968 timestamp = float(visit_time.tai.mjd)
970 validity_start_column =
"validityStart"
971 validity_end_column =
"validityEnd"
972 timestamp = visit_time.datetime
975 if self.
config.dia_object_index ==
"last_object_table":
977 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
980 use_validity_start = self.
_schema.check_column(ApdbTables.DiaObjectLast, validity_start_column)
983 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
985 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
986 res = connection.execute(query)
987 timer.add_values(row_count=res.rowcount)
988 _LOG.debug(
"deleted %s objects", res.rowcount)
991 last_column_names = [column.name
for column
in table.columns]
992 if validity_start_column
in last_column_names
and validity_start_column
not in objs.columns:
993 last_column_names.remove(validity_start_column)
994 last_objs = objs[last_column_names]
998 if use_validity_start:
999 if validity_start_column
in last_objs:
1000 last_objs[validity_start_column] = timestamp
1002 extra_column = pandas.Series([timestamp] * len(last_objs), name=validity_start_column)
1003 last_objs.set_index(extra_column.index, inplace=
True)
1004 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
1006 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
1012 schema=table.schema,
1014 timer.add_values(row_count=len(last_objs))
1017 table = self.
_schema.get_table(ApdbTables.DiaObject)
1021 .values(**{validity_end_column: timestamp})
1023 sql.expression.and_(
1024 table.columns[
"diaObjectId"].in_(ids),
1025 table.columns[validity_end_column].is_(
None),
1030 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
1031 res = connection.execute(update)
1032 timer.add_values(row_count=res.rowcount)
1033 _LOG.debug(
"truncated %s intervals", res.rowcount)
1038 extra_columns: list[pandas.Series] = []
1039 if validity_start_column
in objs.columns:
1040 objs[validity_start_column] = timestamp
1042 extra_columns.append(pandas.Series([timestamp] * len(objs), name=validity_start_column))
1043 if validity_end_column
in objs.columns:
1044 objs[validity_end_column] =
None
1046 extra_columns.append(pandas.Series([
None] * len(objs), name=validity_end_column))
1048 objs.set_index(extra_columns[0].index, inplace=
True)
1049 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1052 table = self.
_schema.get_table(ApdbTables.DiaObject)
1053 replica_data: list[dict] = []
1054 replica_stmt: Any =
None
1055 replica_table_name =
""
1056 if replica_chunk
is not None:
1057 pk_names = [column.name
for column
in table.primary_key]
1058 replica_data = objs[pk_names].to_dict(
"records")
1060 for row
in replica_data:
1061 row[
"apdb_replica_chunk"] = replica_chunk.id
1062 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1063 replica_table_name = replica_table.name
1064 replica_stmt = replica_table.insert()
1067 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1068 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1069 timer.add_values(row_count=len(objs))
1070 if replica_stmt
is not None:
1071 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1072 connection.execute(replica_stmt, replica_data)
1073 timer.add_values(row_count=len(replica_data))
1077 sources: pandas.DataFrame,
1078 replica_chunk: ReplicaChunk |
None,
1079 connection: sqlalchemy.engine.Connection,
1081 """Store catalog of DiaSources from current visit.
1085 sources : `pandas.DataFrame`
1086 Catalog containing DiaSource records
1088 table = self.
_schema.get_table(ApdbTables.DiaSource)
1091 replica_data: list[dict] = []
1092 replica_stmt: Any =
None
1093 replica_table_name =
""
1094 if replica_chunk
is not None:
1095 pk_names = [column.name
for column
in table.primary_key]
1096 replica_data = sources[pk_names].to_dict(
"records")
1098 for row
in replica_data:
1099 row[
"apdb_replica_chunk"] = replica_chunk.id
1100 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1101 replica_table_name = replica_table.name
1102 replica_stmt = replica_table.insert()
1105 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1107 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1108 timer.add_values(row_count=len(sources))
1109 if replica_stmt
is not None:
1110 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1111 connection.execute(replica_stmt, replica_data)
1112 timer.add_values(row_count=len(replica_data))
1116 sources: pandas.DataFrame,
1117 replica_chunk: ReplicaChunk |
None,
1118 connection: sqlalchemy.engine.Connection,
1120 """Store a set of DiaForcedSources from current visit.
1124 sources : `pandas.DataFrame`
1125 Catalog containing DiaForcedSource records
1127 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1130 replica_data: list[dict] = []
1131 replica_stmt: Any =
None
1132 replica_table_name =
""
1133 if replica_chunk
is not None:
1134 pk_names = [column.name
for column
in table.primary_key]
1135 replica_data = sources[pk_names].to_dict(
"records")
1137 for row
in replica_data:
1138 row[
"apdb_replica_chunk"] = replica_chunk.id
1139 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1140 replica_table_name = replica_table.name
1141 replica_stmt = replica_table.insert()
1144 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1146 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1147 timer.add_values(row_count=len(sources))
1148 if replica_stmt
is not None:
1149 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1150 connection.execute(replica_stmt, replica_data)
1151 timer.add_values(row_count=len(replica_data))
1155 records: Iterable[ApdbUpdateRecord],
1156 chunk: ReplicaChunk,
1158 store_chunk: bool =
False,
1159 connection: sqlalchemy.engine.Connection |
None =
None,
1161 """Store ApdbUpdateRecords in the replica table for those records.
1165 records : `list` [`ApdbUpdateRecord`]
1167 chunk : `ReplicaChunk`
1168 Replica chunk for these records.
1169 store_chunk : `bool`
1170 If True then also store replica chunk.
1171 connection : `sqlalchemy.engine.Connection`
1172 SQLALchemy connection to use, if `None` the new connection will be
1173 made. `None` is useful for tests only, regular use will call this
1174 method in the same transaction that saves other types of records.
1179 Raised if replication is not enabled for this instance.
1181 if not self.
_schema.replication_enabled:
1182 raise TypeError(
"Replication is not enabled for this APDB instance.")
1184 apdb_replica_chunk = chunk.id
1187 update_unique_id = uuid.uuid4()
1190 for record
in records:
1191 record_dicts.append(
1193 "apdb_replica_chunk": apdb_replica_chunk,
1194 "update_time_ns": record.update_time_ns,
1195 "update_order": record.update_order,
1196 "update_unique_id": update_unique_id,
1197 "update_payload": record.to_json(),
1201 if not record_dicts:
1205 table = self.
_schema.get_table(ExtraTables.ApdbUpdateRecordChunks)
1207 def _do_store(connection: sqlalchemy.engine.Connection) ->
None:
1210 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1211 connection.execute(table.insert(), record_dicts)
1212 timer.add_values(row_count=len(record_dicts))
1214 if connection
is None:
1215 with self.
_engine.begin()
as connection:
1216 _do_store(connection)
1218 _do_store(connection)
1221 """Generate a set of HTM indices covering specified region.
1225 region: `sphgeom.Region`
1226 Region that needs to be indexed.
1230 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1232 _LOG.debug(
"region: %s", region)
1233 indices = self.
pixelator.envelope(region, self.
config.pixelization.htm_max_ranges)
1235 return indices.ranges()
1237 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1238 """Make SQLAlchemy expression for selecting records in a region."""
1239 htm_index_column = table.columns[self.
config.pixelization.htm_index_column]
1242 for low, upper
in pixel_ranges:
1245 exprlist.append(htm_index_column == low)
1247 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1249 return sql.expression.or_(*exprlist)
1252 """Calculate spatial index for each record and add it to a DataFrame.
1256 df : `pandas.DataFrame`
1257 DataFrame which has to contain ra/dec columns, names of these
1258 columns are defined by configuration ``ra_dec_columns`` field.
1262 df : `pandas.DataFrame`
1263 DataFrame with ``pixelId`` column which contains pixel index
1264 for ra/dec coordinates.
1268 This overrides any existing column in a DataFrame with the same name
1269 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1273 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1274 ra_col, dec_col = self.
config.ra_dec_columns
1275 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1280 df[self.
config.pixelization.htm_index_column] = htm_index
1284 """Update timestamp columns in input DataFrame to be aware datetime
1287 AP pipeline generates naive datetime instances, we want them to be
1288 aware before they go to database. All naive timestamps are assumed to
1289 be in UTC timezone (they should be TAI).
1294 for column, dtype
in df.dtypes.items()
1295 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.UTC
1297 for column
in columns:
1298 df[column] = df[column].dt.tz_convert(datetime.UTC)
1301 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1303 for column
in columns:
1304 df[column] = df[column].dt.tz_localize(datetime.UTC)
1308 """Update timestamp columns to be naive datetime type in returned
1311 AP pipeline code expects DataFrames to contain naive datetime columns,
1312 while Postgres queries return timezone-aware type. This method converts
1313 those columns to naive datetime in UTC timezone.
1316 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1317 for column
in columns:
1319 df[column] = df[column].dt.tz_convert(
None)
__init__(self, ApdbSqlConfig config)
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
str metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
VersionTuple _versionCheck(self, ApdbMetadataSql metadata)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
str metadataCodeVersionKey
ApdbSqlConfig getConfig(self)
sqlalchemy.engine.Engine _engine
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
None _storeUpdateRecords(self, Iterable[ApdbUpdateRecord] records, ReplicaChunk chunk, *, bool store_chunk=False, sqlalchemy.engine.Connection|None connection=None)
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
VersionTuple _db_schema_version
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, float start_time_mjdTai)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
str metadataSchemaVersionKey
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None)
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, float start_time_mjdTai)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)