22"""Module defining Apdb class and related methods."""
24from __future__
import annotations
33from contextlib
import closing
34from typing
import TYPE_CHECKING, Any, cast
40import sqlalchemy.dialects.postgresql
41import sqlalchemy.dialects.sqlite
42from lsst.sphgeom import HtmPixelization, LonLat, Region, UnitVector3d
43from lsst.utils.db_auth
import DbAuth, DbAuthNotFoundError
44from lsst.utils.iteration
import chunk_iterable
45from sqlalchemy
import func, sql
46from sqlalchemy.pool
import NullPool
48from ..apdb
import Apdb
49from ..apdbConfigFreezer
import ApdbConfigFreezer
50from ..apdbReplica
import ReplicaChunk
51from ..apdbSchema
import ApdbTables
52from ..config
import ApdbConfig
53from ..monitor
import MonAgent
54from ..schema_model
import Table
55from ..timer
import Timer
56from ..versionTuple
import IncompatibleVersionError, VersionTuple
57from .apdbMetadataSql
import ApdbMetadataSql
58from .apdbSqlReplica
import ApdbSqlReplica
59from .apdbSqlSchema
import ApdbSqlSchema, ExtraTables
60from .config
import ApdbSqlConfig
65 from ..apdbMetadata
import ApdbMetadata
67_LOG = logging.getLogger(__name__)
72"""Version for the code controlling non-replication tables. This needs to be
73updated following compatibility rules when schema produced by this code
79 """Change the type of uint64 columns to int64, and return copy of data
82 names = [c[0]
for c
in df.dtypes.items()
if c[1] == np.uint64]
83 return df.astype({name: np.int64
for name
in names})
87 """Calculate starting point for time-based source search.
91 visit_time : `astropy.time.Time`
92 Time of current visit.
94 Number of months in the sources history.
99 A ``midpointMjdTai`` starting point, MJD time.
103 return float(visit_time.mjd - months * 30)
107 dbapiConnection: sqlite3.Connection, connectionRecord: sqlalchemy.pool._ConnectionRecord
110 with closing(dbapiConnection.cursor())
as cursor:
111 cursor.execute(
"PRAGMA foreign_keys=ON;")
115 """Implementation of APDB interface based on SQL database.
117 The implementation is configured via standard ``pex_config`` mechanism
118 using `ApdbSqlConfig` configuration class. For an example of different
119 configurations check ``config/`` folder.
123 config : `ApdbSqlConfig`
124 Configuration object.
127 metadataSchemaVersionKey =
"version:schema"
128 """Name of the metadata key to store schema version number."""
130 metadataCodeVersionKey =
"version:ApdbSql"
131 """Name of the metadata key to store code version number."""
133 metadataReplicaVersionKey =
"version:ApdbSqlReplica"
134 """Name of the metadata key to store replica code version number."""
136 metadataConfigKey =
"config:apdb-sql.json"
137 """Name of the metadata key to store code version number."""
139 _frozen_parameters = (
142 "pixelization.htm_level",
143 "pixelization.htm_index_column",
146 """Names of the config parameters to be frozen in metadata table."""
151 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
152 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
153 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self.
_engine)
158 if config_json
is not None:
161 self.
config = freezer.update(config, config_json)
167 dia_object_index=self.
config.dia_object_index,
168 schema_file=self.
config.schema_file,
169 schema_name=self.
config.schema_name,
170 prefix=self.
config.prefix,
171 namespace=self.
config.namespace,
172 htm_index_column=self.
config.pixelization.htm_index_column,
173 enable_replica=self.
config.enable_replica,
180 if _LOG.isEnabledFor(logging.DEBUG):
181 _LOG.debug(
"ApdbSql Configuration: %s", self.
config.model_dump())
183 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
184 """Create `Timer` instance given its name."""
185 return Timer(name, _MON, tags=tags)
183 def _timer(self, name: str, *, tags: Mapping[str, str | int] |
None =
None) -> Timer:
…
188 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
189 """Make SQLALchemy engine based on configured parameters.
193 config : `ApdbSqlConfig`
194 Configuration object.
196 Whether to try to create new database file, only relevant for
197 SQLite backend which always creates new files by default.
201 kw: MutableMapping[str, Any] = dict(config.connection_config.extra_parameters)
202 conn_args: dict[str, Any] = dict()
203 if not config.connection_config.connection_pool:
204 kw.update(poolclass=NullPool)
205 if config.connection_config.isolation_level
is not None:
206 kw.update(isolation_level=config.connection_config.isolation_level)
207 elif config.db_url.startswith(
"sqlite"):
209 kw.update(isolation_level=
"READ_UNCOMMITTED")
210 if config.connection_config.connection_timeout
is not None:
211 if config.db_url.startswith(
"sqlite"):
212 conn_args.update(timeout=config.connection_config.connection_timeout)
213 elif config.db_url.startswith((
"postgresql",
"mysql")):
214 conn_args.update(connect_timeout=int(config.connection_config.connection_timeout))
215 kw.update(connect_args=conn_args)
216 engine = sqlalchemy.create_engine(cls.
_connection_url(config.db_url, create=create), **kw)
218 if engine.dialect.name ==
"sqlite":
220 sqlalchemy.event.listen(engine,
"connect", _onSqlite3Connect)
188 def _makeEngine(cls, config: ApdbSqlConfig, *, create: bool) -> sqlalchemy.engine.Engine:
…
225 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
226 """Generate a complete URL for database with proper credentials.
231 Database URL as specified in configuration.
233 Whether to try to create new database file, only relevant for
234 SQLite backend which always creates new files by default.
238 connection_url : `sqlalchemy.engine.URL` or `str`
239 Connection URL including credentials.
244 components = urllib.parse.urlparse(config_url)
245 if all((components.scheme
is not None, components.hostname
is not None, components.path
is not None)):
248 config_url = db_auth.getUrl(config_url)
249 except DbAuthNotFoundError:
225 def _connection_url(cls, config_url: str, *, create: bool) -> sqlalchemy.engine.URL | str:
…
263 """If URL refers to sqlite dialect, update it so that the backend does
264 not try to create database file if it does not exist already.
274 Possibly updated connection string.
277 url = sqlalchemy.make_url(url_string)
278 except sqlalchemy.exc.SQLAlchemyError:
283 if url.get_backend_name() ==
"sqlite":
288 database = url.database
289 if database
and not database.startswith((
":",
"file:")):
290 query = dict(url.query, mode=
"rw", uri=
"true")
297 if database.startswith(
"//"):
299 f
"Database URL contains extra leading slashes which will be removed: {url}",
302 database =
"/" + database.lstrip(
"/")
303 url = url.set(database=f
"file:{database}", query=query)
304 url_string = url.render_as_string()
309 """Check schema version compatibility."""
311 def _get_version(key: str) -> VersionTuple:
312 """Retrieve version number from given metadata key."""
313 version_str = metadata.get(key)
314 if version_str
is None:
316 raise RuntimeError(f
"Version key {key!r} does not exist in metadata table.")
317 return VersionTuple.fromString(version_str)
324 if not self.
_schema.schemaVersion().checkCompatibility(db_schema_version):
326 f
"Configured schema version {self._schema.schemaVersion()} "
327 f
"is not compatible with database version {db_schema_version}"
331 f
"Current code version {self.apdbImplementationVersion()} "
332 f
"is not compatible with database version {db_code_version}"
336 if self.
_schema.replication_enabled:
338 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
339 if not code_replica_version.checkCompatibility(db_replica_version):
341 f
"Current replication code version {code_replica_version} "
342 f
"is not compatible with database version {db_replica_version}"
347 """Return version number for current APDB implementation.
351 version : `VersionTuple`
352 Version of the code defined in implementation class.
361 schema_file: str |
None =
None,
362 schema_name: str |
None =
None,
363 read_sources_months: int |
None =
None,
364 read_forced_sources_months: int |
None =
None,
365 enable_replica: bool =
False,
366 connection_timeout: int |
None =
None,
367 dia_object_index: str |
None =
None,
368 htm_level: int |
None =
None,
369 htm_index_column: str |
None =
None,
370 ra_dec_columns: tuple[str, str] |
None =
None,
371 prefix: str |
None =
None,
372 namespace: str |
None =
None,
375 """Initialize new APDB instance and make configuration object for it.
380 SQLAlchemy database URL.
381 schema_file : `str`, optional
382 Location of (YAML) configuration file with APDB schema. If not
383 specified then default location will be used.
384 schema_name : str | None
385 Name of the schema in YAML configuration file. If not specified
386 then default name will be used.
387 read_sources_months : `int`, optional
388 Number of months of history to read from DiaSource.
389 read_forced_sources_months : `int`, optional
390 Number of months of history to read from DiaForcedSource.
391 enable_replica : `bool`
392 If True, make additional tables used for replication to PPDB.
393 connection_timeout : `int`, optional
394 Database connection timeout in seconds.
395 dia_object_index : `str`, optional
396 Indexing mode for DiaObject table.
397 htm_level : `int`, optional
399 htm_index_column : `str`, optional
400 Name of a HTM index column for DiaObject and DiaSource tables.
401 ra_dec_columns : `tuple` [`str`, `str`], optional
402 Names of ra/dec columns in DiaObject table.
403 prefix : `str`, optional
404 Optional prefix for all table names.
405 namespace : `str`, optional
406 Name of the database schema for all APDB tables. If not specified
407 then default schema is used.
408 drop : `bool`, optional
409 If `True` then drop existing tables before re-creating the schema.
413 config : `ApdbSqlConfig`
414 Resulting configuration object for a created APDB instance.
416 config =
ApdbSqlConfig(db_url=db_url, enable_replica=enable_replica)
417 if schema_file
is not None:
418 config.schema_file = schema_file
419 if schema_name
is not None:
420 config.schema_name = schema_name
421 if read_sources_months
is not None:
422 config.read_sources_months = read_sources_months
423 if read_forced_sources_months
is not None:
424 config.read_forced_sources_months = read_forced_sources_months
425 if connection_timeout
is not None:
426 config.connection_config.connection_timeout = connection_timeout
427 if dia_object_index
is not None:
428 config.dia_object_index = dia_object_index
429 if htm_level
is not None:
430 config.pixelization.htm_level = htm_level
431 if htm_index_column
is not None:
432 config.pixelization.htm_index_column = htm_index_column
433 if ra_dec_columns
is not None:
434 config.ra_dec_columns = ra_dec_columns
435 if prefix
is not None:
436 config.prefix = prefix
437 if namespace
is not None:
438 config.namespace = namespace
449 """Return `ApdbReplica` instance for this database."""
453 """Return dictionary with the table names and row counts.
455 Used by ``ap_proto`` to keep track of the size of the database tables.
456 Depending on database technology this could be expensive operation.
461 Dict where key is a table name and value is a row count.
464 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
465 if self.
config.dia_object_index ==
"last_object_table":
466 tables.append(ApdbTables.DiaObjectLast)
467 with self.
_engine.begin()
as conn:
469 sa_table = self.
_schema.get_table(table)
470 stmt = sql.select(func.count()).select_from(sa_table)
471 count: int = conn.execute(stmt).scalar_one()
472 res[table.name] = count
476 def tableDef(self, table: ApdbTables) -> Table |
None:
478 return self.
_schema.tableSchemas.get(table)
476 def tableDef(self, table: ApdbTables) -> Table |
None:
…
481 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
484 if not isinstance(config, ApdbSqlConfig):
485 raise TypeError(f
"Unexpected type of configuration object: {type(config)}")
492 dia_object_index=config.dia_object_index,
493 schema_file=config.schema_file,
494 schema_name=config.schema_name,
495 prefix=config.prefix,
496 namespace=config.namespace,
497 htm_index_column=config.pixelization.htm_index_column,
498 enable_replica=config.enable_replica,
500 schema.makeSchema(drop=drop)
503 meta_table = schema.get_table(ApdbTables.metadata)
509 if config.enable_replica:
513 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
481 def _makeSchema(cls, config: ApdbConfig, drop: bool =
False) ->
None:
…
525 if self.
config.dia_object_index ==
"last_object_table":
526 table_enum = ApdbTables.DiaObjectLast
528 table_enum = ApdbTables.DiaObject
529 table = self.
_schema.get_table(table_enum)
530 if not self.
config.dia_object_columns:
531 columns = self.
_schema.get_apdb_columns(table_enum)
533 columns = [table.c[col]
for col
in self.
config.dia_object_columns]
534 query = sql.select(*columns)
540 if self.
config.dia_object_index !=
"last_object_table":
541 query = query.where(table.c.validityEnd ==
None)
546 with self.
_timer(
"select_time", tags={
"table":
"DiaObject"})
as timer:
547 with self.
_engine.begin()
as conn:
548 objects = pandas.read_sql_query(query, conn)
549 timer.add_values(row_count=len(objects))
550 _LOG.debug(
"found %s DiaObjects", len(objects))
554 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
555 ) -> pandas.DataFrame |
None:
557 if self.
config.read_sources_months == 0:
558 _LOG.debug(
"Skip DiaSources fetching")
561 if object_ids
is None:
568 self, region: Region, object_ids: Iterable[int] |
None, visit_time: astropy.time.Time
569 ) -> pandas.DataFrame |
None:
571 if self.
config.read_forced_sources_months == 0:
572 _LOG.debug(
"Skip DiaForceSources fetching")
575 if object_ids
is None:
577 raise NotImplementedError(
"Region-based selection is not supported")
582 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
584 with self.
_timer(
"select_time", tags={
"table":
"DiaForcedSource"})
as timer:
586 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
588 timer.add_values(row_count=len(sources))
590 _LOG.debug(
"found %s DiaForcedSources", len(sources))
598 visit_time: astropy.time.Time,
601 src_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaSource)
602 frcsrc_table: sqlalchemy.schema.Table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
604 query1 = sql.select(src_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
606 with self.
_engine.begin()
as conn:
607 result = conn.execute(query1).scalar_one_or_none()
608 if result
is not None:
612 query2 = sql.select(frcsrc_table.c.visit).filter_by(visit=visit, detector=detector).limit(1)
613 result = conn.execute(query2).scalar_one_or_none()
614 return result
is not None
619 columns = self.
_schema.get_apdb_columns(ApdbTables.SSObject)
620 query = sql.select(*columns)
623 with self.
_timer(
"SSObject_select_time", tags={
"table":
"SSObject"})
as timer:
624 with self.
_engine.begin()
as conn:
625 objects = pandas.read_sql_query(query, conn)
626 timer.add_values(row_count=len(objects))
627 _LOG.debug(
"found %s SSObjects", len(objects))
632 visit_time: astropy.time.Time,
633 objects: pandas.DataFrame,
634 sources: pandas.DataFrame |
None =
None,
635 forced_sources: pandas.DataFrame |
None =
None,
639 if sources
is not None:
641 if forced_sources
is not None:
645 with self.
_engine.begin()
as connection:
646 replica_chunk: ReplicaChunk |
None =
None
647 if self.
_schema.replication_enabled:
648 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.
config.replica_chunk_seconds)
655 if sources
is not None:
660 if forced_sources
is not None:
667 idColumn =
"ssObjectId"
668 table = self.
_schema.get_table(ApdbTables.SSObject)
671 with self.
_engine.begin()
as conn:
674 ids = sorted(int(oid)
for oid
in objects[idColumn])
676 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
677 result = conn.execute(query)
678 knownIds = set(row.ssObjectId
for row
in result)
680 filter = objects[idColumn].isin(knownIds)
681 toUpdate = cast(pandas.DataFrame, objects[filter])
682 toInsert = cast(pandas.DataFrame, objects[~filter])
685 if len(toInsert) > 0:
686 toInsert.to_sql(table.name, conn, if_exists=
"append", index=
False, schema=table.schema)
689 if len(toUpdate) > 0:
690 whereKey = f
"{idColumn}_param"
691 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
692 toUpdate = toUpdate.rename({idColumn: whereKey}, axis=
"columns")
693 values = toUpdate.to_dict(
"records")
694 result = conn.execute(update, values)
699 table = self.
_schema.get_table(ApdbTables.DiaSource)
700 query = table.update().where(table.columns[
"diaSourceId"] == sql.bindparam(
"srcId"))
702 with self.
_engine.begin()
as conn:
706 missing_ids: list[int] = []
707 for key, value
in idMap.items():
708 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
709 result = conn.execute(query, params)
710 if result.rowcount == 0:
711 missing_ids.append(key)
713 missing =
",".join(str(item)
for item
in missing_ids)
714 raise ValueError(f
"Following DiaSource IDs do not exist in the database: {missing}")
724 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
727 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
728 stmt = stmt.where(table.c.validityEnd ==
None)
731 with self._engine.begin()
as conn:
732 count = conn.execute(stmt).scalar_one()
742 """Return catalog of DiaSource instances from given region.
746 region : `lsst.sphgeom.Region`
747 Region to search for DIASources.
748 visit_time : `astropy.time.Time`
749 Time of the current visit.
753 catalog : `pandas.DataFrame`
754 Catalog containing DiaSource records.
759 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
761 table = self.
_schema.get_table(ApdbTables.DiaSource)
762 columns = self.
_schema.get_apdb_columns(ApdbTables.DiaSource)
763 query = sql.select(*columns)
766 time_filter = table.columns[
"midpointMjdTai"] > midpointMjdTai_start
767 where = sql.expression.and_(self.
_filterRegion(table, region), time_filter)
768 query = query.where(where)
771 with self.
_timer(
"DiaSource_select_time", tags={
"table":
"DiaSource"})
as timer:
772 with self.
_engine.begin()
as conn:
773 sources = pandas.read_sql_query(query, conn)
774 timer.add_values(row_counts=len(sources))
775 _LOG.debug(
"found %s DiaSources", len(sources))
779 """Return catalog of DiaSource instances given set of DiaObject IDs.
784 Collection of DiaObject IDs
785 visit_time : `astropy.time.Time`
786 Time of the current visit.
790 catalog : `pandas.DataFrame`
791 Catalog contaning DiaSource records.
796 _LOG.debug(
"midpointMjdTai_start = %.6f", midpointMjdTai_start)
798 with self.
_timer(
"select_time", tags={
"table":
"DiaSource"})
as timer:
799 sources = self.
_getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
800 timer.add_values(row_count=len(sources))
802 _LOG.debug(
"found %s DiaSources", len(sources))
806 self, table_enum: ApdbTables, object_ids: list[int], midpointMjdTai_start: float
807 ) -> pandas.DataFrame:
808 """Return catalog of DiaSource or DiaForcedSource instances given set
813 table : `sqlalchemy.schema.Table`
816 Collection of DiaObject IDs
817 midpointMjdTai_start : `float`
818 Earliest midpointMjdTai to retrieve.
822 catalog : `pandas.DataFrame`
823 Catalog contaning DiaSource records. `None` is returned if
824 ``read_sources_months`` configuration parameter is set to 0 or
825 when ``object_ids`` is empty.
827 table = self.
_schema.get_table(table_enum)
828 columns = self.
_schema.get_apdb_columns(table_enum)
830 sources: pandas.DataFrame |
None =
None
831 if len(object_ids) <= 0:
832 _LOG.debug(
"ID list is empty, just fetch empty result")
833 query = sql.select(*columns).where(sql.literal(
False))
834 with self.
_engine.begin()
as conn:
835 sources = pandas.read_sql_query(query, conn)
837 data_frames: list[pandas.DataFrame] = []
838 for ids
in chunk_iterable(sorted(object_ids), 1000):
839 query = sql.select(*columns)
843 int_ids = [int(oid)
for oid
in ids]
848 table.columns[
"diaObjectId"].in_(int_ids),
849 table.columns[
"midpointMjdTai"] > midpointMjdTai_start,
854 with self.
_engine.begin()
as conn:
855 data_frames.append(pandas.read_sql_query(query, conn))
857 if len(data_frames) == 1:
858 sources = data_frames[0]
860 sources = pandas.concat(data_frames)
861 assert sources
is not None,
"Catalog cannot be None"
866 replica_chunk: ReplicaChunk,
867 visit_time: astropy.time.Time,
868 connection: sqlalchemy.engine.Connection,
873 dt = datetime.datetime.fromtimestamp(visit_time.unix_tai, tz=datetime.timezone.utc)
875 table = self.
_schema.get_table(ExtraTables.ApdbReplicaChunks)
878 values = {
"last_update_time": dt,
"unique_id": replica_chunk.unique_id}
879 row = {
"apdb_replica_chunk": replica_chunk.id} | values
880 if connection.dialect.name ==
"sqlite":
881 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
882 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
883 connection.execute(insert_sqlite, row)
884 elif connection.dialect.name ==
"postgresql":
885 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
886 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
887 connection.execute(insert_pg, row)
889 raise TypeError(f
"Unsupported dialect {connection.dialect.name} for upsert.")
893 objs: pandas.DataFrame,
894 visit_time: astropy.time.Time,
895 replica_chunk: ReplicaChunk |
None,
896 connection: sqlalchemy.engine.Connection,
898 """Store catalog of DiaObjects from current visit.
902 objs : `pandas.DataFrame`
903 Catalog with DiaObject records.
904 visit_time : `astropy.time.Time`
906 replica_chunk : `ReplicaChunk`
910 _LOG.debug(
"No objects to write to database.")
915 ids = sorted(int(oid)
for oid
in objs[
"diaObjectId"])
916 _LOG.debug(
"first object ID: %d", ids[0])
920 dt = visit_time.datetime
923 if self.
config.dia_object_index ==
"last_object_table":
925 table = self.
_schema.get_table(ApdbTables.DiaObjectLast)
928 query = table.delete().where(table.columns[
"diaObjectId"].in_(ids))
930 with self.
_timer(
"delete_time", tags={
"table": table.name})
as timer:
931 res = connection.execute(query)
932 timer.add_values(row_count=res.rowcount)
933 _LOG.debug(
"deleted %s objects", res.rowcount)
936 last_column_names = [column.name
for column
in table.columns]
937 last_objs = objs[last_column_names]
940 if "lastNonForcedSource" in last_objs.columns:
943 last_objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
945 extra_column = pandas.Series([dt] * len(objs), name=
"lastNonForcedSource")
946 last_objs.set_index(extra_column.index, inplace=
True)
947 last_objs = pandas.concat([last_objs, extra_column], axis=
"columns")
949 with self.
_timer(
"insert_time", tags={
"table":
"DiaObjectLast"})
as timer:
957 timer.add_values(row_count=len(last_objs))
960 table = self.
_schema.get_table(ApdbTables.DiaObject)
964 .values(validityEnd=dt)
967 table.columns[
"diaObjectId"].in_(ids),
968 table.columns[
"validityEnd"].is_(
None),
973 with self.
_timer(
"truncate_time", tags={
"table": table.name})
as timer:
974 res = connection.execute(update)
975 timer.add_values(row_count=res.rowcount)
976 _LOG.debug(
"truncated %s intervals", res.rowcount)
981 extra_columns: list[pandas.Series] = []
982 if "validityStart" in objs.columns:
983 objs[
"validityStart"] = dt
985 extra_columns.append(pandas.Series([dt] * len(objs), name=
"validityStart"))
986 if "validityEnd" in objs.columns:
987 objs[
"validityEnd"] =
None
989 extra_columns.append(pandas.Series([
None] * len(objs), name=
"validityEnd"))
990 if "lastNonForcedSource" in objs.columns:
993 objs.fillna({
"lastNonForcedSource": dt}, inplace=
True)
995 extra_columns.append(pandas.Series([dt] * len(objs), name=
"lastNonForcedSource"))
997 objs.set_index(extra_columns[0].index, inplace=
True)
998 objs = pandas.concat([objs] + extra_columns, axis=
"columns")
1001 table = self.
_schema.get_table(ApdbTables.DiaObject)
1002 replica_data: list[dict] = []
1003 replica_stmt: Any =
None
1004 replica_table_name =
""
1005 if replica_chunk
is not None:
1006 pk_names = [column.name
for column
in table.primary_key]
1007 replica_data = objs[pk_names].to_dict(
"records")
1008 for row
in replica_data:
1009 row[
"apdb_replica_chunk"] = replica_chunk.id
1010 replica_table = self.
_schema.get_table(ExtraTables.DiaObjectChunks)
1011 replica_table_name = replica_table.name
1012 replica_stmt = replica_table.insert()
1015 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1016 objs.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1017 timer.add_values(row_count=len(objs))
1018 if replica_stmt
is not None:
1019 with self.
_timer(
"insert_time", tags={
"table": replica_table_name})
as timer:
1020 connection.execute(replica_stmt, replica_data)
1021 timer.add_values(row_count=len(replica_data))
1025 sources: pandas.DataFrame,
1026 replica_chunk: ReplicaChunk |
None,
1027 connection: sqlalchemy.engine.Connection,
1029 """Store catalog of DiaSources from current visit.
1033 sources : `pandas.DataFrame`
1034 Catalog containing DiaSource records
1036 table = self.
_schema.get_table(ApdbTables.DiaSource)
1039 replica_data: list[dict] = []
1040 replica_stmt: Any =
None
1041 replica_table_name =
""
1042 if replica_chunk
is not None:
1043 pk_names = [column.name
for column
in table.primary_key]
1044 replica_data = sources[pk_names].to_dict(
"records")
1045 for row
in replica_data:
1046 row[
"apdb_replica_chunk"] = replica_chunk.id
1047 replica_table = self.
_schema.get_table(ExtraTables.DiaSourceChunks)
1048 replica_table_name = replica_table.name
1049 replica_stmt = replica_table.insert()
1052 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1054 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1055 timer.add_values(row_count=len(sources))
1056 if replica_stmt
is not None:
1057 with self.
_timer(
"replica_insert_time", tags={
"table": replica_table_name})
as timer:
1058 connection.execute(replica_stmt, replica_data)
1059 timer.add_values(row_count=len(replica_data))
1063 sources: pandas.DataFrame,
1064 replica_chunk: ReplicaChunk |
None,
1065 connection: sqlalchemy.engine.Connection,
1067 """Store a set of DiaForcedSources from current visit.
1071 sources : `pandas.DataFrame`
1072 Catalog containing DiaForcedSource records
1074 table = self.
_schema.get_table(ApdbTables.DiaForcedSource)
1077 replica_data: list[dict] = []
1078 replica_stmt: Any =
None
1079 replica_table_name =
""
1080 if replica_chunk
is not None:
1081 pk_names = [column.name
for column
in table.primary_key]
1082 replica_data = sources[pk_names].to_dict(
"records")
1083 for row
in replica_data:
1084 row[
"apdb_replica_chunk"] = replica_chunk.id
1085 replica_table = self.
_schema.get_table(ExtraTables.DiaForcedSourceChunks)
1086 replica_table_name = replica_table.name
1087 replica_stmt = replica_table.insert()
1090 with self.
_timer(
"insert_time", tags={
"table": table.name})
as timer:
1092 sources.to_sql(table.name, connection, if_exists=
"append", index=
False, schema=table.schema)
1093 timer.add_values(row_count=len(sources))
1094 if replica_stmt
is not None:
1095 with self.
_timer(
"insert_time", tags={
"table": replica_table_name}):
1096 connection.execute(replica_stmt, replica_data)
1097 timer.add_values(row_count=len(replica_data))
1100 """Generate a set of HTM indices covering specified region.
1104 region: `sphgeom.Region`
1105 Region that needs to be indexed.
1109 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1111 _LOG.debug(
"region: %s", region)
1112 indices = self.
pixelator.envelope(region, self.
config.pixelization.htm_max_ranges)
1114 return indices.ranges()
1116 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1117 """Make SQLAlchemy expression for selecting records in a region."""
1118 htm_index_column = table.columns[self.
config.pixelization.htm_index_column]
1121 for low, upper
in pixel_ranges:
1124 exprlist.append(htm_index_column == low)
1126 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1128 return sql.expression.or_(*exprlist)
1116 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
…
1131 """Calculate spatial index for each record and add it to a DataFrame.
1135 df : `pandas.DataFrame`
1136 DataFrame which has to contain ra/dec columns, names of these
1137 columns are defined by configuration ``ra_dec_columns`` field.
1141 df : `pandas.DataFrame`
1142 DataFrame with ``pixelId`` column which contains pixel index
1143 for ra/dec coordinates.
1147 This overrides any existing column in a DataFrame with the same name
1148 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1152 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1153 ra_col, dec_col = self.
config.ra_dec_columns
1154 for i, (ra, dec)
in enumerate(zip(df[ra_col], df[dec_col])):
1159 df[self.
config.pixelization.htm_index_column] = htm_index
1163 """Update timestamp columns in input DataFrame to be aware datetime
1166 AP pipeline generates naive datetime instances, we want them to be
1167 aware before they go to database. All naive timestamps are assumed to
1168 be in UTC timezone (they should be TAI).
1173 for column, dtype
in df.dtypes.items()
1174 if isinstance(dtype, pandas.DatetimeTZDtype)
and dtype.tz
is not datetime.timezone.utc
1176 for column
in columns:
1177 df[column] = df[column].dt.tz_convert(datetime.timezone.utc)
1180 column
for column, dtype
in df.dtypes.items()
if pandas.api.types.is_datetime64_dtype(dtype)
1182 for column
in columns:
1183 df[column] = df[column].dt.tz_localize(datetime.timezone.utc)
1187 """Update timestamp columns to be naive datetime type in returned
1190 AP pipeline code expects DataFrames to contain naive datetime columns,
1191 while Postgres queries return timezone-aware type. This method converts
1192 those columns to naive datetime in UTC timezone.
1195 columns = [column
for column, dtype
in df.dtypes.items()
if isinstance(dtype, pandas.DatetimeTZDtype)]
1196 for column
in columns:
1198 df[column] = df[column].dt.tz_convert(
None)
__init__(self, ApdbSqlConfig config)
ApdbSqlConfig init_database(cls, str db_url, *, str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, tuple[str, str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
pandas.DataFrame getSSObjects(self)
pandas.DataFrame|None getDiaSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
str metadataReplicaVersionKey
pandas.DataFrame _fix_result_timestamps(self, pandas.DataFrame df)
pandas.DataFrame|None getDiaForcedSources(self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
None _storeDiaForcedSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
pandas.DataFrame getDiaObjects(self, Region region)
str _update_sqlite_url(cls, str url_string)
VersionTuple apdbImplementationVersion(cls)
None _versionCheck(self, ApdbMetadataSql metadata)
None _storeReplicaChunk(self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
pandas.DataFrame _add_spatial_index(self, pandas.DataFrame df)
sqlalchemy.engine.Engine _makeEngine(cls, ApdbSqlConfig config, *, bool create)
str metadataCodeVersionKey
pandas.DataFrame _getDiaSourcesInRegion(self, Region region, astropy.time.Time visit_time)
sqlalchemy.engine.Engine _engine
pandas.DataFrame _fix_input_timestamps(self, pandas.DataFrame df)
sqlalchemy.engine.URL|str _connection_url(cls, str config_url, *, bool create)
ApdbMetadata metadata(self)
None _makeSchema(cls, ApdbConfig config, bool drop=False)
pandas.DataFrame _getDiaSourcesByIDs(self, list[int] object_ids, astropy.time.Time visit_time)
None reassignDiaSources(self, Mapping[int, int] idMap)
int countUnassociatedObjects(self)
list[tuple[int, int]] _htm_indices(self, Region region)
bool containsVisitDetector(self, int visit, int detector, Region region, astropy.time.Time visit_time)
dict[str, int] tableRowCount(self)
pandas.DataFrame _getSourcesByIDs(self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
ApdbSqlReplica get_replica(self)
None store(self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
str metadataSchemaVersionKey
None _storeDiaObjects(self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
Timer _timer(self, str name, *, Mapping[str, str|int]|None tags=None)
Table|None tableDef(self, ApdbTables table)
None storeSSObjects(self, pandas.DataFrame objects)
None _storeDiaSources(self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
sql.ColumnElement _filterRegion(self, sqlalchemy.schema.Table table, Region region)
HtmPixelization provides HTM indexing of points and regions.
UnitVector3d is a unit vector in ℝ³ with components stored in double precision.
float _make_midpointMjdTai_start(astropy.time.Time visit_time, int months)
None _onSqlite3Connect(sqlite3.Connection dbapiConnection, sqlalchemy.pool._ConnectionRecord connectionRecord)
pandas.DataFrame _coerce_uint64(pandas.DataFrame df)