LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.dax.apdb.apdbSql.ApdbSql Class Reference
Inheritance diagram for lsst.dax.apdb.apdbSql.ApdbSql:
lsst.dax.apdb.apdb.Apdb

Public Member Functions

def __init__ (self, ApdbSqlConfig config)
 
Dict[str, int] tableRowCount (self)
 
Optional[TableDeftableDef (self, ApdbTables table)
 
None makeSchema (self, bool drop=False)
 
pandas.DataFrame getDiaObjects (self, Region region)
 
Optional[pandas.DataFrame] getDiaSources (self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
Optional[pandas.DataFrame] getDiaForcedSources (self, Region region, Optional[Iterable[int]] object_ids, dafBase.DateTime visit_time)
 
None store (self, dafBase.DateTime visit_time, pandas.DataFrame objects, Optional[pandas.DataFrame] sources=None, Optional[pandas.DataFrame] forced_sources=None)
 
None dailyJob (self)
 
int countUnassociatedObjects (self)
 
ConfigurableField makeField (cls, str doc)
 

Public Attributes

 config
 
 pixelator
 

Static Public Attributes

 ConfigClass = ApdbSqlConfig
 

Detailed Description

Implementation of APDB interface based on SQL database.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbSqlConfig` configuration class. For an example of different
configurations check ``config/`` folder.

Parameters
----------
config : `ApdbSqlConfig`
    Configuration object.

Definition at line 197 of file apdbSql.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.dax.apdb.apdbSql.ApdbSql.__init__ (   self,
ApdbSqlConfig  config 
)

Definition at line 212 of file apdbSql.py.

212  def __init__(self, config: ApdbSqlConfig):
213 
214  self.config = config
215 
216  _LOG.debug("APDB Configuration:")
217  _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
218  _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
219  _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
220  _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
221  _LOG.debug(" object_last_replace: %s", self.config.object_last_replace)
222  _LOG.debug(" schema_file: %s", self.config.schema_file)
223  _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
224  _LOG.debug(" schema prefix: %s", self.config.prefix)
225 
226  # engine is reused between multiple processes, make sure that we don't
227  # share connections by disabling pool (by using NullPool class)
228  kw = dict(echo=self.config.sql_echo)
229  conn_args: Dict[str, Any] = dict()
230  if not self.config.connection_pool:
231  kw.update(poolclass=NullPool)
232  if self.config.isolation_level is not None:
233  kw.update(isolation_level=self.config.isolation_level)
234  elif self.config.db_url.startswith("sqlite"):
235  # Use READ_UNCOMMITTED as default value for sqlite.
236  kw.update(isolation_level="READ_UNCOMMITTED")
237  if self.config.connection_timeout is not None:
238  if self.config.db_url.startswith("sqlite"):
239  conn_args.update(timeout=self.config.connection_timeout)
240  elif self.config.db_url.startswith(("postgresql", "mysql")):
241  conn_args.update(connect_timeout=self.config.connection_timeout)
242  kw.update(connect_args=conn_args)
243  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
244 
245  self._schema = ApdbSqlSchema(engine=self._engine,
246  dia_object_index=self.config.dia_object_index,
247  schema_file=self.config.schema_file,
248  extra_schema_file=self.config.extra_schema_file,
249  prefix=self.config.prefix,
250  htm_index_column=self.config.htm_index_column)
251 
252  self.pixelator = HtmPixelization(self.config.htm_level)
253 

Member Function Documentation

◆ countUnassociatedObjects()

int lsst.dax.apdb.apdbSql.ApdbSql.countUnassociatedObjects (   self)
Return the number of DiaObjects that have only one DiaSource
associated with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Notes
-----
This method can be very inefficient or slow in some implementations.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 435 of file apdbSql.py.

435  def countUnassociatedObjects(self) -> int:
436  # docstring is inherited from a base class
437 
438  # Retrieve the DiaObject table.
439  table: sqlalchemy.schema.Table = self._schema.objects
440 
441  # Construct the sql statement.
442  stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
443  stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
444 
445  # Return the count.
446  with self._engine.begin() as conn:
447  count = conn.scalar(stmt)
448 
449  return count
450 

◆ dailyJob()

None lsst.dax.apdb.apdbSql.ApdbSql.dailyJob (   self)
Implement daily activities like cleanup/vacuum.

What should be done during daily activities is determined by
specific implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 422 of file apdbSql.py.

422  def dailyJob(self) -> None:
423  # docstring is inherited from a base class
424 
425  if self._engine.name == 'postgresql':
426 
427  # do VACUUM on all tables
428  _LOG.info("Running VACUUM on all tables")
429  connection = self._engine.raw_connection()
430  ISOLATION_LEVEL_AUTOCOMMIT = 0
431  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
432  cursor = connection.cursor()
433  cursor.execute("VACUUM ANALYSE")
434 

◆ getDiaForcedSources()

Optional[pandas.DataFrame] lsst.dax.apdb.apdbSql.ApdbSql.getDiaForcedSources (   self,
Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)
Return catalog of DiaForcedSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If list is empty then empty catalog is returned with a
    correct schema.
visit_time : `lsst.daf.base.DateTime`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Raises
------
NotImplementedError
    Raised if ``object_ids`` is `None`.

Notes
-----
Even though base class allows `None` to be passed for ``object_ids``,
this class requires ``object_ids`` to be not-`None`.
`NotImplementedError` is raised if `None` is passed.

This method returns DiaForcedSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_forced_sources_months`` config parameter,
w.r.t. ``visit_time``. If ``object_ids`` is empty then an empty catalog
is always returned with a correct schema (columns/types).

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 343 of file apdbSql.py.

345  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
346  """Return catalog of DiaForcedSource instances from a given region.
347 
348  Parameters
349  ----------
350  region : `lsst.sphgeom.Region`
351  Region to search for DIASources.
352  object_ids : iterable [ `int` ], optional
353  List of DiaObject IDs to further constrain the set of returned
354  sources. If list is empty then empty catalog is returned with a
355  correct schema.
356  visit_time : `lsst.daf.base.DateTime`
357  Time of the current visit.
358 
359  Returns
360  -------
361  catalog : `pandas.DataFrame`, or `None`
362  Catalog containing DiaSource records. `None` is returned if
363  ``read_sources_months`` configuration parameter is set to 0.
364 
365  Raises
366  ------
367  NotImplementedError
368  Raised if ``object_ids`` is `None`.
369 
370  Notes
371  -----
372  Even though base class allows `None` to be passed for ``object_ids``,
373  this class requires ``object_ids`` to be not-`None`.
374  `NotImplementedError` is raised if `None` is passed.
375 
376  This method returns DiaForcedSource catalog for a region with additional
377  filtering based on DiaObject IDs. Only a subset of DiaSource history
378  is returned limited by ``read_forced_sources_months`` config parameter,
379  w.r.t. ``visit_time``. If ``object_ids`` is empty then an empty catalog
380  is always returned with a correct schema (columns/types).
381  """
382 
383  if self.config.read_forced_sources_months == 0:
384  _LOG.debug("Skip DiaForceSources fetching")
385  return None
386 
387  if object_ids is None:
388  # This implementation does not support region-based selection.
389  raise NotImplementedError("Region-based selection is not supported")
390 
391  # TODO: DateTime.MJD must be consistent with code in ap_association,
392  # alternatively we can fill midPointTai ourselves in store()
393  midPointTai_start = _make_midPointTai_start(visit_time, self.config.read_forced_sources_months)
394  _LOG.debug("midPointTai_start = %.6f", midPointTai_start)
395 
396  table: sqlalchemy.schema.Table = self._schema.forcedSources
397  with Timer('DiaForcedSource select', self.config.timer):
398  sources = self._getSourcesByIDs(table, list(object_ids), midPointTai_start)
399 
400  _LOG.debug("found %s DiaForcedSources", len(sources))
401  return sources
402 
Class for handling dates/times, including MJD, UTC, and TAI.
Definition: DateTime.h:64
daf::base::PropertyList * list
Definition: fits.cc:913

◆ getDiaObjects()

pandas.DataFrame lsst.dax.apdb.apdbSql.ApdbSql.getDiaObjects (   self,
Region  region 
)
Returns catalog of DiaObject instances from a given region.

This method returns only the last version of each DiaObject. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIAObjects.

Returns
-------
catalog : `pandas.DataFrame`
    Catalog containing DiaObject records for a region that may be a
    superset of the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 285 of file apdbSql.py.

285  def getDiaObjects(self, region: Region) -> pandas.DataFrame:
286  # docstring is inherited from a base class
287 
288  # decide what columns we need
289  table: sqlalchemy.schema.Table
290  if self.config.dia_object_index == 'last_object_table':
291  table = self._schema.objects_last
292  else:
293  table = self._schema.objects
294  if not self.config.dia_object_columns:
295  query = table.select()
296  else:
297  columns = [table.c[col] for col in self.config.dia_object_columns]
298  query = sql.select(columns)
299 
300  # build selection
301  htm_index_column = table.columns[self.config.htm_index_column]
302  exprlist = []
303  pixel_ranges = self._htm_indices(region)
304  for low, upper in pixel_ranges:
305  upper -= 1
306  if low == upper:
307  exprlist.append(htm_index_column == low)
308  else:
309  exprlist.append(sql.expression.between(htm_index_column, low, upper))
310  query = query.where(sql.expression.or_(*exprlist))
311 
312  # select latest version of objects
313  if self.config.dia_object_index != 'last_object_table':
314  query = query.where(table.c.validityEnd == None) # noqa: E711
315 
316  _LOG.debug("query: %s", query)
317 
318  if self.config.explain:
319  # run the same query with explain
320  self._explain(query, self._engine)
321 
322  # execute select
323  with Timer('DiaObject select', self.config.timer):
324  with self._engine.begin() as conn:
325  objects = pandas.read_sql_query(query, conn)
326  _LOG.debug("found %s DiaObjects", len(objects))
327  return objects
328 

◆ getDiaSources()

Optional[pandas.DataFrame] lsst.dax.apdb.apdbSql.ApdbSql.getDiaSources (   self,
Region  region,
Optional[Iterable[int]]  object_ids,
dafBase.DateTime  visit_time 
)
Return catalog of DiaSource instances from a given region.

Parameters
----------
region : `lsst.sphgeom.Region`
    Region to search for DIASources.
object_ids : iterable [ `int` ], optional
    List of DiaObject IDs to further constrain the set of returned
    sources. If `None` then returned sources are not constrained. If
    list is empty then empty catalog is returned with a correct
    schema.
visit_time : `lsst.daf.base.DateTime`
    Time of the current visit.

Returns
-------
catalog : `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 329 of file apdbSql.py.

331  visit_time: dafBase.DateTime) -> Optional[pandas.DataFrame]:
332  # docstring is inherited from a base class
333  if self.config.read_sources_months == 0:
334  _LOG.debug("Skip DiaSources fetching")
335  return None
336 
337  if object_ids is None:
338  # region-based select
339  return self._getDiaSourcesInRegion(region, visit_time)
340  else:
341  return self._getDiaSourcesByIDs(list(object_ids), visit_time)
342 

◆ makeField()

ConfigurableField lsst.dax.apdb.apdb.Apdb.makeField (   cls,
str  doc 
)
inherited
Make a `~lsst.pex.config.ConfigurableField` for Apdb.

Parameters
----------
doc : `str`
    Help text for the field.

Returns
-------
configurableField : `lsst.pex.config.ConfigurableField`
    A `~lsst.pex.config.ConfigurableField` for Apdb.

Definition at line 268 of file apdb.py.

268  def makeField(cls, doc: str) -> ConfigurableField:
269  """Make a `~lsst.pex.config.ConfigurableField` for Apdb.
270 
271  Parameters
272  ----------
273  doc : `str`
274  Help text for the field.
275 
276  Returns
277  -------
278  configurableField : `lsst.pex.config.ConfigurableField`
279  A `~lsst.pex.config.ConfigurableField` for Apdb.
280  """
281  return ConfigurableField(doc=doc, target=cls)

◆ makeSchema()

None lsst.dax.apdb.apdbSql.ApdbSql.makeSchema (   self,
bool   drop = False 
)
Create or re-create whole database schema.

Parameters
----------
drop : `bool`
    If True then drop all tables before creating new ones.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 281 of file apdbSql.py.

281  def makeSchema(self, drop: bool = False) -> None:
282  # docstring is inherited from a base class
283  self._schema.makeSchema(drop=drop)
284 

◆ store()

None lsst.dax.apdb.apdbSql.ApdbSql.store (   self,
dafBase.DateTime  visit_time,
pandas.DataFrame  objects,
Optional[pandas.DataFrame]   sources = None,
Optional[pandas.DataFrame]   forced_sources = None 
)
Store all three types of catalogs in the database.

Parameters
----------
visit_time : `lsst.daf.base.DateTime`
    Time of the visit.
objects : `pandas.DataFrame`
    Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
    Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
    Catalog with DiaForcedSource records.

Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from catalog
  - this method knows how to fill interval-related columns of DiaObject
    (validityStart, validityEnd) they do not need to appear in a
    catalog
  - source catalogs have ``diaObjectId`` column associating sources
    with objects

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 403 of file apdbSql.py.

407  forced_sources: Optional[pandas.DataFrame] = None) -> None:
408  # docstring is inherited from a base class
409 
410  # fill pixelId column for DiaObjects
411  objects = self._add_obj_htm_index(objects)
412  self._storeDiaObjects(objects, visit_time)
413 
414  if sources is not None:
415  # copy pixelId column from DiaObjects to DiaSources
416  sources = self._add_src_htm_index(sources, objects)
417  self._storeDiaSources(sources)
418 
419  if forced_sources is not None:
420  self._storeDiaForcedSources(forced_sources)
421 

◆ tableDef()

Optional[TableDef] lsst.dax.apdb.apdbSql.ApdbSql.tableDef (   self,
ApdbTables  table 
)
Return table schema definition for a given table.

Parameters
----------
table : `ApdbTables`
    One of the known APDB tables.

Returns
-------
tableSchema : `TableDef` or `None`
    Table schema description, `None` is returned if table is not
    defined by this implementation.

Reimplemented from lsst.dax.apdb.apdb.Apdb.

Definition at line 277 of file apdbSql.py.

277  def tableDef(self, table: ApdbTables) -> Optional[TableDef]:
278  # docstring is inherited from a base class
279  return self._schema.tableSchemas.get(table)
280 

◆ tableRowCount()

Dict[str, int] lsst.dax.apdb.apdbSql.ApdbSql.tableRowCount (   self)
Returns dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 254 of file apdbSql.py.

254  def tableRowCount(self) -> Dict[str, int]:
255  """Returns dictionary with the table names and row counts.
256 
257  Used by ``ap_proto`` to keep track of the size of the database tables.
258  Depending on database technology this could be expensive operation.
259 
260  Returns
261  -------
262  row_counts : `dict`
263  Dict where key is a table name and value is a row count.
264  """
265  res = {}
266  tables: List[sqlalchemy.schema.Table] = [
267  self._schema.objects, self._schema.sources, self._schema.forcedSources]
268  if self.config.dia_object_index == 'last_object_table':
269  tables.append(self._schema.objects_last)
270  for table in tables:
271  stmt = sql.select([func.count()]).select_from(table)
272  count = self._engine.scalar(stmt)
273  res[table.name] = count
274 
275  return res
276 

Member Data Documentation

◆ config

lsst.dax.apdb.apdbSql.ApdbSql.config

Definition at line 214 of file apdbSql.py.

◆ ConfigClass

lsst.dax.apdb.apdbSql.ApdbSql.ConfigClass = ApdbSqlConfig
static

Definition at line 210 of file apdbSql.py.

◆ pixelator

lsst.dax.apdb.apdbSql.ApdbSql.pixelator

Definition at line 252 of file apdbSql.py.


The documentation for this class was generated from the following file: