LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
ppdb.py
Go to the documentation of this file.
1 # This file is part of dax_ppdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining Ppdb class and related methods.
23 """
24 
25 __all__ = ["PpdbConfig", "Ppdb", "Visit"]
26 
27 from collections import namedtuple
28 from contextlib import contextmanager
29 from datetime import datetime
30 import logging
31 import numpy as np
32 import os
33 
34 import lsst.geom as geom
35 import lsst.afw.table as afwTable
36 import lsst.pex.config as pexConfig
37 from lsst.pex.config import Field, ChoiceField, ListField
38 from lsst.utils import getPackageDir
39 import sqlalchemy
40 from sqlalchemy import (func, sql)
41 from sqlalchemy.pool import NullPool
42 from . import timer, ppdbSchema
43 
44 
45 _LOG = logging.getLogger(__name__.partition(".")[2]) # strip leading "lsst."
46 
47 
48 class Timer(object):
49  """Timer class defining context manager which tracks execution timing.
50 
51  Typical use:
52 
53  with Timer("timer_name"):
54  do_something
55 
56  On exit from block it will print elapsed time.
57 
58  See also :py:mod:`timer` module.
59  """
60  def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
61  self._log_before_cursor_execute = log_before_cursor_execute
62  self._do_logging = do_logging
63  self._timer1 = timer.Timer(name)
64  self._timer2 = timer.Timer(name + " (before/after cursor)")
65 
66  def __enter__(self):
67  """
68  Enter context, start timer
69  """
70 # event.listen(engine.Engine, "before_cursor_execute", self._start_timer)
71 # event.listen(engine.Engine, "after_cursor_execute", self._stop_timer)
72  self._timer1.start()
73  return self
74 
75  def __exit__(self, exc_type, exc_val, exc_tb):
76  """
77  Exit context, stop and dump timer
78  """
79  if exc_type is None:
80  self._timer1.stop()
81  if self._do_logging:
82  self._timer1.dump()
83 # event.remove(engine.Engine, "before_cursor_execute", self._start_timer)
84 # event.remove(engine.Engine, "after_cursor_execute", self._stop_timer)
85  return False
86 
87  def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
88  """Start counting"""
90  _LOG.info("before_cursor_execute")
91  self._timer2.start()
92 
93  def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
94  """Stop counting"""
95  self._timer2.stop()
96  if self._do_logging:
97  self._timer2.dump()
98 
99 
100 def _split(seq, nItems):
101  """Split a sequence into smaller sequences"""
102  seq = list(seq)
103  while seq:
104  yield seq[:nItems]
105  del seq[:nItems]
106 
107 
108 # Information about single visit
109 Visit = namedtuple('Visit', 'visitId visitTime lastObjectId lastSourceId')
110 
111 
112 @contextmanager
113 def _ansi_session(engine):
114  """Returns a connection, makes sure that ANSI mode is set for MySQL
115  """
116  with engine.begin() as conn:
117  if engine.name == 'mysql':
118  conn.execute(sql.text("SET SESSION SQL_MODE = 'ANSI'"))
119  yield conn
120  return
121 
122 
123 def _data_file_name(basename):
124  """Return path name of a data file.
125  """
126  return os.path.join(getPackageDir("dax_ppdb"), "data", basename)
127 
128 
129 class PpdbConfig(pexConfig.Config):
130 
131  db_url = Field(dtype=str, doc="SQLAlchemy database connection URI")
132  isolation_level = ChoiceField(dtype=str,
133  doc="Transaction isolation level",
134  allowed={"READ_COMMITTED": "Read committed",
135  "READ_UNCOMMITTED": "Read uncommitted",
136  "REPEATABLE_READ": "Repeatable read",
137  "SERIALIZABLE": "Serializable"},
138  default="READ_COMMITTED")
139  connection_pool = Field(dtype=bool,
140  doc=("If False then disable SQLAlchemy connection pool. "
141  "Do not use connection pool when forking."),
142  default=True)
143  connection_timeout = Field(dtype=float,
144  doc="Maximum time to wait time for database lock to be released before "
145  "exiting. Defaults to sqlachemy defaults if not set.",
146  default=None)
147  sql_echo = Field(dtype=bool,
148  doc="If True then pass SQLAlchemy echo option.",
149  default=False)
150  dia_object_index = ChoiceField(dtype=str,
151  doc="Indexing mode for DiaObject table",
152  allowed={'baseline': "Index defined in baseline schema",
153  'pix_id_iov': "(pixelId, objectId, iovStart) PK",
154  'last_object_table': "Separate DiaObjectLast table"},
155  default='baseline')
156  dia_object_nightly = Field(dtype=bool,
157  doc="Use separate nightly table for DiaObject",
158  default=False)
159  read_sources_months = Field(dtype=int,
160  doc="Number of months of history to read from DiaSource",
161  default=12)
162  read_forced_sources_months = Field(dtype=int,
163  doc="Number of months of history to read from DiaForcedSource",
164  default=12)
165  dia_object_columns = ListField(dtype=str,
166  doc="List of columns to read from DiaObject, by default read all columns",
167  default=[])
168  object_last_replace = Field(dtype=bool,
169  doc="If True (default) then use \"upsert\" for DiaObjectsLast table",
170  default=True)
171  schema_file = Field(dtype=str,
172  doc="Location of (YAML) configuration file with standard schema",
173  default=_data_file_name("ppdb-schema.yaml"))
174  extra_schema_file = Field(dtype=str,
175  doc="Location of (YAML) configuration file with extra schema",
176  default=_data_file_name("ppdb-schema-extra.yaml"))
177  column_map = Field(dtype=str,
178  doc="Location of (YAML) configuration file with column mapping",
179  default=_data_file_name("ppdb-afw-map.yaml"))
180  prefix = Field(dtype=str,
181  doc="Prefix to add to table names and index names",
182  default="")
183  explain = Field(dtype=bool,
184  doc="If True then run EXPLAIN SQL command on each executed query",
185  default=False)
186  timer = Field(dtype=bool,
187  doc="If True then print/log timing information",
188  default=False)
189  diaobject_index_hint = Field(dtype=str,
190  doc="Name of the index to use with Oracle index hint",
191  default=None)
192  dynamic_sampling_hint = Field(dtype=int,
193  doc="If non-zero then use dynamic_sampling hint",
194  default=0)
195  cardinality_hint = Field(dtype=int,
196  doc="If non-zero then use cardinality hint",
197  default=0)
198 
199  def validate(self):
200  if self.isolation_level == "READ_COMMITTED" and self.db_url.startswith("sqlite"):
201  raise ValueError("Attempting to run Ppdb with SQLITE and isolation level 'READ_COMMITTED.' "
202  "Use 'READ_UNCOMMITTED' instead.")
203 
204 
205 class Ppdb(object):
206  """Interface to L1 database, hides all database access details.
207 
208  The implementation is configured via standard ``pex_config`` mechanism
209  using `PpdbConfig` configuration class. For an example of different
210  configurations check config/ folder.
211 
212  Parameters
213  ----------
214  config : `PpdbConfig`
215  afw_schemas : `dict`, optional
216  Dictionary with table name for a key and `afw.table.Schema`
217  for a value. Columns in schema will be added to standard
218  PPDB schema.
219  """
220 
221  def __init__(self, config, afw_schemas=None):
222 
223  self.config = config
224 
225  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
226  _LOG.info("PPDB Configuration:")
227  _LOG.info(" dia_object_index: %s", self.config.dia_object_index)
228  _LOG.info(" dia_object_nightly: %s", self.config.dia_object_nightly)
229  _LOG.info(" read_sources_months: %s", self.config.read_sources_months)
230  _LOG.info(" read_forced_sources_months: %s", self.config.read_forced_sources_months)
231  _LOG.info(" dia_object_columns: %s", self.config.dia_object_columns)
232  _LOG.info(" object_last_replace: %s", self.config.object_last_replace)
233  _LOG.info(" schema_file: %s", self.config.schema_file)
234  _LOG.info(" extra_schema_file: %s", self.config.extra_schema_file)
235  _LOG.info(" column_map: %s", self.config.column_map)
236  _LOG.info(" schema prefix: %s", self.config.prefix)
237 
238  # engine is reused between multiple processes, make sure that we don't
239  # share connections by disabling pool (by using NullPool class)
240  kw = dict(echo=self.config.sql_echo)
241  conn_args = dict()
242  if not self.config.connection_pool:
243  kw.update(poolclass=NullPool)
244  if self.config.isolation_level is not None:
245  kw.update(isolation_level=self.config.isolation_level)
246  if self.config.connection_timeout is not None:
247  if self.config.db_url.startswith("sqlite"):
248  conn_args.update(timeout=self.config.connection_timeout)
249  elif self.config.db_url.startswith(("postgresql", "mysql")):
250  conn_args.update(connect_timeout=self.config.connection_timeout)
251  kw.update(connect_args=conn_args)
252  self._engine = sqlalchemy.create_engine(self.config.db_url, **kw)
253 
254  self._schema = ppdbSchema.PpdbSchema(engine=self._engine,
255  dia_object_index=self.config.dia_object_index,
256  dia_object_nightly=self.config.dia_object_nightly,
257  schema_file=self.config.schema_file,
258  extra_schema_file=self.config.extra_schema_file,
259  column_map=self.config.column_map,
260  afw_schemas=afw_schemas,
261  prefix=self.config.prefix)
262 
263  def lastVisit(self):
264  """Returns last visit information or `None` if visits table is empty.
265 
266  Visits table is used by ap_proto to track visit information, it is
267  not a part of the regular PPDB schema.
268 
269  Returns
270  -------
271  visit : `Visit` or `None`
272  Last stored visit info or `None` if there was nothing stored yet.
273  """
274 
275  with self._engine.begin() as conn:
276 
277  stmnt = sql.select([sql.func.max(self._schema.visits.c.visitId),
278  sql.func.max(self._schema.visits.c.visitTime)])
279  res = conn.execute(stmnt)
280  row = res.fetchone()
281  if row[0] is None:
282  return None
283 
284  visitId = row[0]
285  visitTime = row[1]
286  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
287  visitTime, type(visitTime))
288 
289  # get max IDs from corresponding tables
290  stmnt = sql.select([sql.func.max(self._schema.objects.c.diaObjectId)])
291  lastObjectId = conn.scalar(stmnt)
292  stmnt = sql.select([sql.func.max(self._schema.sources.c.diaSourceId)])
293  lastSourceId = conn.scalar(stmnt)
294 
295  return Visit(visitId=visitId, visitTime=visitTime,
296  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
297 
298  def saveVisit(self, visitId, visitTime):
299  """Store visit information.
300 
301  This method is only used by ``ap_proto`` script from ``l1dbproto``
302  and is not intended for production pipelines.
303 
304  Parameters
305  ----------
306  visitId : `int`
307  Visit identifier
308  visitTime : `datetime.datetime`
309  Visit timestamp.
310  """
311 
312  ins = self._schema.visits.insert().values(visitId=visitId,
313  visitTime=visitTime)
314  self._engine.execute(ins)
315 
316  def tableRowCount(self):
317  """Returns dictionary with the table names and row counts.
318 
319  Used by ``ap_proto`` to keep track of the size of the database tables.
320  Depending on database technology this could be expensive operation.
321 
322  Returns
323  -------
324  row_counts : `dict`
325  Dict where key is a table name and value is a row count.
326  """
327  res = {}
328  tables = [self._schema.objects, self._schema.sources, self._schema.forcedSources]
329  if self.config.dia_object_index == 'last_object_table':
330  tables.append(self._schema.objects_last)
331  for table in tables:
332  stmt = sql.select([func.count()]).select_from(table)
333  count = self._engine.scalar(stmt)
334  res[table.name] = count
335 
336  return res
337 
338  def getDiaObjects(self, pixel_ranges):
339  """Returns catalog of DiaObject instances from given region.
340 
341  Objects are searched based on pixelization index and region is
342  determined by the set of indices. There is no assumption on a
343  particular type of index, client is responsible for consistency
344  when calculating pixelization indices.
345 
346  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
347  the schema of PPDB table. Re-mapping of the column names is done for
348  some columns (based on column map passed to constructor) but types
349  or units are not changed.
350 
351  Returns only the last version of each DiaObject.
352 
353  Parameters
354  ----------
355  pixel_ranges : `list` of `tuple`
356  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
357  This defines set of pixel indices to be included in result.
358 
359  Returns
360  -------
361  catalog : `lsst.afw.table.SourceCatalog`
362  Catalog contaning DiaObject records.
363  """
364 
365  # decide what columns we need
366  if self.config.dia_object_index == 'last_object_table':
367  table = self._schema.objects_last
368  else:
369  table = self._schema.objects
370  if not self.config.dia_object_columns:
371  query = table.select()
372  else:
373  columns = [table.c[col] for col in self.config.dia_object_columns]
374  query = sql.select(columns)
375 
376  if self.config.diaobject_index_hint:
377  val = self.config.diaobject_index_hint
378  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
379  if self.config.dynamic_sampling_hint > 0:
380  val = self.config.dynamic_sampling_hint
381  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
382  if self.config.cardinality_hint > 0:
383  val = self.config.cardinality_hint
384  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
385 
386  # build selection
387  exprlist = []
388  for low, upper in pixel_ranges:
389  upper -= 1
390  if low == upper:
391  exprlist.append(table.c.pixelId == low)
392  else:
393  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
394  query = query.where(sql.expression.or_(*exprlist))
395 
396  # select latest version of objects
397  if self.config.dia_object_index != 'last_object_table':
398  query = query.where(table.c.validityEnd == None) # noqa: E711
399 
400  _LOG.debug("query: %s", query)
401 
402  if self.config.explain:
403  # run the same query with explain
404  self._explain(query, self._engine)
405 
406  # execute select
407  with Timer('DiaObject select', self.config.timer):
408  with self._engine.begin() as conn:
409  res = conn.execute(query)
410  objects = self._convertResult(res, "DiaObject")
411  _LOG.debug("found %s DiaObjects", len(objects))
412  return objects
413 
414  def getDiaSourcesInRegion(self, pixel_ranges, dt):
415  """Returns catalog of DiaSource instances from given region.
416 
417  Sources are searched based on pixelization index and region is
418  determined by the set of indices. There is no assumption on a
419  particular type of index, client is responsible for consistency
420  when calculating pixelization indices.
421 
422  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
423  the schema of PPDB table. Re-mapping of the column names is done for
424  some columns (based on column map passed to constructor) but types or
425  units are not changed.
426 
427  Parameters
428  ----------
429  pixel_ranges : `list` of `tuple`
430  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
431  This defines set of pixel indices to be included in result.
432  dt : `datetime.datetime`
433  Time of the current visit
434 
435  Returns
436  -------
437  catalog : `lsst.afw.table.SourceCatalog` or `None`
438  Catalog contaning DiaSource records. `None` is returned if
439  ``read_sources_months`` configuration parameter is set to 0.
440  """
441 
442  if self.config.read_sources_months == 0:
443  _LOG.info("Skip DiaSources fetching")
444  return None
445 
446  table = self._schema.sources
447  query = table.select()
448 
449  # build selection
450  exprlist = []
451  for low, upper in pixel_ranges:
452  upper -= 1
453  if low == upper:
454  exprlist.append(table.c.pixelId == low)
455  else:
456  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
457  query = query.where(sql.expression.or_(*exprlist))
458 
459  # execute select
460  with Timer('DiaSource select', self.config.timer):
461  with _ansi_session(self._engine) as conn:
462  res = conn.execute(query)
463  sources = self._convertResult(res, "DiaSource")
464  _LOG.debug("found %s DiaSources", len(sources))
465  return sources
466 
467  def getDiaSources(self, object_ids, dt):
468  """Returns catalog of DiaSource instances given set of DiaObject IDs.
469 
470  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
471  the schema of PPDB table. Re-mapping of the column names is done for
472  some columns (based on column map passed to constructor) but types or
473  units are not changed.
474 
475  Parameters
476  ----------
477  object_ids :
478  Collection of DiaObject IDs
479  dt : `datetime.datetime`
480  Time of the current visit
481 
482  Returns
483  -------
484  catalog : `lsst.afw.table.SourceCatalog` or `None`
485  Catalog contaning DiaSource records. `None` is returned if
486  ``read_sources_months`` configuration parameter is set to 0 or
487  when ``object_ids`` is empty.
488  """
489 
490  if self.config.read_sources_months == 0:
491  _LOG.info("Skip DiaSources fetching")
492  return None
493 
494  if len(object_ids) <= 0:
495  _LOG.info("Skip DiaSources fetching - no Objects")
496  # this should create a catalog, but the list of columns may be empty
497  return None
498 
499  table = self._schema.sources
500  sources = None
501  with Timer('DiaSource select', self.config.timer):
502  with _ansi_session(self._engine) as conn:
503  for ids in _split(sorted(object_ids), 1000):
504  query = 'SELECT * FROM "' + table.name + '" WHERE '
505 
506  # select by object id
507  ids = ",".join(str(id) for id in ids)
508  query += '"diaObjectId" IN (' + ids + ') '
509 
510  # execute select
511  res = conn.execute(sql.text(query))
512  sources = self._convertResult(res, "DiaSource", sources)
513 
514  _LOG.debug("found %s DiaSources", len(sources))
515  return sources
516 
517  def getDiaForcedSources(self, object_ids, dt):
518  """Returns catalog of DiaForcedSource instances matching given
519  DiaObjects.
520 
521  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
522  the schema of L1 database table. Re-mapping of the column names may
523  be done for some columns (based on column map passed to constructor)
524  but types or units are not changed.
525 
526  Parameters
527  ----------
528  object_ids :
529  Collection of DiaObject IDs
530  dt : `datetime.datetime`
531  Time of the current visit
532 
533  Returns
534  -------
535  catalog : `lsst.afw.table.SourceCatalog` or `None`
536  Catalog contaning DiaForcedSource records. `None` is returned if
537  ``read_sources_months`` configuration parameter is set to 0 or
538  when ``object_ids`` is empty.
539  """
540 
541  if self.config.read_forced_sources_months == 0:
542  _LOG.info("Skip DiaForceSources fetching")
543  return None
544 
545  if not object_ids:
546  _LOG.info("Skip DiaForceSources fetching - no Objects")
547  # this should create a catalog, but the list of columns may be empty
548  return None
549 
550  table = self._schema.forcedSources
551  sources = None
552 
553  with Timer('DiaForcedSource select', self.config.timer):
554  with _ansi_session(self._engine) as conn:
555  for ids in _split(sorted(object_ids), 1000):
556 
557  query = 'SELECT * FROM "' + table.name + '" WHERE '
558 
559  # select by object id
560  ids = ",".join(str(id) for id in ids)
561  query += '"diaObjectId" IN (' + ids + ') '
562 
563  # execute select
564  res = conn.execute(sql.text(query))
565  sources = self._convertResult(res, "DiaForcedSource", sources)
566 
567  _LOG.debug("found %s DiaForcedSources", len(sources))
568  return sources
569 
570  def storeDiaObjects(self, objs, dt):
571  """Store catalog of DiaObjects from current visit.
572 
573  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
574  compatible with the schema of PPDB table:
575 
576  - column names must correspond to database table columns
577  - some columns names are re-mapped based on column map passed to
578  constructor
579  - types and units of the columns must match database definitions,
580  no unit conversion is performed presently
581  - columns that have default values in database schema can be
582  omitted from afw schema
583  - this method knows how to fill interval-related columns
584  (validityStart, validityEnd) they do not need to appear in
585  afw schema
586 
587  Parameters
588  ----------
589  objs : `lsst.afw.table.BaseCatalog`
590  Catalog with DiaObject records
591  dt : `datetime.datetime`
592  Time of the visit
593  """
594 
595  ids = sorted([obj['id'] for obj in objs])
596  _LOG.debug("first object ID: %d", ids[0])
597 
598  # NOTE: workaround for sqlite, need this here to avoid
599  # "database is locked" error.
600  table = self._schema.objects
601 
602  # everything to be done in single transaction
603  with _ansi_session(self._engine) as conn:
604 
605  ids = ",".join(str(id) for id in ids)
606 
607  if self.config.dia_object_index == 'last_object_table':
608 
609  # insert and replace all records in LAST table, mysql and postgres have
610  # non-standard features (handled in _storeObjectsAfw)
611  table = self._schema.objects_last
612  do_replace = self.config.object_last_replace
613  if not do_replace:
614  query = 'DELETE FROM "' + table.name + '" '
615  query += 'WHERE "diaObjectId" IN (' + ids + ') '
616 
617  if self.config.explain:
618  # run the same query with explain
619  self._explain(query, conn)
620 
621  with Timer(table.name + ' delete', self.config.timer):
622  res = conn.execute(sql.text(query))
623  _LOG.debug("deleted %s objects", res.rowcount)
624 
625  extra_columns = dict(lastNonForcedSource=dt)
626  self._storeObjectsAfw(objs, conn, table, "DiaObjectLast",
627  replace=do_replace,
628  extra_columns=extra_columns)
629 
630  else:
631 
632  # truncate existing validity intervals
633  table = self._schema.objects
634  query = 'UPDATE "' + table.name + '" '
635  query += "SET \"validityEnd\" = '" + str(dt) + "' "
636  query += 'WHERE "diaObjectId" IN (' + ids + ') '
637  query += 'AND "validityEnd" IS NULL'
638 
639  # _LOG.debug("query: %s", query)
640 
641  if self.config.explain:
642  # run the same query with explain
643  self._explain(query, conn)
644 
645  with Timer(table.name + ' truncate', self.config.timer):
646  res = conn.execute(sql.text(query))
647  _LOG.debug("truncated %s intervals", res.rowcount)
648 
649  # insert new versions
650  if self.config.dia_object_nightly:
651  table = self._schema.objects_nightly
652  else:
653  table = self._schema.objects
654  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
655  validityEnd=None)
656  self._storeObjectsAfw(objs, conn, table, "DiaObject",
657  extra_columns=extra_columns)
658 
659  def storeDiaSources(self, sources):
660  """Store catalog of DIASources from current visit.
661 
662  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
663  compatible with the schema of L1 database table:
664 
665  - column names must correspond to database table columns
666  - some columns names may be re-mapped based on column map passed to
667  constructor
668  - types and units of the columns must match database definitions,
669  no unit conversion is performed presently
670  - columns that have default values in database schema can be
671  omitted from afw schema
672 
673  Parameters
674  ----------
675  sources : `lsst.afw.table.BaseCatalog`
676  Catalog containing DiaSource records
677  """
678 
679  # everything to be done in single transaction
680  with _ansi_session(self._engine) as conn:
681 
682  table = self._schema.sources
683  self._storeObjectsAfw(sources, conn, table, "DiaSource")
684 
685  def storeDiaForcedSources(self, sources):
686  """Store a set of DIAForcedSources from current visit.
687 
688  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
689  compatible with the schema of L1 database table:
690 
691  - column names must correspond to database table columns
692  - some columns names may be re-mapped based on column map passed to
693  constructor
694  - types and units of the columns must match database definitions,
695  no unit conversion is performed presently
696  - columns that have default values in database schema can be
697  omitted from afw schema
698 
699  Parameters
700  ----------
701  sources : `lsst.afw.table.BaseCatalog`
702  Catalog containing DiaForcedSource records
703  """
704 
705  # everything to be done in single transaction
706  with _ansi_session(self._engine) as conn:
707 
708  table = self._schema.forcedSources
709  self._storeObjectsAfw(sources, conn, table, "DiaForcedSource")
710 
711  def dailyJob(self):
712  """Implement daily activities like cleanup/vacuum.
713 
714  What should be done during daily cleanup is determined by
715  configuration/schema.
716  """
717 
718  # move data from DiaObjectNightly into DiaObject
719  if self.config.dia_object_nightly:
720  with _ansi_session(self._engine) as conn:
721  query = 'INSERT INTO "' + self._schema.objects.name + '" '
722  query += 'SELECT * FROM "' + self._schema.objects_nightly.name + '"'
723  with Timer('DiaObjectNightly copy', self.config.timer):
724  conn.execute(sql.text(query))
725 
726  query = 'DELETE FROM "' + self._schema.objects_nightly.name + '"'
727  with Timer('DiaObjectNightly delete', self.config.timer):
728  conn.execute(sql.text(query))
729 
730  if self._engine.name == 'postgresql':
731 
732  # do VACUUM on all tables
733  _LOG.info("Running VACUUM on all tables")
734  connection = self._engine.raw_connection()
735  ISOLATION_LEVEL_AUTOCOMMIT = 0
736  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
737  cursor = connection.cursor()
738  cursor.execute("VACUUM ANALYSE")
739 
740  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
741  """Create or re-create all tables.
742 
743  Parameters
744  ----------
745  drop : `bool`
746  If True then drop tables before creating new ones.
747  mysql_engine : `str`, optional
748  Name of the MySQL engine to use for new tables.
749  oracle_tablespace : `str`, optional
750  Name of Oracle tablespace.
751  oracle_iot : `bool`, optional
752  Make Index-organized DiaObjectLast table.
753  """
754  self._schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
755  oracle_tablespace=oracle_tablespace,
756  oracle_iot=oracle_iot)
757 
758  def _explain(self, query, conn):
759  """Run the query with explain
760  """
761 
762  _LOG.info("explain for query: %s...", query[:64])
763 
764  if conn.engine.name == 'mysql':
765  query = "EXPLAIN EXTENDED " + query
766  else:
767  query = "EXPLAIN " + query
768 
769  res = conn.execute(sql.text(query))
770  if res.returns_rows:
771  _LOG.info("explain: %s", res.keys())
772  for row in res:
773  _LOG.info("explain: %s", row)
774  else:
775  _LOG.info("EXPLAIN returned nothing")
776 
777  def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
778  replace=False, extra_columns=None):
779  """Generic store method.
780 
781  Takes catalog of records and stores a bunch of objects in a table.
782 
783  Parameters
784  ----------
785  objects : `lsst.afw.table.BaseCatalog`
786  Catalog containing object records
787  conn :
788  Database connection
789  table : `sqlalchemy.Table`
790  Database table
791  schema_table_name : `str`
792  Name of the table to be used for finding table schema.
793  replace : `boolean`
794  If `True` then use replace instead of INSERT (should be more efficient)
795  extra_columns : `dict`, optional
796  Mapping (column_name, column_value) which gives column values to add
797  to every row, only if column is missing in catalog records.
798  """
799 
800  def quoteValue(v):
801  """Quote and escape values"""
802  if v is None:
803  v = "NULL"
804  elif isinstance(v, datetime):
805  v = "'" + str(v) + "'"
806  elif isinstance(v, str):
807  # we don't expect nasty stuff in strings
808  v = "'" + v + "'"
809  elif isinstance(v, geom.Angle):
810  v = v.asDegrees()
811  if np.isfinite(v):
812  v = str(v)
813  else:
814  v = "NULL"
815  else:
816  if np.isfinite(v):
817  v = str(v)
818  else:
819  v = "NULL"
820  return v
821 
822  def quoteId(columnName):
823  """Smart quoting for column names.
824  Lower-case names are not quoted.
825  """
826  if not columnName.islower():
827  columnName = '"' + columnName + '"'
828  return columnName
829 
830  if conn.engine.name == "oracle":
831  return self._storeObjectsAfwOracle(objects, conn, table,
832  schema_table_name, replace,
833  extra_columns)
834 
835  schema = objects.getSchema()
836  afw_fields = [field.getName() for key, field in schema]
837 
838  column_map = self._schema.getAfwColumns(schema_table_name)
839 
840  # list of columns (as in cat schema)
841  fields = [column_map[field].name for field in afw_fields if field in column_map]
842 
843  # use extra columns that are not in fields already
844  extra_fields = (extra_columns or {}).keys()
845  extra_fields = [field for field in extra_fields if field not in fields]
846 
847  if replace and conn.engine.name in ('mysql', 'sqlite'):
848  query = 'REPLACE INTO '
849  else:
850  query = 'INSERT INTO '
851  qfields = [quoteId(field) for field in fields + extra_fields]
852  query += quoteId(table.name) + ' (' + ','.join(qfields) + ') ' + 'VALUES '
853 
854  values = []
855  for rec in objects:
856  row = []
857  for field in afw_fields:
858  if field not in column_map:
859  continue
860  value = rec[field]
861  if column_map[field].type == "DATETIME" and \
862  np.isfinite(value):
863  # convert seconds into datetime
864  value = datetime.utcfromtimestamp(value)
865  row.append(quoteValue(value))
866  for field in extra_fields:
867  row.append(quoteValue(extra_columns[field]))
868  values.append('(' + ','.join(row) + ')')
869 
870  if self.config.explain:
871  # run the same query with explain, only give it one row of data
872  self._explain(query + values[0], conn)
873 
874  query += ','.join(values)
875 
876  if replace and conn.engine.name == 'postgresql':
877  # This depends on that "replace" can only be true for DiaObjectLast table
878  pks = ('pixelId', 'diaObjectId')
879  query += " ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".format(*pks)
880  fields = [column_map[field].name for field in afw_fields]
881  fields = ['"{0}" = EXCLUDED."{0}"'.format(field)
882  for field in fields if field not in pks]
883  query += ', '.join(fields)
884 
885  # _LOG.debug("query: %s", query)
886  _LOG.info("%s: will store %d records", table.name, len(objects))
887  with Timer(table.name + ' insert', self.config.timer):
888  res = conn.execute(sql.text(query))
889  _LOG.debug("inserted %s intervals", res.rowcount)
890 
891  def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
892  replace=False, extra_columns=None):
893  """Store method for Oracle.
894 
895  Takes catalog of records and stores a bunch of objects in a table.
896 
897  Parameters
898  ----------
899  objects : `lsst.afw.table.BaseCatalog`
900  Catalog containing object records
901  conn :
902  Database connection
903  table : `sqlalchemy.Table`
904  Database table
905  schema_table_name : `str`
906  Name of the table to be used for finding table schema.
907  replace : `boolean`
908  If `True` then use replace instead of INSERT (should be more efficient)
909  extra_columns : `dict`, optional
910  Mapping (column_name, column_value) which gives column values to add
911  to every row, only if column is missing in catalog records.
912  """
913 
914  def quoteId(columnName):
915  """Smart quoting for column names.
916  Lower-case naems are not quoted (Oracle backend needs them unquoted).
917  """
918  if not columnName.islower():
919  columnName = '"' + columnName + '"'
920  return columnName
921 
922  schema = objects.getSchema()
923  afw_fields = [field.getName() for key, field in schema]
924  # _LOG.info("afw_fields: %s", afw_fields)
925 
926  column_map = self._schema.getAfwColumns(schema_table_name)
927  # _LOG.info("column_map: %s", column_map)
928 
929  # list of columns (as in cat schema)
930  fields = [column_map[field].name for field in afw_fields
931  if field in column_map]
932  # _LOG.info("fields: %s", fields)
933 
934  # use extra columns that are not in fields already
935  extra_fields = (extra_columns or {}).keys()
936  extra_fields = [field for field in extra_fields if field not in fields]
937 
938  qfields = [quoteId(field) for field in fields + extra_fields]
939 
940  if not replace:
941  vals = [":col{}".format(i) for i in range(len(fields))]
942  vals += [":extcol{}".format(i) for i in range(len(extra_fields))]
943  query = 'INSERT INTO ' + quoteId(table.name)
944  query += ' (' + ','.join(qfields) + ') VALUES'
945  query += ' (' + ','.join(vals) + ')'
946  else:
947  qvals = [":col{} {}".format(i, quoteId(field)) for i, field in enumerate(fields)]
948  qvals += [":extcol{} {}".format(i, quoteId(field)) for i, field in enumerate(extra_fields)]
949  pks = ('pixelId', 'diaObjectId')
950  onexpr = ["SRC.{col} = DST.{col}".format(col=quoteId(col)) for col in pks]
951  setexpr = ["DST.{col} = SRC.{col}".format(col=quoteId(col))
952  for col in fields + extra_fields if col not in pks]
953  vals = ["SRC.{col}".format(col=quoteId(col)) for col in fields + extra_fields]
954  query = "MERGE INTO {} DST ".format(quoteId(table.name))
955  query += "USING (SELECT {} FROM DUAL) SRC ".format(", ".join(qvals))
956  query += "ON ({}) ".format(" AND ".join(onexpr))
957  query += "WHEN MATCHED THEN UPDATE SET {} ".format(" ,".join(setexpr))
958  query += "WHEN NOT MATCHED THEN INSERT "
959  query += "({}) VALUES ({})".format(','.join(qfields), ','.join(vals))
960  # _LOG.info("query: %s", query)
961 
962  values = []
963  for rec in objects:
964  row = {}
965  col = 0
966  for field in afw_fields:
967  if field not in column_map:
968  continue
969  value = rec[field]
970  if column_map[field].type == "DATETIME" and not np.isnan(value):
971  # convert seconds into datetime
972  value = datetime.utcfromtimestamp(value)
973  elif isinstance(value, geom.Angle):
974  value = str(value.asDegrees())
975  elif not np.isfinite(value):
976  value = None
977  row["col{}".format(col)] = value
978  col += 1
979  for i, field in enumerate(extra_fields):
980  row["extcol{}".format(i)] = extra_columns[field]
981  values.append(row)
982 
983  # _LOG.debug("query: %s", query)
984  _LOG.info("%s: will store %d records", table.name, len(objects))
985  with Timer(table.name + ' insert', self.config.timer):
986  res = conn.execute(sql.text(query), values)
987  _LOG.debug("inserted %s intervals", res.rowcount)
988 
989  def _convertResult(self, res, table_name, catalog=None):
990  """Convert result set into output catalog.
991 
992  Parameters
993  ----------
994  res : `sqlalchemy.ResultProxy`
995  SQLAlchemy result set returned by query.
996  table_name : `str`
997  Name of the table.
998  catalog : `lsst.afw.table.BaseCatalog`
999  If not None then extend existing catalog
1000 
1001  Returns
1002  -------
1003  catalog : `lsst.afw.table.SourceCatalog`
1004  If ``catalog`` is None then new instance is returned, otherwise
1005  ``catalog`` is updated and returned.
1006  """
1007  # make catalog schema
1008  columns = res.keys()
1009  schema, col_map = self._schema.getAfwSchema(table_name, columns)
1010  if catalog is None:
1011  _LOG.debug("_convertResult: schema: %s", schema)
1012  _LOG.debug("_convertResult: col_map: %s", col_map)
1013  catalog = afwTable.SourceCatalog(schema)
1014 
1015  # fill catalog
1016  for row in res:
1017  record = catalog.addNew()
1018  for col, value in row.items():
1019  # some columns may exist in database but not included in afw schema
1020  col = col_map.get(col)
1021  if col is not None:
1022  if isinstance(value, datetime):
1023  # convert datetime to number of seconds
1024  value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1025  elif col.getTypeString() == 'Angle' and value is not None:
1026  value = value * geom.degrees
1027  if value is not None:
1028  record.set(col, value)
1029 
1030  return catalog
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: ppdb.py:75
def __enter__(self)
Definition: ppdb.py:66
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:892
def saveVisit(self, visitId, visitTime)
Definition: ppdb.py:298
def _convertResult(self, res, table_name, catalog=None)
Definition: ppdb.py:989
def __init__(self, config, afw_schemas=None)
Definition: ppdb.py:221
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
Definition: ppdb.py:60
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: ppdb.py:778
def getDiaSources(self, object_ids, dt)
Definition: ppdb.py:467
A class representing an angle.
Definition: Angle.h:127
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
Definition: packaging.cc:33
def _explain(self, query, conn)
Definition: ppdb.py:758
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
Definition: ppdb.py:740
def getDiaForcedSources(self, object_ids, dt)
Definition: ppdb.py:517
table::Key< int > type
Definition: Detector.cc:164
def getDiaObjects(self, pixel_ranges)
Definition: ppdb.py:338
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def lastVisit(self)
Definition: ppdb.py:263
def tableRowCount(self)
Definition: ppdb.py:316
def dailyJob(self)
Definition: ppdb.py:711
def storeDiaObjects(self, objs, dt)
Definition: ppdb.py:570
def getDiaSourcesInRegion(self, pixel_ranges, dt)
Definition: ppdb.py:414
def storeDiaForcedSources(self, sources)
Definition: ppdb.py:685
def storeDiaSources(self, sources)
Definition: ppdb.py:659
daf::base::PropertyList * list
Definition: fits.cc:833