LSSTApplications  18.1.0
LSSTDataManagementBasePackage
ppdb.py
Go to the documentation of this file.
1 # This file is part of dax_ppdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining Ppdb class and related methods.
23 """
24 
25 __all__ = ["PpdbConfig", "Ppdb", "Visit"]
26 
27 from collections import namedtuple
28 from contextlib import contextmanager
29 from datetime import datetime
30 import logging
31 import numpy as np
32 import os
33 import pandas
34 
35 import lsst.geom as geom
36 import lsst.afw.table as afwTable
37 import lsst.pex.config as pexConfig
38 from lsst.pex.config import Field, ChoiceField, ListField
39 from lsst.utils import getPackageDir
40 import sqlalchemy
41 from sqlalchemy import (func, sql)
42 from sqlalchemy.pool import NullPool
43 from . import timer, ppdbSchema
44 
45 
46 _LOG = logging.getLogger(__name__.partition(".")[2]) # strip leading "lsst."
47 
48 
49 class Timer(object):
50  """Timer class defining context manager which tracks execution timing.
51 
52  Typical use:
53 
54  with Timer("timer_name"):
55  do_something
56 
57  On exit from block it will print elapsed time.
58 
59  See also :py:mod:`timer` module.
60  """
61  def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
62  self._log_before_cursor_execute = log_before_cursor_execute
63  self._do_logging = do_logging
64  self._timer1 = timer.Timer(name)
65  self._timer2 = timer.Timer(name + " (before/after cursor)")
66 
67  def __enter__(self):
68  """
69  Enter context, start timer
70  """
71 # event.listen(engine.Engine, "before_cursor_execute", self._start_timer)
72 # event.listen(engine.Engine, "after_cursor_execute", self._stop_timer)
73  self._timer1.start()
74  return self
75 
76  def __exit__(self, exc_type, exc_val, exc_tb):
77  """
78  Exit context, stop and dump timer
79  """
80  if exc_type is None:
81  self._timer1.stop()
82  if self._do_logging:
83  self._timer1.dump()
84 # event.remove(engine.Engine, "before_cursor_execute", self._start_timer)
85 # event.remove(engine.Engine, "after_cursor_execute", self._stop_timer)
86  return False
87 
88  def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
89  """Start counting"""
91  _LOG.info("before_cursor_execute")
92  self._timer2.start()
93 
94  def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
95  """Stop counting"""
96  self._timer2.stop()
97  if self._do_logging:
98  self._timer2.dump()
99 
100 
101 def _split(seq, nItems):
102  """Split a sequence into smaller sequences"""
103  seq = list(seq)
104  while seq:
105  yield seq[:nItems]
106  del seq[:nItems]
107 
108 
109 # Information about single visit
110 Visit = namedtuple('Visit', 'visitId visitTime lastObjectId lastSourceId')
111 
112 
113 @contextmanager
114 def _ansi_session(engine):
115  """Returns a connection, makes sure that ANSI mode is set for MySQL
116  """
117  with engine.begin() as conn:
118  if engine.name == 'mysql':
119  conn.execute(sql.text("SET SESSION SQL_MODE = 'ANSI'"))
120  yield conn
121  return
122 
123 
124 def _data_file_name(basename):
125  """Return path name of a data file.
126  """
127  return os.path.join(getPackageDir("dax_ppdb"), "data", basename)
128 
129 
130 class PpdbConfig(pexConfig.Config):
131 
132  db_url = Field(dtype=str, doc="SQLAlchemy database connection URI")
133  isolation_level = ChoiceField(dtype=str,
134  doc="Transaction isolation level",
135  allowed={"READ_COMMITTED": "Read committed",
136  "READ_UNCOMMITTED": "Read uncommitted",
137  "REPEATABLE_READ": "Repeatable read",
138  "SERIALIZABLE": "Serializable"},
139  default="READ_COMMITTED",
140  optional=True)
141  connection_pool = Field(dtype=bool,
142  doc=("If False then disable SQLAlchemy connection pool. "
143  "Do not use connection pool when forking."),
144  default=True)
145  connection_timeout = Field(dtype=float,
146  doc="Maximum time to wait time for database lock to be released before "
147  "exiting. Defaults to sqlachemy defaults if not set.",
148  default=None,
149  optional=True)
150  sql_echo = Field(dtype=bool,
151  doc="If True then pass SQLAlchemy echo option.",
152  default=False)
153  dia_object_index = ChoiceField(dtype=str,
154  doc="Indexing mode for DiaObject table",
155  allowed={'baseline': "Index defined in baseline schema",
156  'pix_id_iov': "(pixelId, objectId, iovStart) PK",
157  'last_object_table': "Separate DiaObjectLast table"},
158  default='baseline')
159  dia_object_nightly = Field(dtype=bool,
160  doc="Use separate nightly table for DiaObject",
161  default=False)
162  read_sources_months = Field(dtype=int,
163  doc="Number of months of history to read from DiaSource",
164  default=12)
165  read_forced_sources_months = Field(dtype=int,
166  doc="Number of months of history to read from DiaForcedSource",
167  default=12)
168  dia_object_columns = ListField(dtype=str,
169  doc="List of columns to read from DiaObject, by default read all columns",
170  default=[])
171  object_last_replace = Field(dtype=bool,
172  doc="If True (default) then use \"upsert\" for DiaObjectsLast table",
173  default=True)
174  schema_file = Field(dtype=str,
175  doc="Location of (YAML) configuration file with standard schema",
176  default=_data_file_name("ppdb-schema.yaml"))
177  extra_schema_file = Field(dtype=str,
178  doc="Location of (YAML) configuration file with extra schema",
179  default=_data_file_name("ppdb-schema-extra.yaml"))
180  column_map = Field(dtype=str,
181  doc="Location of (YAML) configuration file with column mapping",
182  default=_data_file_name("ppdb-afw-map.yaml"))
183  prefix = Field(dtype=str,
184  doc="Prefix to add to table names and index names",
185  default="")
186  explain = Field(dtype=bool,
187  doc="If True then run EXPLAIN SQL command on each executed query",
188  default=False)
189  timer = Field(dtype=bool,
190  doc="If True then print/log timing information",
191  default=False)
192  diaobject_index_hint = Field(dtype=str,
193  doc="Name of the index to use with Oracle index hint",
194  default=None,
195  optional=True)
196  dynamic_sampling_hint = Field(dtype=int,
197  doc="If non-zero then use dynamic_sampling hint",
198  default=0)
199  cardinality_hint = Field(dtype=int,
200  doc="If non-zero then use cardinality hint",
201  default=0)
202 
203  def validate(self):
204  super().validate()
205  if self.isolation_level == "READ_COMMITTED" and self.db_url.startswith("sqlite"):
206  raise ValueError("Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' "
207  "Use 'READ_UNCOMMITTED' instead.")
208 
209 
210 class Ppdb(object):
211  """Interface to L1 database, hides all database access details.
212 
213  The implementation is configured via standard ``pex_config`` mechanism
214  using `PpdbConfig` configuration class. For an example of different
215  configurations check config/ folder.
216 
217  Parameters
218  ----------
219  config : `PpdbConfig`
220  afw_schemas : `dict`, optional
221  Dictionary with table name for a key and `afw.table.Schema`
222  for a value. Columns in schema will be added to standard
223  PPDB schema.
224  """
225 
226  def __init__(self, config, afw_schemas=None):
227 
228  self.config = config
229 
230  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
231  _LOG.debug("PPDB Configuration:")
232  _LOG.debug(" dia_object_index: %s", self.config.dia_object_index)
233  _LOG.debug(" dia_object_nightly: %s", self.config.dia_object_nightly)
234  _LOG.debug(" read_sources_months: %s", self.config.read_sources_months)
235  _LOG.debug(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
236  _LOG.debug(" dia_object_columns: %s", self.config.dia_object_columns)
237  _LOG.debug(" object_last_replace: %s", self.config.object_last_replace)
238  _LOG.debug(" schema_file: %s", self.config.schema_file)
239  _LOG.debug(" extra_schema_file: %s", self.config.extra_schema_file)
240  _LOG.debug(" column_map: %s", self.config.column_map)
241  _LOG.debug(" schema prefix: %s", self.config.prefix)
242 
243  # engine is reused between multiple processes, make sure that we don't
244  # share connections by disabling pool (by using NullPool class)
245  kw = dict(echo=self.config.sql_echo)
246  conn_args = dict()
247  if not self.config.connection_pool:
248  kw.update(poolclass=NullPool)
249  if self.config.isolation_level is not None:
250  kw.update(isolation_level=self.config.isolation_level)
251  if self.config.connection_timeout is not None:
252  if self.config.db_url.startswith("sqlite"):
253  conn_args.update(timeout=self.config.connection_timeout)
254  elif self.config.db_url.startswith(("postgresql", "mysql")):
255  conn_args.update(connect_timeout=self.config.connection_timeout)
256  kw.update(connect_args=conn_args)
257  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
258 
259  self._schema = ppdbSchema.PpdbSchema(engine=self._engine,
260  dia_object_index=self.config.dia_object_index,
261  dia_object_nightly=self.config.dia_object_nightly,
262  schema_file=self.config.schema_file,
263  extra_schema_file=self.config.extra_schema_file,
264  column_map=self.config.column_map,
265  afw_schemas=afw_schemas,
266  prefix=self.config.prefix)
267 
268  def lastVisit(self):
269  """Returns last visit information or `None` if visits table is empty.
270 
271  Visits table is used by ap_proto to track visit information, it is
272  not a part of the regular PPDB schema.
273 
274  Returns
275  -------
276  visit : `Visit` or `None`
277  Last stored visit info or `None` if there was nothing stored yet.
278  """
279 
280  with self._engine.begin() as conn:
281 
282  stmnt = sql.select([sql.func.max(self._schema.visits.c.visitId),
283  sql.func.max(self._schema.visits.c.visitTime)])
284  res = conn.execute(stmnt)
285  row = res.fetchone()
286  if row[0] is None:
287  return None
288 
289  visitId = row[0]
290  visitTime = row[1]
291  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
292  visitTime, type(visitTime))
293 
294  # get max IDs from corresponding tables
295  stmnt = sql.select([sql.func.max(self._schema.objects.c.diaObjectId)])
296  lastObjectId = conn.scalar(stmnt)
297  stmnt = sql.select([sql.func.max(self._schema.sources.c.diaSourceId)])
298  lastSourceId = conn.scalar(stmnt)
299 
300  return Visit(visitId=visitId, visitTime=visitTime,
301  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
302 
303  def saveVisit(self, visitId, visitTime):
304  """Store visit information.
305 
306  This method is only used by ``ap_proto`` script from ``l1dbproto``
307  and is not intended for production pipelines.
308 
309  Parameters
310  ----------
311  visitId : `int`
312  Visit identifier
313  visitTime : `datetime.datetime`
314  Visit timestamp.
315  """
316 
317  ins = self._schema.visits.insert().values(visitId=visitId,
318  visitTime=visitTime)
319  self._engine.execute(ins)
320 
321  def tableRowCount(self):
322  """Returns dictionary with the table names and row counts.
323 
324  Used by ``ap_proto`` to keep track of the size of the database tables.
325  Depending on database technology this could be expensive operation.
326 
327  Returns
328  -------
329  row_counts : `dict`
330  Dict where key is a table name and value is a row count.
331  """
332  res = {}
333  tables = [self._schema.objects, self._schema.sources, self._schema.forcedSources]
334  if self.config.dia_object_index == 'last_object_table':
335  tables.append(self._schema.objects_last)
336  for table in tables:
337  stmt = sql.select([func.count()]).select_from(table)
338  count = self._engine.scalar(stmt)
339  res[table.name] = count
340 
341  return res
342 
343  def getDiaObjects(self, pixel_ranges, return_pandas=False):
344  """Returns catalog of DiaObject instances from given region.
345 
346  Objects are searched based on pixelization index and region is
347  determined by the set of indices. There is no assumption on a
348  particular type of index, client is responsible for consistency
349  when calculating pixelization indices.
350 
351  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
352  the schema of PPDB table. Re-mapping of the column names is done for
353  some columns (based on column map passed to constructor) but types
354  or units are not changed.
355 
356  Returns only the last version of each DiaObject.
357 
358  Parameters
359  ----------
360  pixel_ranges : `list` of `tuple`
361  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
362  This defines set of pixel indices to be included in result.
363  return_pandas : `bool`
364  Return a `pandas.DataFrame` instead of
365  `lsst.afw.table.SourceCatalog`.
366 
367  Returns
368  -------
369  catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
370  Catalog containing DiaObject records.
371  """
372 
373  # decide what columns we need
374  if self.config.dia_object_index == 'last_object_table':
375  table = self._schema.objects_last
376  else:
377  table = self._schema.objects
378  if not self.config.dia_object_columns:
379  query = table.select()
380  else:
381  columns = [table.c[col] for col in self.config.dia_object_columns]
382  query = sql.select(columns)
383 
384  if self.config.diaobject_index_hint:
385  val = self.config.diaobject_index_hint
386  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
387  if self.config.dynamic_sampling_hint > 0:
388  val = self.config.dynamic_sampling_hint
389  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
390  if self.config.cardinality_hint > 0:
391  val = self.config.cardinality_hint
392  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
393 
394  # build selection
395  exprlist = []
396  for low, upper in pixel_ranges:
397  upper -= 1
398  if low == upper:
399  exprlist.append(table.c.pixelId == low)
400  else:
401  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
402  query = query.where(sql.expression.or_(*exprlist))
403 
404  # select latest version of objects
405  if self.config.dia_object_index != 'last_object_table':
406  query = query.where(table.c.validityEnd == None) # noqa: E711
407 
408  _LOG.debug("query: %s", query)
409 
410  if self.config.explain:
411  # run the same query with explain
412  self._explain(query, self._engine)
413 
414  # execute select
415  with Timer('DiaObject select', self.config.timer):
416  with self._engine.begin() as conn:
417  if return_pandas:
418  objects = pandas.read_sql_query(query, conn)
419  else:
420  res = conn.execute(query)
421  objects = self._convertResult(res, "DiaObject")
422  _LOG.debug("found %s DiaObjects", len(objects))
423  return objects
424 
425  def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False):
426  """Returns catalog of DiaSource instances from given region.
427 
428  Sources are searched based on pixelization index and region is
429  determined by the set of indices. There is no assumption on a
430  particular type of index, client is responsible for consistency
431  when calculating pixelization indices.
432 
433  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
434  the schema of PPDB table. Re-mapping of the column names is done for
435  some columns (based on column map passed to constructor) but types or
436  units are not changed.
437 
438  Parameters
439  ----------
440  pixel_ranges : `list` of `tuple`
441  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
442  This defines set of pixel indices to be included in result.
443  dt : `datetime.datetime`
444  Time of the current visit
445  return_pandas : `bool`
446  Return a `pandas.DataFrame` instead of
447  `lsst.afw.table.SourceCatalog`.
448 
449  Returns
450  -------
451  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
452  Catalog containing DiaSource records. `None` is returned if
453  ``read_sources_months`` configuration parameter is set to 0.
454  """
455 
456  if self.config.read_sources_months == 0:
457  _LOG.info("Skip DiaSources fetching")
458  return None
459 
460  table = self._schema.sources
461  query = table.select()
462 
463  # build selection
464  exprlist = []
465  for low, upper in pixel_ranges:
466  upper -= 1
467  if low == upper:
468  exprlist.append(table.c.pixelId == low)
469  else:
470  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
471  query = query.where(sql.expression.or_(*exprlist))
472 
473  # execute select
474  with Timer('DiaSource select', self.config.timer):
475  with _ansi_session(self._engine) as conn:
476  if return_pandas:
477  sources = pandas.read_sql_query(query, conn)
478  else:
479  res = conn.execute(query)
480  sources = self._convertResult(res, "DiaSource")
481  _LOG.debug("found %s DiaSources", len(sources))
482  return sources
483 
484  def getDiaSources(self, object_ids, dt, return_pandas=False):
485  """Returns catalog of DiaSource instances given set of DiaObject IDs.
486 
487  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
488  the schema of PPDB table. Re-mapping of the column names is done for
489  some columns (based on column map passed to constructor) but types or
490  units are not changed.
491 
492  Parameters
493  ----------
494  object_ids :
495  Collection of DiaObject IDs
496  dt : `datetime.datetime`
497  Time of the current visit
498  return_pandas : `bool`
499  Return a `pandas.DataFrame` instead of
500  `lsst.afw.table.SourceCatalog`.
501 
502 
503  Returns
504  -------
505  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
506  Catalog contaning DiaSource records. `None` is returned if
507  ``read_sources_months`` configuration parameter is set to 0 or
508  when ``object_ids`` is empty.
509  """
510 
511  if self.config.read_sources_months == 0:
512  _LOG.info("Skip DiaSources fetching")
513  return None
514 
515  if len(object_ids) <= 0:
516  _LOG.info("Skip DiaSources fetching - no Objects")
517  # this should create a catalog, but the list of columns may be empty
518  return None
519 
520  table = self._schema.sources
521  sources = None
522  with Timer('DiaSource select', self.config.timer):
523  with _ansi_session(self._engine) as conn:
524  for ids in _split(sorted(object_ids), 1000):
525  query = 'SELECT * FROM "' + table.name + '" WHERE '
526 
527  # select by object id
528  ids = ",".join(str(id) for id in ids)
529  query += '"diaObjectId" IN (' + ids + ') '
530 
531  # execute select
532  if return_pandas:
533  df = pandas.read_sql_query(sql.text(query), conn)
534  if sources is None:
535  sources = df
536  else:
537  sources = sources.append(df)
538  else:
539  res = conn.execute(sql.text(query))
540  sources = self._convertResult(res, "DiaSource", sources)
541 
542  _LOG.debug("found %s DiaSources", len(sources))
543  return sources
544 
545  def getDiaForcedSources(self, object_ids, dt, return_pandas=False):
546  """Returns catalog of DiaForcedSource instances matching given
547  DiaObjects.
548 
549  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
550  the schema of L1 database table. Re-mapping of the column names may
551  be done for some columns (based on column map passed to constructor)
552  but types or units are not changed.
553 
554  Parameters
555  ----------
556  object_ids :
557  Collection of DiaObject IDs
558  dt : `datetime.datetime`
559  Time of the current visit
560  return_pandas : `bool`
561  Return a `pandas.DataFrame` instead of
562  `lsst.afw.table.SourceCatalog`.
563 
564  Returns
565  -------
566  catalog : `lsst.afw.table.SourceCatalog` or `None`
567  Catalog contaning DiaForcedSource records. `None` is returned if
568  ``read_sources_months`` configuration parameter is set to 0 or
569  when ``object_ids`` is empty.
570  """
571 
572  if self.config.read_forced_sources_months == 0:
573  _LOG.info("Skip DiaForceSources fetching")
574  return None
575 
576  if not object_ids:
577  _LOG.info("Skip DiaForceSources fetching - no Objects")
578  # this should create a catalog, but the list of columns may be empty
579  return None
580 
581  table = self._schema.forcedSources
582  sources = None
583 
584  with Timer('DiaForcedSource select', self.config.timer):
585  with _ansi_session(self._engine) as conn:
586  for ids in _split(sorted(object_ids), 1000):
587 
588  query = 'SELECT * FROM "' + table.name + '" WHERE '
589 
590  # select by object id
591  ids = ",".join(str(id) for id in ids)
592  query += '"diaObjectId" IN (' + ids + ') '
593 
594  # execute select
595  if return_pandas:
596  df = pandas.read_sql_query(sql.text(query), conn)
597  if sources is None:
598  sources = df
599  else:
600  sources = sources.append(df)
601  else:
602  res = conn.execute(sql.text(query))
603  sources = self._convertResult(res, "DiaForcedSource", sources)
604 
605  _LOG.debug("found %s DiaForcedSources", len(sources))
606  return sources
607 
608  def storeDiaObjects(self, objs, dt):
609  """Store catalog of DiaObjects from current visit.
610 
611  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
612  compatible with the schema of PPDB table:
613 
614  - column names must correspond to database table columns
615  - some columns names are re-mapped based on column map passed to
616  constructor
617  - types and units of the columns must match database definitions,
618  no unit conversion is performed presently
619  - columns that have default values in database schema can be
620  omitted from afw schema
621  - this method knows how to fill interval-related columns
622  (validityStart, validityEnd) they do not need to appear in
623  afw schema
624 
625  Parameters
626  ----------
627  objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
628  Catalog with DiaObject records
629  dt : `datetime.datetime`
630  Time of the visit
631  """
632 
633  if isinstance(objs, pandas.DataFrame):
634  ids = sorted(objs['diaObjectId'])
635  else:
636  ids = sorted([obj['id'] for obj in objs])
637  _LOG.debug("first object ID: %d", ids[0])
638 
639  # NOTE: workaround for sqlite, need this here to avoid
640  # "database is locked" error.
641  table = self._schema.objects
642 
643  # everything to be done in single transaction
644  with _ansi_session(self._engine) as conn:
645 
646  ids = ",".join(str(id) for id in ids)
647 
648  if self.config.dia_object_index == 'last_object_table':
649 
650  # insert and replace all records in LAST table, mysql and postgres have
651  # non-standard features (handled in _storeObjectsAfw)
652  table = self._schema.objects_last
653  do_replace = self.config.object_last_replace
654  # If the input data is of type Pandas, we drop the previous
655  # objects regardless of the do_replace setting due to how
656  # Pandas inserts objects.
657  if not do_replace or isinstance(objs, pandas.DataFrame):
658  query = 'DELETE FROM "' + table.name + '" '
659  query += 'WHERE "diaObjectId" IN (' + ids + ') '
660 
661  if self.config.explain:
662  # run the same query with explain
663  self._explain(query, conn)
664 
665  with Timer(table.name + ' delete', self.config.timer):
666  res = conn.execute(sql.text(query))
667  _LOG.debug("deleted %s objects", res.rowcount)
668 
669  extra_columns = dict(lastNonForcedSource=dt)
670  if isinstance(objs, pandas.DataFrame):
671  with Timer("DiaObjectLast insert", self.config.timer):
672  for col, data in extra_columns.items():
673  objs[col] = data
674  objs.to_sql("DiaObjectLast", conn, if_exists='append',
675  index=False)
676  else:
677  self._storeObjectsAfw(objs, conn, table, "DiaObjectLast",
678  replace=do_replace,
679  extra_columns=extra_columns)
680 
681  else:
682 
683  # truncate existing validity intervals
684  table = self._schema.objects
685  query = 'UPDATE "' + table.name + '" '
686  query += "SET \"validityEnd\" = '" + str(dt) + "' "
687  query += 'WHERE "diaObjectId" IN (' + ids + ') '
688  query += 'AND "validityEnd" IS NULL'
689 
690  # _LOG.debug("query: %s", query)
691 
692  if self.config.explain:
693  # run the same query with explain
694  self._explain(query, conn)
695 
696  with Timer(table.name + ' truncate', self.config.timer):
697  res = conn.execute(sql.text(query))
698  _LOG.debug("truncated %s intervals", res.rowcount)
699 
700  # insert new versions
701  if self.config.dia_object_nightly:
702  table = self._schema.objects_nightly
703  else:
704  table = self._schema.objects
705  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
706  validityEnd=None)
707  if isinstance(objs, pandas.DataFrame):
708  with Timer("DiaObject insert", self.config.timer):
709  for col, data in extra_columns.items():
710  objs[col] = data
711  objs.to_sql("DiaObject", conn, if_exists='append',
712  index=False)
713  else:
714  self._storeObjectsAfw(objs, conn, table, "DiaObject",
715  extra_columns=extra_columns)
716 
717  def storeDiaSources(self, sources):
718  """Store catalog of DIASources from current visit.
719 
720  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
721  compatible with the schema of L1 database table:
722 
723  - column names must correspond to database table columns
724  - some columns names may be re-mapped based on column map passed to
725  constructor
726  - types and units of the columns must match database definitions,
727  no unit conversion is performed presently
728  - columns that have default values in database schema can be
729  omitted from afw schema
730 
731  Parameters
732  ----------
733  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
734  Catalog containing DiaSource records
735  """
736 
737  # everything to be done in single transaction
738  with _ansi_session(self._engine) as conn:
739 
740  if isinstance(sources, pandas.DataFrame):
741  with Timer("DiaSource insert", self.config.timer):
742  sources.to_sql("DiaSource", conn, if_exists='append',
743  index=False)
744  else:
745  table = self._schema.sources
746  self._storeObjectsAfw(sources, conn, table, "DiaSource")
747 
748  def storeDiaForcedSources(self, sources):
749  """Store a set of DIAForcedSources from current visit.
750 
751  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
752  compatible with the schema of L1 database table:
753 
754  - column names must correspond to database table columns
755  - some columns names may be re-mapped based on column map passed to
756  constructor
757  - types and units of the columns must match database definitions,
758  no unit conversion is performed presently
759  - columns that have default values in database schema can be
760  omitted from afw schema
761 
762  Parameters
763  ----------
764  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
765  Catalog containing DiaForcedSource records
766  """
767 
768  # everything to be done in single transaction
769  with _ansi_session(self._engine) as conn:
770 
771  if isinstance(sources, pandas.DataFrame):
772  with Timer("DiaForcedSource insert", self.config.timer):
773  sources.to_sql("DiaForcedSource", conn, if_exists='append',
774  index=False)
775  else:
776  table = self._schema.forcedSources
777  self._storeObjectsAfw(sources, conn, table, "DiaForcedSource")
778 
779  def dailyJob(self):
780  """Implement daily activities like cleanup/vacuum.
781 
782  What should be done during daily cleanup is determined by
783  configuration/schema.
784  """
785 
786  # move data from DiaObjectNightly into DiaObject
787  if self.config.dia_object_nightly:
788  with _ansi_session(self._engine) as conn:
789  query = 'INSERT INTO "' + self._schema.objects.name + '" '
790  query += 'SELECT * FROM "' + self._schema.objects_nightly.name + '"'
791  with Timer('DiaObjectNightly copy', self.config.timer):
792  conn.execute(sql.text(query))
793 
794  query = 'DELETE FROM "' + self._schema.objects_nightly.name + '"'
795  with Timer('DiaObjectNightly delete', self.config.timer):
796  conn.execute(sql.text(query))
797 
798  if self._engine.name == 'postgresql':
799 
800  # do VACUUM on all tables
801  _LOG.info("Running VACUUM on all tables")
802  connection = self._engine.raw_connection()
803  ISOLATION_LEVEL_AUTOCOMMIT = 0
804  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
805  cursor = connection.cursor()
806  cursor.execute("VACUUM ANALYSE")
807 
808  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
809  """Create or re-create all tables.
810 
811  Parameters
812  ----------
813  drop : `bool`
814  If True then drop tables before creating new ones.
815  mysql_engine : `str`, optional
816  Name of the MySQL engine to use for new tables.
817  oracle_tablespace : `str`, optional
818  Name of Oracle tablespace.
819  oracle_iot : `bool`, optional
820  Make Index-organized DiaObjectLast table.
821  """
822  self._schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
823  oracle_tablespace=oracle_tablespace,
824  oracle_iot=oracle_iot)
825 
826  def _explain(self, query, conn):
827  """Run the query with explain
828  """
829 
830  _LOG.info("explain for query: %s...", query[:64])
831 
832  if conn.engine.name == 'mysql':
833  query = "EXPLAIN EXTENDED " + query
834  else:
835  query = "EXPLAIN " + query
836 
837  res = conn.execute(sql.text(query))
838  if res.returns_rows:
839  _LOG.info("explain: %s", res.keys())
840  for row in res:
841  _LOG.info("explain: %s", row)
842  else:
843  _LOG.info("EXPLAIN returned nothing")
844 
845  def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
846  replace=False, extra_columns=None):
847  """Generic store method.
848 
849  Takes catalog of records and stores a bunch of objects in a table.
850 
851  Parameters
852  ----------
853  objects : `lsst.afw.table.BaseCatalog`
854  Catalog containing object records
855  conn :
856  Database connection
857  table : `sqlalchemy.Table`
858  Database table
859  schema_table_name : `str`
860  Name of the table to be used for finding table schema.
861  replace : `boolean`
862  If `True` then use replace instead of INSERT (should be more efficient)
863  extra_columns : `dict`, optional
864  Mapping (column_name, column_value) which gives column values to add
865  to every row, only if column is missing in catalog records.
866  """
867 
868  def quoteValue(v):
869  """Quote and escape values"""
870  if v is None:
871  v = "NULL"
872  elif isinstance(v, datetime):
873  v = "'" + str(v) + "'"
874  elif isinstance(v, str):
875  # we don't expect nasty stuff in strings
876  v = "'" + v + "'"
877  elif isinstance(v, geom.Angle):
878  v = v.asDegrees()
879  if np.isfinite(v):
880  v = str(v)
881  else:
882  v = "NULL"
883  else:
884  if np.isfinite(v):
885  v = str(v)
886  else:
887  v = "NULL"
888  return v
889 
890  def quoteId(columnName):
891  """Smart quoting for column names.
892  Lower-case names are not quoted.
893  """
894  if not columnName.islower():
895  columnName = '"' + columnName + '"'
896  return columnName
897 
898  if conn.engine.name == "oracle":
899  return self._storeObjectsAfwOracle(objects, conn, table,
900  schema_table_name, replace,
901  extra_columns)
902 
903  schema = objects.getSchema()
904  # use extra columns if specified
905  extra_fields = list((extra_columns or {}).keys())
906 
907  afw_fields = [field.getName() for key, field in schema
908  if field.getName() not in extra_fields]
909 
910  column_map = self._schema.getAfwColumns(schema_table_name)
911  # list of columns (as in cat schema)
912  fields = [column_map[field].name for field in afw_fields if field in column_map]
913 
914  if replace and conn.engine.name in ('mysql', 'sqlite'):
915  query = 'REPLACE INTO '
916  else:
917  query = 'INSERT INTO '
918  qfields = [quoteId(field) for field in fields + extra_fields]
919  query += quoteId(table.name) + ' (' + ','.join(qfields) + ') ' + 'VALUES '
920 
921  values = []
922  for rec in objects:
923  row = []
924  for field in afw_fields:
925  if field not in column_map:
926  continue
927  value = rec[field]
928  if column_map[field].type == "DATETIME" and \
929  np.isfinite(value):
930  # convert seconds into datetime
931  value = datetime.utcfromtimestamp(value)
932  row.append(quoteValue(value))
933  for field in extra_fields:
934  row.append(quoteValue(extra_columns[field]))
935  values.append('(' + ','.join(row) + ')')
936 
937  if self.config.explain:
938  # run the same query with explain, only give it one row of data
939  self._explain(query + values[0], conn)
940 
941  query += ','.join(values)
942 
943  if replace and conn.engine.name == 'postgresql':
944  # This depends on that "replace" can only be true for DiaObjectLast table
945  pks = ('pixelId', 'diaObjectId')
946  query += " ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".format(*pks)
947  fields = [column_map[field].name for field in afw_fields if field in column_map]
948  fields = ['"{0}" = EXCLUDED."{0}"'.format(field)
949  for field in fields if field not in pks]
950  query += ', '.join(fields)
951 
952  # _LOG.debug("query: %s", query)
953  _LOG.info("%s: will store %d records", table.name, len(objects))
954  with Timer(table.name + ' insert', self.config.timer):
955  res = conn.execute(sql.text(query))
956  _LOG.debug("inserted %s intervals", res.rowcount)
957 
958  def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
959  replace=False, extra_columns=None):
960  """Store method for Oracle.
961 
962  Takes catalog of records and stores a bunch of objects in a table.
963 
964  Parameters
965  ----------
966  objects : `lsst.afw.table.BaseCatalog`
967  Catalog containing object records
968  conn :
969  Database connection
970  table : `sqlalchemy.Table`
971  Database table
972  schema_table_name : `str`
973  Name of the table to be used for finding table schema.
974  replace : `boolean`
975  If `True` then use replace instead of INSERT (should be more efficient)
976  extra_columns : `dict`, optional
977  Mapping (column_name, column_value) which gives column values to add
978  to every row, only if column is missing in catalog records.
979  """
980 
981  def quoteId(columnName):
982  """Smart quoting for column names.
983  Lower-case naems are not quoted (Oracle backend needs them unquoted).
984  """
985  if not columnName.islower():
986  columnName = '"' + columnName + '"'
987  return columnName
988 
989  schema = objects.getSchema()
990 
991  # use extra columns that as overrides always.
992  extra_fields = list((extra_columns or {}).keys())
993 
994  afw_fields = [field.getName() for key, field in schema
995  if field.getName() not in extra_fields]
996  # _LOG.info("afw_fields: %s", afw_fields)
997 
998  column_map = self._schema.getAfwColumns(schema_table_name)
999  # _LOG.info("column_map: %s", column_map)
1000 
1001  # list of columns (as in cat schema)
1002  fields = [column_map[field].name for field in afw_fields
1003  if field in column_map]
1004  # _LOG.info("fields: %s", fields)
1005 
1006  qfields = [quoteId(field) for field in fields + extra_fields]
1007 
1008  if not replace:
1009  vals = [":col{}".format(i) for i in range(len(fields))]
1010  vals += [":extcol{}".format(i) for i in range(len(extra_fields))]
1011  query = 'INSERT INTO ' + quoteId(table.name)
1012  query += ' (' + ','.join(qfields) + ') VALUES'
1013  query += ' (' + ','.join(vals) + ')'
1014  else:
1015  qvals = [":col{} {}".format(i, quoteId(field)) for i, field in enumerate(fields)]
1016  qvals += [":extcol{} {}".format(i, quoteId(field)) for i, field in enumerate(extra_fields)]
1017  pks = ('pixelId', 'diaObjectId')
1018  onexpr = ["SRC.{col} = DST.{col}".format(col=quoteId(col)) for col in pks]
1019  setexpr = ["DST.{col} = SRC.{col}".format(col=quoteId(col))
1020  for col in fields + extra_fields if col not in pks]
1021  vals = ["SRC.{col}".format(col=quoteId(col)) for col in fields + extra_fields]
1022  query = "MERGE INTO {} DST ".format(quoteId(table.name))
1023  query += "USING (SELECT {} FROM DUAL) SRC ".format(", ".join(qvals))
1024  query += "ON ({}) ".format(" AND ".join(onexpr))
1025  query += "WHEN MATCHED THEN UPDATE SET {} ".format(" ,".join(setexpr))
1026  query += "WHEN NOT MATCHED THEN INSERT "
1027  query += "({}) VALUES ({})".format(','.join(qfields), ','.join(vals))
1028  # _LOG.info("query: %s", query)
1029 
1030  values = []
1031  for rec in objects:
1032  row = {}
1033  col = 0
1034  for field in afw_fields:
1035  if field not in column_map:
1036  continue
1037  value = rec[field]
1038  if column_map[field].type == "DATETIME" and not np.isnan(value):
1039  # convert seconds into datetime
1040  value = datetime.utcfromtimestamp(value)
1041  elif isinstance(value, geom.Angle):
1042  value = str(value.asDegrees())
1043  elif not np.isfinite(value):
1044  value = None
1045  row["col{}".format(col)] = value
1046  col += 1
1047  for i, field in enumerate(extra_fields):
1048  row["extcol{}".format(i)] = extra_columns[field]
1049  values.append(row)
1050 
1051  # _LOG.debug("query: %s", query)
1052  _LOG.info("%s: will store %d records", table.name, len(objects))
1053  with Timer(table.name + ' insert', self.config.timer):
1054  res = conn.execute(sql.text(query), values)
1055  _LOG.debug("inserted %s intervals", res.rowcount)
1056 
1057  def _convertResult(self, res, table_name, catalog=None):
1058  """Convert result set into output catalog.
1059 
1060  Parameters
1061  ----------
1062  res : `sqlalchemy.ResultProxy`
1063  SQLAlchemy result set returned by query.
1064  table_name : `str`
1065  Name of the table.
1066  catalog : `lsst.afw.table.BaseCatalog`
1067  If not None then extend existing catalog
1068 
1069  Returns
1070  -------
1071  catalog : `lsst.afw.table.SourceCatalog`
1072  If ``catalog`` is None then new instance is returned, otherwise
1073  ``catalog`` is updated and returned.
1074  """
1075  # make catalog schema
1076  columns = res.keys()
1077  schema, col_map = self._schema.getAfwSchema(table_name, columns)
1078  if catalog is None:
1079  _LOG.debug("_convertResult: schema: %s", schema)
1080  _LOG.debug("_convertResult: col_map: %s", col_map)
1081  catalog = afwTable.SourceCatalog(schema)
1082 
1083  # fill catalog
1084  for row in res:
1085  record = catalog.addNew()
1086  for col, value in row.items():
1087  # some columns may exist in database but not included in afw schema
1088  col = col_map.get(col)
1089  if col is not None:
1090  if isinstance(value, datetime):
1091  # convert datetime to number of seconds
1092  value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1093  elif col.getTypeString() == 'Angle' and value is not None:
1094  value = value * geom.degrees
1095  if value is not None:
1096  record.set(col, value)
1097 
1098  return catalog
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
Definition: ppdb.py:425
def getDiaSources(self, object_ids, dt, return_pandas=False)
Definition: ppdb.py:484
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: ppdb.py:76
def __enter__(self)
Definition: ppdb.py:67
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:959
def saveVisit(self, visitId, visitTime)
Definition: ppdb.py:303
def _convertResult(self, res, table_name, catalog=None)
Definition: ppdb.py:1057
def __init__(self, config, afw_schemas=None)
Definition: ppdb.py:226
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
Definition: ppdb.py:61
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:846
A class representing an angle.
Definition: Angle.h:127
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
Definition: packaging.cc:33
def _explain(self, query, conn)
Definition: ppdb.py:826
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
Definition: ppdb.py:808
table::Key< int > type
Definition: Detector.cc:167
def getDiaObjects(self, pixel_ranges, return_pandas=False)
Definition: ppdb.py:343
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def lastVisit(self)
Definition: ppdb.py:268
def tableRowCount(self)
Definition: ppdb.py:321
def dailyJob(self)
Definition: ppdb.py:779
def storeDiaObjects(self, objs, dt)
Definition: ppdb.py:608
def storeDiaForcedSources(self, sources)
Definition: ppdb.py:748
def storeDiaSources(self, sources)
Definition: ppdb.py:717
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
Definition: ppdb.py:545
daf::base::PropertyList * list
Definition: fits.cc:885