LSST Applications g180d380827+6621f76652,g2079a07aa2+86d27d4dc4,g2305ad1205+f5a9e323a1,g2bbee38e9b+c6a8a0fb72,g337abbeb29+c6a8a0fb72,g33d1c0ed96+c6a8a0fb72,g3a166c0a6a+c6a8a0fb72,g3ddfee87b4+9a10e1fe7b,g48712c4677+c9a099281a,g487adcacf7+f2e03ea30b,g50ff169b8f+96c6868917,g52b1c1532d+585e252eca,g591dd9f2cf+aead732c78,g64a986408d+eddffb812c,g858d7b2824+eddffb812c,g864b0138d7+aa38e45daa,g974c55ee3d+f37bf00e57,g99cad8db69+119519a52d,g9c22b2923f+e2510deafe,g9ddcbc5298+9a081db1e4,ga1e77700b3+03d07e1c1f,gb0e22166c9+60f28cb32d,gb23b769143+eddffb812c,gba4ed39666+c2a2e4ac27,gbb8dafda3b+27317ec8e9,gbd998247f1+585e252eca,gc120e1dc64+5817c176a8,gc28159a63d+c6a8a0fb72,gc3e9b769f7+6707aea8b4,gcf0d15dbbd+9a10e1fe7b,gdaeeff99f8+f9a426f77a,ge6526c86ff+6a2e01d432,ge79ae78c31+c6a8a0fb72,gee10cc3b42+585e252eca,gff1a9f87cc+eddffb812c,v27.0.0.rc1
LSST Data Management Base Package
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Protected Attributes | Static Protected Attributes | List of all members
lsst.dax.apdb.sql.apdbSql.ApdbSql Class Reference
Inheritance diagram for lsst.dax.apdb.sql.apdbSql.ApdbSql:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

 __init__ (self, ApdbSqlConfig config)
 
VersionTuple apdbImplementationVersion (cls)
 
ApdbSqlConfig init_database (cls, str db_url, *str|None schema_file=None, str|None schema_name=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool use_insert_id=False, int|None connection_timeout=None, str|None dia_object_index=None, int|None htm_level=None, str|None htm_index_column=None, list[str]|None ra_dec_columns=None, str|None prefix=None, str|None namespace=None, bool drop=False)
 
VersionTuple apdbSchemaVersion (self)
 
ApdbSqlReplica get_replica (self)
 
dict[str, int] tableRowCount (self)
 
Table|None tableDef (self, ApdbTables table)
 
pandas.DataFrame getDiaObjects (self, Region region)
 
pandas.DataFrame|None getDiaSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame|None getDiaForcedSources (self, Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time)
 
bool containsVisitDetector (self, int visit, int detector)
 
bool containsCcdVisit (self, int ccdVisitId)
 
pandas.DataFrame getSSObjects (self)
 
None store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None)
 
None storeSSObjects (self, pandas.DataFrame objects)
 
None reassignDiaSources (self, Mapping[int, int] idMap)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ApdbMetadata metadata (self)
 

Public Attributes

 config
 
 pixelator
 
 metadataSchemaVersionKey
 
 metadataCodeVersionKey
 
 metadataReplicaVersionKey
 
 metadataConfigKey
 

Static Public Attributes

 ConfigClass = ApdbSqlConfig
 
str metadataSchemaVersionKey = "version:schema"
 
str metadataCodeVersionKey = "version:ApdbSql"
 
str metadataReplicaVersionKey = "version:ApdbSqlReplica"
 
str metadataConfigKey = "config:apdb-sql.json"
 

Protected Member Functions

Timer _timer (self, str name, *Mapping[str, str|int]|None tags=None)
 
sqlalchemy.engine.Engine _makeEngine (cls, ApdbSqlConfig config)
 
None _versionCheck (self, ApdbMetadataSql metadata)
 
None _makeSchema (cls, ApdbConfig config, bool drop=False)
 
pandas.DataFrame _getDiaSourcesInRegion (self, Region region, astropy.time.Time visit_time)
 
pandas.DataFrame _getDiaSourcesByIDs (self, list[int] object_ids, astropy.time.Time visit_time)
 
pandas.DataFrame _getSourcesByIDs (self, ApdbTables table_enum, list[int] object_ids, float midpointMjdTai_start)
 
None _storeReplicaChunk (self, ReplicaChunk replica_chunk, astropy.time.Time visit_time, sqlalchemy.engine.Connection connection)
 
None _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
None _storeDiaForcedSources (self, pandas.DataFrame sources, ReplicaChunk|None replica_chunk, sqlalchemy.engine.Connection connection)
 
list[tuple[int, int]] _htm_indices (self, Region region)
 
sql.ColumnElement _filterRegion (self, sqlalchemy.schema.Table table, Region region)
 
pandas.DataFrame _add_obj_htm_index (self, pandas.DataFrame df)
 
pandas.DataFrame _add_src_htm_index (self, pandas.DataFrame sources, pandas.DataFrame objs)
 

Protected Attributes

 _engine
 
 _metadata
 
 _schema
 
 _timer_args
 

Static Protected Attributes

tuple _frozen_parameters
 

Detailed Description

Implementation of APDB interface based on SQL database.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbSqlConfig` configuration class. For an example of different
configurations check ``config/`` folder.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 179 of file apdbSql.py.

Constructor & Destructor Documentation

◆ __init__()

lsst.dax.apdb.sql.apdbSql.ApdbSql.__init__ ( self,
ApdbSqlConfig config )

Definition at line 215 of file apdbSql.py.

215 def __init__(self, config: ApdbSqlConfig):
216 self._engine = self._makeEngine(config)
217
218 sa_metadata = sqlalchemy.MetaData(schema=config.namespace)
219 meta_table_name = ApdbTables.metadata.table_name(prefix=config.prefix)
220 meta_table: sqlalchemy.schema.Table | None = None
221 with suppress(sqlalchemy.exc.NoSuchTableError):
222 meta_table = sqlalchemy.schema.Table(meta_table_name, sa_metadata, autoload_with=self._engine)
223
224 self._metadata = ApdbMetadataSql(self._engine, meta_table)
225
226 # Read frozen config from metadata.
227 config_json = self._metadata.get(self.metadataConfigKey)
228 if config_json is not None:
229 # Update config from metadata.
230 freezer = ApdbConfigFreezer[ApdbSqlConfig](self._frozen_parameters)
231 self.config = freezer.update(config, config_json)
232 else:
233 self.config = config
234 self.config.validate()
235
236 self._schema = ApdbSqlSchema(
237 engine=self._engine,
238 dia_object_index=self.config.dia_object_index,
239 schema_file=self.config.schema_file,
240 schema_name=self.config.schema_name,
241 prefix=self.config.prefix,
242 namespace=self.config.namespace,
243 htm_index_column=self.config.htm_index_column,
244 enable_replica=self.config.use_insert_id,
245 )
246
247 if self._metadata.table_exists():
248 self._versionCheck(self._metadata)
249
250 self.pixelator = HtmPixelization(self.config.htm_level)
251
252 _LOG.debug("APDB Configuration:")
253 _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
254 _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
255 _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
256 _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
257 _LOG.debug(" schema_file: %s", self.config.schema_file)
258 _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
259 _LOG.debug(" schema prefix: %s", self.config.prefix)
260
261 self._timer_args: list[MonAgent | logging.Logger] = [_MON]
262 if self.config.timer:
263 self._timer_args.append(_LOG)
264

Member Function Documentation

◆ _add_obj_htm_index()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._add_obj_htm_index ( self,
pandas.DataFrame df )
protected
Calculate HTM index for each record and add it to a DataFrame.

Notes
-----
This overrides any existing column in a DataFrame with the same name
(pixelId). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1121 of file apdbSql.py.

1121 def _add_obj_htm_index(self, df: pandas.DataFrame) -> pandas.DataFrame:
1122 """Calculate HTM index for each record and add it to a DataFrame.
1123
1124 Notes
1125 -----
1126 This overrides any existing column in a DataFrame with the same name
1127 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1128 returned.
1129 """
1130 # calculate HTM index for every DiaObject
1131 htm_index = np.zeros(df.shape[0], dtype=np.int64)
1132 ra_col, dec_col = self.config.ra_dec_columns
1133 for i, (ra, dec) in enumerate(zip(df[ra_col], df[dec_col])):
1134 uv3d = UnitVector3d(LonLat.fromDegrees(ra, dec))
1135 idx = self.pixelator.index(uv3d)
1136 htm_index[i] = idx
1137 df = df.copy()
1138 df[self.config.htm_index_column] = htm_index
1139 return df
1140

◆ _add_src_htm_index()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._add_src_htm_index ( self,
pandas.DataFrame sources,
pandas.DataFrame objs )
protected
Add pixelId column to DiaSource catalog.

Notes
-----
This method copies pixelId value from a matching DiaObject record.
DiaObject catalog needs to have a pixelId column filled by
``_add_obj_htm_index`` method and DiaSource records need to be
associated to DiaObjects via ``diaObjectId`` column.

This overrides any existing column in a DataFrame with the same name
(pixelId). Original DataFrame is not changed, copy of a DataFrame is
returned.

Definition at line 1141 of file apdbSql.py.

1141 def _add_src_htm_index(self, sources: pandas.DataFrame, objs: pandas.DataFrame) -> pandas.DataFrame:
1142 """Add pixelId column to DiaSource catalog.
1143
1144 Notes
1145 -----
1146 This method copies pixelId value from a matching DiaObject record.
1147 DiaObject catalog needs to have a pixelId column filled by
1148 ``_add_obj_htm_index`` method and DiaSource records need to be
1149 associated to DiaObjects via ``diaObjectId`` column.
1150
1151 This overrides any existing column in a DataFrame with the same name
1152 (pixelId). Original DataFrame is not changed, copy of a DataFrame is
1153 returned.
1154 """
1155 pixel_id_map: dict[int, int] = {
1156 diaObjectId: pixelId
1157 for diaObjectId, pixelId in zip(objs["diaObjectId"], objs[self.config.htm_index_column])
1158 }
1159 # DiaSources associated with SolarSystemObjects do not have an
1160 # associated DiaObject hence we skip them and set their htmIndex
1161 # value to 0.
1162 pixel_id_map[0] = 0
1163 htm_index = np.zeros(sources.shape[0], dtype=np.int64)
1164 for i, diaObjId in enumerate(sources["diaObjectId"]):
1165 htm_index[i] = pixel_id_map[diaObjId]
1166 sources = sources.copy()
1167 sources[self.config.htm_index_column] = htm_index
1168 return sources

◆ _filterRegion()

sql.ColumnElement lsst.dax.apdb.sql.apdbSql.ApdbSql._filterRegion ( self,
sqlalchemy.schema.Table table,
Region region )
protected
Make SQLAlchemy expression for selecting records in a region.

Definition at line 1107 of file apdbSql.py.

1107 def _filterRegion(self, table: sqlalchemy.schema.Table, region: Region) -> sql.ColumnElement:
1108 """Make SQLAlchemy expression for selecting records in a region."""
1109 htm_index_column = table.columns[self.config.htm_index_column]
1110 exprlist = []
1111 pixel_ranges = self._htm_indices(region)
1112 for low, upper in pixel_ranges:
1113 upper -= 1
1114 if low == upper:
1115 exprlist.append(htm_index_column == low)
1116 else:
1117 exprlist.append(sql.expression.between(htm_index_column, low, upper))
1118
1119 return sql.expression.or_(*exprlist)
1120

◆ _getDiaSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesByIDs ( self,
list[int] object_ids,
astropy.time.Time visit_time )
protected
Return catalog of DiaSource instances given set of DiaObject IDs.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog contaning DiaSource records.

Definition at line 782 of file apdbSql.py.

782 def _getDiaSourcesByIDs(self, object_ids: list[int], visit_time: astropy.time.Time) -> pandas.DataFrame:
783 """Return catalog of DiaSource instances given set of DiaObject IDs.
784
785 Parameters
786 ----------
787 object_ids :
788 Collection of DiaObject IDs
789 visit_time : `astropy.time.Time`
790 Time of the current visit.
791
792 Returns
793 -------
794 catalog : `pandas.DataFrame`
795 Catalog contaning DiaSource records.
796 """
797 # TODO: DateTime.MJD must be consistent with code in ap_association,
798 # alternatively we can fill midpointMjdTai ourselves in store()
799 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
800 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
801
802 with self._timer("select_time", tags={"table": "DiaSource"}):
803 sources = self._getSourcesByIDs(ApdbTables.DiaSource, object_ids, midpointMjdTai_start)
804
805 _LOG.debug("found %s DiaSources", len(sources))
806 return sources
807

◆ _getDiaSourcesInRegion()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getDiaSourcesInRegion ( self,
Region region,
astropy.time.Time visit_time )
protected
Return catalog of DiaSource instances from given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaSource records.

Definition at line 746 of file apdbSql.py.

746 def _getDiaSourcesInRegion(self, region: Region, visit_time: astropy.time.Time) -> pandas.DataFrame:
747 """Return catalog of DiaSource instances from given region.
748
749 Parameters
750 ----------
751 region : `lsst.sphgeom.Region`
752 Region to search for DIASources.
753 visit_time : `astropy.time.Time`
754 Time of the current visit.
755
756 Returns
757 -------
758 catalog : `pandas.DataFrame`
759 Catalog containing DiaSource records.
760 """
761 # TODO: DateTime.MJD must be consistent with code in ap_association,
762 # alternatively we can fill midpointMjdTai ourselves in store()
763 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_sources_months)
764 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
765
766 table = self._schema.get_table(ApdbTables.DiaSource)
767 columns = self._schema.get_apdb_columns(ApdbTables.DiaSource)
768 query = sql.select(*columns)
769
770 # build selection
771 time_filter = table.columns["midpointMjdTai"] > midpointMjdTai_start
772 where = sql.expression.and_(self._filterRegion(table, region), time_filter)
773 query = query.where(where)
774
775 # execute select
776 with self._timer("DiaSource_select_time", tags={"table": "DiaSource"}):
777 with self._engine.begin() as conn:
778 sources = pandas.read_sql_query(query, conn)
779 _LOG.debug("found %s DiaSources", len(sources))
780 return sources
781

◆ _getSourcesByIDs()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql._getSourcesByIDs ( self,
ApdbTables table_enum,
list[int] object_ids,
float midpointMjdTai_start )
protected
Return catalog of DiaSource or DiaForcedSource instances given set
of DiaObject IDs.

Parameters
----------
table : `sqlalchemy.schema.Table`
    Database table.
object_ids :
    Collection of DiaObject IDs
midpointMjdTai_start : `float`
    Earliest midpointMjdTai to retrieve.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog contaning DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 808 of file apdbSql.py.

810 ) -> pandas.DataFrame:
811 """Return catalog of DiaSource or DiaForcedSource instances given set
812 of DiaObject IDs.
813
814 Parameters
815 ----------
816 table : `sqlalchemy.schema.Table`
817 Database table.
818 object_ids :
819 Collection of DiaObject IDs
820 midpointMjdTai_start : `float`
821 Earliest midpointMjdTai to retrieve.
822
823 Returns
824 -------
825 catalog : `pandas.DataFrame`
826 Catalog contaning DiaSource records. `None` is returned if
827 ``read_sources_months`` configuration parameter is set to 0 or
828 when ``object_ids`` is empty.
829 """
830 table = self._schema.get_table(table_enum)
831 columns = self._schema.get_apdb_columns(table_enum)
832
833 sources: pandas.DataFrame | None = None
834 if len(object_ids) <= 0:
835 _LOG.debug("ID list is empty, just fetch empty result")
836 query = sql.select(*columns).where(sql.literal(False))
837 with self._engine.begin() as conn:
838 sources = pandas.read_sql_query(query, conn)
839 else:
840 data_frames: list[pandas.DataFrame] = []
841 for ids in chunk_iterable(sorted(object_ids), 1000):
842 query = sql.select(*columns)
843
844 # Some types like np.int64 can cause issues with
845 # sqlalchemy, convert them to int.
846 int_ids = [int(oid) for oid in ids]
847
848 # select by object id
849 query = query.where(
850 sql.expression.and_(
851 table.columns["diaObjectId"].in_(int_ids),
852 table.columns["midpointMjdTai"] > midpointMjdTai_start,
853 )
854 )
855
856 # execute select
857 with self._engine.begin() as conn:
858 data_frames.append(pandas.read_sql_query(query, conn))
859
860 if len(data_frames) == 1:
861 sources = data_frames[0]
862 else:
863 sources = pandas.concat(data_frames)
864 assert sources is not None, "Catalog cannot be None"
865 return sources
866

◆ _htm_indices()

list[tuple[int, int]] lsst.dax.apdb.sql.apdbSql.ApdbSql._htm_indices ( self,
Region region )
protected
Generate a set of HTM indices covering specified region.

Parameters
----------
region: `sphgeom.Region`
    Region that needs to be indexed.

Returns
-------
Sequence of ranges, range is a tuple (minHtmID, maxHtmID).

Definition at line 1090 of file apdbSql.py.

1090 def _htm_indices(self, region: Region) -> list[tuple[int, int]]:
1091 """Generate a set of HTM indices covering specified region.
1092
1093 Parameters
1094 ----------
1095 region: `sphgeom.Region`
1096 Region that needs to be indexed.
1097
1098 Returns
1099 -------
1100 Sequence of ranges, range is a tuple (minHtmID, maxHtmID).
1101 """
1102 _LOG.debug("region: %s", region)
1103 indices = self.pixelator.envelope(region, self.config.htm_max_ranges)
1104
1105 return indices.ranges()
1106

◆ _makeEngine()

sqlalchemy.engine.Engine lsst.dax.apdb.sql.apdbSql.ApdbSql._makeEngine ( cls,
ApdbSqlConfig config )
protected
Make SQLALchemy engine based on configured parameters.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 270 of file apdbSql.py.

270 def _makeEngine(cls, config: ApdbSqlConfig) -> sqlalchemy.engine.Engine:
271 """Make SQLALchemy engine based on configured parameters.
272
273 Parameters
274 ----------
275 config : `ApdbSqlConfig`
276 Configuration object.
277 """
278 # engine is reused between multiple processes, make sure that we don't
279 # share connections by disabling pool (by using NullPool class)
280 kw: MutableMapping[str, Any] = dict(echo=config.sql_echo)
281 conn_args: dict[str, Any] = dict()
282 if not config.connection_pool:
283 kw.update(poolclass=NullPool)
284 if config.isolation_level is not None:
285 kw.update(isolation_level=config.isolation_level)
286 elif config.db_url.startswith("sqlite"): # type: ignore
287 # Use READ_UNCOMMITTED as default value for sqlite.
288 kw.update(isolation_level="READ_UNCOMMITTED")
289 if config.connection_timeout is not None:
290 if config.db_url.startswith("sqlite"):
291 conn_args.update(timeout=config.connection_timeout)
292 elif config.db_url.startswith(("postgresql", "mysql")):
293 conn_args.update(connect_timeout=config.connection_timeout)
294 kw.update(connect_args=conn_args)
295 engine = sqlalchemy.create_engine(config.db_url, **kw)
296
297 if engine.dialect.name == "sqlite":
298 # Need to enable foreign keys on every new connection.
299 sqlalchemy.event.listen(engine, "connect", _onSqlite3Connect)
300
301 return engine
302

◆ _makeSchema()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._makeSchema ( cls,
ApdbConfig config,
bool drop = False )
protected

Definition at line 475 of file apdbSql.py.

475 def _makeSchema(cls, config: ApdbConfig, drop: bool = False) -> None:
476 # docstring is inherited from a base class
477
478 if not isinstance(config, ApdbSqlConfig):
479 raise TypeError(f"Unexpected type of configuration object: {type(config)}")
480
481 engine = cls._makeEngine(config)
482
483 # Ask schema class to create all tables.
484 schema = ApdbSqlSchema(
485 engine=engine,
486 dia_object_index=config.dia_object_index,
487 schema_file=config.schema_file,
488 schema_name=config.schema_name,
489 prefix=config.prefix,
490 namespace=config.namespace,
491 htm_index_column=config.htm_index_column,
492 enable_replica=config.use_insert_id,
493 )
494 schema.makeSchema(drop=drop)
495
496 # Need metadata table to store few items in it, if table exists.
497 meta_table: sqlalchemy.schema.Table | None = None
498 with suppress(ValueError):
499 meta_table = schema.get_table(ApdbTables.metadata)
500
501 apdb_meta = ApdbMetadataSql(engine, meta_table)
502 if apdb_meta.table_exists():
503 # Fill version numbers, overwrite if they are already there.
504 apdb_meta.set(cls.metadataSchemaVersionKey, str(schema.schemaVersion()), force=True)
505 apdb_meta.set(cls.metadataCodeVersionKey, str(cls.apdbImplementationVersion()), force=True)
506 if config.use_insert_id:
507 # Only store replica code version if replcia is enabled.
508 apdb_meta.set(
509 cls.metadataReplicaVersionKey,
510 str(ApdbSqlReplica.apdbReplicaImplementationVersion()),
511 force=True,
512 )
513
514 # Store frozen part of a configuration in metadata.
515 freezer = ApdbConfigFreezer[ApdbSqlConfig](cls._frozen_parameters)
516 apdb_meta.set(cls.metadataConfigKey, freezer.to_json(config), force=True)
517

◆ _storeDiaForcedSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaForcedSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store a set of DiaForcedSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaForcedSource records

Definition at line 1054 of file apdbSql.py.

1059 ) -> None:
1060 """Store a set of DiaForcedSources from current visit.
1061
1062 Parameters
1063 ----------
1064 sources : `pandas.DataFrame`
1065 Catalog containing DiaForcedSource records
1066 """
1067 table = self._schema.get_table(ApdbTables.DiaForcedSource)
1068
1069 # Insert replica data
1070 replica_data: list[dict] = []
1071 replica_stmt: Any = None
1072 replica_table_name = ""
1073 if replica_chunk is not None:
1074 pk_names = [column.name for column in table.primary_key]
1075 replica_data = sources[pk_names].to_dict("records")
1076 for row in replica_data:
1077 row["apdb_replica_chunk"] = replica_chunk.id
1078 replica_table = self._schema.get_table(ExtraTables.DiaForcedSourceChunks)
1079 replica_table_name = replica_table.name
1080 replica_stmt = replica_table.insert()
1081
1082 # everything to be done in single transaction
1083 with self._timer("insert_time", tags={"table": table.name}):
1084 sources = _coerce_uint64(sources)
1085 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1086 if replica_stmt is not None:
1087 with self._timer("insert_time", tags={"table": replica_table_name}):
1088 connection.execute(replica_stmt, replica_data)
1089

◆ _storeDiaObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaObjects ( self,
pandas.DataFrame objs,
astropy.time.Time visit_time,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaObjects from current visit.

Parameters
----------
objs : `pandas.DataFrame`
    Catalog with DiaObject records.
visit_time : `astropy.time.Time`
    Time of the visit.
replica_chunk : `ReplicaChunk`
    Insert identifier.

Definition at line 891 of file apdbSql.py.

897 ) -> None:
898 """Store catalog of DiaObjects from current visit.
899
900 Parameters
901 ----------
902 objs : `pandas.DataFrame`
903 Catalog with DiaObject records.
904 visit_time : `astropy.time.Time`
905 Time of the visit.
906 replica_chunk : `ReplicaChunk`
907 Insert identifier.
908 """
909 if len(objs) == 0:
910 _LOG.debug("No objects to write to database.")
911 return
912
913 # Some types like np.int64 can cause issues with sqlalchemy, convert
914 # them to int.
915 ids = sorted(int(oid) for oid in objs["diaObjectId"])
916 _LOG.debug("first object ID: %d", ids[0])
917
918 # TODO: Need to verify that we are using correct scale here for
919 # DATETIME representation (see DM-31996).
920 dt = visit_time.datetime
921
922 # everything to be done in single transaction
923 if self.config.dia_object_index == "last_object_table":
924 # Insert and replace all records in LAST table.
925 table = self._schema.get_table(ApdbTables.DiaObjectLast)
926
927 # Drop the previous objects (pandas cannot upsert).
928 query = table.delete().where(table.columns["diaObjectId"].in_(ids))
929
930 with self._timer("delete_time", tags={"table": table.name}):
931 res = connection.execute(query)
932 _LOG.debug("deleted %s objects", res.rowcount)
933
934 # DiaObjectLast is a subset of DiaObject, strip missing columns
935 last_column_names = [column.name for column in table.columns]
936 last_objs = objs[last_column_names]
937 last_objs = _coerce_uint64(last_objs)
938
939 if "lastNonForcedSource" in last_objs.columns:
940 # lastNonForcedSource is defined NOT NULL, fill it with visit
941 # time just in case.
942 last_objs["lastNonForcedSource"].fillna(dt, inplace=True)
943 else:
944 extra_column = pandas.Series([dt] * len(objs), name="lastNonForcedSource")
945 last_objs.set_index(extra_column.index, inplace=True)
946 last_objs = pandas.concat([last_objs, extra_column], axis="columns")
947
948 with self._timer("insert_time", tags={"table": "DiaObjectLast"}):
949 last_objs.to_sql(
950 table.name,
951 connection,
952 if_exists="append",
953 index=False,
954 schema=table.schema,
955 )
956 else:
957 # truncate existing validity intervals
958 table = self._schema.get_table(ApdbTables.DiaObject)
959
960 update = (
961 table.update()
962 .values(validityEnd=dt)
963 .where(
964 sql.expression.and_(
965 table.columns["diaObjectId"].in_(ids),
966 table.columns["validityEnd"].is_(None),
967 )
968 )
969 )
970
971 with self._timer("truncate_time", tags={"table": table.name}):
972 res = connection.execute(update)
973 _LOG.debug("truncated %s intervals", res.rowcount)
974
975 objs = _coerce_uint64(objs)
976
977 # Fill additional columns
978 extra_columns: list[pandas.Series] = []
979 if "validityStart" in objs.columns:
980 objs["validityStart"] = dt
981 else:
982 extra_columns.append(pandas.Series([dt] * len(objs), name="validityStart"))
983 if "validityEnd" in objs.columns:
984 objs["validityEnd"] = None
985 else:
986 extra_columns.append(pandas.Series([None] * len(objs), name="validityEnd"))
987 if "lastNonForcedSource" in objs.columns:
988 # lastNonForcedSource is defined NOT NULL, fill it with visit time
989 # just in case.
990 objs["lastNonForcedSource"].fillna(dt, inplace=True)
991 else:
992 extra_columns.append(pandas.Series([dt] * len(objs), name="lastNonForcedSource"))
993 if extra_columns:
994 objs.set_index(extra_columns[0].index, inplace=True)
995 objs = pandas.concat([objs] + extra_columns, axis="columns")
996
997 # Insert replica data
998 table = self._schema.get_table(ApdbTables.DiaObject)
999 replica_data: list[dict] = []
1000 replica_stmt: Any = None
1001 replica_table_name = ""
1002 if replica_chunk is not None:
1003 pk_names = [column.name for column in table.primary_key]
1004 replica_data = objs[pk_names].to_dict("records")
1005 for row in replica_data:
1006 row["apdb_replica_chunk"] = replica_chunk.id
1007 replica_table = self._schema.get_table(ExtraTables.DiaObjectChunks)
1008 replica_table_name = replica_table.name
1009 replica_stmt = replica_table.insert()
1010
1011 # insert new versions
1012 with self._timer("insert_time", tags={"table": table.name}):
1013 objs.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1014 if replica_stmt is not None:
1015 with self._timer("insert_time", tags={"table": replica_table_name}):
1016 connection.execute(replica_stmt, replica_data)
1017

◆ _storeDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeDiaSources ( self,
pandas.DataFrame sources,
ReplicaChunk | None replica_chunk,
sqlalchemy.engine.Connection connection )
protected
Store catalog of DiaSources from current visit.

Parameters
----------
sources : `pandas.DataFrame`
    Catalog containing DiaSource records

Definition at line 1018 of file apdbSql.py.

1023 ) -> None:
1024 """Store catalog of DiaSources from current visit.
1025
1026 Parameters
1027 ----------
1028 sources : `pandas.DataFrame`
1029 Catalog containing DiaSource records
1030 """
1031 table = self._schema.get_table(ApdbTables.DiaSource)
1032
1033 # Insert replica data
1034 replica_data: list[dict] = []
1035 replica_stmt: Any = None
1036 replica_table_name = ""
1037 if replica_chunk is not None:
1038 pk_names = [column.name for column in table.primary_key]
1039 replica_data = sources[pk_names].to_dict("records")
1040 for row in replica_data:
1041 row["apdb_replica_chunk"] = replica_chunk.id
1042 replica_table = self._schema.get_table(ExtraTables.DiaSourceChunks)
1043 replica_table_name = replica_table.name
1044 replica_stmt = replica_table.insert()
1045
1046 # everything to be done in single transaction
1047 with self._timer("insert_time", tags={"table": table.name}):
1048 sources = _coerce_uint64(sources)
1049 sources.to_sql(table.name, connection, if_exists="append", index=False, schema=table.schema)
1050 if replica_stmt is not None:
1051 with self._timer("replica_insert_time", tags={"table": replica_table_name}):
1052 connection.execute(replica_stmt, replica_data)
1053

◆ _storeReplicaChunk()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._storeReplicaChunk ( self,
ReplicaChunk replica_chunk,
astropy.time.Time visit_time,
sqlalchemy.engine.Connection connection )
protected

Definition at line 867 of file apdbSql.py.

872 ) -> None:
873 dt = visit_time.datetime
874
875 table = self._schema.get_table(ExtraTables.ApdbReplicaChunks)
876
877 # We need UPSERT which is dialect-specific construct
878 values = {"last_update_time": dt, "unique_id": replica_chunk.unique_id}
879 row = {"apdb_replica_chunk": replica_chunk.id} | values
880 if connection.dialect.name == "sqlite":
881 insert_sqlite = sqlalchemy.dialects.sqlite.insert(table)
882 insert_sqlite = insert_sqlite.on_conflict_do_update(index_elements=table.primary_key, set_=values)
883 connection.execute(insert_sqlite, row)
884 elif connection.dialect.name == "postgresql":
885 insert_pg = sqlalchemy.dialects.postgresql.dml.insert(table)
886 insert_pg = insert_pg.on_conflict_do_update(constraint=table.primary_key, set_=values)
887 connection.execute(insert_pg, row)
888 else:
889 raise TypeError(f"Unsupported dialect {connection.dialect.name} for upsert.")
890

◆ _timer()

Timer lsst.dax.apdb.sql.apdbSql.ApdbSql._timer ( self,
str name,
*Mapping[str, str | int] | None tags = None )
protected
Create `Timer` instance given its name.

Definition at line 265 of file apdbSql.py.

265 def _timer(self, name: str, *, tags: Mapping[str, str | int] | None = None) -> Timer:
266 """Create `Timer` instance given its name."""
267 return Timer(name, *self._timer_args, tags=tags)
268

◆ _versionCheck()

None lsst.dax.apdb.sql.apdbSql.ApdbSql._versionCheck ( self,
ApdbMetadataSql metadata )
protected
Check schema version compatibility.

Definition at line 303 of file apdbSql.py.

303 def _versionCheck(self, metadata: ApdbMetadataSql) -> None:
304 """Check schema version compatibility."""
305
306 def _get_version(key: str, default: VersionTuple) -> VersionTuple:
307 """Retrieve version number from given metadata key."""
308 if metadata.table_exists():
309 version_str = metadata.get(key)
310 if version_str is None:
311 # Should not happen with existing metadata table.
312 raise RuntimeError(f"Version key {key!r} does not exist in metadata table.")
313 return VersionTuple.fromString(version_str)
314 return default
315
316 # For old databases where metadata table does not exist we assume that
317 # version of both code and schema is 0.1.0.
318 initial_version = VersionTuple(0, 1, 0)
319 db_schema_version = _get_version(self.metadataSchemaVersionKey, initial_version)
320 db_code_version = _get_version(self.metadataCodeVersionKey, initial_version)
321
322 # For now there is no way to make read-only APDB instances, assume that
323 # any access can do updates.
324 if not self._schema.schemaVersion().checkCompatibility(db_schema_version, True):
325 raise IncompatibleVersionError(
326 f"Configured schema version {self._schema.schemaVersion()} "
327 f"is not compatible with database version {db_schema_version}"
328 )
329 if not self.apdbImplementationVersion().checkCompatibility(db_code_version, True):
330 raise IncompatibleVersionError(
331 f"Current code version {self.apdbImplementationVersion()} "
332 f"is not compatible with database version {db_code_version}"
333 )
334
335 # Check replica code version only if replica is enabled.
336 if self._schema.has_replica_chunks:
337 db_replica_version = _get_version(self.metadataReplicaVersionKey, initial_version)
338 code_replica_version = ApdbSqlReplica.apdbReplicaImplementationVersion()
339 if not code_replica_version.checkCompatibility(db_replica_version, True):
340 raise IncompatibleVersionError(
341 f"Current replication code version {code_replica_version} "
342 f"is not compatible with database version {db_replica_version}"
343 )
344

◆ apdbImplementationVersion()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql.apdbImplementationVersion ( cls)
Return version number for current APDB implementation.

Returns
-------
version : `VersionTuple`
    Version of the code defined in implementation class.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 346 of file apdbSql.py.

346 def apdbImplementationVersion(cls) -> VersionTuple:
347 # Docstring inherited from base class.
348 return VERSION
349

◆ apdbSchemaVersion()

VersionTuple lsst.dax.apdb.sql.apdbSql.ApdbSql.apdbSchemaVersion ( self)
Return schema version number as defined in config file.

Returns
-------
version : `VersionTuple`
    Version of the schema defined in schema config file.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 438 of file apdbSql.py.

438 def apdbSchemaVersion(self) -> VersionTuple:
439 # Docstring inherited from base class.
440 return self._schema.schemaVersion()
441

◆ containsCcdVisit()

bool lsst.dax.apdb.sql.apdbSql.ApdbSql.containsCcdVisit ( self,
int ccdVisitId )
Test whether data for a given visit-detector is present in the APDB.

This method is a placeholder until `Apdb.containsVisitDetector` can
be implemented.

Parameters
----------
ccdVisitId : `int`
    The packed ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaSource records exist for the specified
    observation, `False` otherwise.

Definition at line 592 of file apdbSql.py.

592 def containsCcdVisit(self, ccdVisitId: int) -> bool:
593 """Test whether data for a given visit-detector is present in the APDB.
594
595 This method is a placeholder until `Apdb.containsVisitDetector` can
596 be implemented.
597
598 Parameters
599 ----------
600 ccdVisitId : `int`
601 The packed ID of the visit-detector to search for.
602
603 Returns
604 -------
605 present : `bool`
606 `True` if some DiaSource records exist for the specified
607 observation, `False` otherwise.
608 """
609 # TODO: remove this method in favor of containsVisitDetector on either
610 # DM-41671 or a ticket that removes ccdVisitId from these tables
611 src_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaSource)
612 frcsrc_table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaForcedSource)
613 # Query should load only one leaf page of the index
614 query1 = sql.select(src_table.c.ccdVisitId).filter_by(ccdVisitId=ccdVisitId).limit(1)
615 # Backup query in case an image was processed but had no diaSources
616 query2 = sql.select(frcsrc_table.c.ccdVisitId).filter_by(ccdVisitId=ccdVisitId).limit(1)
617
618 with self._engine.begin() as conn:
619 result = conn.execute(query1).scalar_one_or_none()
620 if result is not None:
621 return True
622 else:
623 result = conn.execute(query2).scalar_one_or_none()
624 return result is not None
625

◆ containsVisitDetector()

bool lsst.dax.apdb.sql.apdbSql.ApdbSql.containsVisitDetector ( self,
int visit,
int detector )
Test whether data for a given visit-detector is present in the APDB.

Parameters
----------
visit, detector : `int`
    The ID of the visit-detector to search for.

Returns
-------
present : `bool`
    `True` if some DiaObject, DiaSource, or DiaForcedSource records
    exist for the specified observation, `False` otherwise.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 588 of file apdbSql.py.

588 def containsVisitDetector(self, visit: int, detector: int) -> bool:
589 # docstring is inherited from a base class
590 raise NotImplementedError()
591

◆ countUnassociatedObjects()

int lsst.dax.apdb.sql.apdbSql.ApdbSql.countUnassociatedObjects ( self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 723 of file apdbSql.py.

723 def countUnassociatedObjects(self) -> int:
724 # docstring is inherited from a base class
725
726 # Retrieve the DiaObject table.
727 table: sqlalchemy.schema.Table = self._schema.get_table(ApdbTables.DiaObject)
728
729 # Construct the sql statement.
730 stmt = sql.select(func.count()).select_from(table).where(table.c.nDiaSources == 1)
731 stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
732
733 # Return the count.
734 with self._engine.begin() as conn:
735 count = conn.execute(stmt).scalar_one()
736
737 return count
738

◆ dailyJob()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.dailyJob ( self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 719 of file apdbSql.py.

719 def dailyJob(self) -> None:
720 # docstring is inherited from a base class
721 pass
722

◆ get_replica()

ApdbSqlReplica lsst.dax.apdb.sql.apdbSql.ApdbSql.get_replica ( self)
Return `ApdbReplica` instance for this database.

Definition at line 442 of file apdbSql.py.

442 def get_replica(self) -> ApdbSqlReplica:
443 """Return `ApdbReplica` instance for this database."""
444 return ApdbSqlReplica(self._schema, self._engine)
445

◆ getDiaForcedSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaForcedSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema. If `None` then returned sources are not
    constrained. Some implementations may not support latter case.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_forced_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    May be raised by some implementations if ``object_ids`` is `None`.

Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 563 of file apdbSql.py.

565 ) -> pandas.DataFrame | None:
566 # docstring is inherited from a base class
567 if self.config.read_forced_sources_months == 0:
568 _LOG.debug("Skip DiaForceSources fetching")
569 return None
570
571 if object_ids is None:
572 # This implementation does not support region-based selection.
573 raise NotImplementedError("Region-based selection is not supported")
574
575 # TODO: DateTime.MJD must be consistent with code in ap_association,
576 # alternatively we can fill midpointMjdTai ourselves in store()
577 midpointMjdTai_start = _make_midpointMjdTai_start(visit_time, self.config.read_forced_sources_months)
578 _LOG.debug("midpointMjdTai_start = %.6f", midpointMjdTai_start)
579
580 with self._timer("select_time", tags={"table": "DiaForcedSource"}):
581 sources = self._getSourcesByIDs(
582 ApdbTables.DiaForcedSource, list(object_ids), midpointMjdTai_start
583 )
584
585 _LOG.debug("found %s DiaForcedSources", len(sources))
586 return sources
587

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaObjects ( self,
Region region )
Return catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 518 of file apdbSql.py.

518 def getDiaObjects(self, region: Region) -> pandas.DataFrame:
519 # docstring is inherited from a base class
520
521 # decide what columns we need
522 if self.config.dia_object_index == "last_object_table":
523 table_enum = ApdbTables.DiaObjectLast
524 else:
525 table_enum = ApdbTables.DiaObject
526 table = self._schema.get_table(table_enum)
527 if not self.config.dia_object_columns:
528 columns = self._schema.get_apdb_columns(table_enum)
529 else:
530 columns = [table.c[col] for col in self.config.dia_object_columns]
531 query = sql.select(*columns)
532
533 # build selection
534 query = query.where(self._filterRegion(table, region))
535
536 # select latest version of objects
537 if self.config.dia_object_index != "last_object_table":
538 query = query.where(table.c.validityEnd == None) # noqa: E711
539
540 # _LOG.debug("query: %s", query)
541
542 # execute select
543 with self._timer("select_time", tags={"table": "DiaObject"}):
544 with self._engine.begin() as conn:
545 objects = pandas.read_sql_query(query, conn)
546 _LOG.debug("found %s DiaObjects", len(objects))
547 return objects
548

◆ getDiaSources()

pandas.DataFrame | None lsst.dax.apdb.sql.apdbSql.ApdbSql.getDiaSources ( self,
Region region,
Iterable[int] | None object_ids,
astropy.time.Time visit_time )
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `astropy.time.Time`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 549 of file apdbSql.py.

551 ) -> pandas.DataFrame | None:
552 # docstring is inherited from a base class
553 if self.config.read_sources_months == 0:
554 _LOG.debug("Skip DiaSources fetching")
555 return None
556
557 if object_ids is None:
558 # region-based select
559 return self._getDiaSourcesInRegion(region, visit_time)
560 else:
561 return self._getDiaSourcesByIDs(list(object_ids), visit_time)
562

◆ getSSObjects()

pandas.DataFrame lsst.dax.apdb.sql.apdbSql.ApdbSql.getSSObjects ( self)
Return catalog of SSObject instances.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing SSObject records, all existing records are
    returned.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 626 of file apdbSql.py.

626 def getSSObjects(self) -> pandas.DataFrame:
627 # docstring is inherited from a base class
628
629 columns = self._schema.get_apdb_columns(ApdbTables.SSObject)
630 query = sql.select(*columns)
631
632 # execute select
633 with self._timer("SSObject_select_time", tags={"table": "SSObject"}):
634 with self._engine.begin() as conn:
635 objects = pandas.read_sql_query(query, conn)
636 _LOG.debug("found %s SSObjects", len(objects))
637 return objects
638

◆ init_database()

ApdbSqlConfig lsst.dax.apdb.sql.apdbSql.ApdbSql.init_database ( cls,
str db_url,
*str | None schema_file = None,
str | None schema_name = None,
int | None read_sources_months = None,
int | None read_forced_sources_months = None,
bool use_insert_id = False,
int | None connection_timeout = None,
str | None dia_object_index = None,
int | None htm_level = None,
str | None htm_index_column = None,
list[str] | None ra_dec_columns = None,
str | None prefix = None,
str | None namespace = None,
bool drop = False )
Initialize new APDB instance and make configuration object for it.

Parameters
----------
db_url : `str`
    SQLAlchemy database URL.
schema_file : `str`, optional
    Location of (YAML) configuration file with APDB schema. If not
    specified then default location will be used.
schema_name : str | None
    Name of the schema in YAML configuration file. If not specified
    then default name will be used.
read_sources_months : `int`, optional
    Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
    Number of months of history to read from DiaForcedSource.
use_insert_id : `bool`
    If True, make additional tables used for replication to PPDB.
connection_timeout : `int`, optional
    Database connection timeout in seconds.
dia_object_index : `str`, optional
    Indexing mode for DiaObject table.
htm_level : `int`, optional
    HTM indexing level.
htm_index_column : `str`, optional
    Name of a HTM index column for DiaObject and DiaSource tables.
ra_dec_columns : `list` [`str`], optional
    Names of ra/dec columns in DiaObject table.
prefix : `str`, optional
    Optional prefix for all table names.
namespace : `str`, optional
    Name of the database schema for all APDB tables. If not specified
    then default schema is used.
drop : `bool`, optional
    If `True` then drop existing tables before re-creating the schema.

Returns
-------
config : `ApdbSqlConfig`
    Resulting configuration object for a created APDB instance.

Definition at line 351 of file apdbSql.py.

368 ) -> ApdbSqlConfig:
369 """Initialize new APDB instance and make configuration object for it.
370
371 Parameters
372 ----------
373 db_url : `str`
374 SQLAlchemy database URL.
375 schema_file : `str`, optional
376 Location of (YAML) configuration file with APDB schema. If not
377 specified then default location will be used.
378 schema_name : str | None
379 Name of the schema in YAML configuration file. If not specified
380 then default name will be used.
381 read_sources_months : `int`, optional
382 Number of months of history to read from DiaSource.
383 read_forced_sources_months : `int`, optional
384 Number of months of history to read from DiaForcedSource.
385 use_insert_id : `bool`
386 If True, make additional tables used for replication to PPDB.
387 connection_timeout : `int`, optional
388 Database connection timeout in seconds.
389 dia_object_index : `str`, optional
390 Indexing mode for DiaObject table.
391 htm_level : `int`, optional
392 HTM indexing level.
393 htm_index_column : `str`, optional
394 Name of a HTM index column for DiaObject and DiaSource tables.
395 ra_dec_columns : `list` [`str`], optional
396 Names of ra/dec columns in DiaObject table.
397 prefix : `str`, optional
398 Optional prefix for all table names.
399 namespace : `str`, optional
400 Name of the database schema for all APDB tables. If not specified
401 then default schema is used.
402 drop : `bool`, optional
403 If `True` then drop existing tables before re-creating the schema.
404
405 Returns
406 -------
407 config : `ApdbSqlConfig`
408 Resulting configuration object for a created APDB instance.
409 """
410 config = ApdbSqlConfig(db_url=db_url, use_insert_id=use_insert_id)
411 if schema_file is not None:
412 config.schema_file = schema_file
413 if schema_name is not None:
414 config.schema_name = schema_name
415 if read_sources_months is not None:
416 config.read_sources_months = read_sources_months
417 if read_forced_sources_months is not None:
418 config.read_forced_sources_months = read_forced_sources_months
419 if connection_timeout is not None:
420 config.connection_timeout = connection_timeout
421 if dia_object_index is not None:
422 config.dia_object_index = dia_object_index
423 if htm_level is not None:
424 config.htm_level = htm_level
425 if htm_index_column is not None:
426 config.htm_index_column = htm_index_column
427 if ra_dec_columns is not None:
428 config.ra_dec_columns = ra_dec_columns
429 if prefix is not None:
430 config.prefix = prefix
431 if namespace is not None:
432 config.namespace = namespace
433
434 cls._makeSchema(config, drop=drop)
435
436 return config
437

◆ metadata()

ApdbMetadata lsst.dax.apdb.sql.apdbSql.ApdbSql.metadata ( self)
Object controlling access to APDB metadata (`ApdbMetadata`).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 740 of file apdbSql.py.

740 def metadata(self) -> ApdbMetadata:
741 # docstring is inherited from a base class
742 if self._metadata is None:
743 raise RuntimeError("Database schema was not initialized.")
744 return self._metadata
745

◆ reassignDiaSources()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.reassignDiaSources ( self,
Mapping[int, int] idMap )
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.

Parameters
----------
idMap : `Mapping`
    Maps DiaSource IDs to their new SSObject IDs.

Raises
------
ValueError
    Raised if DiaSource ID does not exist in the database.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 699 of file apdbSql.py.

699 def reassignDiaSources(self, idMap: Mapping[int, int]) -> None:
700 # docstring is inherited from a base class
701
702 table = self._schema.get_table(ApdbTables.DiaSource)
703 query = table.update().where(table.columns["diaSourceId"] == sql.bindparam("srcId"))
704
705 with self._engine.begin() as conn:
706 # Need to make sure that every ID exists in the database, but
707 # executemany may not support rowcount, so iterate and check what
708 # is missing.
709 missing_ids: list[int] = []
710 for key, value in idMap.items():
711 params = dict(srcId=key, diaObjectId=0, ssObjectId=value)
712 result = conn.execute(query, params)
713 if result.rowcount == 0:
714 missing_ids.append(key)
715 if missing_ids:
716 missing = ",".join(str(item) for item in missing_ids)
717 raise ValueError(f"Following DiaSource IDs do not exist in the database: {missing}")
718

◆ store()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.store ( self,
astropy.time.Time visit_time,
pandas.DataFrame objects,
pandas.DataFrame | None sources = None,
pandas.DataFrame | None forced_sources = None )
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `astropy.time.Time`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 639 of file apdbSql.py.

645 ) -> None:
646 # docstring is inherited from a base class
647
648 # We want to run all inserts in one transaction.
649 with self._engine.begin() as connection:
650 replica_chunk: ReplicaChunk | None = None
651 if self._schema.has_replica_chunks:
652 replica_chunk = ReplicaChunk.make_replica_chunk(visit_time, self.config.replica_chunk_seconds)
653 self._storeReplicaChunk(replica_chunk, visit_time, connection)
654
655 # fill pixelId column for DiaObjects
656 objects = self._add_obj_htm_index(objects)
657 self._storeDiaObjects(objects, visit_time, replica_chunk, connection)
658
659 if sources is not None:
660 # copy pixelId column from DiaObjects to DiaSources
661 sources = self._add_src_htm_index(sources, objects)
662 self._storeDiaSources(sources, replica_chunk, connection)
663
664 if forced_sources is not None:
665 self._storeDiaForcedSources(forced_sources, replica_chunk, connection)
666

◆ storeSSObjects()

None lsst.dax.apdb.sql.apdbSql.ApdbSql.storeSSObjects ( self,
pandas.DataFrame objects )
Store or update SSObject catalog.

Parameters
----------
objects : `pandas.DataFrame`
    Catalog with SSObject records.

Notes
-----
If SSObjects with matching IDs already exist in the database, their
records will be updated with the information from provided records.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 667 of file apdbSql.py.

667 def storeSSObjects(self, objects: pandas.DataFrame) -> None:
668 # docstring is inherited from a base class
669
670 idColumn = "ssObjectId"
671 table = self._schema.get_table(ApdbTables.SSObject)
672
673 # everything to be done in single transaction
674 with self._engine.begin() as conn:
675 # Find record IDs that already exist. Some types like np.int64 can
676 # cause issues with sqlalchemy, convert them to int.
677 ids = sorted(int(oid) for oid in objects[idColumn])
678
679 query = sql.select(table.columns[idColumn], table.columns[idColumn].in_(ids))
680 result = conn.execute(query)
681 knownIds = set(row.ssObjectId for row in result)
682
683 filter = objects[idColumn].isin(knownIds)
684 toUpdate = cast(pandas.DataFrame, objects[filter])
685 toInsert = cast(pandas.DataFrame, objects[~filter])
686
687 # insert new records
688 if len(toInsert) > 0:
689 toInsert.to_sql(table.name, conn, if_exists="append", index=False, schema=table.schema)
690
691 # update existing records
692 if len(toUpdate) > 0:
693 whereKey = f"{idColumn}_param"
694 update = table.update().where(table.columns[idColumn] == sql.bindparam(whereKey))
695 toUpdate = toUpdate.rename({idColumn: whereKey}, axis="columns")
696 values = toUpdate.to_dict("records")
697 result = conn.execute(update, values)
698
daf::base::PropertySet * set
Definition fits.cc:931

◆ tableDef()

Table | None lsst.dax.apdb.sql.apdbSql.ApdbSql.tableDef ( self,
ApdbTables table )
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `.schema_model.Table` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 470 of file apdbSql.py.

470 def tableDef(self, table: ApdbTables) -> Table | None:
471 # docstring is inherited from a base class
472 return self._schema.tableSchemas.get(table)
473

◆ tableRowCount()

dict[str, int] lsst.dax.apdb.sql.apdbSql.ApdbSql.tableRowCount ( self)
Return dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 446 of file apdbSql.py.

446 def tableRowCount(self) -> dict[str, int]:
447 """Return dictionary with the table names and row counts.
448
449 Used by ``ap_proto`` to keep track of the size of the database tables.
450 Depending on database technology this could be expensive operation.
451
452 Returns
453 -------
454 row_counts : `dict`
455 Dict where key is a table name and value is a row count.
456 """
457 res = {}
458 tables = [ApdbTables.DiaObject, ApdbTables.DiaSource, ApdbTables.DiaForcedSource]
459 if self.config.dia_object_index == "last_object_table":
460 tables.append(ApdbTables.DiaObjectLast)
461 with self._engine.begin() as conn:
462 for table in tables:
463 sa_table = self._schema.get_table(table)
464 stmt = sql.select(func.count()).select_from(sa_table)
465 count: int = conn.execute(stmt).scalar_one()
466 res[table.name] = count
467
468 return res
469

Member Data Documentation

◆ _engine

lsst.dax.apdb.sql.apdbSql.ApdbSql._engine
protected

Definition at line 216 of file apdbSql.py.

◆ _frozen_parameters

tuple lsst.dax.apdb.sql.apdbSql.ApdbSql._frozen_parameters
staticprotected
Initial value:
= (
"use_insert_id",
"dia_object_index",
"htm_level",
"htm_index_column",
"ra_dec_columns",
)

Definition at line 206 of file apdbSql.py.

◆ _metadata

lsst.dax.apdb.sql.apdbSql.ApdbSql._metadata
protected

Definition at line 224 of file apdbSql.py.

◆ _schema

lsst.dax.apdb.sql.apdbSql.ApdbSql._schema
protected

Definition at line 236 of file apdbSql.py.

◆ _timer_args

lsst.dax.apdb.sql.apdbSql.ApdbSql._timer_args
protected

Definition at line 267 of file apdbSql.py.

◆ config

lsst.dax.apdb.sql.apdbSql.ApdbSql.config

Definition at line 231 of file apdbSql.py.

◆ ConfigClass

lsst.dax.apdb.sql.apdbSql.ApdbSql.ConfigClass = ApdbSqlConfig
static

Definition at line 192 of file apdbSql.py.

◆ metadataCodeVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataCodeVersionKey = "version:ApdbSql"
static

Definition at line 197 of file apdbSql.py.

◆ metadataCodeVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataCodeVersionKey

Definition at line 505 of file apdbSql.py.

◆ metadataConfigKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataConfigKey = "config:apdb-sql.json"
static

Definition at line 203 of file apdbSql.py.

◆ metadataConfigKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataConfigKey

Definition at line 516 of file apdbSql.py.

◆ metadataReplicaVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataReplicaVersionKey = "version:ApdbSqlReplica"
static

Definition at line 200 of file apdbSql.py.

◆ metadataReplicaVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataReplicaVersionKey

Definition at line 509 of file apdbSql.py.

◆ metadataSchemaVersionKey [1/2]

str lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataSchemaVersionKey = "version:schema"
static

Definition at line 194 of file apdbSql.py.

◆ metadataSchemaVersionKey [2/2]

lsst.dax.apdb.sql.apdbSql.ApdbSql.metadataSchemaVersionKey

Definition at line 504 of file apdbSql.py.

◆ pixelator

lsst.dax.apdb.sql.apdbSql.ApdbSql.pixelator

Definition at line 250 of file apdbSql.py.


The documentation for this class was generated from the following file: