LSSTApplications  1.1.2+25,10.0+13,10.0+132,10.0+133,10.0+224,10.0+41,10.0+8,10.0-1-g0f53050+14,10.0-1-g4b7b172+19,10.0-1-g61a5bae+98,10.0-1-g7408a83+3,10.0-1-gc1e0f5a+19,10.0-1-gdb4482e+14,10.0-11-g3947115+2,10.0-12-g8719d8b+2,10.0-15-ga3f480f+1,10.0-2-g4f67435,10.0-2-gcb4bc6c+26,10.0-28-gf7f57a9+1,10.0-3-g1bbe32c+14,10.0-3-g5b46d21,10.0-4-g027f45f+5,10.0-4-g86f66b5+2,10.0-4-gc4fccf3+24,10.0-40-g4349866+2,10.0-5-g766159b,10.0-5-gca2295e+25,10.0-6-g462a451+1
LSSTDataManagementBasePackage
ingestSourcesTask.py
Go to the documentation of this file.
1 import MySQLdb
2 import math
3 import re
4 import sys
5 import traceback
6 
7 import lsst.afw.table as afwTable
8 import lsst.daf.base as dafBase
9 import lsst.daf.persistence as dafPersist
10 import lsst.pex.config as pexConfig
11 import lsst.pipe.base as pipeBase
12 
13 class ColumnFormatter(object):
14  """A class to format a column in an afw.SourceCatalog.
15 
16  A little tricky because a column's values may be composite entities
17  (coordinates, matrixes, etc.).
18 
19  This class is basically a container for a SQL type, a function returning
20  SQL column names, and a function returning the formatted value of a
21  column."""
22 
23  def __init__(self, sqlType, columnNameCallable, formatValueCallable):
24  """Store the column formatting information."""
25  self.sqlType = sqlType
26  self.columnNameCallable = columnNameCallable
27  self.formatValueCallable = formatValueCallable
28 
29  def getSqlType(self):
30  """Return the SQL type (e.g. BIGINT, DOUBLE) for the column's basic
31  values."""
32  return self.sqlType
33 
34  def getColumnNames(self, baseName):
35  """Return an iterable of the names that should be used for columns in
36  SQL given a SQL-compatible base name derived from the catalog's column
37  name."""
38  return self.columnNameCallable(baseName)
39 
40  def formatValue(self, value):
41  """Return a string suitable for inclusion in an INSERT/REPLACE
42  statement (not a CSV file) resulting from formatting the column's
43  value. One value should be provided for each of the column names that
44  had been returned, of course. Values should be separated by commas.
45  This method also handles changing "None" values to SQL NULLs."""
46  if value is None:
47  return "NULL"
48  return self.formatValueCallable(value)
49 
50 
51 def _formatNumber(fmt, number):
52  """Auxiliary function for formatting a number, handling conversion of NaN
53  and infinities to NULL."""
54  if math.isnan(number) or math.isinf(number):
55  return "NULL"
56  return fmt % (number,)
57 
58 def _formatList(fmt, list):
59  """Auxiliary function for formatting a list of numbers using a common
60  format, joining the results with commas."""
61  return ", ".join([_formatNumber(fmt, x) for x in list])
62 
63 """Describe how to handle each of the column types. Array and Cov (plain)
64 types are not yet processed."""
65 columnFormatters = dict(
66  Flag = ColumnFormatter("BIT", lambda x: (x,),
67  lambda v: "1" if v else "0"),
68  I = ColumnFormatter("INT", lambda x: (x,),
69  lambda v: str(v)),
70  L = ColumnFormatter("BIGINT", lambda x: (x,),
71  lambda v: str(v)),
72  F = ColumnFormatter("FLOAT", lambda x: (x,),
73  lambda v: _formatNumber("%.9g", v)),
74  D = ColumnFormatter("DOUBLE", lambda x: (x,),
75  lambda v: _formatNumber("%.17g", v)),
76  Angle = ColumnFormatter("DOUBLE", lambda x: (x,),
77  lambda v: _formatNumber("%.17g", v.asDegrees())),
78  Coord = ColumnFormatter("DOUBLE", lambda x: (x + "_ra", x + "_dec"),
79  lambda v: _formatList("%.17g",
80  (v.getRa().asDegrees(), v.getDec().asDegrees()))),
81  PointI = ColumnFormatter("INT", lambda x: (x + "_x", x + "_y"),
82  lambda v: _formatList("%d", (v[0], v[1]))),
83  PointF = ColumnFormatter("FLOAT", lambda x: (x + "_x", x + "_y"),
84  lambda v: _formatList("%.9g", (v[0], v[1]))),
85  PointD = ColumnFormatter("DOUBLE", lambda x: (x + "_x", x + "_y"),
86  lambda v: _formatList("%.17g", (v[0], v[1]))),
87  MomentsF = ColumnFormatter("FLOAT",
88  lambda x: (x + "_xx", x + "_xy", x + "_yy"),
89  lambda v: _formatList("%.9g",
90  (v.getIxx(), v.getIxy(), v.getIyy()))),
91  MomentsD = ColumnFormatter("DOUBLE",
92  lambda x: (x + "_xx", x + "_xy", x + "_yy"),
93  lambda v: _formatList("%.17g",
94  (v.getIxx(), v.getIxy(), v.getIyy()))),
95  CovPointF = ColumnFormatter("FLOAT",
96  lambda x: (x + "_xx", x + "_xy", x + "_yy"),
97  lambda v: _formatList("%.9g", (v[0, 0], v[0, 1], v[1, 1]))),
98  CovPointD = ColumnFormatter("DOUBLE",
99  lambda x: (x + "_xx", x + "_xy", x + "_yy"),
100  lambda v: _formatList("%.17g", (v[0, 0], v[0, 1], v[1, 1]))),
101  CovMomentsF = ColumnFormatter("FLOAT",
102  lambda x: (x + "_xx_xx", x + "_xx_xy", x + "_xx_yy",
103  x + "_xy_xy", x + "_xy_yy", x + "_yy_yy"),
104  lambda v: _formatList("%.9g",
105  (v[0, 0], v[0, 1], v[0, 2], v[1, 1], v[1, 2], v[2, 2]))),
106  CovMomentsD = ColumnFormatter("DOUBLE",
107  lambda x: (x + "_xx_xx", x + "_xx_xy", x + "_xx_yy",
108  x + "_xy_xy", x + "_xy_yy", x + "_yy_yy"),
109  lambda v: _formatList("%.17g",
110  (v[0, 0], v[0, 1], v[0, 2], v[1, 1], v[1, 2], v[2, 2])))
111  )
112 
113 class IngestSourcesConfig(pexConfig.Config):
114  """Configuration for the IngestSourcesTask."""
115  allowReplace = pexConfig.Field(
116  "Allow replacement of existing rows with the same source IDs",
117  bool, default=False)
118  maxQueryLen = pexConfig.Field(
119  "Maximum length of a query string."
120  " None means use a non-standard, database-specific way to get"
121  " the maximum.",
122  int, optional=True, default=None)
123  idColumnName = pexConfig.Field(
124  "Name of unique identifier column",
125  str, default="id")
126  remap = pexConfig.DictField(
127  "Column name remapping. "
128  "key = normal SQL column name, value = desired SQL column name",
129  keytype=str, itemtype=str,
130  optional=True,
131  default={"coord_ra": "ra", "coord_dec": "decl"})
132  extraColumns = pexConfig.Field(
133  "Extra column definitions, comma-separated, to put into the"
134  " CREATE TABLE statement if the table is being created",
135  str, optional=True, default="")
136 
137 class IngestSourcesTaskRunner(pipeBase.TaskRunner):
138  @staticmethod
139  def getTargetList(parsedCmd):
140  """Override the target list to add additional run() method
141  parameters."""
142  return pipeBase.TaskRunner.getTargetList(parsedCmd,
143  dstype=parsedCmd.dstype,
144  tableName=parsedCmd.tableName,
145  host=parsedCmd.host,
146  db=parsedCmd.db,
147  port=parsedCmd.port,
148  user=parsedCmd.user)
149 
150  def precall(self, parsedCmd):
151  """Override the precall to not write schemas, not require writing of
152  configs, and set the task's name appropriately."""
153  self.TaskClass._DefaultName += "_" + parsedCmd.dstype
154  task = self.TaskClass(config=self.config, log=self.log)
155  try:
156  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig)
157  except Exception, e:
158  # Often no mapping for config, but in any case just skip
159  task.log.warn("Could not persist config: %s" % (e,))
160  return True
161 
162 
163 class IngestSourcesTask(pipeBase.CmdLineTask):
164  """Task to ingest a SourceCatalog of arbitrary schema into a database table.
165 
166  This task connects to a database using connection information given
167  through command line arguments or run() parameters. It attempts to use
168  a .mysql.cnf file if present (by not specifying a password) and falls back
169  to using credentials obtained via the DbAuth interface if not.
170 
171  If run from the command line, it will ingest each catalog of Sources
172  specified by a data id and dataset type. A sample command line might look
173  like:
174  $DATAREL_DIR/bin/ingest/ingestSources.py
175  {repository path}
176  --host lsst-db.ncsa.illinois.edu
177  --database {user}_S12_sdss_u_s2012prod_{runid}
178  --table DiaSources
179  --dstype goodSeeingDiff_src
180  --id run=... camcol=... filter=... field=...
181  As usual for tasks, multiple --id options may be specified, or ranges and
182  lists of values can be specified for data id keys.
183 
184  There are also two methods (ingest() and runFile()) that can be manually
185  called to ingest catalogs, either by passing the catalog explicitly or by
186  passing the name of a FITS file containing the catalog. Both, like run(),
187  require database connection information.
188 
189  The ingestion process creates the destination table in the database if it
190  doesn't exist. The schema is translated from the source catalog's schema.
191  The database table must contain a unique identifier column, named in the
192  idColumnName configuration parameter. The only index provided is a unique
193  one on this id field. (Additional ones can be created later, of course.)
194  Columns can be renamed using the remap configuration parameter. The names
195  to be remapped have been canonicalized (for now by changing any non-word
196  characters to underscores) and may have additional subfield tags appended
197  (such as "_ra" or "_y"). Extra columns (e.g. ones to be filled in later
198  by spatial indexing code) may be added to the table via the extraColumns
199  configuration parameter.
200 
201  Note that "nullable integer" columns are not provided. There is no way to
202  represent these explicitly in the source catalog, and translating 0 to
203  NULL seems to have little value and might be error-prone. (An option
204  could be provided to do this if it turns out to be necessary.)
205 
206  Also note that covariances and moments are assumed to be in pixel space
207  (or something else) and not angular space and so do not need
208  radians-to-degrees conversion.
209 
210  If the table does exist, one row of the input (the first) is checked to
211  see if it already exists in the destination table. If it does, the
212  ingestion fails unless the allowReplace configuration parameter is set to
213  True.
214 
215  Rows are inserted into the database via INSERT statements. As many rows
216  as possible are packed into each INSERT to maximize throughput. The limit
217  on INSERT statement length is either set by configuration or determined by
218  querying the database (in a MySQL-specific way). This may not be as
219  efficient in its use of the database as converting to CSV and doing a bulk
220  load, but it eliminates the use of (often shared) disk resources. The use
221  of INSERTs (committed once at the end) may not be fully parallelizable
222  (particularly due to the unique id index), but tests seem to indicate that
223  it is at least not much slower to execute many such INSERTs in parallel
224  compared with executing them all sequentially. This remains an area for
225  future optimization.
226 
227  The columnFormatters dictionary is used to determine how to format each
228  type of column in the source catalog. If new column types are added to
229  afw::table and are used in Source catalogs, they should also be added
230  here. While lambdas are used for the formatting functions for
231  compactness, they can be any callable (and so can handle more complex
232  logic than can be embedded in a lambda -- e.g. checking a column's units
233  to see if it needs to be converted from radians to degrees).
234  """
235 
236  ConfigClass = IngestSourcesConfig
237  _DefaultName = "ingestSources"
238  RunnerClass = IngestSourcesTaskRunner
239 
240  @classmethod
242  """Extend the default argument parser with database-specific
243  arguments and the dataset type for the Sources to be read."""
244  parser = pipeBase.ArgumentParser(name=cls._DefaultName)
245  parser.add_argument("-H", "--host", dest="host", required=True,
246  help="Database hostname")
247  parser.add_argument("-D", "--database", dest="db", required=True,
248  help="Database name")
249  parser.add_argument("-U", "--user", dest="user",
250  help="Database username (optional)", default=None)
251  parser.add_argument("-P", "--port", dest="port",
252  help="Database port number (optional)", default=3306)
253  parser.add_argument("-t", "--table", dest="tableName", required=True,
254  help="Table to ingest into")
255  parser.add_id_argument("--id", pipeBase.DatasetArgument("dstype"),
256  help="Source dataset data id to ingest")
257  # Use DatasetArgument to require dataset type be specified on
258  # the command line
259  return parser
260 
261  def runFile(self, fileName, tableName, host, db, port=3306, user=None):
262  """Ingest a SourceCatalog specified by a filename."""
263  cat = afwTable.SourceCatalog.readFits(fileName)
264  self.ingest(cat, tableName, host, db, port, user)
265 
266  def run(self, dataRef, dstype, tableName, host, db, port=3306, user=None):
267  """Ingest a SourceCatalog specified by a dataref and dataset type."""
268  self.ingest(dataRef.get(dstype), tableName, host, db, port, user)
269 
270  @pipeBase.timeMethod
271  def ingest(self, cat, tableName, host, db, port=3306, user=None):
272  """Ingest a SourceCatalog passed as an object.
273 
274  @param cat (SourceCatalog) Catalog to ingest.
275  @param tableName (str) Name of the database table to create.
276  @param host (str) Name of the database host machine.
277  @param db (str) Name of the database to ingest into.
278  @param port (int) Port number on the database host.
279  @param user (str) User name to use for the database."""
280 
281  try:
282  # See if we can connect without a password (e.g. via my.cnf)
283  self.db = MySQLdb.connect(host=host, port=port, user=user, db=db)
284  except:
285  # Fallback to DbAuth
286  user = dafPersist.DbAuth.username(host, str(port))
287  passwd = dafPersist.DbAuth.password(host, str(port))
288  self.db = MySQLdb.connect(host=host, port=port,
289  user=user, passwd=passwd, db=db)
290  self.tableName = tableName
291 
292  # Determine the maximum query length (MySQL-specific) if not
293  # configured.
294  if self.config.maxQueryLen is None:
295  self.maxQueryLen = int(self._getSqlScalar("""
296  SELECT variable_value
297  FROM information_schema.session_variables
298  WHERE variable_name = 'max_allowed_packet';"""))
299  else:
300  self.maxQueryLen = self.config.maxQueryLen
301 
302  """Ingest a SourceCatalog by converting it to one or more (large)
303  INSERT or REPLACE statements, executing those statements, and
304  committing the result."""
305 
306  tableName = self.db.escape_string(self.tableName)
307  self._checkTable(tableName, cat)
308  pos = 0
309  while pos < len(cat):
310  if self.config.allowReplace:
311  sql = "REPLACE"
312  else:
313  sql = "INSERT"
314  sql += " INTO `%s` (" % (tableName,)
315  keys = []
316  firstCol = True
317  for col in cat.schema:
318  if col.field.getTypeString() not in columnFormatters:
319  self.log.warn(
320  "Skipping complex column: {name} ({type})".format(
321  name=col.field.getName(),
322  type=col.field.getTypeString()))
323 
324  continue
325  formatter = columnFormatters[col.field.getTypeString()]
326  keys.append((col.key, formatter))
327  if firstCol:
328  firstCol = False
329  else:
330  sql += ", "
331  sql += self._columnDef(col, includeTypes=False)
332  sql += ") VALUES "
333  initialPos = pos
334  maxValueLen = self.maxQueryLen - len(sql)
335  while pos < len(cat):
336  source = cat[pos]
337  value = "("
338  value += ", ".join([formatter.formatValue(source.get(key))
339  for (key, formatter) in keys])
340  value += "), "
341  maxValueLen -= len(value)
342  if maxValueLen < 0:
343  break
344  else:
345  sql += value
346  pos += 1
347  if pos == initialPos:
348  # Have not made progress
349  raise RuntimeError("Single row too large to insert")
350  self._executeSql(sql[:-2] + ";")
351  self.db.commit()
352 
353  def _executeSql(self, sql):
354  """Execute a SQL query with no expectation of result."""
355  self.log.logdebug("executeSql: " + sql)
356  self.db.query(sql)
357 
358  def _getSqlScalar(self, sql):
359  """Execute a SQL query and return a single scalar result."""
360  cur = self.db.cursor()
361  self.log.logdebug("getSqlScalar: " + sql)
362  rows = cur.execute(sql)
363  if rows != 1:
364  raise RuntimeError(
365  "Wrong number of rows (%d) for scalar query: %s" %
366  (rows, sql))
367  row = cur.fetchone()
368  self.log.logdebug("Result: " + str(row))
369  return row[0]
370 
371  def _checkTable(self, tableName, cat):
372  """Check to make sure a table exists by selecting a row from it. If
373  the row contains the unique id of the first item in the input
374  SourceCatalog, assume that the rest are present as well. If the table
375  does not exist, create it."""
376 
377  sampleId = cat[0][self.config.idColumnName]
378  count = 0
379  try:
380  count = self._getSqlScalar(
381  "SELECT COUNT(*) FROM `%s` WHERE %s = %d;" % (
382  tableName, self.config.idColumnName, sampleId))
383  except RuntimeError, e:
384  raise e
385  except:
386  pass
387  if count == 0:
388  self._createTable(tableName, cat.schema)
389  elif self.config.allowReplace:
390  self.log.warn("Overwriting existing rows")
391  else:
392  raise RuntimeError("Row exists: {name}={id}".format(
393  name=self.config.idColumnName, id=sampleId))
394 
395  def _createTable(self, tableName, schema):
396  """Create a table. Use column definitions based on the provided table
397  schema, adding in any extra columns specified in the config. The
398  unique id column is given a key."""
399  sql = "CREATE TABLE IF NOT EXISTS `%s` (" % (tableName,)
400  sql += ", ".join([self._columnDef(col) for col in schema if
401  col.field.getTypeString() in columnFormatters])
402  if self.config.extraColumns is not None and self.config.extraColumns != "":
403  sql += ", " + self.config.extraColumns
404  sql += ", UNIQUE(%s)" % (self.config.idColumnName,)
405  sql += ");"
406  self._executeSql(sql)
407 
408  def _columnDef(self, col, includeTypes=True):
409  """Return the column definition for a given schema column, which may
410  be composed of multiple database columns (separated by commas). If
411  includeTypes is True (the default), include the SQL type for the
412  column as for a CREATE TABLE statement."""
413  formatter = columnFormatters[col.field.getTypeString()]
414  baseName = self._canonicalizeName(col.field.getName())
415  columnType = " " + formatter.getSqlType() if includeTypes else ""
416  return ", ".join(["%s%s" % (self._remapColumn(columnName), columnType)
417  for columnName in formatter.getColumnNames(baseName)])
418 
419  def _remapColumn(self, colName):
420  """Remap a column name according to the remap dictionary in the
421  config."""
422  if colName in self.config.remap:
423  return self.config.remap[colName]
424  return colName
425 
426  def _canonicalizeName(self, colName):
427  """Return a SQL-compatible version of the schema column name."""
428  return re.sub(r'[^\w]', '_', colName)