22"""Module defining Apdb class and related methods."""
24from __future__
import annotations
33from contextlib
import closing
34from typing
import TYPE_CHECKING, Any, cast
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from sqlalchemy
import func, sql
43from sqlalchemy.pool
import NullPool
45from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
46from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
47from lsst.utils.iteration
import chunk_iterable
49from ..apdb
import Apdb
50from ..apdbConfigFreezer
import ApdbConfigFreezer
51from ..apdbReplica
import ReplicaChunk
52from ..apdbSchema
import ApdbTables
53from ..config
import ApdbConfig
54from ..monitor
import MonAgent
55from ..schema_model
import Table
56from ..timer
import Timer
57from ..versionTuple
import IncompatibleVersionError, VersionTuple
58from .apdbMetadataSql
import ApdbMetadataSql
59from .apdbSqlAdmin
import ApdbSqlAdmin
60from .apdbSqlReplica
import ApdbSqlReplica
61from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
62from .config
import ApdbSqlConfig
67 from ..apdbMetadata
import ApdbMetadata
69_LOG = logging.getLogger(__name__)
74"""Version for the code controlling non-replication tables. This needs to be
75updated following compatibility rules when schema produced by this code
81 """Change the type of uint64 columns to int64, and return copy of data
84 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
85 return df.astype(dict.fromkeys(names, np.int64))
89 """Calculate starting point for time-based source search.
93 visit_time : `astropy.time.Time`
94 Time of current visit.
96 Number of months in the sources history.
101 A ``midpointMjdTai`` starting point, MJD time.
105 return float(visit_time.mjd - months * 30)
109 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
112 with closing(dbapiConnection.cursor())
as cursor:
113 cursor.execute(
"PRAGMA foreign_keys=ON;")
117 """Implementation of APDB interface based on SQL database.
119 The implementation is configured via standard ``pex_config`` mechanism
120 using `ApdbSqlConfig` configuration class. For an example of different
121 configurations check ``config/`` folder.
125 config : `ApdbSqlConfig`
126 Configuration object.
129 metadataSchemaVersionKey =
"version:schema"
130 """Name of the metadata key to store schema version number."""
132 metadataCodeVersionKey =
"version:ApdbSql"
133 """Name of the metadata key to store code version number."""
135 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
136 """Name of the metadata key to store replica code version number."""
138 metadataConfigKey =
"config:apdb-sql.json"
139 """Name of the metadata key to store code version number."""
141 _frozen_parameters = (
144 "pixelization.htm_level",
145 "pixelization.htm_index_column",
148 """Names of the config parameters to be frozen in metadata table."""
153 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
154 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
155 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
160 if config_json
is not None:
163 self.
config = freezer.update(config, config_json)
169 dia_object_index=self.
config.dia_object_index,
170 schema_file=self.
config.schema_file,
171 schema_name=self.
config.schema_name,
172 prefix=self.
config.prefix,
173 namespace=self.
config.namespace,
174 htm_index_column=self.
config.pixelization.htm_index_column,
175 enable_replica=self.
config.enable_replica,
182 if _LOG.isEnabledFor(logging.DEBUG):
183 _LOG.debug(
"ApdbSql Configuration: %s", self.
config.model_dump())
185 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
186 """Create `Timer` instance given its name."""
187 return Timer(name, _MON, tags=tags)
190 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
191 """Make SQLALchemy engine based on configured parameters.
195 config : `ApdbSqlConfig`
196 Configuration object.
198 Whether to try to create new database file, only relevant for
199 SQLite backend which always creates new files by default.
203 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
204 conn_args: dict[str, Any] = {}
205 if not config.connection_config.connection_pool:
206 kw.update(poolclass=NullPool)
207 if config.connection_config.isolation_level
is not None:
208 kw.update(isolation_level=config.connection_config.isolation_level)
209 elif config.db_url.startswith(
"sqlite"):
211 kw.update(isolation_level=
"READ_UNCOMMITTED")
212 if config.connection_config.connection_timeout
is not None:
213 if config.db_url.startswith(
"sqlite"):
214 conn_args.update(timeout=config.connection_config.connection_timeout)
215 elif config.db_url.startswith((
"postgresql",
"mysql")):
216 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
217 kw.update(connect_args=conn_args)
218 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
220 if engine.dialect.name ==
"sqlite":
222 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
227 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
228 """Generate a complete URL for database with proper credentials.
233 Database URL as specified in configuration.
235 Whether to try to create new database file, only relevant for
236 SQLite backend which always creates new files by default.
240 connection_url : `sqlalchemy.engine.URL` or `str`
241 Connection URL including credentials.
246 components = urllib.parse.urlparse(config_url)
247 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
250 config_url = db_auth.getUrl(config_url)
251 except DbAuthNotFoundError:
265 """If URL refers to sqlite dialect, update it so that the backend does
266 not try to create database file if it does not exist already.
276 Possibly updated connection string.
279 url = sqlalchemy.make_url(url_string)
280 except sqlalchemy.exc.SQLAlchemyError:
285 if url.get_backend_name() ==
"sqlite":
290 database = url.database
291 if database
and not database.startswith((
":",
"file:")):
292 query = dict(url.query, mode=
"rw", uri=
"true")
299 if database.startswith(
"//"):
301 f
"Database URL contains extra leading slashes which will be removed: {url}",
304 database =
"/" + database.lstrip(
"/")
305 url = url.set(database=f
"file:{database}", query=query)
306 url_string = url.render_as_string()
311 """Check schema version compatibility and return the database schema
315 def _get_version(key: str) -> VersionTuple:
316 """Retrieve version number from given metadata key."""
317 version_str = metadata.get(key)
318 if version_str
is None:
320 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
321 return VersionTuple.fromString(version_str)
328 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
330 f
"Configured schema version {self._schema.schemaVersion()} "
331 f
"is not compatible with database version {db_schema_version}"
335 f
"Current code version {self.apdbImplementationVersion()} "
336 f
"is not compatible with database version {db_code_version}"
340 if self.
_schema.replication_enabled:
342 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
343 if not code_replica_version.checkCompatibility(db_replica_version):
345 f
"Current replication code version {code_replica_version} "
346 f
"is not compatible with database version {db_replica_version}"
349 return db_schema_version
353 """Return version number for current APDB implementation.
357 version : `VersionTuple`
358 Version of the code defined in implementation class.
367 schema_file: str |
None =
None,
368 schema_name: str |
None =
None,
369 read_sources_months: int |
None =
None,
370 read_forced_sources_months: int |
None =
None,
371 enable_replica: bool =
False,
372 connection_timeout: int |
None =
None,
373 dia_object_index: str |
None =
None,
374 htm_level: int |
None =
None,
375 htm_index_column: str |
None =
None,
376 ra_dec_columns: tuple[str, str] |
None =
None,
377 prefix: str |
None =
None,
378 namespace: str |
None =
None,
381 """Initialize new APDB instance and make configuration object for it.
386 SQLAlchemy database URL.
387 schema_file : `str`, optional
388 Location of (YAML) configuration file with APDB schema. If not
389 specified then default location will be used.
390 schema_name : str | None
391 Name of the schema in YAML configuration file. If not specified
392 then default name will be used.
393 read_sources_months : `int`, optional
394 Number of months of history to read from DiaSource.
395 read_forced_sources_months : `int`, optional
396 Number of months of history to read from DiaForcedSource.
397 enable_replica : `bool`
398 If True, make additional tables used for replication to PPDB.
399 connection_timeout : `int`, optional
400 Database connection timeout in seconds.
401 dia_object_index : `str`, optional
402 Indexing mode for DiaObject table.
403 htm_level : `int`, optional
405 htm_index_column : `str`, optional
406 Name of a HTM index column for DiaObject and DiaSource tables.
407 ra_dec_columns : `tuple` [`str`, `str`], optional
408 Names of ra/dec columns in DiaObject table.
409 prefix : `str`, optional
410 Optional prefix for all table names.
411 namespace : `str`, optional
412 Name of the database schema for all APDB tables. If not specified
413 then default schema is used.
414 drop : `bool`, optional
415 If `True` then drop existing tables before re-creating the schema.
419 config : `ApdbSqlConfig`
420 Resulting configuration object for a created APDB instance.
422 config =
ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
423 if schema_file
is not None:
424 config.schema_file = schema_file
425 if schema_name
is not None:
426 config.schema_name = schema_name
427 if read_sources_months
is not None:
428 config.read_sources_months = read_sources_months
429 if read_forced_sources_months
is not None:
430 config.read_forced_sources_months = read_forced_sources_months
431 if connection_timeout
is not None:
432 config.connection_config.connection_timeout = connection_timeout
433 if dia_object_index
is not None:
434 config.dia_object_index = dia_object_index
435 if htm_level
is not None:
436 config.pixelization.htm_level = htm_level
437 if htm_index_column
is not None:
438 config.pixelization.htm_index_column = htm_index_column
439 if ra_dec_columns
is not None:
440 config.ra_dec_columns = ra_dec_columns
441 if prefix
is not None:
442 config.prefix = prefix
443 if namespace
is not None:
444 config.namespace = namespace
455 """Return `ApdbReplica` instance for this database."""
459 """Return dictionary with the table names and row counts.
461 Used by ``ap_proto`` to keep track of the size of the database tables.
462 Depending on database technology this could be expensive operation.
467 Dict where key is a table name and value is a row count.
470 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
471 if self.
config.dia_object_index ==
"last_object_table":
472 tables.append(ApdbTables.DiaObjectLast)
473 with self.
_engine.begin()
as conn:
475 sa_table = self.
_schema.get_table(table)
476 stmt = sql.select(func.count()).select_from(sa_table)
477 count: int = conn.execute(stmt).scalar_one()
478 res[table.name] = count
486 def tableDef(self, table: ApdbTables) -> Table |
None:
488 return self.
_schema.tableSchemas.get(table)
491 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
494 if not isinstance(config, ApdbSqlConfig):
495 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
502 dia_object_index=config.dia_object_index,
503 schema_file=config.schema_file,
504 schema_name=config.schema_name,
505 prefix=config.prefix,
506 namespace=config.namespace,
507 htm_index_column=config.pixelization.htm_index_column,
508 enable_replica=config.enable_replica,
510 schema.makeSchema(drop=drop)
513 meta_table = schema.get_table(ApdbTables.metadata)
519 if config.enable_replica:
523 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
535 if self.
config.dia_object_index ==
"last_object_table":
536 table_enum = ApdbTables.DiaObjectLast
538 table_enum = ApdbTables.DiaObject
539 table = self.
_schema.get_table(table_enum)
540 if not self.
config.dia_object_columns:
541 columns = self.
_schema.get_apdb_columns(table_enum)
543 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
544 query = sql.select(*columns)
549 if self.
_schema.has_mjd_timestamps:
550 validity_end_column =
"validityEndMjdTai"
552 validity_end_column =
"validityEnd"
555 if self.
config.dia_object_index !=
"last_object_table":
556 query = query.where(table.columns[validity_end_column] ==
None)
561 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
562 with self.
_engine.begin()
as conn:
563 objects = pandas.read_sql_query(query, conn)
564 timer.add_values(row_count=len(objects))
565 _LOG.debug(
"found %s DiaObjects", len(objects))
569 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
570 ) -> pandas.DataFrame |
None:
572 if self.
config.read_sources_months == 0:
573 _LOG.debug(
"Skip DiaSources fetching")
576 if object_ids
is None:
583 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
584 ) -> pandas.DataFrame |
None:
586 if self.
config.read_forced_sources_months == 0:
587 _LOG.debug(
"Skip DiaForceSources fetching")
590 if object_ids
is None:
592 raise NotImplementedError(
"Region-based selection is not supported")
597 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
599 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
601 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
603 timer.add_values(row_count=len(sources))
605 _LOG.debug(
"found %s DiaForcedSources", len(sources))
613 visit_time: astropy.time.Time,
616 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
617 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
619 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
621 with self.
_engine.begin()
as conn:
622 result = conn.execute(query1).scalar_one_or_none()
623 if result
is not None:
627 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
628 result = conn.execute(query2).scalar_one_or_none()
629 return result
is not None
634 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
635 query = sql.select(*columns)
638 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
639 with self.
_engine.begin()
as conn:
640 objects = pandas.read_sql_query(query, conn)
641 timer.add_values(row_count=len(objects))
642 _LOG.debug(
"found %s SSObjects", len(objects))
647 visit_time: astropy.time.Time,
648 objects: pandas.DataFrame,
649 sources: pandas.DataFrame |
None =
None,
650 forced_sources: pandas.DataFrame |
None =
None,
654 if sources
is not None:
656 if forced_sources
is not None:
660 with self.
_engine.begin()
as connection:
661 replica_chunk: ReplicaChunk |
None =
None
662 if self.
_schema.replication_enabled:
663 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
670 if sources
is not None:
675 if forced_sources
is not None:
682 idColumn =
"ssObjectId"
683 table = self.
_schema.get_table(ApdbTables.SSObject)
686 with self.
_engine.begin()
as conn:
689 ids = sorted(int(oid)
for oid
in objects[idColumn])
691 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
692 result = conn.execute(query)
693 knownIds = {row.ssObjectId
for row
in result}
695 filter = objects[idColumn].isin(knownIds)
696 toUpdate = cast(pandas.DataFrame, objects[filter])
697 toInsert = cast(pandas.DataFrame, objects[~filter])
700 if len(toInsert) > 0:
701 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
704 if len(toUpdate) > 0:
705 whereKey = f
"{idColumn}_param"
706 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
707 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
708 values = toUpdate.to_dict(
"records")
709 result = conn.execute(update, values)
714 reassignTime = datetime.datetime.now(tz=datetime.UTC)
716 table = self.
_schema.get_table(ApdbTables.DiaSource)
717 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
719 with self.
_engine.begin()
as conn:
723 missing_ids: list[int] = []
724 for key, value
in idMap.items():
729 "ssObjectReassocTime": reassignTime,
731 result = conn.execute(query, params)
732 if result.rowcount == 0:
733 missing_ids.append(key)
735 missing =
",".join(str(item)
for item
in missing_ids)
736 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
746 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
748 if self._schema.has_mjd_timestamps:
749 validity_end_column =
"validityEndMjdTai"
751 validity_end_column =
"validityEnd"
754 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
755 stmt = stmt.where(table.columns[validity_end_column] ==
None)
758 with self._engine.begin()
as conn:
759 count = conn.execute(stmt).scalar_one()
774 """Return catalog of DiaSource instances from given region.
778 region : `lsst.sphgeom.Region`
779 Region to search for DIASources.
780 visit_time : `astropy.time.Time`
781 Time of the current visit.
785 catalog : `pandas.DataFrame`
786 Catalog containing DiaSource records.
791 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
793 table = self.
_schema.get_table(ApdbTables.DiaSource)
794 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
795 query = sql.select(*columns)
798 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
799 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
800 query = query.where(where)
803 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
804 with self.
_engine.begin()
as conn:
805 sources = pandas.read_sql_query(query, conn)
806 timer.add_values(row_counts=len(sources))
807 _LOG.debug(
"found %s DiaSources", len(sources))
811 """Return catalog of DiaSource instances given set of DiaObject IDs.
816 Collection of DiaObject IDs
817 visit_time : `astropy.time.Time`
818 Time of the current visit.
822 catalog : `pandas.DataFrame`
823 Catalog contaning DiaSource records.
828 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
830 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
831 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
832 timer.add_values(row_count=len(sources))
834 _LOG.debug(
"found %s DiaSources", len(sources))
838 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
839 ) -> pandas.DataFrame:
840 """Return catalog of DiaSource or DiaForcedSource instances given set
845 table : `sqlalchemy.schema.Table`
848 Collection of DiaObject IDs
849 midpointMjdTai_start : `float`
850 Earliest midpointMjdTai to retrieve.
854 catalog : `pandas.DataFrame`
855 Catalog contaning DiaSource records. `None` is returned if
856 ``read_sources_months`` configuration parameter is set to 0 or
857 when ``object_ids`` is empty.
859 table = self.
_schema.get_table(table_enum)
860 columns = self.
_schema.get_apdb_columns(table_enum)
862 sources: pandas.DataFrame |
None =
None
863 if len(object_ids) <= 0:
864 _LOG.debug(
"ID list is empty, just fetch empty result")
865 query = sql.select(*columns).where(sql.literal(
False))
866 with self.
_engine.begin()
as conn:
867 sources = pandas.read_sql_query(query, conn)
869 data_frames: list[pandas.DataFrame] = []
870 for ids
in chunk_iterable(sorted(object_ids), 1000):
871 query = sql.select(*columns)
875 int_ids = [int(oid)
for oid
in ids]
880 table.columns[
"diaObjectId"].in_(int_ids),
881 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
886 with self.
_engine.begin()
as conn:
887 data_frames.append(pandas.read_sql_query(query, conn))
889 if len(data_frames) == 1:
890 sources = data_frames[0]
892 sources = pandas.concat(data_frames)
893 assert sources
is not None,
"Catalog cannot be None"
898 replica_chunk: ReplicaChunk,
899 visit_time: astropy.time.Time,
900 connection: sqlalchemy.engine.Connection,
905 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.UTC)
907 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
910 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
911 row = {
"apdb_replica_chunk": replica_chunk.id} | values
912 if connection.dialect.name ==
"sqlite":
913 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
914 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
915 connection.execute(insert_sqlite, row)
916 elif connection.dialect.name ==
"postgresql":
917 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
918 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
919 connection.execute(insert_pg, row)
921 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
925 objs: pandas.DataFrame,
926 visit_time: astropy.time.Time,
927 replica_chunk: ReplicaChunk |
None,
928 connection: sqlalchemy.engine.Connection,
930 """Store catalog of DiaObjects from current visit.
934 objs : `pandas.DataFrame`
935 Catalog with DiaObject records.
936 visit_time : `astropy.time.Time`
938 replica_chunk : `ReplicaChunk`
942 _LOG.debug(
"No objects to write to database.")
947 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
948 _LOG.debug(
"first object ID: %d", ids[0])
950 if self.
_schema.has_mjd_timestamps:
951 validity_start_column =
"validityStartMjdTai"
952 validity_end_column =
"validityEndMjdTai"
953 timestamp = float(visit_time.tai.mjd)
955 validity_start_column =
"validityStart"
956 validity_end_column =
"validityEnd"
957 timestamp = visit_time.datetime
960 if self.
config.dia_object_index ==
"last_object_table":
962 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
965 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
967 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
968 res = connection.execute(query)
969 timer.add_values(row_count=res.rowcount)
970 _LOG.debug(
"deleted %s objects", res.rowcount)
973 last_column_names = [column.name
for column
in table.columns]
974 last_objs = objs[last_column_names]
977 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
985 timer.add_values(row_count=len(last_objs))
988 table = self.
_schema.get_table(ApdbTables.DiaObject)
992 .values(**{validity_end_column: timestamp})
995 table.columns[
"diaObjectId"].in_(ids),
996 table.columns[validity_end_column].is_(
None),
1001 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
1002 res = connection.execute(update)
1003 timer.add_values(row_count=res.rowcount)
1004 _LOG.debug(
"truncated %s intervals", res.rowcount)
1009 extra_columns: list[pandas.Series] = []
1010 if validity_start_column
in objs.columns:
1011 objs[validity_start_column] = timestamp
1013 extra_columns.append(pandas.Series([timestamp] * len(objs), name=validity_start_column))
1014 if validity_end_column
in objs.columns:
1015 objs[validity_end_column] =
None
1017 extra_columns.append(pandas.Series([
None] * len(objs), name=validity_end_column))
1019 objs.set_index(extra_columns[0].index, inplace=
True)
1020 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1023 table = self.
_schema.get_table(ApdbTables.DiaObject)
1024 replica_data: list[dict] = []
1025 replica_stmt: Any =
None
1026 replica_table_name =
""
1027 if replica_chunk
is not None:
1028 pk_names = [column.name
for column
in table.primary_key]
1029 replica_data = objs[pk_names].to_dict(
"records")
1030 for row
in replica_data:
1031 row[
"apdb_replica_chunk"] = replica_chunk.id
1032 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1033 replica_table_name = replica_table.name
1034 replica_stmt = replica_table.insert()
1037 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1038 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1039 timer.add_values(row_count=len(objs))
1040 if replica_stmt
is not None:
1041 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1042 connection.execute(replica_stmt, replica_data)
1043 timer.add_values(row_count=len(replica_data))
1047 sources: pandas.DataFrame,
1048 replica_chunk: ReplicaChunk |
None,
1049 connection: sqlalchemy.engine.Connection,
1051 """Store catalog of DiaSources from current visit.
1055 sources : `pandas.DataFrame`
1056 Catalog containing DiaSource records
1058 table = self.
_schema.get_table(ApdbTables.DiaSource)
1061 replica_data: list[dict] = []
1062 replica_stmt: Any =
None
1063 replica_table_name =
""
1064 if replica_chunk
is not None:
1065 pk_names = [column.name
for column
in table.primary_key]
1066 replica_data = sources[pk_names].to_dict(
"records")
1067 for row
in replica_data:
1068 row[
"apdb_replica_chunk"] = replica_chunk.id
1069 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1070 replica_table_name = replica_table.name
1071 replica_stmt = replica_table.insert()
1074 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1076 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1077 timer.add_values(row_count=len(sources))
1078 if replica_stmt
is not None:
1079 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1080 connection.execute(replica_stmt, replica_data)
1081 timer.add_values(row_count=len(replica_data))
1085 sources: pandas.DataFrame,
1086 replica_chunk: ReplicaChunk |
None,
1087 connection: sqlalchemy.engine.Connection,
1089 """Store a set of DiaForcedSources from current visit.
1093 sources : `pandas.DataFrame`
1094 Catalog containing DiaForcedSource records
1096 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1099 replica_data: list[dict] = []
1100 replica_stmt: Any =
None
1101 replica_table_name =
""
1102 if replica_chunk
is not None:
1103 pk_names = [column.name
for column
in table.primary_key]
1104 replica_data = sources[pk_names].to_dict(
"records")
1105 for row
in replica_data:
1106 row[
"apdb_replica_chunk"] = replica_chunk.id
1107 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1108 replica_table_name = replica_table.name
1109 replica_stmt = replica_table.insert()
1112 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1114 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1115 timer.add_values(row_count=len(sources))
1116 if replica_stmt
is not None:
1117 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1118 connection.execute(replica_stmt, replica_data)
1119 timer.add_values(row_count=len(replica_data))
1122 """Generate a set of HTM indices covering specified region.
1126 region: `sphgeom.Region`
1127 Region that needs to be indexed.
1131 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1133 _LOG.debug(
"region: %s", region)
1134 indices = self.
pixelator.envelope(region, self.
config.pixelization.htm_max_ranges)
1136 return indices.ranges()
1138 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1139 """Make SQLAlchemy expression for selecting records in a region."""
1140 htm_index_column = table.columns[self.
config.pixelization.htm_index_column]
1143 for low, upper
in pixel_ranges:
1146 exprlist.append(htm_index_column == low)
1148 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1150 return sql.expression.or_(*exprlist)
1153 """Calculate spatial index for each record and add it to a DataFrame.
1157 df : `pandas.DataFrame`
1158 DataFrame which has to contain ra/dec columns, names of these
1159 columns are defined by configuration ``ra_dec_columns`` field.
1163 df : `pandas.DataFrame`
1164 DataFrame with ``pixelId`` column which contains pixel index
1165 for ra/dec coordinates.
1169 This overrides any existing column in a DataFrame with the same name
1170 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1174 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1175 ra_col, dec_col = self.
config.ra_dec_columns
1176 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1181 df[self.
config.pixelization.htm_index_column] = htm_index
1185 """Update timestamp columns in input DataFrame to be aware datetime
1188 AP pipeline generates naive datetime instances, we want them to be
1189 aware before they go to database. All naive timestamps are assumed to
1190 be in UTC timezone (they should be TAI).
1195 for column, dtype
in df.dtypes.items()
1196 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.UTC
1198 for column
in columns:
1199 df[column] = df[column].dt.tz_convert(datetime.UTC)
1202 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1204 for column
in columns:
1205 df[column] = df[column].dt.tz_localize(datetime.UTC)
1209 """Update timestamp columns to be naive datetime type in returned
1212 AP pipeline code expects DataFrames to contain naive datetime columns,
1213 while Postgres queries return timezone-aware type. This method converts
1214 those columns to naive datetime in UTC timezone.
1217 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1218 for column
in columns:
1220 df[column] = df[column].dt.tz_convert(
None)
__init__(self, ApdbSqlConfig config)
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
str metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
VersionTuple _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
str metadataCodeVersionKey
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
ApdbSqlConfig getConfig(self)
sqlalchemy.engine.Engine _engine
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
VersionTuple _db_schema_version
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
str metadataSchemaVersionKey
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)