LSSTApplications  19.0.0,19.0.0+1,19.0.0+10,19.0.0+13,19.0.0+3,19.0.0+5,19.0.0+9,tickets.DM-22703-ga158cbef15,w.2019.51
LSSTDataManagementBasePackage
Public Member Functions | Public Attributes | List of all members
lsst.dax.ppdb.ppdb.Ppdb Class Reference
Inheritance diagram for lsst.dax.ppdb.ppdb.Ppdb:

Public Member Functions

def __init__ (self, config, afw_schemas=None)
 
def lastVisit (self)
 
def saveVisit (self, visitId, visitTime)
 
def tableRowCount (self)
 
def getDiaObjects (self, pixel_ranges, return_pandas=False)
 
def getDiaSourcesInRegion (self, pixel_ranges, dt, return_pandas=False)
 
def getDiaSources (self, object_ids, dt, return_pandas=False)
 
def getDiaForcedSources (self, object_ids, dt, return_pandas=False)
 
def storeDiaObjects (self, objs, dt)
 
def storeDiaSources (self, sources)
 
def storeDiaForcedSources (self, sources)
 
def countUnassociatedObjects (self)
 
def isVisitProcessed (self, visitInfo)
 
def dailyJob (self)
 
def makeSchema (self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
 

Public Attributes

 config
 

Detailed Description

Interface to L1 database, hides all database access details.

The implementation is configured via standard ``pex_config`` mechanism
using `PpdbConfig` configuration class. For an example of different
configurations check config/ folder.

Parameters
----------
config : `PpdbConfig`
afw_schemas : `dict`, optional
    Dictionary with table name for a key and `afw.table.Schema`
    for a value. Columns in schema will be added to standard
    PPDB schema.

Definition at line 210 of file ppdb.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.dax.ppdb.ppdb.Ppdb.__init__ (   self,
  config,
  afw_schemas = None 
)

Definition at line 226 of file ppdb.py.

226  def __init__(self, config, afw_schemas=None):
227 
228  self.config = config
229 
230  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
231  _LOG.debug("PPDB Configuration:")
232  _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
233  _LOG.debug(" dia_object_nightly: %s", self.config.dia_object_nightly)
234  _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
235  _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
236  _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
237  _LOG.debug(" object_last_replace: %s", self.config.object_last_replace)
238  _LOG.debug(" schema_file: %s", self.config.schema_file)
239  _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
240  _LOG.debug(" column_map: %s", self.config.column_map)
241  _LOG.debug(" schema prefix: %s", self.config.prefix)
242 
243  # engine is reused between multiple processes, make sure that we don't
244  # share connections by disabling pool (by using NullPool class)
245  kw = dict(echo=self.config.sql_echo)
246  conn_args = dict()
247  if not self.config.connection_pool:
248  kw.update(poolclass=NullPool)
249  if self.config.isolation_level is not None:
250  kw.update(isolation_level=self.config.isolation_level)
251  if self.config.connection_timeout is not None:
252  if self.config.db_url.startswith("sqlite"):
253  conn_args.update(timeout=self.config.connection_timeout)
254  elif self.config.db_url.startswith(("postgresql", "mysql")):
255  conn_args.update(connect_timeout=self.config.connection_timeout)
256  kw.update(connect_args=conn_args)
257  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
258 
259  self._schema = ppdbSchema.PpdbSchema(engine=self._engine,
260  dia_object_index=self.config.dia_object_index,
261  dia_object_nightly=self.config.dia_object_nightly,
262  schema_file=self.config.schema_file,
263  extra_schema_file=self.config.extra_schema_file,
264  column_map=self.config.column_map,
265  afw_schemas=afw_schemas,
266  prefix=self.config.prefix)
267 
def __init__(self, minimum, dataRange, Q)

Member Function Documentation

◆ countUnassociatedObjects()

def lsst.dax.ppdb.ppdb.Ppdb.countUnassociatedObjects (   self)
Return the number of DiaObjects that have only one DiaSource associated
with them.

Used as part of ap_verify metrics.

Returns
-------
count : `int`
    Number of DiaObjects with exactly one associated DiaSource.

Definition at line 779 of file ppdb.py.

779  def countUnassociatedObjects(self):
780  """Return the number of DiaObjects that have only one DiaSource associated
781  with them.
782 
783  Used as part of ap_verify metrics.
784 
785  Returns
786  -------
787  count : `int`
788  Number of DiaObjects with exactly one associated DiaSource.
789  """
790  # Retrieve the DiaObject table.
791  table = self._schema.objects
792 
793  # Construct the sql statement.
794  stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
795  stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
796 
797  # Return the count.
798  count = self._engine.scalar(stmt)
799 
800  return count
801 

◆ dailyJob()

def lsst.dax.ppdb.ppdb.Ppdb.dailyJob (   self)
Implement daily activities like cleanup/vacuum.

What should be done during daily cleanup is determined by
configuration/schema.

Definition at line 827 of file ppdb.py.

827  def dailyJob(self):
828  """Implement daily activities like cleanup/vacuum.
829 
830  What should be done during daily cleanup is determined by
831  configuration/schema.
832  """
833 
834  # move data from DiaObjectNightly into DiaObject
835  if self.config.dia_object_nightly:
836  with _ansi_session(self._engine) as conn:
837  query = 'INSERT INTO "' + self._schema.objects.name + '" '
838  query += 'SELECT * FROM "' + self._schema.objects_nightly.name + '"'
839  with Timer('DiaObjectNightly copy', self.config.timer):
840  conn.execute(sql.text(query))
841 
842  query = 'DELETE FROM "' + self._schema.objects_nightly.name + '"'
843  with Timer('DiaObjectNightly delete', self.config.timer):
844  conn.execute(sql.text(query))
845 
846  if self._engine.name == 'postgresql':
847 
848  # do VACUUM on all tables
849  _LOG.info("Running VACUUM on all tables")
850  connection = self._engine.raw_connection()
851  ISOLATION_LEVEL_AUTOCOMMIT = 0
852  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
853  cursor = connection.cursor()
854  cursor.execute("VACUUM ANALYSE")
855 

◆ getDiaForcedSources()

def lsst.dax.ppdb.ppdb.Ppdb.getDiaForcedSources (   self,
  object_ids,
  dt,
  return_pandas = False 
)
Returns catalog of DiaForcedSource instances matching given
DiaObjects.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of L1 database table. Re-mapping of the column names may
be done for some columns (based on column map passed to constructor)
but types or units are not changed.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog` or `None`
    Catalog contaning DiaForcedSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 545 of file ppdb.py.

545  def getDiaForcedSources(self, object_ids, dt, return_pandas=False):
546  """Returns catalog of DiaForcedSource instances matching given
547  DiaObjects.
548 
549  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
550  the schema of L1 database table. Re-mapping of the column names may
551  be done for some columns (based on column map passed to constructor)
552  but types or units are not changed.
553 
554  Parameters
555  ----------
556  object_ids :
557  Collection of DiaObject IDs
558  dt : `datetime.datetime`
559  Time of the current visit
560  return_pandas : `bool`
561  Return a `pandas.DataFrame` instead of
562  `lsst.afw.table.SourceCatalog`.
563 
564  Returns
565  -------
566  catalog : `lsst.afw.table.SourceCatalog` or `None`
567  Catalog contaning DiaForcedSource records. `None` is returned if
568  ``read_sources_months`` configuration parameter is set to 0 or
569  when ``object_ids`` is empty.
570  """
571 
572  if self.config.read_forced_sources_months == 0:
573  _LOG.info("Skip DiaForceSources fetching")
574  return None
575 
576  if not object_ids:
577  _LOG.info("Skip DiaForceSources fetching - no Objects")
578  # this should create a catalog, but the list of columns may be empty
579  return None
580 
581  table = self._schema.forcedSources
582  sources = None
583 
584  with Timer('DiaForcedSource select', self.config.timer):
585  with _ansi_session(self._engine) as conn:
586  for ids in _split(sorted(object_ids), 1000):
587 
588  query = 'SELECT * FROM "' + table.name + '" WHERE '
589 
590  # select by object id
591  ids = ",".join(str(id) for id in ids)
592  query += '"diaObjectId" IN (' + ids + ') '
593 
594  # execute select
595  if return_pandas:
596  df = pandas.read_sql_query(sql.text(query), conn)
597  if sources is None:
598  sources = df
599  else:
600  sources = sources.append(df)
601  else:
602  res = conn.execute(sql.text(query))
603  sources = self._convertResult(res, "DiaForcedSource", sources)
604 
605  _LOG.debug("found %s DiaForcedSources", len(sources))
606  return sources
607 

◆ getDiaObjects()

def lsst.dax.ppdb.ppdb.Ppdb.getDiaObjects (   self,
  pixel_ranges,
  return_pandas = False 
)
Returns catalog of DiaObject instances from given region.

Objects are searched based on pixelization index and region is
determined by the set of indices. There is no assumption on a
particular type of index, client is responsible for consistency
when calculating pixelization indices.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of PPDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types
or units are not changed.

Returns only the last version of each DiaObject.

Parameters
----------
pixel_ranges : `list` of `tuple`
    Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
    This defines set of pixel indices to be included in result.
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
    Catalog containing DiaObject records.

Definition at line 343 of file ppdb.py.

343  def getDiaObjects(self, pixel_ranges, return_pandas=False):
344  """Returns catalog of DiaObject instances from given region.
345 
346  Objects are searched based on pixelization index and region is
347  determined by the set of indices. There is no assumption on a
348  particular type of index, client is responsible for consistency
349  when calculating pixelization indices.
350 
351  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
352  the schema of PPDB table. Re-mapping of the column names is done for
353  some columns (based on column map passed to constructor) but types
354  or units are not changed.
355 
356  Returns only the last version of each DiaObject.
357 
358  Parameters
359  ----------
360  pixel_ranges : `list` of `tuple`
361  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
362  This defines set of pixel indices to be included in result.
363  return_pandas : `bool`
364  Return a `pandas.DataFrame` instead of
365  `lsst.afw.table.SourceCatalog`.
366 
367  Returns
368  -------
369  catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
370  Catalog containing DiaObject records.
371  """
372 
373  # decide what columns we need
374  if self.config.dia_object_index == 'last_object_table':
375  table = self._schema.objects_last
376  else:
377  table = self._schema.objects
378  if not self.config.dia_object_columns:
379  query = table.select()
380  else:
381  columns = [table.c[col] for col in self.config.dia_object_columns]
382  query = sql.select(columns)
383 
384  if self.config.diaobject_index_hint:
385  val = self.config.diaobject_index_hint
386  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
387  if self.config.dynamic_sampling_hint > 0:
388  val = self.config.dynamic_sampling_hint
389  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
390  if self.config.cardinality_hint > 0:
391  val = self.config.cardinality_hint
392  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
393 
394  # build selection
395  exprlist = []
396  for low, upper in pixel_ranges:
397  upper -= 1
398  if low == upper:
399  exprlist.append(table.c.pixelId == low)
400  else:
401  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
402  query = query.where(sql.expression.or_(*exprlist))
403 
404  # select latest version of objects
405  if self.config.dia_object_index != 'last_object_table':
406  query = query.where(table.c.validityEnd == None) # noqa: E711
407 
408  _LOG.debug("query: %s", query)
409 
410  if self.config.explain:
411  # run the same query with explain
412  self._explain(query, self._engine)
413 
414  # execute select
415  with Timer('DiaObject select', self.config.timer):
416  with self._engine.begin() as conn:
417  if return_pandas:
418  objects = pandas.read_sql_query(query, conn)
419  else:
420  res = conn.execute(query)
421  objects = self._convertResult(res, "DiaObject")
422  _LOG.debug("found %s DiaObjects", len(objects))
423  return objects
424 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174

◆ getDiaSources()

def lsst.dax.ppdb.ppdb.Ppdb.getDiaSources (   self,
  object_ids,
  dt,
  return_pandas = False 
)
Returns catalog of DiaSource instances given set of DiaObject IDs.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of PPDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types or
units are not changed.

Parameters
----------
object_ids :
    Collection of DiaObject IDs
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.


Returns
-------
catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
    Catalog contaning DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0 or
    when ``object_ids`` is empty.

Definition at line 484 of file ppdb.py.

484  def getDiaSources(self, object_ids, dt, return_pandas=False):
485  """Returns catalog of DiaSource instances given set of DiaObject IDs.
486 
487  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
488  the schema of PPDB table. Re-mapping of the column names is done for
489  some columns (based on column map passed to constructor) but types or
490  units are not changed.
491 
492  Parameters
493  ----------
494  object_ids :
495  Collection of DiaObject IDs
496  dt : `datetime.datetime`
497  Time of the current visit
498  return_pandas : `bool`
499  Return a `pandas.DataFrame` instead of
500  `lsst.afw.table.SourceCatalog`.
501 
502 
503  Returns
504  -------
505  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
506  Catalog contaning DiaSource records. `None` is returned if
507  ``read_sources_months`` configuration parameter is set to 0 or
508  when ``object_ids`` is empty.
509  """
510 
511  if self.config.read_sources_months == 0:
512  _LOG.info("Skip DiaSources fetching")
513  return None
514 
515  if len(object_ids) <= 0:
516  _LOG.info("Skip DiaSources fetching - no Objects")
517  # this should create a catalog, but the list of columns may be empty
518  return None
519 
520  table = self._schema.sources
521  sources = None
522  with Timer('DiaSource select', self.config.timer):
523  with _ansi_session(self._engine) as conn:
524  for ids in _split(sorted(object_ids), 1000):
525  query = 'SELECT * FROM "' + table.name + '" WHERE '
526 
527  # select by object id
528  ids = ",".join(str(id) for id in ids)
529  query += '"diaObjectId" IN (' + ids + ') '
530 
531  # execute select
532  if return_pandas:
533  df = pandas.read_sql_query(sql.text(query), conn)
534  if sources is None:
535  sources = df
536  else:
537  sources = sources.append(df)
538  else:
539  res = conn.execute(sql.text(query))
540  sources = self._convertResult(res, "DiaSource", sources)
541 
542  _LOG.debug("found %s DiaSources", len(sources))
543  return sources
544 

◆ getDiaSourcesInRegion()

def lsst.dax.ppdb.ppdb.Ppdb.getDiaSourcesInRegion (   self,
  pixel_ranges,
  dt,
  return_pandas = False 
)
Returns catalog of DiaSource instances from given region.

Sources are searched based on pixelization index and region is
determined by the set of indices. There is no assumption on a
particular type of index, client is responsible for consistency
when calculating pixelization indices.

This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
the schema of PPDB table. Re-mapping of the column names is done for
some columns (based on column map passed to constructor) but types or
units are not changed.

Parameters
----------
pixel_ranges : `list` of `tuple`
    Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
    This defines set of pixel indices to be included in result.
dt : `datetime.datetime`
    Time of the current visit
return_pandas : `bool`
    Return a `pandas.DataFrame` instead of
    `lsst.afw.table.SourceCatalog`.

Returns
-------
catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
    Catalog containing DiaSource records. `None` is returned if
    ``read_sources_months`` configuration parameter is set to 0.

Definition at line 425 of file ppdb.py.

425  def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False):
426  """Returns catalog of DiaSource instances from given region.
427 
428  Sources are searched based on pixelization index and region is
429  determined by the set of indices. There is no assumption on a
430  particular type of index, client is responsible for consistency
431  when calculating pixelization indices.
432 
433  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
434  the schema of PPDB table. Re-mapping of the column names is done for
435  some columns (based on column map passed to constructor) but types or
436  units are not changed.
437 
438  Parameters
439  ----------
440  pixel_ranges : `list` of `tuple`
441  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
442  This defines set of pixel indices to be included in result.
443  dt : `datetime.datetime`
444  Time of the current visit
445  return_pandas : `bool`
446  Return a `pandas.DataFrame` instead of
447  `lsst.afw.table.SourceCatalog`.
448 
449  Returns
450  -------
451  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
452  Catalog containing DiaSource records. `None` is returned if
453  ``read_sources_months`` configuration parameter is set to 0.
454  """
455 
456  if self.config.read_sources_months == 0:
457  _LOG.info("Skip DiaSources fetching")
458  return None
459 
460  table = self._schema.sources
461  query = table.select()
462 
463  # build selection
464  exprlist = []
465  for low, upper in pixel_ranges:
466  upper -= 1
467  if low == upper:
468  exprlist.append(table.c.pixelId == low)
469  else:
470  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
471  query = query.where(sql.expression.or_(*exprlist))
472 
473  # execute select
474  with Timer('DiaSource select', self.config.timer):
475  with _ansi_session(self._engine) as conn:
476  if return_pandas:
477  sources = pandas.read_sql_query(query, conn)
478  else:
479  res = conn.execute(query)
480  sources = self._convertResult(res, "DiaSource")
481  _LOG.debug("found %s DiaSources", len(sources))
482  return sources
483 

◆ isVisitProcessed()

def lsst.dax.ppdb.ppdb.Ppdb.isVisitProcessed (   self,
  visitInfo 
)
Test whether data from an image has been loaded into the database.

Used as part of ap_verify metrics.

Parameters
----------
visitInfo : `lsst.afw.image.VisitInfo`
    The metadata for the image of interest.

Returns
-------
isProcessed : `bool`
    `True` if the data are present, `False` otherwise.

Definition at line 802 of file ppdb.py.

802  def isVisitProcessed(self, visitInfo):
803  """Test whether data from an image has been loaded into the database.
804 
805  Used as part of ap_verify metrics.
806 
807  Parameters
808  ----------
809  visitInfo : `lsst.afw.image.VisitInfo`
810  The metadata for the image of interest.
811 
812  Returns
813  -------
814  isProcessed : `bool`
815  `True` if the data are present, `False` otherwise.
816  """
817  id = visitInfo.getExposureId()
818  table = self._schema.sources
819  idField = table.c.ccdVisitId
820 
821  # Hopefully faster than SELECT DISTINCT
822  query = sql.select([idField]).select_from(table) \
823  .where(idField == id).limit(1)
824 
825  return self._engine.scalar(query) is not None
826 

◆ lastVisit()

def lsst.dax.ppdb.ppdb.Ppdb.lastVisit (   self)
Returns last visit information or `None` if visits table is empty.

Visits table is used by ap_proto to track visit information, it is
not a part of the regular PPDB schema.

Returns
-------
visit : `Visit` or `None`
    Last stored visit info or `None` if there was nothing stored yet.

Definition at line 268 of file ppdb.py.

268  def lastVisit(self):
269  """Returns last visit information or `None` if visits table is empty.
270 
271  Visits table is used by ap_proto to track visit information, it is
272  not a part of the regular PPDB schema.
273 
274  Returns
275  -------
276  visit : `Visit` or `None`
277  Last stored visit info or `None` if there was nothing stored yet.
278  """
279 
280  with self._engine.begin() as conn:
281 
282  stmnt = sql.select([sql.func.max(self._schema.visits.c.visitId),
283  sql.func.max(self._schema.visits.c.visitTime)])
284  res = conn.execute(stmnt)
285  row = res.fetchone()
286  if row[0] is None:
287  return None
288 
289  visitId = row[0]
290  visitTime = row[1]
291  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
292  visitTime, type(visitTime))
293 
294  # get max IDs from corresponding tables
295  stmnt = sql.select([sql.func.max(self._schema.objects.c.diaObjectId)])
296  lastObjectId = conn.scalar(stmnt)
297  stmnt = sql.select([sql.func.max(self._schema.sources.c.diaSourceId)])
298  lastSourceId = conn.scalar(stmnt)
299 
300  return Visit(visitId=visitId, visitTime=visitTime,
301  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
302 
table::Key< int > type
Definition: Detector.cc:163

◆ makeSchema()

def lsst.dax.ppdb.ppdb.Ppdb.makeSchema (   self,
  drop = False,
  mysql_engine = 'InnoDB',
  oracle_tablespace = None,
  oracle_iot = False 
)
Create or re-create all tables.

Parameters
----------
drop : `bool`
    If True then drop tables before creating new ones.
mysql_engine : `str`, optional
    Name of the MySQL engine to use for new tables.
oracle_tablespace : `str`, optional
    Name of Oracle tablespace.
oracle_iot : `bool`, optional
    Make Index-organized DiaObjectLast table.

Definition at line 856 of file ppdb.py.

856  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
857  """Create or re-create all tables.
858 
859  Parameters
860  ----------
861  drop : `bool`
862  If True then drop tables before creating new ones.
863  mysql_engine : `str`, optional
864  Name of the MySQL engine to use for new tables.
865  oracle_tablespace : `str`, optional
866  Name of Oracle tablespace.
867  oracle_iot : `bool`, optional
868  Make Index-organized DiaObjectLast table.
869  """
870  self._schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
871  oracle_tablespace=oracle_tablespace,
872  oracle_iot=oracle_iot)
873 

◆ saveVisit()

def lsst.dax.ppdb.ppdb.Ppdb.saveVisit (   self,
  visitId,
  visitTime 
)
Store visit information.

This method is only used by ``ap_proto`` script from ``l1dbproto``
and is not intended for production pipelines.

Parameters
----------
visitId : `int`
    Visit identifier
visitTime : `datetime.datetime`
    Visit timestamp.

Definition at line 303 of file ppdb.py.

303  def saveVisit(self, visitId, visitTime):
304  """Store visit information.
305 
306  This method is only used by ``ap_proto`` script from ``l1dbproto``
307  and is not intended for production pipelines.
308 
309  Parameters
310  ----------
311  visitId : `int`
312  Visit identifier
313  visitTime : `datetime.datetime`
314  Visit timestamp.
315  """
316 
317  ins = self._schema.visits.insert().values(visitId=visitId,
318  visitTime=visitTime)
319  self._engine.execute(ins)
320 

◆ storeDiaForcedSources()

def lsst.dax.ppdb.ppdb.Ppdb.storeDiaForcedSources (   self,
  sources 
)
Store a set of DIAForcedSources from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of L1 database table:

  - column names must correspond to database table columns
  - some columns names may be re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema

Parameters
----------
sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog containing DiaForcedSource records

Definition at line 748 of file ppdb.py.

748  def storeDiaForcedSources(self, sources):
749  """Store a set of DIAForcedSources from current visit.
750 
751  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
752  compatible with the schema of L1 database table:
753 
754  - column names must correspond to database table columns
755  - some columns names may be re-mapped based on column map passed to
756  constructor
757  - types and units of the columns must match database definitions,
758  no unit conversion is performed presently
759  - columns that have default values in database schema can be
760  omitted from afw schema
761 
762  Parameters
763  ----------
764  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
765  Catalog containing DiaForcedSource records
766  """
767 
768  # everything to be done in single transaction
769  with _ansi_session(self._engine) as conn:
770 
771  if isinstance(sources, pandas.DataFrame):
772  with Timer("DiaForcedSource insert", self.config.timer):
773  sources.to_sql("DiaForcedSource", conn, if_exists='append',
774  index=False)
775  else:
776  table = self._schema.forcedSources
777  self._storeObjectsAfw(sources, conn, table, "DiaForcedSource")
778 

◆ storeDiaObjects()

def lsst.dax.ppdb.ppdb.Ppdb.storeDiaObjects (   self,
  objs,
  dt 
)
Store catalog of DiaObjects from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of PPDB table:

  - column names must correspond to database table columns
  - some columns names are re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema
  - this method knows how to fill interval-related columns
    (validityStart, validityEnd) they do not need to appear in
    afw schema

Parameters
----------
objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog with DiaObject records
dt : `datetime.datetime`
    Time of the visit

Definition at line 608 of file ppdb.py.

608  def storeDiaObjects(self, objs, dt):
609  """Store catalog of DiaObjects from current visit.
610 
611  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
612  compatible with the schema of PPDB table:
613 
614  - column names must correspond to database table columns
615  - some columns names are re-mapped based on column map passed to
616  constructor
617  - types and units of the columns must match database definitions,
618  no unit conversion is performed presently
619  - columns that have default values in database schema can be
620  omitted from afw schema
621  - this method knows how to fill interval-related columns
622  (validityStart, validityEnd) they do not need to appear in
623  afw schema
624 
625  Parameters
626  ----------
627  objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
628  Catalog with DiaObject records
629  dt : `datetime.datetime`
630  Time of the visit
631  """
632 
633  if isinstance(objs, pandas.DataFrame):
634  ids = sorted(objs['diaObjectId'])
635  else:
636  ids = sorted([obj['id'] for obj in objs])
637  _LOG.debug("first object ID: %d", ids[0])
638 
639  # NOTE: workaround for sqlite, need this here to avoid
640  # "database is locked" error.
641  table = self._schema.objects
642 
643  # everything to be done in single transaction
644  with _ansi_session(self._engine) as conn:
645 
646  ids = ",".join(str(id) for id in ids)
647 
648  if self.config.dia_object_index == 'last_object_table':
649 
650  # insert and replace all records in LAST table, mysql and postgres have
651  # non-standard features (handled in _storeObjectsAfw)
652  table = self._schema.objects_last
653  do_replace = self.config.object_last_replace
654  # If the input data is of type Pandas, we drop the previous
655  # objects regardless of the do_replace setting due to how
656  # Pandas inserts objects.
657  if not do_replace or isinstance(objs, pandas.DataFrame):
658  query = 'DELETE FROM "' + table.name + '" '
659  query += 'WHERE "diaObjectId" IN (' + ids + ') '
660 
661  if self.config.explain:
662  # run the same query with explain
663  self._explain(query, conn)
664 
665  with Timer(table.name + ' delete', self.config.timer):
666  res = conn.execute(sql.text(query))
667  _LOG.debug("deleted %s objects", res.rowcount)
668 
669  extra_columns = dict(lastNonForcedSource=dt)
670  if isinstance(objs, pandas.DataFrame):
671  with Timer("DiaObjectLast insert", self.config.timer):
672  for col, data in extra_columns.items():
673  objs[col] = data
674  objs.to_sql("DiaObjectLast", conn, if_exists='append',
675  index=False)
676  else:
677  self._storeObjectsAfw(objs, conn, table, "DiaObjectLast",
678  replace=do_replace,
679  extra_columns=extra_columns)
680 
681  else:
682 
683  # truncate existing validity intervals
684  table = self._schema.objects
685  query = 'UPDATE "' + table.name + '" '
686  query += "SET \"validityEnd\" = '" + str(dt) + "' "
687  query += 'WHERE "diaObjectId" IN (' + ids + ') '
688  query += 'AND "validityEnd" IS NULL'
689 
690  # _LOG.debug("query: %s", query)
691 
692  if self.config.explain:
693  # run the same query with explain
694  self._explain(query, conn)
695 
696  with Timer(table.name + ' truncate', self.config.timer):
697  res = conn.execute(sql.text(query))
698  _LOG.debug("truncated %s intervals", res.rowcount)
699 
700  # insert new versions
701  if self.config.dia_object_nightly:
702  table = self._schema.objects_nightly
703  else:
704  table = self._schema.objects
705  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
706  validityEnd=None)
707  if isinstance(objs, pandas.DataFrame):
708  with Timer("DiaObject insert", self.config.timer):
709  for col, data in extra_columns.items():
710  objs[col] = data
711  objs.to_sql("DiaObject", conn, if_exists='append',
712  index=False)
713  else:
714  self._storeObjectsAfw(objs, conn, table, "DiaObject",
715  extra_columns=extra_columns)
716 

◆ storeDiaSources()

def lsst.dax.ppdb.ppdb.Ppdb.storeDiaSources (   self,
  sources 
)
Store catalog of DIASources from current visit.

This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
compatible with the schema of L1 database table:

  - column names must correspond to database table columns
  - some columns names may be re-mapped based on column map passed to
    constructor
  - types and units of the columns must match database definitions,
    no unit conversion is performed presently
  - columns that have default values in database schema can be
    omitted from afw schema

Parameters
----------
sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
    Catalog containing DiaSource records

Definition at line 717 of file ppdb.py.

717  def storeDiaSources(self, sources):
718  """Store catalog of DIASources from current visit.
719 
720  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
721  compatible with the schema of L1 database table:
722 
723  - column names must correspond to database table columns
724  - some columns names may be re-mapped based on column map passed to
725  constructor
726  - types and units of the columns must match database definitions,
727  no unit conversion is performed presently
728  - columns that have default values in database schema can be
729  omitted from afw schema
730 
731  Parameters
732  ----------
733  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
734  Catalog containing DiaSource records
735  """
736 
737  # everything to be done in single transaction
738  with _ansi_session(self._engine) as conn:
739 
740  if isinstance(sources, pandas.DataFrame):
741  with Timer("DiaSource insert", self.config.timer):
742  sources.to_sql("DiaSource", conn, if_exists='append',
743  index=False)
744  else:
745  table = self._schema.sources
746  self._storeObjectsAfw(sources, conn, table, "DiaSource")
747 

◆ tableRowCount()

def lsst.dax.ppdb.ppdb.Ppdb.tableRowCount (   self)
Returns dictionary with the table names and row counts.

Used by ``ap_proto`` to keep track of the size of the database tables.
Depending on database technology this could be expensive operation.

Returns
-------
row_counts : `dict`
    Dict where key is a table name and value is a row count.

Definition at line 321 of file ppdb.py.

321  def tableRowCount(self):
322  """Returns dictionary with the table names and row counts.
323 
324  Used by ``ap_proto`` to keep track of the size of the database tables.
325  Depending on database technology this could be expensive operation.
326 
327  Returns
328  -------
329  row_counts : `dict`
330  Dict where key is a table name and value is a row count.
331  """
332  res = {}
333  tables = [self._schema.objects, self._schema.sources, self._schema.forcedSources]
334  if self.config.dia_object_index == 'last_object_table':
335  tables.append(self._schema.objects_last)
336  for table in tables:
337  stmt = sql.select([func.count()]).select_from(table)
338  count = self._engine.scalar(stmt)
339  res[table.name] = count
340 
341  return res
342 

Member Data Documentation

◆ config

lsst.dax.ppdb.ppdb.Ppdb.config

Definition at line 228 of file ppdb.py.


The documentation for this class was generated from the following file: