LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
Public Member Functions | Public Attributes | List of all members
lsst.dax.apdb.apdb.Apdb Class Reference
Inheritance diagram for lsst.dax.apdb.apdb.Apdb:

Public Member Functions

def __init__ (self, config, afw_schemas=None)
 
def lastVisit (self)
 
def saveVisit (self, visitId, visitTime)
 
def tableRowCount (self)
 
def getDiaObjects (self, pixel_ranges, return_pandas=False)
 
def getDiaSourcesInRegion (self, pixel_ranges, dt, return_pandas=False)
 
def getDiaSources (self, object_ids, dt, return_pandas=False)
 
def getDiaForcedSources (self, object_ids, dt, return_pandas=False)
 
def storeDiaObjects (self, objs, dt)
 
def storeDiaSources (self, sources)
 
def storeDiaForcedSources (self, sources)
 
def countUnassociatedObjects (self)
 
def isVisitProcessed (self, visitInfo)
 
def dailyJob (self)
 
def makeSchema (self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
 

Public Attributes

 config
 

Detailed Description

Interface to L1 database, hides all database access details.

The implementation is configured via standard ``pex_config`` mechanism
using `ApdbConfig` configuration class. For an example of different
configurations check config/ folder.

Parameters
----------
config : `ApdbConfig`
afw_schemas : `dict`, optional
    Dictionary with table name for a key and `afw.table.Schema`
    for a value. Columns in schema will be added to standard
    APDB schema.

Definition at line 216 of file apdb.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.dax.apdb.apdb.Apdb.__init__ (   self,
  config,
  afw_schemas = None 
)

Definition at line 232 of file apdb.py.

232  def __init__(self, config, afw_schemas=None):
233 
234  self.config = config
235 
236  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
237  _LOG.debug("APDB Configuration:")
238  _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
239  _LOG.debug(" dia_object_nightly: %s", self.config.dia_object_nightly)
240  _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
241  _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
242  _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
243  _LOG.debug(" object_last_replace: %s", self.config.object_last_replace)
244  _LOG.debug(" schema_file: %s", self.config.schema_file)
245  _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
246  _LOG.debug(" column_map: %s", self.config.column_map)
247  _LOG.debug(" schema prefix: %s", self.config.prefix)
248 
249  # engine is reused between multiple processes, make sure that we don't
250  # share connections by disabling pool (by using NullPool class)
251  kw = dict(echo=self.config.sql_echo)
252  conn_args = dict()
253  if not self.config.connection_pool:
254  kw.update(poolclass=NullPool)
255  if self.config.isolation_level is not None:
256  kw.update(isolation_level=self.config.isolation_level)
257  if self.config.connection_timeout is not None:
258  if self.config.db_url.startswith("sqlite"):
259  conn_args.update(timeout=self.config.connection_timeout)
260  elif self.config.db_url.startswith(("postgresql", "mysql")):
261  conn_args.update(connect_timeout=self.config.connection_timeout)
262  kw.update(connect_args=conn_args)
263  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
264 
265  self._schema = apdbSchema.ApdbSchema(engine=self._engine,
266  dia_object_index=self.config.dia_object_index,
267  dia_object_nightly=self.config.dia_object_nightly,
268  schema_file=self.config.schema_file,
269  extra_schema_file=self.config.extra_schema_file,
270  column_map=self.config.column_map,
271  afw_schemas=afw_schemas,
272  prefix=self.config.prefix)
273 

Member Function Documentation

◆ countUnassociatedObjects()

def lsst.dax.apdb.apdb.Apdb.countUnassociatedObjects (   self)
Return the number of DiaObjects that have only one DiaSource associated
with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Definition at line 789 of file apdb.py.

789  def countUnassociatedObjects(self):
790  """Return the number of DiaObjects that have only one DiaSource associated
791  with them.
792 
793  Used as part of ap_verify metrics.
794 
795  Returns
796  -------
797  count : `int`
798  Number of DiaObjects with exactly one associated DiaSource.
799  """
800  # Retrieve the DiaObject table.
801  table = self._schema.objects
802 
803  # Construct the sql statement.
804  stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
805  stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
806 
807  # Return the count.
808  count = self._engine.scalar(stmt)
809 
810  return count
811 

◆ dailyJob()

def lsst.dax.apdb.apdb.Apdb.dailyJob (   self)
Implement daily activities like cleanup/vacuum.

What should be done during daily cleanup is determined by
configuration/schema.

Definition at line 837 of file apdb.py.

837  def dailyJob(self):
838  """Implement daily activities like cleanup/vacuum.
839 
840  What should be done during daily cleanup is determined by
841  configuration/schema.
842  """
843 
844  # move data from DiaObjectNightly into DiaObject
845  if self.config.dia_object_nightly:
846  with _ansi_session(self._engine) as conn:
847  query = 'INSERT INTO "' + self._schema.objects.name + '" '
848  query += 'SELECT * FROM "' + self._schema.objects_nightly.name + '"'
849  with Timer('DiaObjectNightly copy', self.config.timer):
850  conn.execute(sql.text(query))
851 
852  query = 'DELETE FROM "' + self._schema.objects_nightly.name + '"'
853  with Timer('DiaObjectNightly delete', self.config.timer):
854  conn.execute(sql.text(query))
855 
856  if self._engine.name == 'postgresql':
857 
858  # do VACUUM on all tables
859  _LOG.info("Running VACUUM on all tables")
860  connection = self._engine.raw_connection()
861  ISOLATION_LEVEL_AUTOCOMMIT = 0
862  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
863  cursor = connection.cursor()
864  cursor.execute("VACUUM ANALYSE")
865 

◆ getDiaForcedSources()

def lsst.dax.apdb.apdb.Apdb.getDiaForcedSources (   self,
  object_ids,
  dt,
  return_pandas = False 
)
Returns catalog of DiaForcedSource instances matching given
DiaObjects.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of L1 database table. Re-mapping of the column names may
be done for some columns (based on column map passed to constructor)
but types or units are not changed.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog` or `None`
    Catalog contaning DiaForcedSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 551 of file apdb.py.

551  def getDiaForcedSources(self, object_ids, dt, return_pandas=False):
552  """Returns catalog of DiaForcedSource instances matching given
553  DiaObjects.
554 
555  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
556  the schema of L1 database table. Re-mapping of the column names may
557  be done for some columns (based on column map passed to constructor)
558  but types or units are not changed.
559 
560  Parameters
561  ----------
562  object_ids :
563  Collection of DiaObject IDs
564  dt : `datetime.datetime`
565  Time of the current visit
566  return_pandas : `bool`
567  Return a `pandas.DataFrame` instead of
568  `lsst.afw.table.SourceCatalog`.
569 
570  Returns
571  -------
572  catalog : `lsst.afw.table.SourceCatalog` or `None`
573  Catalog contaning DiaForcedSource records. `None` is returned if
574  ``read_sources_months`` configuration parameter is set to 0 or
575  when ``object_ids`` is empty.
576  """
577 
578  if self.config.read_forced_sources_months == 0:
579  _LOG.info("Skip DiaForceSources fetching")
580  return None
581 
582  if len(object_ids) <= 0:
583  _LOG.info("Skip DiaForceSources fetching - no Objects")
584  # this should create a catalog, but the list of columns may be empty
585  return None
586 
587  table = self._schema.forcedSources
588  sources = None
589 
590  with Timer('DiaForcedSource select', self.config.timer):
591  with _ansi_session(self._engine) as conn:
592  for ids in _split(sorted(object_ids), 1000):
593 
594  query = 'SELECT * FROM "' + table.name + '" WHERE '
595 
596  # select by object id
597  ids = ",".join(str(id) for id in ids)
598  query += '"diaObjectId" IN (' + ids + ') '
599 
600  # execute select
601  if return_pandas:
602  df = pandas.read_sql_query(sql.text(query), conn)
603  if sources is None:
604  sources = df
605  else:
606  sources = sources.append(df)
607  else:
608  res = conn.execute(sql.text(query))
609  sources = self._convertResult(res, "DiaForcedSource", sources)
610 
611  _LOG.debug("found %s DiaForcedSources", len(sources))
612  return sources
613 

◆ getDiaObjects()

def lsst.dax.apdb.apdb.Apdb.getDiaObjects (   self,
  pixel_ranges,
  return_pandas = False 
)
Returns catalog of DiaObject instances from given region.

Objects are searched based on pixelization index and region is
determined by the set of indices. There is no assumption on a
particular type of index, client is responsible for consistency
when calculating pixelization indices.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of APDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types
or units are not changed.

Returns only the last version of each DiaObject.

Parameters
----------
pixel_ranges : `list` of `tuple`
    Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
    This defines set of pixel indices to be included in result.
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
    Catalog containing DiaObject records.

Definition at line 349 of file apdb.py.

349  def getDiaObjects(self, pixel_ranges, return_pandas=False):
350  """Returns catalog of DiaObject instances from given region.
351 
352  Objects are searched based on pixelization index and region is
353  determined by the set of indices. There is no assumption on a
354  particular type of index, client is responsible for consistency
355  when calculating pixelization indices.
356 
357  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
358  the schema of APDB table. Re-mapping of the column names is done for
359  some columns (based on column map passed to constructor) but types
360  or units are not changed.
361 
362  Returns only the last version of each DiaObject.
363 
364  Parameters
365  ----------
366  pixel_ranges : `list` of `tuple`
367  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
368  This defines set of pixel indices to be included in result.
369  return_pandas : `bool`
370  Return a `pandas.DataFrame` instead of
371  `lsst.afw.table.SourceCatalog`.
372 
373  Returns
374  -------
375  catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
376  Catalog containing DiaObject records.
377  """
378 
379  # decide what columns we need
380  if self.config.dia_object_index == 'last_object_table':
381  table = self._schema.objects_last
382  else:
383  table = self._schema.objects
384  if not self.config.dia_object_columns:
385  query = table.select()
386  else:
387  columns = [table.c[col] for col in self.config.dia_object_columns]
388  query = sql.select(columns)
389 
390  if self.config.diaobject_index_hint:
391  val = self.config.diaobject_index_hint
392  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
393  if self.config.dynamic_sampling_hint > 0:
394  val = self.config.dynamic_sampling_hint
395  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
396  if self.config.cardinality_hint > 0:
397  val = self.config.cardinality_hint
398  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
399 
400  # build selection
401  exprlist = []
402  for low, upper in pixel_ranges:
403  upper -= 1
404  if low == upper:
405  exprlist.append(table.c.pixelId == low)
406  else:
407  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
408  query = query.where(sql.expression.or_(*exprlist))
409 
410  # select latest version of objects
411  if self.config.dia_object_index != 'last_object_table':
412  query = query.where(table.c.validityEnd == None) # noqa: E711
413 
414  _LOG.debug("query: %s", query)
415 
416  if self.config.explain:
417  # run the same query with explain
418  self._explain(query, self._engine)
419 
420  # execute select
421  with Timer('DiaObject select', self.config.timer):
422  with self._engine.begin() as conn:
423  if return_pandas:
424  objects = pandas.read_sql_query(query, conn)
425  else:
426  res = conn.execute(query)
427  objects = self._convertResult(res, "DiaObject")
428  _LOG.debug("found %s DiaObjects", len(objects))
429  return objects
430 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174

◆ getDiaSources()

def lsst.dax.apdb.apdb.Apdb.getDiaSources (   self,
  object_ids,
  dt,
  return_pandas = False 
)
Returns catalog of DiaSource instances given set of DiaObject IDs.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of APDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types or
units are not changed.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.


Returns
-------
catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
    Catalog contaning DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 490 of file apdb.py.

490  def getDiaSources(self, object_ids, dt, return_pandas=False):
491  """Returns catalog of DiaSource instances given set of DiaObject IDs.
492 
493  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
494  the schema of APDB table. Re-mapping of the column names is done for
495  some columns (based on column map passed to constructor) but types or
496  units are not changed.
497 
498  Parameters
499  ----------
500  object_ids :
501  Collection of DiaObject IDs
502  dt : `datetime.datetime`
503  Time of the current visit
504  return_pandas : `bool`
505  Return a `pandas.DataFrame` instead of
506  `lsst.afw.table.SourceCatalog`.
507 
508 
509  Returns
510  -------
511  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
512  Catalog contaning DiaSource records. `None` is returned if
513  ``read_sources_months`` configuration parameter is set to 0 or
514  when ``object_ids`` is empty.
515  """
516 
517  if self.config.read_sources_months == 0:
518  _LOG.info("Skip DiaSources fetching")
519  return None
520 
521  if len(object_ids) <= 0:
522  _LOG.info("Skip DiaSources fetching - no Objects")
523  # this should create a catalog, but the list of columns may be empty
524  return None
525 
526  table = self._schema.sources
527  sources = None
528  with Timer('DiaSource select', self.config.timer):
529  with _ansi_session(self._engine) as conn:
530  for ids in _split(sorted(object_ids), 1000):
531  query = 'SELECT * FROM "' + table.name + '" WHERE '
532 
533  # select by object id
534  ids = ",".join(str(id) for id in ids)
535  query += '"diaObjectId" IN (' + ids + ') '
536 
537  # execute select
538  if return_pandas:
539  df = pandas.read_sql_query(sql.text(query), conn)
540  if sources is None:
541  sources = df
542  else:
543  sources = sources.append(df)
544  else:
545  res = conn.execute(sql.text(query))
546  sources = self._convertResult(res, "DiaSource", sources)
547 
548  _LOG.debug("found %s DiaSources", len(sources))
549  return sources
550 

◆ getDiaSourcesInRegion()

def lsst.dax.apdb.apdb.Apdb.getDiaSourcesInRegion (   self,
  pixel_ranges,
  dt,
  return_pandas = False 
)
Returns catalog of DiaSource instances from given region.

Sources are searched based on pixelization index and region is
determined by the set of indices. There is no assumption on a
particular type of index, client is responsible for consistency
when calculating pixelization indices.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of APDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types or
units are not changed.

Parameters
----------
pixel_ranges : `list` of `tuple`
    Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
    This defines set of pixel indices to be included in result.
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Definition at line 431 of file apdb.py.

431  def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False):
432  """Returns catalog of DiaSource instances from given region.
433 
434  Sources are searched based on pixelization index and region is
435  determined by the set of indices. There is no assumption on a
436  particular type of index, client is responsible for consistency
437  when calculating pixelization indices.
438 
439  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
440  the schema of APDB table. Re-mapping of the column names is done for
441  some columns (based on column map passed to constructor) but types or
442  units are not changed.
443 
444  Parameters
445  ----------
446  pixel_ranges : `list` of `tuple`
447  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
448  This defines set of pixel indices to be included in result.
449  dt : `datetime.datetime`
450  Time of the current visit
451  return_pandas : `bool`
452  Return a `pandas.DataFrame` instead of
453  `lsst.afw.table.SourceCatalog`.
454 
455  Returns
456  -------
457  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
458  Catalog containing DiaSource records. `None` is returned if
459  ``read_sources_months`` configuration parameter is set to 0.
460  """
461 
462  if self.config.read_sources_months == 0:
463  _LOG.info("Skip DiaSources fetching")
464  return None
465 
466  table = self._schema.sources
467  query = table.select()
468 
469  # build selection
470  exprlist = []
471  for low, upper in pixel_ranges:
472  upper -= 1
473  if low == upper:
474  exprlist.append(table.c.pixelId == low)
475  else:
476  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
477  query = query.where(sql.expression.or_(*exprlist))
478 
479  # execute select
480  with Timer('DiaSource select', self.config.timer):
481  with _ansi_session(self._engine) as conn:
482  if return_pandas:
483  sources = pandas.read_sql_query(query, conn)
484  else:
485  res = conn.execute(query)
486  sources = self._convertResult(res, "DiaSource")
487  _LOG.debug("found %s DiaSources", len(sources))
488  return sources
489 

◆ isVisitProcessed()

def lsst.dax.apdb.apdb.Apdb.isVisitProcessed (   self,
  visitInfo 
)
Test whether data from an image has been loaded into the database.

Used as part of ap_verify metrics.

Parameters
----------
visitInfo : `lsst.afw.image.VisitInfo`
    The metadata for the image of interest.

Returns
-------
isProcessed : `bool`
    `True` if the data are present, `False` otherwise.

Definition at line 812 of file apdb.py.

812  def isVisitProcessed(self, visitInfo):
813  """Test whether data from an image has been loaded into the database.
814 
815  Used as part of ap_verify metrics.
816 
817  Parameters
818  ----------
819  visitInfo : `lsst.afw.image.VisitInfo`
820  The metadata for the image of interest.
821 
822  Returns
823  -------
824  isProcessed : `bool`
825  `True` if the data are present, `False` otherwise.
826  """
827  id = visitInfo.getExposureId()
828  table = self._schema.sources
829  idField = table.c.ccdVisitId
830 
831  # Hopefully faster than SELECT DISTINCT
832  query = sql.select([idField]).select_from(table) \
833  .where(idField == id).limit(1)
834 
835  return self._engine.scalar(query) is not None
836 

◆ lastVisit()

def lsst.dax.apdb.apdb.Apdb.lastVisit (   self)
Returns last visit information or `None` if visits table is empty.

Visits table is used by ap_proto to track visit information, it is
not a part of the regular APDB schema.

Returns
-------
visit : `Visit` or `None`
    Last stored visit info or `None` if there was nothing stored yet.

Definition at line 274 of file apdb.py.

274  def lastVisit(self):
275  """Returns last visit information or `None` if visits table is empty.
276 
277  Visits table is used by ap_proto to track visit information, it is
278  not a part of the regular APDB schema.
279 
280  Returns
281  -------
282  visit : `Visit` or `None`
283  Last stored visit info or `None` if there was nothing stored yet.
284  """
285 
286  with self._engine.begin() as conn:
287 
288  stmnt = sql.select([sql.func.max(self._schema.visits.c.visitId),
289  sql.func.max(self._schema.visits.c.visitTime)])
290  res = conn.execute(stmnt)
291  row = res.fetchone()
292  if row[0] is None:
293  return None
294 
295  visitId = row[0]
296  visitTime = row[1]
297  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
298  visitTime, type(visitTime))
299 
300  # get max IDs from corresponding tables
301  stmnt = sql.select([sql.func.max(self._schema.objects.c.diaObjectId)])
302  lastObjectId = conn.scalar(stmnt)
303  stmnt = sql.select([sql.func.max(self._schema.sources.c.diaSourceId)])
304  lastSourceId = conn.scalar(stmnt)
305 
306  return Visit(visitId=visitId, visitTime=visitTime,
307  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
308 
table::Key< int > type
Definition: Detector.cc:163

◆ makeSchema()

def lsst.dax.apdb.apdb.Apdb.makeSchema (   self,
  drop = False,
  mysql_engine = 'InnoDB',
  oracle_tablespace = None,
  oracle_iot = False 
)
Create or re-create all tables.

Parameters
----------
drop : `bool`
    If True then drop tables before creating new ones.
mysql_engine : `str`, optional
    Name of the MySQL engine to use for new tables.
oracle_tablespace : `str`, optional
    Name of Oracle tablespace.
oracle_iot : `bool`, optional
    Make Index-organized DiaObjectLast table.

Definition at line 866 of file apdb.py.

866  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
867  """Create or re-create all tables.
868 
869  Parameters
870  ----------
871  drop : `bool`
872  If True then drop tables before creating new ones.
873  mysql_engine : `str`, optional
874  Name of the MySQL engine to use for new tables.
875  oracle_tablespace : `str`, optional
876  Name of Oracle tablespace.
877  oracle_iot : `bool`, optional
878  Make Index-organized DiaObjectLast table.
879  """
880  self._schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
881  oracle_tablespace=oracle_tablespace,
882  oracle_iot=oracle_iot)
883 

◆ saveVisit()

def lsst.dax.apdb.apdb.Apdb.saveVisit (   self,
  visitId,
  visitTime 
)
Store visit information.

This method is only used by ``ap_proto`` script from ``l1dbproto``
and is not intended for production pipelines.

Parameters
----------
visitId : `int`
    Visit identifier
visitTime : `datetime.datetime`
    Visit timestamp.

Definition at line 309 of file apdb.py.

309  def saveVisit(self, visitId, visitTime):
310  """Store visit information.
311 
312  This method is only used by ``ap_proto`` script from ``l1dbproto``
313  and is not intended for production pipelines.
314 
315  Parameters
316  ----------
317  visitId : `int`
318  Visit identifier
319  visitTime : `datetime.datetime`
320  Visit timestamp.
321  """
322 
323  ins = self._schema.visits.insert().values(visitId=visitId,
324  visitTime=visitTime)
325  self._engine.execute(ins)
326 

◆ storeDiaForcedSources()

def lsst.dax.apdb.apdb.Apdb.storeDiaForcedSources (   self,
  sources 
)
Store a set of DIAForcedSources from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of L1 database table:

  - column names must correspond to database table columns
  - some columns names may be re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema

Parameters
----------
sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog containing DiaForcedSource records

Definition at line 757 of file apdb.py.

757  def storeDiaForcedSources(self, sources):
758  """Store a set of DIAForcedSources from current visit.
759 
760  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
761  compatible with the schema of L1 database table:
762 
763  - column names must correspond to database table columns
764  - some columns names may be re-mapped based on column map passed to
765  constructor
766  - types and units of the columns must match database definitions,
767  no unit conversion is performed presently
768  - columns that have default values in database schema can be
769  omitted from afw schema
770 
771  Parameters
772  ----------
773  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
774  Catalog containing DiaForcedSource records
775  """
776 
777  # everything to be done in single transaction
778  with _ansi_session(self._engine) as conn:
779 
780  if isinstance(sources, pandas.DataFrame):
781  with Timer("DiaForcedSource insert", self.config.timer):
782  sources = _coerce_uint64(sources)
783  sources.to_sql("DiaForcedSource", conn, if_exists='append',
784  index=False)
785  else:
786  table = self._schema.forcedSources
787  self._storeObjectsAfw(sources, conn, table, "DiaForcedSource")
788 

◆ storeDiaObjects()

def lsst.dax.apdb.apdb.Apdb.storeDiaObjects (   self,
  objs,
  dt 
)
Store catalog of DiaObjects from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of APDB table:

  - column names must correspond to database table columns
  - some columns names are re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema
  - this method knows how to fill interval-related columns
    (validityStart, validityEnd) they do not need to appear in
    afw schema

Parameters
----------
objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog with DiaObject records
dt : `datetime.datetime`
    Time of the visit

Definition at line 614 of file apdb.py.

614  def storeDiaObjects(self, objs, dt):
615  """Store catalog of DiaObjects from current visit.
616 
617  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
618  compatible with the schema of APDB table:
619 
620  - column names must correspond to database table columns
621  - some columns names are re-mapped based on column map passed to
622  constructor
623  - types and units of the columns must match database definitions,
624  no unit conversion is performed presently
625  - columns that have default values in database schema can be
626  omitted from afw schema
627  - this method knows how to fill interval-related columns
628  (validityStart, validityEnd) they do not need to appear in
629  afw schema
630 
631  Parameters
632  ----------
633  objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
634  Catalog with DiaObject records
635  dt : `datetime.datetime`
636  Time of the visit
637  """
638 
639  if isinstance(objs, pandas.DataFrame):
640  ids = sorted(objs['diaObjectId'])
641  else:
642  ids = sorted([obj['id'] for obj in objs])
643  _LOG.debug("first object ID: %d", ids[0])
644 
645  # NOTE: workaround for sqlite, need this here to avoid
646  # "database is locked" error.
647  table = self._schema.objects
648 
649  # everything to be done in single transaction
650  with _ansi_session(self._engine) as conn:
651 
652  ids = ",".join(str(id) for id in ids)
653 
654  if self.config.dia_object_index == 'last_object_table':
655 
656  # insert and replace all records in LAST table, mysql and postgres have
657  # non-standard features (handled in _storeObjectsAfw)
658  table = self._schema.objects_last
659  do_replace = self.config.object_last_replace
660  # If the input data is of type Pandas, we drop the previous
661  # objects regardless of the do_replace setting due to how
662  # Pandas inserts objects.
663  if not do_replace or isinstance(objs, pandas.DataFrame):
664  query = 'DELETE FROM "' + table.name + '" '
665  query += 'WHERE "diaObjectId" IN (' + ids + ') '
666 
667  if self.config.explain:
668  # run the same query with explain
669  self._explain(query, conn)
670 
671  with Timer(table.name + ' delete', self.config.timer):
672  res = conn.execute(sql.text(query))
673  _LOG.debug("deleted %s objects", res.rowcount)
674 
675  extra_columns = dict(lastNonForcedSource=dt)
676  if isinstance(objs, pandas.DataFrame):
677  with Timer("DiaObjectLast insert", self.config.timer):
678  objs = _coerce_uint64(objs)
679  for col, data in extra_columns.items():
680  objs[col] = data
681  objs.to_sql("DiaObjectLast", conn, if_exists='append',
682  index=False)
683  else:
684  self._storeObjectsAfw(objs, conn, table, "DiaObjectLast",
685  replace=do_replace,
686  extra_columns=extra_columns)
687 
688  else:
689 
690  # truncate existing validity intervals
691  table = self._schema.objects
692  query = 'UPDATE "' + table.name + '" '
693  query += "SET \"validityEnd\" = '" + str(dt) + "' "
694  query += 'WHERE "diaObjectId" IN (' + ids + ') '
695  query += 'AND "validityEnd" IS NULL'
696 
697  # _LOG.debug("query: %s", query)
698 
699  if self.config.explain:
700  # run the same query with explain
701  self._explain(query, conn)
702 
703  with Timer(table.name + ' truncate', self.config.timer):
704  res = conn.execute(sql.text(query))
705  _LOG.debug("truncated %s intervals", res.rowcount)
706 
707  # insert new versions
708  if self.config.dia_object_nightly:
709  table = self._schema.objects_nightly
710  else:
711  table = self._schema.objects
712  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
713  validityEnd=None)
714  if isinstance(objs, pandas.DataFrame):
715  with Timer("DiaObject insert", self.config.timer):
716  objs = _coerce_uint64(objs)
717  for col, data in extra_columns.items():
718  objs[col] = data
719  objs.to_sql("DiaObject", conn, if_exists='append',
720  index=False)
721  else:
722  self._storeObjectsAfw(objs, conn, table, "DiaObject",
723  extra_columns=extra_columns)
724 

◆ storeDiaSources()

def lsst.dax.apdb.apdb.Apdb.storeDiaSources (   self,
  sources 
)
Store catalog of DIASources from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of L1 database table:

  - column names must correspond to database table columns
  - some columns names may be re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema

Parameters
----------
sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog containing DiaSource records

Definition at line 725 of file apdb.py.

725  def storeDiaSources(self, sources):
726  """Store catalog of DIASources from current visit.
727 
728  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
729  compatible with the schema of L1 database table:
730 
731  - column names must correspond to database table columns
732  - some columns names may be re-mapped based on column map passed to
733  constructor
734  - types and units of the columns must match database definitions,
735  no unit conversion is performed presently
736  - columns that have default values in database schema can be
737  omitted from afw schema
738 
739  Parameters
740  ----------
741  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
742  Catalog containing DiaSource records
743  """
744 
745  # everything to be done in single transaction
746  with _ansi_session(self._engine) as conn:
747 
748  if isinstance(sources, pandas.DataFrame):
749  with Timer("DiaSource insert", self.config.timer):
750  sources = _coerce_uint64(sources)
751  sources.to_sql("DiaSource", conn, if_exists='append',
752  index=False)
753  else:
754  table = self._schema.sources
755  self._storeObjectsAfw(sources, conn, table, "DiaSource")
756 

◆ tableRowCount()

def lsst.dax.apdb.apdb.Apdb.tableRowCount (   self)
Returns dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 327 of file apdb.py.

327  def tableRowCount(self):
328  """Returns dictionary with the table names and row counts.
329 
330  Used by ``ap_proto`` to keep track of the size of the database tables.
331  Depending on database technology this could be expensive operation.
332 
333  Returns
334  -------
335  row_counts : `dict`
336  Dict where key is a table name and value is a row count.
337  """
338  res = {}
339  tables = [self._schema.objects, self._schema.sources, self._schema.forcedSources]
340  if self.config.dia_object_index == 'last_object_table':
341  tables.append(self._schema.objects_last)
342  for table in tables:
343  stmt = sql.select([func.count()]).select_from(table)
344  count = self._engine.scalar(stmt)
345  res[table.name] = count
346 
347  return res
348 

Member Data Documentation

◆ config

lsst.dax.apdb.apdb.Apdb.config

Definition at line 234 of file apdb.py.


The documentation for this class was generated from the following file: