LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
apdb.py
Go to the documentation of this file.
1 # This file is part of dax_apdb.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """Module defining Apdb class and related methods.
23 """
24 
25 __all__ = ["ApdbConfig", "Apdb", "Visit"]
26 
27 from collections import namedtuple
28 from contextlib import contextmanager
29 from datetime import datetime
30 import logging
31 import numpy as np
32 import os
33 import pandas
34 
35 import lsst.geom as geom
36 import lsst.afw.table as afwTable
37 import lsst.pex.config as pexConfig
38 from lsst.pex.config import Field, ChoiceField, ListField
39 from lsst.utils import getPackageDir
40 import sqlalchemy
41 from sqlalchemy import (func, sql)
42 from sqlalchemy.pool import NullPool
43 from . import timer, apdbSchema
44 
45 
46 _LOG = logging.getLogger(__name__.partition(".")[2]) # strip leading "lsst."
47 
48 
49 class Timer(object):
50  """Timer class defining context manager which tracks execution timing.
51 
52  Typical use:
53 
54  with Timer("timer_name"):
55  do_something
56 
57  On exit from block it will print elapsed time.
58 
59  See also :py:mod:`timer` module.
60  """
61  def __init__(self, name, do_logging=True, log_before_cursor_execute=False):
62  self._log_before_cursor_execute_log_before_cursor_execute = log_before_cursor_execute
63  self._do_logging_do_logging = do_logging
64  self._timer1_timer1 = timer.Timer(name)
65  self._timer2_timer2 = timer.Timer(name + " (before/after cursor)")
66 
67  def __enter__(self):
68  """
69  Enter context, start timer
70  """
71 # event.listen(engine.Engine, "before_cursor_execute", self._start_timer)
72 # event.listen(engine.Engine, "after_cursor_execute", self._stop_timer)
73  self._timer1_timer1.start()
74  return self
75 
76  def __exit__(self, exc_type, exc_val, exc_tb):
77  """
78  Exit context, stop and dump timer
79  """
80  if exc_type is None:
81  self._timer1_timer1.stop()
82  if self._do_logging_do_logging:
83  self._timer1_timer1.dump()
84 # event.remove(engine.Engine, "before_cursor_execute", self._start_timer)
85 # event.remove(engine.Engine, "after_cursor_execute", self._stop_timer)
86  return False
87 
88  def _start_timer(self, conn, cursor, statement, parameters, context, executemany):
89  """Start counting"""
90  if self._log_before_cursor_execute_log_before_cursor_execute:
91  _LOG.info("before_cursor_execute")
92  self._timer2_timer2.start()
93 
94  def _stop_timer(self, conn, cursor, statement, parameters, context, executemany):
95  """Stop counting"""
96  self._timer2_timer2.stop()
97  if self._do_logging_do_logging:
98  self._timer2_timer2.dump()
99 
100 
101 def _split(seq, nItems):
102  """Split a sequence into smaller sequences"""
103  seq = list(seq)
104  while seq:
105  yield seq[:nItems]
106  del seq[:nItems]
107 
108 
109 # Information about single visit
110 Visit = namedtuple('Visit', 'visitId visitTime lastObjectId lastSourceId')
111 
112 
113 @contextmanager
114 def _ansi_session(engine):
115  """Returns a connection, makes sure that ANSI mode is set for MySQL
116  """
117  with engine.begin() as conn:
118  if engine.name == 'mysql':
119  conn.execute(sql.text("SET SESSION SQL_MODE = 'ANSI'"))
120  yield conn
121  return
122 
123 
124 def _data_file_name(basename):
125  """Return path name of a data file.
126  """
127  return os.path.join(getPackageDir("dax_apdb"), "data", basename)
128 
129 
130 class ApdbConfig(pexConfig.Config):
131 
132  db_url = Field(dtype=str, doc="SQLAlchemy database connection URI")
133  isolation_level = ChoiceField(dtype=str,
134  doc="Transaction isolation level",
135  allowed={"READ_COMMITTED": "Read committed",
136  "READ_UNCOMMITTED": "Read uncommitted",
137  "REPEATABLE_READ": "Repeatable read",
138  "SERIALIZABLE": "Serializable"},
139  default="READ_COMMITTED",
140  optional=True)
141  connection_pool = Field(dtype=bool,
142  doc=("If False then disable SQLAlchemy connection pool. "
143  "Do not use connection pool when forking."),
144  default=True)
145  connection_timeout = Field(dtype=float,
146  doc="Maximum time to wait time for database lock to be released before "
147  "exiting. Defaults to sqlachemy defaults if not set.",
148  default=None,
149  optional=True)
150  sql_echo = Field(dtype=bool,
151  doc="If True then pass SQLAlchemy echo option.",
152  default=False)
153  dia_object_index = ChoiceField(dtype=str,
154  doc="Indexing mode for DiaObject table",
155  allowed={'baseline': "Index defined in baseline schema",
156  'pix_id_iov': "(pixelId, objectId, iovStart) PK",
157  'last_object_table': "Separate DiaObjectLast table"},
158  default='baseline')
159  dia_object_nightly = Field(dtype=bool,
160  doc="Use separate nightly table for DiaObject",
161  default=False)
162  read_sources_months = Field(dtype=int,
163  doc="Number of months of history to read from DiaSource",
164  default=12)
165  read_forced_sources_months = Field(dtype=int,
166  doc="Number of months of history to read from DiaForcedSource",
167  default=12)
168  dia_object_columns = ListField(dtype=str,
169  doc="List of columns to read from DiaObject, by default read all columns",
170  default=[])
171  object_last_replace = Field(dtype=bool,
172  doc="If True (default) then use \"upsert\" for DiaObjectsLast table",
173  default=True)
174  schema_file = Field(dtype=str,
175  doc="Location of (YAML) configuration file with standard schema",
176  default=_data_file_name("apdb-schema.yaml"))
177  extra_schema_file = Field(dtype=str,
178  doc="Location of (YAML) configuration file with extra schema",
179  default=_data_file_name("apdb-schema-extra.yaml"))
180  column_map = Field(dtype=str,
181  doc="Location of (YAML) configuration file with column mapping",
182  default=_data_file_name("apdb-afw-map.yaml"))
183  prefix = Field(dtype=str,
184  doc="Prefix to add to table names and index names",
185  default="")
186  explain = Field(dtype=bool,
187  doc="If True then run EXPLAIN SQL command on each executed query",
188  default=False)
189  timer = Field(dtype=bool,
190  doc="If True then print/log timing information",
191  default=False)
192  diaobject_index_hint = Field(dtype=str,
193  doc="Name of the index to use with Oracle index hint",
194  default=None,
195  optional=True)
196  dynamic_sampling_hint = Field(dtype=int,
197  doc="If non-zero then use dynamic_sampling hint",
198  default=0)
199  cardinality_hint = Field(dtype=int,
200  doc="If non-zero then use cardinality hint",
201  default=0)
202 
203  def validate(self):
204  super().validate()
205  if self.isolation_levelisolation_level == "READ_COMMITTED" and self.db_urldb_url.startswith("sqlite"):
206  raise ValueError("Attempting to run Apdb with SQLITE and isolation level 'READ_COMMITTED.' "
207  "Use 'READ_UNCOMMITTED' instead.")
208 
209 
210 class Apdb(object):
211  """Interface to L1 database, hides all database access details.
212 
213  The implementation is configured via standard ``pex_config`` mechanism
214  using `ApdbConfig` configuration class. For an example of different
215  configurations check config/ folder.
216 
217  Parameters
218  ----------
219  config : `ApdbConfig`
220  afw_schemas : `dict`, optional
221  Dictionary with table name for a key and `afw.table.Schema`
222  for a value. Columns in schema will be added to standard
223  APDB schema.
224  """
225 
226  def __init__(self, config, afw_schemas=None):
227 
228  self.configconfig = config
229 
230  # logging.getLogger('sqlalchemy').setLevel(logging.INFO)
231  _LOG.debug("APDB Configuration:")
232  _LOG.debug(" dia_object_index: %s", self.configconfig.dia_object_index)
233  _LOG.debug(" dia_object_nightly: %s", self.configconfig.dia_object_nightly)
234  _LOG.debug(" read_sources_months: %s", self.configconfig.read_sources_months)
235  _LOG.debug(" read_forced_sources_months: %s", self.configconfig.read_forced_sources_months)
236  _LOG.debug(" dia_object_columns: %s", self.configconfig.dia_object_columns)
237  _LOG.debug(" object_last_replace: %s", self.configconfig.object_last_replace)
238  _LOG.debug(" schema_file: %s", self.configconfig.schema_file)
239  _LOG.debug(" extra_schema_file: %s", self.configconfig.extra_schema_file)
240  _LOG.debug(" column_map: %s", self.configconfig.column_map)
241  _LOG.debug(" schema prefix: %s", self.configconfig.prefix)
242 
243  # engine is reused between multiple processes, make sure that we don't
244  # share connections by disabling pool (by using NullPool class)
245  kw = dict(echo=self.configconfig.sql_echo)
246  conn_args = dict()
247  if not self.configconfig.connection_pool:
248  kw.update(poolclass=NullPool)
249  if self.configconfig.isolation_level is not None:
250  kw.update(isolation_level=self.configconfig.isolation_level)
251  if self.configconfig.connection_timeout is not None:
252  if self.configconfig.db_url.startswith("sqlite"):
253  conn_args.update(timeout=self.configconfig.connection_timeout)
254  elif self.configconfig.db_url.startswith(("postgresql", "mysql")):
255  conn_args.update(connect_timeout=self.configconfig.connection_timeout)
256  kw.update(connect_args=conn_args)
257  self._engine_engine = sqlalchemy.create_engine(self.configconfig.db_url, **kw)
258 
259  self._schema_schema = apdbSchema.ApdbSchema(engine=self._engine_engine,
260  dia_object_index=self.configconfig.dia_object_index,
261  dia_object_nightly=self.configconfig.dia_object_nightly,
262  schema_file=self.configconfig.schema_file,
263  extra_schema_file=self.configconfig.extra_schema_file,
264  column_map=self.configconfig.column_map,
265  afw_schemas=afw_schemas,
266  prefix=self.configconfig.prefix)
267 
268  def lastVisit(self):
269  """Returns last visit information or `None` if visits table is empty.
270 
271  Visits table is used by ap_proto to track visit information, it is
272  not a part of the regular APDB schema.
273 
274  Returns
275  -------
276  visit : `Visit` or `None`
277  Last stored visit info or `None` if there was nothing stored yet.
278  """
279 
280  with self._engine_engine.begin() as conn:
281 
282  stmnt = sql.select([sql.func.max(self._schema_schema.visits.c.visitId),
283  sql.func.max(self._schema_schema.visits.c.visitTime)])
284  res = conn.execute(stmnt)
285  row = res.fetchone()
286  if row[0] is None:
287  return None
288 
289  visitId = row[0]
290  visitTime = row[1]
291  _LOG.info("lastVisit: visitId: %s visitTime: %s (%s)", visitId,
292  visitTime, type(visitTime))
293 
294  # get max IDs from corresponding tables
295  stmnt = sql.select([sql.func.max(self._schema_schema.objects.c.diaObjectId)])
296  lastObjectId = conn.scalar(stmnt)
297  stmnt = sql.select([sql.func.max(self._schema_schema.sources.c.diaSourceId)])
298  lastSourceId = conn.scalar(stmnt)
299 
300  return Visit(visitId=visitId, visitTime=visitTime,
301  lastObjectId=lastObjectId, lastSourceId=lastSourceId)
302 
303  def saveVisit(self, visitId, visitTime):
304  """Store visit information.
305 
306  This method is only used by ``ap_proto`` script from ``l1dbproto``
307  and is not intended for production pipelines.
308 
309  Parameters
310  ----------
311  visitId : `int`
312  Visit identifier
313  visitTime : `datetime.datetime`
314  Visit timestamp.
315  """
316 
317  ins = self._schema_schema.visits.insert().values(visitId=visitId,
318  visitTime=visitTime)
319  self._engine_engine.execute(ins)
320 
321  def tableRowCount(self):
322  """Returns dictionary with the table names and row counts.
323 
324  Used by ``ap_proto`` to keep track of the size of the database tables.
325  Depending on database technology this could be expensive operation.
326 
327  Returns
328  -------
329  row_counts : `dict`
330  Dict where key is a table name and value is a row count.
331  """
332  res = {}
333  tables = [self._schema_schema.objects, self._schema_schema.sources, self._schema_schema.forcedSources]
334  if self.configconfig.dia_object_index == 'last_object_table':
335  tables.append(self._schema_schema.objects_last)
336  for table in tables:
337  stmt = sql.select([func.count()]).select_from(table)
338  count = self._engine_engine.scalar(stmt)
339  res[table.name] = count
340 
341  return res
342 
343  def getDiaObjects(self, pixel_ranges, return_pandas=False):
344  """Returns catalog of DiaObject instances from given region.
345 
346  Objects are searched based on pixelization index and region is
347  determined by the set of indices. There is no assumption on a
348  particular type of index, client is responsible for consistency
349  when calculating pixelization indices.
350 
351  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
352  the schema of APDB table. Re-mapping of the column names is done for
353  some columns (based on column map passed to constructor) but types
354  or units are not changed.
355 
356  Returns only the last version of each DiaObject.
357 
358  Parameters
359  ----------
360  pixel_ranges : `list` of `tuple`
361  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
362  This defines set of pixel indices to be included in result.
363  return_pandas : `bool`
364  Return a `pandas.DataFrame` instead of
365  `lsst.afw.table.SourceCatalog`.
366 
367  Returns
368  -------
369  catalog : `lsst.afw.table.SourceCatalog` or `pandas.DataFrame`
370  Catalog containing DiaObject records.
371  """
372 
373  # decide what columns we need
374  if self.configconfig.dia_object_index == 'last_object_table':
375  table = self._schema_schema.objects_last
376  else:
377  table = self._schema_schema.objects
378  if not self.configconfig.dia_object_columns:
379  query = table.select()
380  else:
381  columns = [table.c[col] for col in self.configconfig.dia_object_columns]
382  query = sql.select(columns)
383 
384  if self.configconfig.diaobject_index_hint:
385  val = self.configconfig.diaobject_index_hint
386  query = query.with_hint(table, 'index_rs_asc(%(name)s "{}")'.format(val))
387  if self.configconfig.dynamic_sampling_hint > 0:
388  val = self.configconfig.dynamic_sampling_hint
389  query = query.with_hint(table, 'dynamic_sampling(%(name)s {})'.format(val))
390  if self.configconfig.cardinality_hint > 0:
391  val = self.configconfig.cardinality_hint
392  query = query.with_hint(table, 'FIRST_ROWS_1 cardinality(%(name)s {})'.format(val))
393 
394  # build selection
395  exprlist = []
396  for low, upper in pixel_ranges:
397  upper -= 1
398  if low == upper:
399  exprlist.append(table.c.pixelId == low)
400  else:
401  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
402  query = query.where(sql.expression.or_(*exprlist))
403 
404  # select latest version of objects
405  if self.configconfig.dia_object_index != 'last_object_table':
406  query = query.where(table.c.validityEnd == None) # noqa: E711
407 
408  _LOG.debug("query: %s", query)
409 
410  if self.configconfig.explain:
411  # run the same query with explain
412  self._explain_explain(query, self._engine_engine)
413 
414  # execute select
415  with Timer('DiaObject select', self.configconfig.timer):
416  with self._engine_engine.begin() as conn:
417  if return_pandas:
418  objects = pandas.read_sql_query(query, conn)
419  else:
420  res = conn.execute(query)
421  objects = self._convertResult_convertResult(res, "DiaObject")
422  _LOG.debug("found %s DiaObjects", len(objects))
423  return objects
424 
425  def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False):
426  """Returns catalog of DiaSource instances from given region.
427 
428  Sources are searched based on pixelization index and region is
429  determined by the set of indices. There is no assumption on a
430  particular type of index, client is responsible for consistency
431  when calculating pixelization indices.
432 
433  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
434  the schema of APDB table. Re-mapping of the column names is done for
435  some columns (based on column map passed to constructor) but types or
436  units are not changed.
437 
438  Parameters
439  ----------
440  pixel_ranges : `list` of `tuple`
441  Sequence of ranges, range is a tuple (minPixelID, maxPixelID).
442  This defines set of pixel indices to be included in result.
443  dt : `datetime.datetime`
444  Time of the current visit
445  return_pandas : `bool`
446  Return a `pandas.DataFrame` instead of
447  `lsst.afw.table.SourceCatalog`.
448 
449  Returns
450  -------
451  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
452  Catalog containing DiaSource records. `None` is returned if
453  ``read_sources_months`` configuration parameter is set to 0.
454  """
455 
456  if self.configconfig.read_sources_months == 0:
457  _LOG.info("Skip DiaSources fetching")
458  return None
459 
460  table = self._schema_schema.sources
461  query = table.select()
462 
463  # build selection
464  exprlist = []
465  for low, upper in pixel_ranges:
466  upper -= 1
467  if low == upper:
468  exprlist.append(table.c.pixelId == low)
469  else:
470  exprlist.append(sql.expression.between(table.c.pixelId, low, upper))
471  query = query.where(sql.expression.or_(*exprlist))
472 
473  # execute select
474  with Timer('DiaSource select', self.configconfig.timer):
475  with _ansi_session(self._engine_engine) as conn:
476  if return_pandas:
477  sources = pandas.read_sql_query(query, conn)
478  else:
479  res = conn.execute(query)
480  sources = self._convertResult_convertResult(res, "DiaSource")
481  _LOG.debug("found %s DiaSources", len(sources))
482  return sources
483 
484  def getDiaSources(self, object_ids, dt, return_pandas=False):
485  """Returns catalog of DiaSource instances given set of DiaObject IDs.
486 
487  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
488  the schema of APDB table. Re-mapping of the column names is done for
489  some columns (based on column map passed to constructor) but types or
490  units are not changed.
491 
492  Parameters
493  ----------
494  object_ids :
495  Collection of DiaObject IDs
496  dt : `datetime.datetime`
497  Time of the current visit
498  return_pandas : `bool`
499  Return a `pandas.DataFrame` instead of
500  `lsst.afw.table.SourceCatalog`.
501 
502 
503  Returns
504  -------
505  catalog : `lsst.afw.table.SourceCatalog`, `pandas.DataFrame`, or `None`
506  Catalog contaning DiaSource records. `None` is returned if
507  ``read_sources_months`` configuration parameter is set to 0 or
508  when ``object_ids`` is empty.
509  """
510 
511  if self.configconfig.read_sources_months == 0:
512  _LOG.info("Skip DiaSources fetching")
513  return None
514 
515  if len(object_ids) <= 0:
516  _LOG.info("Skip DiaSources fetching - no Objects")
517  # this should create a catalog, but the list of columns may be empty
518  return None
519 
520  table = self._schema_schema.sources
521  sources = None
522  with Timer('DiaSource select', self.configconfig.timer):
523  with _ansi_session(self._engine_engine) as conn:
524  for ids in _split(sorted(object_ids), 1000):
525  query = 'SELECT * FROM "' + table.name + '" WHERE '
526 
527  # select by object id
528  ids = ",".join(str(id) for id in ids)
529  query += '"diaObjectId" IN (' + ids + ') '
530 
531  # execute select
532  if return_pandas:
533  df = pandas.read_sql_query(sql.text(query), conn)
534  if sources is None:
535  sources = df
536  else:
537  sources = sources.append(df)
538  else:
539  res = conn.execute(sql.text(query))
540  sources = self._convertResult_convertResult(res, "DiaSource", sources)
541 
542  _LOG.debug("found %s DiaSources", len(sources))
543  return sources
544 
545  def getDiaForcedSources(self, object_ids, dt, return_pandas=False):
546  """Returns catalog of DiaForcedSource instances matching given
547  DiaObjects.
548 
549  This method returns :doc:`/modules/lsst.afw.table/index` catalog with schema determined by
550  the schema of L1 database table. Re-mapping of the column names may
551  be done for some columns (based on column map passed to constructor)
552  but types or units are not changed.
553 
554  Parameters
555  ----------
556  object_ids :
557  Collection of DiaObject IDs
558  dt : `datetime.datetime`
559  Time of the current visit
560  return_pandas : `bool`
561  Return a `pandas.DataFrame` instead of
562  `lsst.afw.table.SourceCatalog`.
563 
564  Returns
565  -------
566  catalog : `lsst.afw.table.SourceCatalog` or `None`
567  Catalog contaning DiaForcedSource records. `None` is returned if
568  ``read_sources_months`` configuration parameter is set to 0 or
569  when ``object_ids`` is empty.
570  """
571 
572  if self.configconfig.read_forced_sources_months == 0:
573  _LOG.info("Skip DiaForceSources fetching")
574  return None
575 
576  if len(object_ids) <= 0:
577  _LOG.info("Skip DiaForceSources fetching - no Objects")
578  # this should create a catalog, but the list of columns may be empty
579  return None
580 
581  table = self._schema_schema.forcedSources
582  sources = None
583 
584  with Timer('DiaForcedSource select', self.configconfig.timer):
585  with _ansi_session(self._engine_engine) as conn:
586  for ids in _split(sorted(object_ids), 1000):
587 
588  query = 'SELECT * FROM "' + table.name + '" WHERE '
589 
590  # select by object id
591  ids = ",".join(str(id) for id in ids)
592  query += '"diaObjectId" IN (' + ids + ') '
593 
594  # execute select
595  if return_pandas:
596  df = pandas.read_sql_query(sql.text(query), conn)
597  if sources is None:
598  sources = df
599  else:
600  sources = sources.append(df)
601  else:
602  res = conn.execute(sql.text(query))
603  sources = self._convertResult_convertResult(res, "DiaForcedSource", sources)
604 
605  _LOG.debug("found %s DiaForcedSources", len(sources))
606  return sources
607 
608  def storeDiaObjects(self, objs, dt):
609  """Store catalog of DiaObjects from current visit.
610 
611  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
612  compatible with the schema of APDB table:
613 
614  - column names must correspond to database table columns
615  - some columns names are re-mapped based on column map passed to
616  constructor
617  - types and units of the columns must match database definitions,
618  no unit conversion is performed presently
619  - columns that have default values in database schema can be
620  omitted from afw schema
621  - this method knows how to fill interval-related columns
622  (validityStart, validityEnd) they do not need to appear in
623  afw schema
624 
625  Parameters
626  ----------
627  objs : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
628  Catalog with DiaObject records
629  dt : `datetime.datetime`
630  Time of the visit
631  """
632 
633  if isinstance(objs, pandas.DataFrame):
634  ids = sorted(objs['diaObjectId'])
635  else:
636  ids = sorted([obj['id'] for obj in objs])
637  _LOG.debug("first object ID: %d", ids[0])
638 
639  # NOTE: workaround for sqlite, need this here to avoid
640  # "database is locked" error.
641  table = self._schema_schema.objects
642 
643  # everything to be done in single transaction
644  with _ansi_session(self._engine_engine) as conn:
645 
646  ids = ",".join(str(id) for id in ids)
647 
648  if self.configconfig.dia_object_index == 'last_object_table':
649 
650  # insert and replace all records in LAST table, mysql and postgres have
651  # non-standard features (handled in _storeObjectsAfw)
652  table = self._schema_schema.objects_last
653  do_replace = self.configconfig.object_last_replace
654  # If the input data is of type Pandas, we drop the previous
655  # objects regardless of the do_replace setting due to how
656  # Pandas inserts objects.
657  if not do_replace or isinstance(objs, pandas.DataFrame):
658  query = 'DELETE FROM "' + table.name + '" '
659  query += 'WHERE "diaObjectId" IN (' + ids + ') '
660 
661  if self.configconfig.explain:
662  # run the same query with explain
663  self._explain_explain(query, conn)
664 
665  with Timer(table.name + ' delete', self.configconfig.timer):
666  res = conn.execute(sql.text(query))
667  _LOG.debug("deleted %s objects", res.rowcount)
668 
669  extra_columns = dict(lastNonForcedSource=dt)
670  if isinstance(objs, pandas.DataFrame):
671  with Timer("DiaObjectLast insert", self.configconfig.timer):
672  for col, data in extra_columns.items():
673  objs[col] = data
674  objs.to_sql("DiaObjectLast", conn, if_exists='append',
675  index=False)
676  else:
677  self._storeObjectsAfw_storeObjectsAfw(objs, conn, table, "DiaObjectLast",
678  replace=do_replace,
679  extra_columns=extra_columns)
680 
681  else:
682 
683  # truncate existing validity intervals
684  table = self._schema_schema.objects
685  query = 'UPDATE "' + table.name + '" '
686  query += "SET \"validityEnd\" = '" + str(dt) + "' "
687  query += 'WHERE "diaObjectId" IN (' + ids + ') '
688  query += 'AND "validityEnd" IS NULL'
689 
690  # _LOG.debug("query: %s", query)
691 
692  if self.configconfig.explain:
693  # run the same query with explain
694  self._explain_explain(query, conn)
695 
696  with Timer(table.name + ' truncate', self.configconfig.timer):
697  res = conn.execute(sql.text(query))
698  _LOG.debug("truncated %s intervals", res.rowcount)
699 
700  # insert new versions
701  if self.configconfig.dia_object_nightly:
702  table = self._schema_schema.objects_nightly
703  else:
704  table = self._schema_schema.objects
705  extra_columns = dict(lastNonForcedSource=dt, validityStart=dt,
706  validityEnd=None)
707  if isinstance(objs, pandas.DataFrame):
708  with Timer("DiaObject insert", self.configconfig.timer):
709  for col, data in extra_columns.items():
710  objs[col] = data
711  objs.to_sql("DiaObject", conn, if_exists='append',
712  index=False)
713  else:
714  self._storeObjectsAfw_storeObjectsAfw(objs, conn, table, "DiaObject",
715  extra_columns=extra_columns)
716 
717  def storeDiaSources(self, sources):
718  """Store catalog of DIASources from current visit.
719 
720  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
721  compatible with the schema of L1 database table:
722 
723  - column names must correspond to database table columns
724  - some columns names may be re-mapped based on column map passed to
725  constructor
726  - types and units of the columns must match database definitions,
727  no unit conversion is performed presently
728  - columns that have default values in database schema can be
729  omitted from afw schema
730 
731  Parameters
732  ----------
733  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
734  Catalog containing DiaSource records
735  """
736 
737  # everything to be done in single transaction
738  with _ansi_session(self._engine_engine) as conn:
739 
740  if isinstance(sources, pandas.DataFrame):
741  with Timer("DiaSource insert", self.configconfig.timer):
742  sources.to_sql("DiaSource", conn, if_exists='append',
743  index=False)
744  else:
745  table = self._schema_schema.sources
746  self._storeObjectsAfw_storeObjectsAfw(sources, conn, table, "DiaSource")
747 
748  def storeDiaForcedSources(self, sources):
749  """Store a set of DIAForcedSources from current visit.
750 
751  This methods takes :doc:`/modules/lsst.afw.table/index` catalog, its schema must be
752  compatible with the schema of L1 database table:
753 
754  - column names must correspond to database table columns
755  - some columns names may be re-mapped based on column map passed to
756  constructor
757  - types and units of the columns must match database definitions,
758  no unit conversion is performed presently
759  - columns that have default values in database schema can be
760  omitted from afw schema
761 
762  Parameters
763  ----------
764  sources : `lsst.afw.table.BaseCatalog` or `pandas.DataFrame`
765  Catalog containing DiaForcedSource records
766  """
767 
768  # everything to be done in single transaction
769  with _ansi_session(self._engine_engine) as conn:
770 
771  if isinstance(sources, pandas.DataFrame):
772  with Timer("DiaForcedSource insert", self.configconfig.timer):
773  sources.to_sql("DiaForcedSource", conn, if_exists='append',
774  index=False)
775  else:
776  table = self._schema_schema.forcedSources
777  self._storeObjectsAfw_storeObjectsAfw(sources, conn, table, "DiaForcedSource")
778 
780  """Return the number of DiaObjects that have only one DiaSource associated
781  with them.
782 
783  Used as part of ap_verify metrics.
784 
785  Returns
786  -------
787  count : `int`
788  Number of DiaObjects with exactly one associated DiaSource.
789  """
790  # Retrieve the DiaObject table.
791  table = self._schema_schema.objects
792 
793  # Construct the sql statement.
794  stmt = sql.select([func.count()]).select_from(table).where(table.c.nDiaSources == 1)
795  stmt = stmt.where(table.c.validityEnd == None) # noqa: E711
796 
797  # Return the count.
798  count = self._engine_engine.scalar(stmt)
799 
800  return count
801 
802  def isVisitProcessed(self, visitInfo):
803  """Test whether data from an image has been loaded into the database.
804 
805  Used as part of ap_verify metrics.
806 
807  Parameters
808  ----------
809  visitInfo : `lsst.afw.image.VisitInfo`
810  The metadata for the image of interest.
811 
812  Returns
813  -------
814  isProcessed : `bool`
815  `True` if the data are present, `False` otherwise.
816  """
817  id = visitInfo.getExposureId()
818  table = self._schema_schema.sources
819  idField = table.c.ccdVisitId
820 
821  # Hopefully faster than SELECT DISTINCT
822  query = sql.select([idField]).select_from(table) \
823  .where(idField == id).limit(1)
824 
825  return self._engine_engine.scalar(query) is not None
826 
827  def dailyJob(self):
828  """Implement daily activities like cleanup/vacuum.
829 
830  What should be done during daily cleanup is determined by
831  configuration/schema.
832  """
833 
834  # move data from DiaObjectNightly into DiaObject
835  if self.configconfig.dia_object_nightly:
836  with _ansi_session(self._engine_engine) as conn:
837  query = 'INSERT INTO "' + self._schema_schema.objects.name + '" '
838  query += 'SELECT * FROM "' + self._schema_schema.objects_nightly.name + '"'
839  with Timer('DiaObjectNightly copy', self.configconfig.timer):
840  conn.execute(sql.text(query))
841 
842  query = 'DELETE FROM "' + self._schema_schema.objects_nightly.name + '"'
843  with Timer('DiaObjectNightly delete', self.configconfig.timer):
844  conn.execute(sql.text(query))
845 
846  if self._engine_engine.name == 'postgresql':
847 
848  # do VACUUM on all tables
849  _LOG.info("Running VACUUM on all tables")
850  connection = self._engine_engine.raw_connection()
851  ISOLATION_LEVEL_AUTOCOMMIT = 0
852  connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
853  cursor = connection.cursor()
854  cursor.execute("VACUUM ANALYSE")
855 
856  def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False):
857  """Create or re-create all tables.
858 
859  Parameters
860  ----------
861  drop : `bool`
862  If True then drop tables before creating new ones.
863  mysql_engine : `str`, optional
864  Name of the MySQL engine to use for new tables.
865  oracle_tablespace : `str`, optional
866  Name of Oracle tablespace.
867  oracle_iot : `bool`, optional
868  Make Index-organized DiaObjectLast table.
869  """
870  self._schema_schema.makeSchema(drop=drop, mysql_engine=mysql_engine,
871  oracle_tablespace=oracle_tablespace,
872  oracle_iot=oracle_iot)
873 
874  def _explain(self, query, conn):
875  """Run the query with explain
876  """
877 
878  _LOG.info("explain for query: %s...", query[:64])
879 
880  if conn.engine.name == 'mysql':
881  query = "EXPLAIN EXTENDED " + query
882  else:
883  query = "EXPLAIN " + query
884 
885  res = conn.execute(sql.text(query))
886  if res.returns_rows:
887  _LOG.info("explain: %s", res.keys())
888  for row in res:
889  _LOG.info("explain: %s", row)
890  else:
891  _LOG.info("EXPLAIN returned nothing")
892 
893  def _storeObjectsAfw(self, objects, conn, table, schema_table_name,
894  replace=False, extra_columns=None):
895  """Generic store method.
896 
897  Takes catalog of records and stores a bunch of objects in a table.
898 
899  Parameters
900  ----------
901  objects : `lsst.afw.table.BaseCatalog`
902  Catalog containing object records
903  conn :
904  Database connection
905  table : `sqlalchemy.Table`
906  Database table
907  schema_table_name : `str`
908  Name of the table to be used for finding table schema.
909  replace : `boolean`
910  If `True` then use replace instead of INSERT (should be more efficient)
911  extra_columns : `dict`, optional
912  Mapping (column_name, column_value) which gives column values to add
913  to every row, only if column is missing in catalog records.
914  """
915 
916  def quoteValue(v):
917  """Quote and escape values"""
918  if v is None:
919  v = "NULL"
920  elif isinstance(v, datetime):
921  v = "'" + str(v) + "'"
922  elif isinstance(v, str):
923  # we don't expect nasty stuff in strings
924  v = "'" + v + "'"
925  elif isinstance(v, geom.Angle):
926  v = v.asDegrees()
927  if np.isfinite(v):
928  v = str(v)
929  else:
930  v = "NULL"
931  else:
932  if np.isfinite(v):
933  v = str(v)
934  else:
935  v = "NULL"
936  return v
937 
938  def quoteId(columnName):
939  """Smart quoting for column names.
940  Lower-case names are not quoted.
941  """
942  if not columnName.islower():
943  columnName = '"' + columnName + '"'
944  return columnName
945 
946  if conn.engine.name == "oracle":
947  return self._storeObjectsAfwOracle_storeObjectsAfwOracle(objects, conn, table,
948  schema_table_name, replace,
949  extra_columns)
950 
951  schema = objects.getSchema()
952  # use extra columns if specified
953  extra_fields = list((extra_columns or {}).keys())
954 
955  afw_fields = [field.getName() for key, field in schema
956  if field.getName() not in extra_fields]
957 
958  column_map = self._schema_schema.getAfwColumns(schema_table_name)
959  # list of columns (as in cat schema)
960  fields = [column_map[field].name for field in afw_fields if field in column_map]
961 
962  if replace and conn.engine.name in ('mysql', 'sqlite'):
963  query = 'REPLACE INTO '
964  else:
965  query = 'INSERT INTO '
966  qfields = [quoteId(field) for field in fields + extra_fields]
967  query += quoteId(table.name) + ' (' + ','.join(qfields) + ') ' + 'VALUES '
968 
969  values = []
970  for rec in objects:
971  row = []
972  for field in afw_fields:
973  if field not in column_map:
974  continue
975  value = rec[field]
976  if column_map[field].type == "DATETIME" and \
977  np.isfinite(value):
978  # convert seconds into datetime
979  value = datetime.utcfromtimestamp(value)
980  row.append(quoteValue(value))
981  for field in extra_fields:
982  row.append(quoteValue(extra_columns[field]))
983  values.append('(' + ','.join(row) + ')')
984 
985  if self.configconfig.explain:
986  # run the same query with explain, only give it one row of data
987  self._explain_explain(query + values[0], conn)
988 
989  query += ','.join(values)
990 
991  if replace and conn.engine.name == 'postgresql':
992  # This depends on that "replace" can only be true for DiaObjectLast table
993  pks = ('pixelId', 'diaObjectId')
994  query += " ON CONFLICT (\"{}\", \"{}\") DO UPDATE SET ".format(*pks)
995  fields = [column_map[field].name for field in afw_fields if field in column_map]
996  fields = ['"{0}" = EXCLUDED."{0}"'.format(field)
997  for field in fields if field not in pks]
998  query += ', '.join(fields)
999 
1000  # _LOG.debug("query: %s", query)
1001  _LOG.info("%s: will store %d records", table.name, len(objects))
1002  with Timer(table.name + ' insert', self.configconfig.timer):
1003  res = conn.execute(sql.text(query))
1004  _LOG.debug("inserted %s intervals", res.rowcount)
1005 
1006  def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name,
1007  replace=False, extra_columns=None):
1008  """Store method for Oracle.
1009 
1010  Takes catalog of records and stores a bunch of objects in a table.
1011 
1012  Parameters
1013  ----------
1014  objects : `lsst.afw.table.BaseCatalog`
1015  Catalog containing object records
1016  conn :
1017  Database connection
1018  table : `sqlalchemy.Table`
1019  Database table
1020  schema_table_name : `str`
1021  Name of the table to be used for finding table schema.
1022  replace : `boolean`
1023  If `True` then use replace instead of INSERT (should be more efficient)
1024  extra_columns : `dict`, optional
1025  Mapping (column_name, column_value) which gives column values to add
1026  to every row, only if column is missing in catalog records.
1027  """
1028 
1029  def quoteId(columnName):
1030  """Smart quoting for column names.
1031  Lower-case naems are not quoted (Oracle backend needs them unquoted).
1032  """
1033  if not columnName.islower():
1034  columnName = '"' + columnName + '"'
1035  return columnName
1036 
1037  schema = objects.getSchema()
1038 
1039  # use extra columns that as overrides always.
1040  extra_fields = list((extra_columns or {}).keys())
1041 
1042  afw_fields = [field.getName() for key, field in schema
1043  if field.getName() not in extra_fields]
1044  # _LOG.info("afw_fields: %s", afw_fields)
1045 
1046  column_map = self._schema_schema.getAfwColumns(schema_table_name)
1047  # _LOG.info("column_map: %s", column_map)
1048 
1049  # list of columns (as in cat schema)
1050  fields = [column_map[field].name for field in afw_fields
1051  if field in column_map]
1052  # _LOG.info("fields: %s", fields)
1053 
1054  qfields = [quoteId(field) for field in fields + extra_fields]
1055 
1056  if not replace:
1057  vals = [":col{}".format(i) for i in range(len(fields))]
1058  vals += [":extcol{}".format(i) for i in range(len(extra_fields))]
1059  query = 'INSERT INTO ' + quoteId(table.name)
1060  query += ' (' + ','.join(qfields) + ') VALUES'
1061  query += ' (' + ','.join(vals) + ')'
1062  else:
1063  qvals = [":col{} {}".format(i, quoteId(field)) for i, field in enumerate(fields)]
1064  qvals += [":extcol{} {}".format(i, quoteId(field)) for i, field in enumerate(extra_fields)]
1065  pks = ('pixelId', 'diaObjectId')
1066  onexpr = ["SRC.{col} = DST.{col}".format(col=quoteId(col)) for col in pks]
1067  setexpr = ["DST.{col} = SRC.{col}".format(col=quoteId(col))
1068  for col in fields + extra_fields if col not in pks]
1069  vals = ["SRC.{col}".format(col=quoteId(col)) for col in fields + extra_fields]
1070  query = "MERGE INTO {} DST ".format(quoteId(table.name))
1071  query += "USING (SELECT {} FROM DUAL) SRC ".format(", ".join(qvals))
1072  query += "ON ({}) ".format(" AND ".join(onexpr))
1073  query += "WHEN MATCHED THEN UPDATE SET {} ".format(" ,".join(setexpr))
1074  query += "WHEN NOT MATCHED THEN INSERT "
1075  query += "({}) VALUES ({})".format(','.join(qfields), ','.join(vals))
1076  # _LOG.info("query: %s", query)
1077 
1078  values = []
1079  for rec in objects:
1080  row = {}
1081  col = 0
1082  for field in afw_fields:
1083  if field not in column_map:
1084  continue
1085  value = rec[field]
1086  if column_map[field].type == "DATETIME" and not np.isnan(value):
1087  # convert seconds into datetime
1088  value = datetime.utcfromtimestamp(value)
1089  elif isinstance(value, geom.Angle):
1090  value = str(value.asDegrees())
1091  elif not np.isfinite(value):
1092  value = None
1093  row["col{}".format(col)] = value
1094  col += 1
1095  for i, field in enumerate(extra_fields):
1096  row["extcol{}".format(i)] = extra_columns[field]
1097  values.append(row)
1098 
1099  # _LOG.debug("query: %s", query)
1100  _LOG.info("%s: will store %d records", table.name, len(objects))
1101  with Timer(table.name + ' insert', self.configconfig.timer):
1102  res = conn.execute(sql.text(query), values)
1103  _LOG.debug("inserted %s intervals", res.rowcount)
1104 
1105  def _convertResult(self, res, table_name, catalog=None):
1106  """Convert result set into output catalog.
1107 
1108  Parameters
1109  ----------
1110  res : `sqlalchemy.ResultProxy`
1111  SQLAlchemy result set returned by query.
1112  table_name : `str`
1113  Name of the table.
1114  catalog : `lsst.afw.table.BaseCatalog`
1115  If not None then extend existing catalog
1116 
1117  Returns
1118  -------
1119  catalog : `lsst.afw.table.SourceCatalog`
1120  If ``catalog`` is None then new instance is returned, otherwise
1121  ``catalog`` is updated and returned.
1122  """
1123  # make catalog schema
1124  columns = res.keys()
1125  schema, col_map = self._schema_schema.getAfwSchema(table_name, columns)
1126  if catalog is None:
1127  _LOG.debug("_convertResult: schema: %s", schema)
1128  _LOG.debug("_convertResult: col_map: %s", col_map)
1129  catalog = afwTable.SourceCatalog(schema)
1130 
1131  # fill catalog
1132  for row in res:
1133  record = catalog.addNew()
1134  for col, value in row.items():
1135  # some columns may exist in database but not included in afw schema
1136  col = col_map.get(col)
1137  if col is not None:
1138  if isinstance(value, datetime):
1139  # convert datetime to number of seconds
1140  value = int((value - datetime.utcfromtimestamp(0)).total_seconds())
1141  elif col.getTypeString() == 'Angle' and value is not None:
1142  value = value * geom.degrees
1143  if value is not None:
1144  record.set(col, value)
1145 
1146  return catalog
table::Key< int > type
Definition: Detector.cc:163
def storeDiaObjects(self, objs, dt)
Definition: apdb.py:608
def lastVisit(self)
Definition: apdb.py:268
def saveVisit(self, visitId, visitTime)
Definition: apdb.py:303
def __init__(self, config, afw_schemas=None)
Definition: apdb.py:226
def _storeObjectsAfwOracle(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: apdb.py:1007
def storeDiaSources(self, sources)
Definition: apdb.py:717
def dailyJob(self)
Definition: apdb.py:827
def _convertResult(self, res, table_name, catalog=None)
Definition: apdb.py:1105
def isVisitProcessed(self, visitInfo)
Definition: apdb.py:802
def _storeObjectsAfw(self, objects, conn, table, schema_table_name, replace=False, extra_columns=None)
Definition: apdb.py:894
def getDiaSourcesInRegion(self, pixel_ranges, dt, return_pandas=False)
Definition: apdb.py:425
def getDiaObjects(self, pixel_ranges, return_pandas=False)
Definition: apdb.py:343
def makeSchema(self, drop=False, mysql_engine='InnoDB', oracle_tablespace=None, oracle_iot=False)
Definition: apdb.py:856
def _explain(self, query, conn)
Definition: apdb.py:874
def storeDiaForcedSources(self, sources)
Definition: apdb.py:748
def tableRowCount(self)
Definition: apdb.py:321
def countUnassociatedObjects(self)
Definition: apdb.py:779
def getDiaForcedSources(self, object_ids, dt, return_pandas=False)
Definition: apdb.py:545
def getDiaSources(self, object_ids, dt, return_pandas=False)
Definition: apdb.py:484
def __init__(self, name, do_logging=True, log_before_cursor_execute=False)
Definition: apdb.py:61
def __exit__(self, exc_type, exc_val, exc_tb)
Definition: apdb.py:76
def __enter__(self)
Definition: apdb.py:67
A class representing an angle.
Definition: Angle.h:127
daf::base::PropertyList * list
Definition: fits.cc:913
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
std::string getPackageDir(std::string const &packageName)
return the root directory of a setup package
Definition: packaging.cc:33