LSSTApplications  16.0-11-g09ed895+2,16.0-11-g12e47bd,16.0-11-g9bb73b2+6,16.0-12-g5c924a4+6,16.0-14-g9a974b3+1,16.0-15-g1417920+1,16.0-15-gdd5ca33+1,16.0-16-gf0259e2,16.0-17-g31abd91+7,16.0-17-g7d7456e+7,16.0-17-ga3d2e9f+13,16.0-18-ga4d4bcb+1,16.0-18-gd06566c+1,16.0-2-g0febb12+21,16.0-2-g9d5294e+69,16.0-2-ga8830df+6,16.0-20-g21842373+7,16.0-24-g3eae5ec,16.0-28-gfc9ea6c+4,16.0-29-ge8801f9,16.0-3-ge00e371+34,16.0-4-g18f3627+13,16.0-4-g5f3a788+20,16.0-4-ga3eb747+10,16.0-4-gabf74b7+29,16.0-4-gb13d127+6,16.0-49-g42e581f7+6,16.0-5-g27fb78a+7,16.0-5-g6a53317+34,16.0-5-gb3f8a4b+87,16.0-6-g9321be7+4,16.0-6-gcbc7b31+42,16.0-6-gf49912c+29,16.0-7-gd2eeba5+51,16.0-71-ge89f8615e,16.0-8-g21fd5fe+29,16.0-8-g3a9f023+20,16.0-8-g4734f7a+1,16.0-8-g5858431+3,16.0-9-gf5c1f43+8,master-gd73dc1d098+1,w.2019.01
LSSTDataManagementBasePackage
ppdb.py
Go to the documentation of this file.
1 # This file is part of dax_ppdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining Ppdb class and related methods.
23 """
24 
25 __all__ = ["PpdbConfig", "Ppdb", "Visit"]
26 
27 from collections import namedtuple
28 from contextlib import contextmanager
29 from datetime import datetime
30 import logging
31 import numpy as np
32 import os
33 
34 import lsst.geom as geom
35 import lsst.afw.table as afwTable
36 import lsst.pex.config as pexConfig
37 from lsst.pex.config import Field, ChoiceField, ListField
38 from lsst.utils import getPackageDir
39 import sqlalchemy
40 from sqlalchemy import (func, sql)
41 from sqlalchemy.pool import NullPool
42 from . import timer, ppdbSchema
43 
44 
45 _LOG = logging.getLogger(__name__.partition(".")[2]) # strip leading "lsst."
46 
47 
48 class Timer(object):
49  """Timer class defining context manager which tracks execution timing.
50 
51  Typical use:
52 
53  with Timer("timer_name"):
54  do_something
55 
56  On exit from block it will print elapsed time.
57 
58  See also :py:mod:`timer` module.
59  """
60  def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
61  self._log_before_cursor_execute = log_before_cursor_execute
62  self._do_logging = do_logging
63  self._timer1 = timer.Timer(name)
64  self._timer2 = timer.Timer(name + " (before/after cursor)")
65 
66  def __enter__(self):
67  """
68  Enter context, start timer
69  """
70 # event.listen(engine.Engine, "before_cursor_execute", self._start_timer)
71 # event.listen(engine.Engine, "after_cursor_execute", self._stop_timer)
72  self._timer1.start()
73  return self
74 
75  def __exit__(self, exc_type, exc_val, exc_tb):
76  """
77  Exit context, stop and dump timer
78  """
79  if exc_type is None:
80  self._timer1.stop()
81  if self._do_logging:
82  self._timer1.dump()
83 # event.remove(engine.Engine, "before_cursor_execute", self._start_timer)
84 # event.remove(engine.Engine, "after_cursor_execute", self._stop_timer)
85  return False
86 
87  def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
88  """Start counting"""
90  _LOG.info("before_cursor_execute")
91  self._timer2.start()
92 
93  def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
94  """Stop counting"""
95  self._timer2.stop()
96  if self._do_logging:
97  self._timer2.dump()
98 
99 
100 def _split(seq, nItems):
101  """Split a sequence into smaller sequences"""
102  seq = list(seq)
103  while seq:
104  yield seq[:nItems]
105  del seq[:nItems]
106 
107 
108 # Information about single visit
109 Visit = namedtuple('Visit', 'visitId visitTime lastObjectId lastSourceId')
110 
111 
112 @contextmanager
113 def _ansi_session(engine):
114  """Returns a connection, makes sure that ANSI mode is set for MySQL
115  """
116  with engine.begin() as conn:
117  if engine.name == 'mysql':
118  conn.execute(sql.text("SET SESSION SQL_MODE = 'ANSI'"))
119  yield conn
120  return
121 
122 
123 def _data_file_name(basename):
124  """Return path name of a data file.
125  """
126  return os.path.join(getPackageDir("dax_ppdb"), "data", basename)
127 
128 
129 class PpdbConfig(pexConfig.Config):
130 
131  db_url = Field(dtype=str, doc="SQLAlchemy database connection URI")
132  isolation_level = ChoiceField(dtype=str,
133  doc="Transaction isolation level",
134  allowed={"READ_COMMITTED": "Read committed",
135  "READ_UNCOMMITTED": "Read uncommitted",
136  "REPEATABLE_READ": "Repeatable read",
137  "SERIALIZABLE": "Serializable"},
138  default="READ_COMMITTED")
139  connection_pool = Field(dtype=bool,
140  doc=("If False then disable SQLAlchemy connection pool. "
141  "Do not use connection pool when forking."),
142  default=True)
143  connection_timeout = Field(dtype=float,
144  doc="Maximum time to wait time for database lock to be released before "
145  "exiting. Defaults to sqlachemy defaults if not set.",
146  default=None)
147  sql_echo = Field(dtype=bool,
148  doc="If True then pass SQLAlchemy echo option.",
149  default=False)
150  dia_object_index = ChoiceField(dtype=str,
151  doc="Indexing mode for DiaObject table",
152  allowed={'baseline': "Index defined in baseline schema",
153  'pix_id_iov': "(pixelId, objectId, iovStart) PK",
154  'last_object_table': "Separate DiaObjectLast table"},
155  default='baseline')
156  dia_object_nightly = Field(dtype=bool,
157  doc="Use separate nightly table for DiaObject",
158  default=False)
159  read_sources_months = Field(dtype=int,
160  doc="Number of months of history to read from DiaSource",
161  default=12)
162  read_forced_sources_months = Field(dtype=int,
163  doc="Number of months of history to read from DiaForcedSource",
164  default=12)
165  dia_object_columns = ListField(dtype=str,
166  doc="List of columns to read from DiaObject, by default read all columns",
167  default=[])
168  object_last_replace = Field(dtype=bool,
169  doc="If True (default) then use \"upsert\" for DiaObjectsLast table",
170  default=True)
171  schema_file = Field(dtype=str,
172  doc="Location of (YAML) configuration file with standard schema",
173  default=_data_file_name("ppdb-schema.yaml"))
174  extra_schema_file = Field(dtype=str,
175  doc="Location of (YAML) configuration file with extra schema",
176  default=_data_file_name("ppdb-schema-extra.yaml"))
177  column_map = Field(dtype=str,
178  doc="Location of (YAML) configuration file with column mapping",
179  default=_data_file_name("ppdb-afw-map.yaml"))
180  prefix = Field(dtype=str,
181  doc="Prefix to add to table names and index names",
182  default="")
183  explain = Field(dtype=bool,
184  doc="If True then run EXPLAIN SQL command on each executed query",
185  default=False)
186  timer = Field(dtype=bool,
187  doc="If True then print/log timing information",
188  default=False)
189  diaobject_index_hint = Field(dtype=str,
190  doc="Name of the index to use with Oracle index hint",
191  default=None)
192  dynamic_sampling_hint = Field(dtype=int,
193  doc="If non-zero then use dynamic_sampling hint",
194  default=0)
195  cardinality_hint = Field(dtype=int,
196  doc="If non-zero then use cardinality hint",
197  default=0)
198 
199  def validate(self):
200  if self.isolation_level == "READ_COMMITTED" and self.db_url.startswith("sqlite"):
201  raise ValueError("Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' "
202  "Use 'READ_UNCOMMITTED' instead.")
203 
204 
205 class Ppdb(object):
206  """Interface to L1 database, hides all database access details.
207 
208  The implementation is configured via standard ``pex_config`` mechanism
209  using `PpdbConfig` configuration class. For an example of different
210  configurations check config/ folder.
211 
212  Parameters
213  ----------
214  config : `PpdbConfig`
215  afw_schemas : `dict`, optional
216  Dictionary with table name for a key and `afw.table.Schema`
217  for a value. Columns in schema will be added to standard
218  PPDB schema.
219  """
220 
221  def __init__(self, config, afw_schemas=None):
222 
223  self.config = config
224 
225  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
226  _LOG.info("PPDB Configuration:")
227  _LOG.info(" dia_object_index: %s", self.config.dia_object_index)
228  _LOG.info(" dia_object_nightly: %s", self.config.dia_object_nightly)
229  _LOG.info(" read_sources_months: %s", self.config.read_sources_months)
230  _LOG.info(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
231  _LOG.info(" dia_object_columns: %s", self.config.dia_object_columns)
232  _LOG.info(" object_last_replace: %s", self.config.object_last_replace)
233  _LOG.info(" schema_file: %s", self.config.schema_file)
234  _LOG.info(" extra_schema_file: %s", self.config.extra_schema_file)
235  _LOG.info(" column_map: %s", self.config.column_map)
236  _LOG.info(" schema prefix: %s", self.config.prefix)
237 
238  # engine is reused between multiple processes, make sure that we don't
239  # share connections by disabling pool (by using NullPool class)
240  kw = dict(echo=self.config.sql_echo)
241  conn_args = dict()
242  if not self.config.connection_pool:
243  kw.update(poolclass=NullPool)
244  if self.config.isolation_level is not None:
245  kw.update(isolation_level=self.config.isolation_level)
246  if self.config.connection_timeout is not None:
247  if self.config.db_url.startswith("sqlite"):
248  conn_args.update(timeout=self.config.connection_timeout)
249  elif self.config.db_url.startswith(("postgresql", "mysql")):
250  conn_args.update(connect_timeout=self.config.connection_timeout)
251  kw.update(connect_args=conn_args)
252  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
253 
254  self._schema = ppdbSchema.PpdbSchema(engine=self._engine,
255  dia_object_index=self.config.dia_object_index,
256  dia_object_nightly=self.config.dia_object_nightly,
257  schema_file=self.config.schema_file,
258  extra_schema_file=self.config.extra_schema_file,
259  column_map=self.config.column_map,
260  afw_schemas=afw_schemas,
261  prefix=self.config.prefix)
262 
263  def lastVisit(self):
264  """Returns last visit information or `None` if visits table is empty.
265 
266  Visits table is used by ap_proto to track visit information, it is
267  not a part of the regular PPDB schema.
268 
269  Returns
270  -------
271  visit : `Visit` or `None`
272  Last stored visit info or `None` if there was nothing stored yet.
273  """
274 
275  with self._engine.begin() as conn:
276 
277  stmnt = sql.select([sql.func.max(self._schema.visits.c.visitId),
278  sql.func.max(self._schema.visits.c.visitTime)])
279  res = conn.execute(stmnt)
280  row = res.fetchone()
281  if row[0] is None:
282  return None
283 
284  visitId = row[0]
285  visitTime = row[1]
286  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
287  visitTime, type(visitTime))
288 
289  # get max IDs from corresponding tables
290  stmnt = sql.select([sql.func.max(self._schema.objects.c.diaObjectId)])
291  lastObjectId = conn.scalar(stmnt)
292  stmnt = sql.select([sql.func.max(self._schema.sources.c.diaSourceId)])
293  lastSourceId = conn.scalar(stmnt)
294 
295  return Visit(visitId=visitId, visitTime=visitTime,
296  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
297 
298  def saveVisit(self, visitId, visitTime):
299  """Store visit information.
300 
301  This method is only used by ``ap_proto`` script from ``l1dbproto``
302  and is not intended for production pipelines.
303 
304  Parameters
305  ----------
306  visitId : `int`
307  Visit identifier
308  visitTime : `datetime.datetime`
309  Visit timestamp.
310  """
311 
312  ins = self._schema.visits.insert().values(visitId=visitId,
313  visitTime=visitTime)
314  self._engine.execute(ins)
315 
316  def tableRowCount(self):
317  """Returns dictionary with the table names and row counts.
318 
319  Used by ``ap_proto`` to keep track of the size of the database tables.
320  Depending on database technology this could be expensive operation.
321 
322  Returns
323  -------
324  row_counts : `dict`
325  Dict where key is a table name and value is a row count.
326  """
327  res = {}
328  tables = [self._schema.objects, self._schema.sources, self._schema.forcedSources]
329  if self.config.dia_object_index == 'last_object_table':
330  tables.append(self._schema.objects_last)
331  for table in tables:
332  stmt = sql.select([func.count()]).select_from(table)
333  count = self._engine.scalar(stmt)
334  res[table.name] = count
335 
336  return res
337 
338  def getDiaObjects(self, pixel_ranges):
339  """Returns catalog of DiaObject instances from given region.
340 
341  Objects are searched based on pixelization index and region is
342  determined by the set of indices. There is no assumption on a
343  particular type of index, client is responsible for consistency
344  when calculating pixelization indices.
345 
346  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
347  the schema of PPDB table. Re-mapping of the column names is done for
348  some columns (based on column map passed to constructor) but types
349  or units are not changed.
350 
351  Returns only the last version of each DiaObject.
352 
353  Parameters
354  ----------
355  pixel_ranges : `list` of `tuple`
356  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
357  This defines set of pixel indices to be included in result.
358 
359  Returns
360  -------
361  catalog : `lsst.afw.table.SourceCatalog`
362  Catalog contaning DiaObject records.
363  """
364 
365  # decide what columns we need
366  if self.config.dia_object_index == 'last_object_table':
367  table = self._schema.objects_last
368  else:
369  table = self._schema.objects
370  if not self.config.dia_object_columns:
371  query = table.select()
372  else:
373  columns = [table.c[col] for col in self.config.dia_object_columns]
374  query = sql.select(columns)
375 
376  if self.config.diaobject_index_hint:
377  val = self.config.diaobject_index_hint
378  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
379  if self.config.dynamic_sampling_hint > 0:
380  val = self.config.dynamic_sampling_hint
381  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
382  if self.config.cardinality_hint > 0:
383  val = self.config.cardinality_hint
384  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
385 
386  # build selection
387  exprlist = []
388  for low, upper in pixel_ranges:
389  upper -= 1
390  if low == upper:
391  exprlist.append(table.c.pixelId == low)
392  else:
393  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
394  query = query.where(sql.expression.or_(*exprlist))
395 
396  # select latest version of objects
397  if self.config.dia_object_index != 'last_object_table':
398  query = query.where(table.c.validityEnd == None) # noqa: E711
399 
400  _LOG.debug("query: %s", query)
401 
402  if self.config.explain:
403  # run the same query with explain
404  self._explain(query, self._engine)
405 
406  # execute select
407  with Timer('DiaObject select', self.config.timer):
408  with self._engine.begin() as conn:
409  res = conn.execute(query)
410  objects = self._convertResult(res, "DiaObject")
411  _LOG.debug("found %s DiaObjects", len(objects))
412  return objects
413 
414  def getDiaSourcesInRegion(self, pixel_ranges, dt):
415  """Returns catalog of DiaSource instances from given region.
416 
417  Sources are searched based on pixelization index and region is
418  determined by the set of indices. There is no assumption on a
419  particular type of index, client is responsible for consistency
420  when calculating pixelization indices.
421 
422  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
423  the schema of PPDB table. Re-mapping of the column names is done for
424  some columns (based on column map passed to constructor) but types or
425  units are not changed.
426 
427  Parameters
428  ----------
429  pixel_ranges : `list` of `tuple`
430  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
431  This defines set of pixel indices to be included in result.
432  dt : `datetime.datetime`
433  Time of the current visit
434 
435  Returns
436  -------
437  catalog : `lsst.afw.table.SourceCatalog` or `None`
438  Catalog contaning DiaSource records. `None` is returned if
439  ``read_sources_months`` configuration parameter is set to 0.
440  """
441 
442  if self.config.read_sources_months == 0:
443  _LOG.info("Skip DiaSources fetching")
444  return None
445 
446  table = self._schema.sources
447  query = table.select()
448 
449  # build selection
450  exprlist = []
451  for low, upper in pixel_ranges:
452  upper -= 1
453  if low == upper:
454  exprlist.append(table.c.pixelId == low)
455  else:
456  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
457  query = query.where(sql.expression.or_(*exprlist))
458 
459  # execute select
460  with Timer('DiaSource select', self.config.timer):
461  with _ansi_session(self._engine) as conn:
462  res = conn.execute(query)
463  sources = self._convertResult(res, "DiaSource")
464  _LOG.debug("found %s DiaSources", len(sources))
465  return sources
466 
467  def getDiaSources(self, object_ids, dt):
468  """Returns catalog of DiaSource instances given set of DiaObject IDs.
469 
470  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
471  the schema of PPDB table. Re-mapping of the column names is done for
472  some columns (based on column map passed to constructor) but types or
473  units are not changed.
474 
475  Parameters
476  ----------
477  object_ids :
478  Collection of DiaObject IDs
479  dt : `datetime.datetime`
480  Time of the current visit
481 
482  Returns
483  -------
484  catalog : `lsst.afw.table.SourceCatalog` or `None`
485  Catalog contaning DiaSource records. `None` is returned if
486  ``read_sources_months`` configuration parameter is set to 0 or
487  when ``object_ids`` is empty.
488  """
489 
490  if self.config.read_sources_months == 0:
491  _LOG.info("Skip DiaSources fetching")
492  return None
493 
494  if len(object_ids) <= 0:
495  _LOG.info("Skip DiaSources fetching - no Objects")
496  # this should create a catalog, but the list of columns may be empty
497  return None
498 
499  table = self._schema.sources
500  sources = None
501  with Timer('DiaSource select', self.config.timer):
502  with _ansi_session(self._engine) as conn:
503  for ids in _split(sorted(object_ids), 1000):
504  query = 'SELECT * FROM "' + table.name + '" WHERE '
505 
506  # select by object id
507  ids = ",".join(str(id) for id in ids)
508  query += '"diaObjectId" IN (' + ids + ') '
509 
510  # execute select
511  res = conn.execute(sql.text(query))
512  sources = self._convertResult(res, "DiaSource", sources)
513 
514  _LOG.debug("found %s DiaSources", len(sources))
515  return sources
516 
517  def getDiaForcedSources(self, object_ids, dt):
518  """Returns catalog of DiaForcedSource instances matching given
519  DiaObjects.
520 
521  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
522  the schema of L1 database table. Re-mapping of the column names may
523  be done for some columns (based on column map passed to constructor)
524  but types or units are not changed.
525 
526  Parameters
527  ----------
528  object_ids :
529  Collection of DiaObject IDs
530  dt : `datetime.datetime`
531  Time of the current visit
532 
533  Returns
534  -------
535  catalog : `lsst.afw.table.SourceCatalog` or `None`
536  Catalog contaning DiaForcedSource records. `None` is returned if
537  ``read_sources_months`` configuration parameter is set to 0 or
538  when ``object_ids`` is empty.
539  """
540 
541  if self.config.read_forced_sources_months == 0:
542  _LOG.info("Skip DiaForceSources fetching")
543  return None
544 
545  if not object_ids:
546  _LOG.info("Skip DiaForceSources fetching - no Objects")
547  # this should create a catalog, but the list of columns may be empty
548  return None
549 
550  table = self._schema.forcedSources
551  sources = None
552 
553  with Timer('DiaForcedSource select', self.config.timer):
554  with _ansi_session(self._engine) as conn:
555  for ids in _split(sorted(object_ids), 1000):
556 
557  query = 'SELECT * FROM "' + table.name + '" WHERE '
558 
559  # select by object id
560  ids = ",".join(str(id) for id in ids)
561  query += '"diaObjectId" IN (' + ids + ') '
562 
563  # execute select
564  res = conn.execute(sql.text(query))
565  sources = self._convertResult(res, "DiaForcedSource", sources)
566 
567  _LOG.debug("found %s DiaForcedSources", len(sources))
568  return sources
569 
570  def storeDiaObjects(self, objs, dt):
571  """Store catalog of DiaObjects from current visit.
572 
573  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
574  compatible with the schema of PPDB table:
575 
576  - column names must correspond to database table columns
577  - some columns names are re-mapped based on column map passed to
578  constructor
579  - types and units of the columns must match database definitions,
580  no unit conversion is performed presently
581  - columns that have default values in database schema can be
582  omitted from afw schema
583  - this method knows how to fill interval-related columns
584  (validityStart, validityEnd) they do not need to appear in
585  afw schema
586 
587  Parameters
588  ----------
589  objs : `lsst.afw.table.BaseCatalog`
590  Catalog with DiaObject records
591  dt : `datetime.datetime`
592  Time of the visit
593  """
594 
595  ids = sorted([obj['id'] for obj in objs])
596  _LOG.debug("first object ID: %d", ids[0])
597 
598  # NOTE: workaround for sqlite, need this here to avoid
599  # "database is locked" error.
600  table = self._schema.objects
601 
602  # everything to be done in single transaction
603  with _ansi_session(self._engine) as conn:
604 
605  ids = ",".join(str(id) for id in ids)
606 
607  if self.config.dia_object_index == 'last_object_table':
608 
609  # insert and replace all records in LAST table, mysql and postgres have
610  # non-standard features (handled in _storeObjectsAfw)
611  table = self._schema.objects_last
612  do_replace = self.config.object_last_replace
613  if not do_replace:
614  query = 'DELETE FROM "' + table.name + '" '
615  query += 'WHERE "diaObjectId" IN (' + ids + ') '
616 
617  if self.config.explain:
618  # run the same query with explain
619  self._explain(query, conn)
620 
621  with Timer(table.name + ' delete', self.config.timer):
622  res = conn.execute(sql.text(query))
623  _LOG.debug("deleted %s objects", res.rowcount)
624 
625  extra_columns = dict(lastNonForcedSource=dt)
626  self._storeObjectsAfw(objs, conn, table, "DiaObjectLast",
627  replace=do_replace,
628  extra_columns=extra_columns)
629 
630  else:
631 
632  # truncate existing validity intervals
633  table = self._schema.objects
634  query = 'UPDATE "' + table.name + '" '
635  query += "SET \"validityEnd\" = '" + str(dt) + "' "
636  query += 'WHERE "diaObjectId" IN (' + ids + ') '
637  query += 'AND "validityEnd" IS NULL'
638 
639  # _LOG.debug("query: %s", query)
640 
641  if self.config.explain:
642  # run the same query with explain
643  self._explain(query, conn)
644 
645  with Timer(table.name + ' truncate', self.config.timer):
646  res = conn.execute(sql.text(query))
647  _LOG.debug("truncated %s intervals", res.rowcount)
648 
649  # insert new versions
650  if self.config.dia_object_nightly:
651  table = self._schema.objects_nightly
652  else:
653  table = self._schema.objects
654  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
655  validityEnd=None)
656  self._storeObjectsAfw(objs, conn, table, "DiaObject",
657  extra_columns=extra_columns)
658 
659  def storeDiaSources(self, sources):
660  """Store catalog of DIASources from current visit.
661 
662  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
663  compatible with the schema of L1 database table:
664 
665  - column names must correspond to database table columns
666  - some columns names may be re-mapped based on column map passed to
667  constructor
668  - types and units of the columns must match database definitions,
669  no unit conversion is performed presently
670  - columns that have default values in database schema can be
671  omitted from afw schema
672 
673  Parameters
674  ----------
675  sources : `lsst.afw.table.BaseCatalog`
676  Catalog containing DiaSource records
677  """
678 
679  # everything to be done in single transaction
680  with _ansi_session(self._engine) as conn:
681 
682  table = self._schema.sources
683  self._storeObjectsAfw(sources, conn, table, "DiaSource")
684 
685  def storeDiaForcedSources(self, sources):
686  """Store a set of DIAForcedSources from current visit.
687 
688  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
689  compatible with the schema of L1 database table:
690 
691  - column names must correspond to database table columns
692  - some columns names may be re-mapped based on column map passed to
693  constructor
694  - types and units of the columns must match database definitions,
695  no unit conversion is performed presently
696  - columns that have default values in database schema can be
697  omitted from afw schema
698 
699  Parameters
700  ----------
701  sources : `lsst.afw.table.BaseCatalog`
702  Catalog containing DiaForcedSource records
703  """
704 
705  # everything to be done in single transaction
706  with _ansi_session(self._engine) as conn:
707 
708  table = self._schema.forcedSources
709  self._storeObjectsAfw(sources, conn, table, "DiaForcedSource")
710 
711  def dailyJob(self):
712  """Implement daily activities like cleanup/vacuum.
713 
714  What should be done during daily cleanup is determined by
715  configuration/schema.
716  """
717 
718  # move data from DiaObjectNightly into DiaObject
719  if self.config.dia_object_nightly:
720  with _ansi_session(self._engine) as conn:
721  query = 'INSERT INTO "' + self._schema.objects.name + '" '
722  query += 'SELECT * FROM "' + self._schema.objects_nightly.name + '"'
723  with Timer('DiaObjectNightly copy', self.config.timer):
724  conn.execute(sql.text(query))
725 
726  query = 'DELETE FROM "' + self._schema.objects_nightly.name + '"'
727  with Timer('DiaObjectNightly delete', self.config.timer):
728  conn.execute(sql.text(query))
729 
730  if self._engine.name == 'postgresql':
731 
732  # do VACUUM on all tables
733  _LOG.info("Running VACUUM on all tables")
734  connection = self._engine.raw_connection()
735  ISOLATION_LEVEL_AUTOCOMMIT = 0
736  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
737  cursor = connection.cursor()
738  cursor.execute("VACUUM ANALYSE")
739 
740  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
741  """Create or re-create all tables.
742 
743  Parameters
744  ----------
745  drop : `bool`
746  If True then drop tables before creating new ones.
747  mysql_engine : `str`, optional
748  Name of the MySQL engine to use for new tables.
749  oracle_tablespace : `str`, optional
750  Name of Oracle tablespace.
751  oracle_iot : `bool`, optional
752  Make Index-organized DiaObjectLast table.
753  """
754  self._schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
755  oracle_tablespace=oracle_tablespace,
756  oracle_iot=oracle_iot)
757 
758  def _explain(self, query, conn):
759  """Run the query with explain
760  """
761 
762  _LOG.info("explain for query: %s...", query[:64])
763 
764  if conn.engine.name == 'mysql':
765  query = "EXPLAIN EXTENDED " + query
766  else:
767  query = "EXPLAIN " + query
768 
769  res = conn.execute(sql.text(query))
770  if res.returns_rows:
771  _LOG.info("explain: %s", res.keys())
772  for row in res:
773  _LOG.info("explain: %s", row)
774  else:
775  _LOG.info("EXPLAIN returned nothing")
776 
777  def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
778  replace=False, extra_columns=None):
779  """Generic store method.
780 
781  Takes catalog of records and stores a bunch of objects in a table.
782 
783  Parameters
784  ----------
785  objects : `lsst.afw.table.BaseCatalog`
786  Catalog containing object records
787  conn :
788  Database connection
789  table : `sqlalchemy.Table`
790  Database table
791  schema_table_name : `str`
792  Name of the table to be used for finding table schema.
793  replace : `boolean`
794  If `True` then use replace instead of INSERT (should be more efficient)
795  extra_columns : `dict`, optional
796  Mapping (column_name, column_value) which gives column values to add
797  to every row, only if column is missing in catalog records.
798  """
799 
800  def quoteValue(v):
801  """Quote and escape values"""
802  if v is None:
803  v = "NULL"
804  elif isinstance(v, datetime):
805  v = "'" + str(v) + "'"
806  elif isinstance(v, str):
807  # we don't expect nasty stuff in strings
808  v = "'" + v + "'"
809  elif isinstance(v, geom.Angle):
810  v = str(v.asDegrees())
811  else:
812  if not np.isfinite(v):
813  v = "NULL"
814  else:
815  v = str(v)
816  return v
817 
818  def quoteId(columnName):
819  """Smart quoting for column names.
820  Lower-case names are not quoted.
821  """
822  if not columnName.islower():
823  columnName = '"' + columnName + '"'
824  return columnName
825 
826  if conn.engine.name == "oracle":
827  return self._storeObjectsAfwOracle(objects, conn, table,
828  schema_table_name, replace,
829  extra_columns)
830 
831  schema = objects.getSchema()
832  afw_fields = [field.getName() for key, field in schema]
833 
834  column_map = self._schema.getAfwColumns(schema_table_name)
835 
836  # list of columns (as in cat schema)
837  fields = [column_map[field].name for field in afw_fields if field in column_map]
838 
839  # use extra columns that are not in fields already
840  extra_fields = (extra_columns or {}).keys()
841  extra_fields = [field for field in extra_fields if field not in fields]
842 
843  if replace and conn.engine.name in ('mysql', 'sqlite'):
844  query = 'REPLACE INTO '
845  else:
846  query = 'INSERT INTO '
847  qfields = [quoteId(field) for field in fields + extra_fields]
848  query += quoteId(table.name) + ' (' + ','.join(qfields) + ') ' + 'VALUES '
849 
850  values = []
851  for rec in objects:
852  row = []
853  for field in afw_fields:
854  if field not in column_map:
855  continue
856  value = rec[field]
857  if column_map[field].type == "DATETIME" and \
858  not np.isnan(value):
859  # convert seconds into datetime
860  value = datetime.utcfromtimestamp(value)
861  row.append(quoteValue(value))
862  for field in extra_fields:
863  row.append(quoteValue(extra_columns[field]))
864  values.append('(' + ','.join(row) + ')')
865 
866  if self.config.explain:
867  # run the same query with explain, only give it one row of data
868  self._explain(query + values[0], conn)
869 
870  query += ','.join(values)
871 
872  if replace and conn.engine.name == 'postgresql':
873  # This depends on that "replace" can only be true for DiaObjectLast table
874  pks = ('pixelId', 'diaObjectId')
875  query += " ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".format(*pks)
876  fields = [column_map[field].name for field in afw_fields]
877  fields = ['"{0}" = EXCLUDED."{0}"'.format(field)
878  for field in fields if field not in pks]
879  query += ', '.join(fields)
880 
881  # _LOG.debug("query: %s", query)
882  _LOG.info("%s: will store %d records", table.name, len(objects))
883  with Timer(table.name + ' insert', self.config.timer):
884  res = conn.execute(sql.text(query))
885  _LOG.debug("inserted %s intervals", res.rowcount)
886 
887  def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
888  replace=False, extra_columns=None):
889  """Store method for Oracle.
890 
891  Takes catalog of records and stores a bunch of objects in a table.
892 
893  Parameters
894  ----------
895  objects : `lsst.afw.table.BaseCatalog`
896  Catalog containing object records
897  conn :
898  Database connection
899  table : `sqlalchemy.Table`
900  Database table
901  schema_table_name : `str`
902  Name of the table to be used for finding table schema.
903  replace : `boolean`
904  If `True` then use replace instead of INSERT (should be more efficient)
905  extra_columns : `dict`, optional
906  Mapping (column_name, column_value) which gives column values to add
907  to every row, only if column is missing in catalog records.
908  """
909 
910  def quoteId(columnName):
911  """Smart quoting for column names.
912  Lower-case naems are not quoted (Oracle backend needs them unquoted).
913  """
914  if not columnName.islower():
915  columnName = '"' + columnName + '"'
916  return columnName
917 
918  schema = objects.getSchema()
919  afw_fields = [field.getName() for key, field in schema]
920  # _LOG.info("afw_fields: %s", afw_fields)
921 
922  column_map = self._schema.getAfwColumns(schema_table_name)
923  # _LOG.info("column_map: %s", column_map)
924 
925  # list of columns (as in cat schema)
926  fields = [column_map[field].name for field in afw_fields
927  if field in column_map]
928  # _LOG.info("fields: %s", fields)
929 
930  # use extra columns that are not in fields already
931  extra_fields = (extra_columns or {}).keys()
932  extra_fields = [field for field in extra_fields if field not in fields]
933 
934  qfields = [quoteId(field) for field in fields + extra_fields]
935 
936  if not replace:
937  vals = [":col{}".format(i) for i in range(len(fields))]
938  vals += [":extcol{}".format(i) for i in range(len(extra_fields))]
939  query = 'INSERT INTO ' + quoteId(table.name)
940  query += ' (' + ','.join(qfields) + ') VALUES'
941  query += ' (' + ','.join(vals) + ')'
942  else:
943  qvals = [":col{} {}".format(i, quoteId(field)) for i, field in enumerate(fields)]
944  qvals += [":extcol{} {}".format(i, quoteId(field)) for i, field in enumerate(extra_fields)]
945  pks = ('pixelId', 'diaObjectId')
946  onexpr = ["SRC.{col} = DST.{col}".format(col=quoteId(col)) for col in pks]
947  setexpr = ["DST.{col} = SRC.{col}".format(col=quoteId(col))
948  for col in fields + extra_fields if col not in pks]
949  vals = ["SRC.{col}".format(col=quoteId(col)) for col in fields + extra_fields]
950  query = "MERGE INTO {} DST ".format(quoteId(table.name))
951  query += "USING (SELECT {} FROM DUAL) SRC ".format(", ".join(qvals))
952  query += "ON ({}) ".format(" AND ".join(onexpr))
953  query += "WHEN MATCHED THEN UPDATE SET {} ".format(" ,".join(setexpr))
954  query += "WHEN NOT MATCHED THEN INSERT "
955  query += "({}) VALUES ({})".format(','.join(qfields), ','.join(vals))
956  # _LOG.info("query: %s", query)
957 
958  values = []
959  for rec in objects:
960  row = {}
961  col = 0
962  for field in afw_fields:
963  if field not in column_map:
964  continue
965  value = rec[field]
966  if column_map[field].type == "DATETIME" and not np.isnan(value):
967  # convert seconds into datetime
968  value = datetime.utcfromtimestamp(value)
969  elif isinstance(value, geom.Angle):
970  value = str(value.asDegrees())
971  elif not np.isfinite(value):
972  value = None
973  row["col{}".format(col)] = value
974  col += 1
975  for i, field in enumerate(extra_fields):
976  row["extcol{}".format(i)] = extra_columns[field]
977  values.append(row)
978 
979  # _LOG.debug("query: %s", query)
980  _LOG.info("%s: will store %d records", table.name, len(objects))
981  with Timer(table.name + ' insert', self.config.timer):
982  res = conn.execute(sql.text(query), values)
983  _LOG.debug("inserted %s intervals", res.rowcount)
984 
985  def _convertResult(self, res, table_name, catalog=None):
986  """Convert result set into output catalog.
987 
988  Parameters
989  ----------
990  res : `sqlalchemy.ResultProxy`
991  SQLAlchemy result set returned by query.
992  table_name : `str`
993  Name of the table.
994  catalog : `lsst.afw.table.BaseCatalog`
995  If not None then extend existing catalog
996 
997  Returns
998  -------
999  catalog : `lsst.afw.table.SourceCatalog`
1000  If ``catalog`` is None then new instance is returned, otherwise
1001  ``catalog`` is updated and returned.
1002  """
1003  # make catalog schema
1004  columns = res.keys()
1005  schema, col_map = self._schema.getAfwSchema(table_name, columns)
1006  if catalog is None:
1007  _LOG.debug("_convertResult: schema: %s", schema)
1008  _LOG.debug("_convertResult: col_map: %s", col_map)
1009  catalog = afwTable.SourceCatalog(schema)
1010 
1011  # fill catalog
1012  for row in res:
1013  record = catalog.addNew()
1014  for col, value in row.items():
1015  # some columns may exist in database but not included in afw schema
1016  col = col_map.get(col)
1017  if col is not None:
1018  if isinstance(value, datetime):
1019  # convert datetime to number of seconds
1020  value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1021  elif col.getTypeString() == 'Angle' and value is not None:
1022  value = value * geom.degrees
1023  if value is not None:
1024  record.set(col, value)
1025 
1026  return catalog
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: ppdb.py:75
def __enter__(self)
Definition: ppdb.py:66
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:888
def saveVisit(self, visitId, visitTime)
Definition: ppdb.py:298
def _convertResult(self, res, table_name, catalog=None)
Definition: ppdb.py:985
def __init__(self, config, afw_schemas=None)
Definition: ppdb.py:221
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
Definition: ppdb.py:60
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:778
def getDiaSources(self, object_ids, dt)
Definition: ppdb.py:467
A class representing an angle.
Definition: Angle.h:127
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
Definition: packaging.cc:33
def _explain(self, query, conn)
Definition: ppdb.py:758
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
Definition: ppdb.py:740
def getDiaForcedSources(self, object_ids, dt)
Definition: ppdb.py:517
table::Key< int > type
Definition: Detector.cc:164
def getDiaObjects(self, pixel_ranges)
Definition: ppdb.py:338
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:129
def lastVisit(self)
Definition: ppdb.py:263
def tableRowCount(self)
Definition: ppdb.py:316
def dailyJob(self)
Definition: ppdb.py:711
def storeDiaObjects(self, objs, dt)
Definition: ppdb.py:570
def getDiaSourcesInRegion(self, pixel_ranges, dt)
Definition: ppdb.py:414
def storeDiaForcedSources(self, sources)
Definition: ppdb.py:685
def storeDiaSources(self, sources)
Definition: ppdb.py:659
daf::base::PropertyList * list
Definition: fits.cc:833