14 """A class to format a column in an afw.SourceCatalog.
16 A little tricky because a column's values may be composite entities
17 (coordinates, matrixes, etc.).
19 This class is basically a container for a SQL type, a function returning
20 SQL column names, and a function returning the formatted value of a
23 def __init__(self, sqlType, columnNameCallable, formatValueCallable):
24 """Store the column formatting information."""
30 """Return the SQL type (e.g. BIGINT, DOUBLE) for the column's basic
35 """Return an iterable of the names that should be used for columns in
36 SQL given a SQL-compatible base name derived from the catalog's column
41 """Return a string suitable for inclusion in an INSERT/REPLACE
42 statement (not a CSV file) resulting from formatting the column's
43 value. One value should be provided for each of the column names that
44 had been returned, of course. Values should be separated by commas.
45 This method also handles changing "None" values to SQL NULLs."""
52 """Auxiliary function for formatting a number, handling conversion of NaN
53 and infinities to NULL."""
54 if math.isnan(number)
or math.isinf(number):
56 return fmt % (number,)
59 """Auxiliary function for formatting a list of numbers using a common
60 format, joining the results with commas."""
63 """Describe how to handle each of the column types. Array and Cov (plain)
64 types are not yet processed."""
65 columnFormatters = dict(
67 lambda v:
"1" if v
else "0"),
80 (v.getRa().asDegrees(), v.getDec().asDegrees()))),
88 lambda x: (x +
"_xx", x +
"_xy", x +
"_yy"),
90 (v.getIxx(), v.getIxy(), v.getIyy()))),
92 lambda x: (x +
"_xx", x +
"_xy", x +
"_yy"),
94 (v.getIxx(), v.getIxy(), v.getIyy()))),
96 lambda x: (x +
"_xx", x +
"_xy", x +
"_yy"),
97 lambda v:
_formatList(
"%.9g", (v[0, 0], v[0, 1], v[1, 1]))),
99 lambda x: (x +
"_xx", x +
"_xy", x +
"_yy"),
100 lambda v:
_formatList(
"%.17g", (v[0, 0], v[0, 1], v[1, 1]))),
102 lambda x: (x +
"_xx_xx", x +
"_xx_xy", x +
"_xx_yy",
103 x +
"_xy_xy", x +
"_xy_yy", x +
"_yy_yy"),
105 (v[0, 0], v[0, 1], v[0, 2], v[1, 1], v[1, 2], v[2, 2]))),
107 lambda x: (x +
"_xx_xx", x +
"_xx_xy", x +
"_xx_yy",
108 x +
"_xy_xy", x +
"_xy_yy", x +
"_yy_yy"),
110 (v[0, 0], v[0, 1], v[0, 2], v[1, 1], v[1, 2], v[2, 2])))
114 """Configuration for the IngestSourcesTask."""
115 allowReplace = pexConfig.Field(
116 "Allow replacement of existing rows with the same source IDs",
118 maxQueryLen = pexConfig.Field(
119 "Maximum length of a query string."
120 " None means use a non-standard, database-specific way to get"
122 int, optional=
True, default=
None)
123 idColumnName = pexConfig.Field(
124 "Name of unique identifier column",
126 remap = pexConfig.DictField(
127 "Column name remapping. "
128 "key = normal SQL column name, value = desired SQL column name",
129 keytype=str, itemtype=str,
131 default={
"coord_ra":
"ra",
"coord_dec":
"decl"})
132 extraColumns = pexConfig.Field(
133 "Extra column definitions, comma-separated, to put into the"
134 " CREATE TABLE statement if the table is being created",
135 str, optional=
True, default=
"")
140 """Override the target list to add additional run() method
142 return pipeBase.TaskRunner.getTargetList(parsedCmd,
143 dstype=parsedCmd.dstype,
144 tableName=parsedCmd.tableName,
151 """Override the precall to not write schemas, not require writing of
152 configs, and set the task's name appropriately."""
153 self.TaskClass._DefaultName +=
"_" + parsedCmd.dstype
154 task = self.TaskClass(config=self.config, log=self.log)
156 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig)
159 task.log.warn(
"Could not persist config: %s" % (e,))
164 """Task to ingest a SourceCatalog of arbitrary schema into a database table.
166 This task connects to a database using connection information given
167 through command line arguments or run() parameters. It attempts to use
168 a .mysql.cnf file if present (by not specifying a password) and falls back
169 to using credentials obtained via the DbAuth interface if not.
171 If run from the command line, it will ingest each catalog of Sources
172 specified by a data id and dataset type. A sample command line might look
174 $DATAREL_DIR/bin/ingest/ingestSources.py
176 --host lsst-db.ncsa.illinois.edu
177 --database {user}_S12_sdss_u_s2012prod_{runid}
179 --dstype goodSeeingDiff_src
180 --id run=... camcol=... filter=... field=...
181 As usual for tasks, multiple --id options may be specified, or ranges and
182 lists of values can be specified for data id keys.
184 There are also two methods (ingest() and runFile()) that can be manually
185 called to ingest catalogs, either by passing the catalog explicitly or by
186 passing the name of a FITS file containing the catalog. Both, like run(),
187 require database connection information.
189 The ingestion process creates the destination table in the database if it
190 doesn't exist. The schema is translated from the source catalog's schema.
191 The database table must contain a unique identifier column, named in the
192 idColumnName configuration parameter. The only index provided is a unique
193 one on this id field. (Additional ones can be created later, of course.)
194 Columns can be renamed using the remap configuration parameter. The names
195 to be remapped have been canonicalized (for now by changing any non-word
196 characters to underscores) and may have additional subfield tags appended
197 (such as "_ra" or "_y"). Extra columns (e.g. ones to be filled in later
198 by spatial indexing code) may be added to the table via the extraColumns
199 configuration parameter.
201 Note that "nullable integer" columns are not provided. There is no way to
202 represent these explicitly in the source catalog, and translating 0 to
203 NULL seems to have little value and might be error-prone. (An option
204 could be provided to do this if it turns out to be necessary.)
206 Also note that covariances and moments are assumed to be in pixel space
207 (or something else) and not angular space and so do not need
208 radians-to-degrees conversion.
210 If the table does exist, one row of the input (the first) is checked to
211 see if it already exists in the destination table. If it does, the
212 ingestion fails unless the allowReplace configuration parameter is set to
215 Rows are inserted into the database via INSERT statements. As many rows
216 as possible are packed into each INSERT to maximize throughput. The limit
217 on INSERT statement length is either set by configuration or determined by
218 querying the database (in a MySQL-specific way). This may not be as
219 efficient in its use of the database as converting to CSV and doing a bulk
220 load, but it eliminates the use of (often shared) disk resources. The use
221 of INSERTs (committed once at the end) may not be fully parallelizable
222 (particularly due to the unique id index), but tests seem to indicate that
223 it is at least not much slower to execute many such INSERTs in parallel
224 compared with executing them all sequentially. This remains an area for
227 The columnFormatters dictionary is used to determine how to format each
228 type of column in the source catalog. If new column types are added to
229 afw::table and are used in Source catalogs, they should also be added
230 here. While lambdas are used for the formatting functions for
231 compactness, they can be any callable (and so can handle more complex
232 logic than can be embedded in a lambda -- e.g. checking a column's units
233 to see if it needs to be converted from radians to degrees).
236 ConfigClass = IngestSourcesConfig
237 _DefaultName =
"ingestSources"
238 RunnerClass = IngestSourcesTaskRunner
242 """Extend the default argument parser with database-specific
243 arguments and the dataset type for the Sources to be read."""
244 parser = pipeBase.ArgumentParser(name=cls._DefaultName)
245 parser.add_argument(
"-H",
"--host", dest=
"host", required=
True,
246 help=
"Database hostname")
247 parser.add_argument(
"-D",
"--database", dest=
"db", required=
True,
248 help=
"Database name")
249 parser.add_argument(
"-U",
"--user", dest=
"user",
250 help=
"Database username (optional)", default=
None)
251 parser.add_argument(
"-P",
"--port", dest=
"port",
252 help=
"Database port number (optional)", default=3306)
253 parser.add_argument(
"-t",
"--table", dest=
"tableName", required=
True,
254 help=
"Table to ingest into")
255 parser.add_id_argument(
"--id", pipeBase.DatasetArgument(
"dstype"),
256 help=
"Source dataset data id to ingest")
261 def runFile(self, fileName, tableName, host, db, port=3306, user=None):
262 """Ingest a SourceCatalog specified by a filename."""
263 cat = afwTable.SourceCatalog.readFits(fileName)
264 self.
ingest(cat, tableName, host, db, port, user)
266 def run(self, dataRef, dstype, tableName, host, db, port=3306, user=None):
267 """Ingest a SourceCatalog specified by a dataref and dataset type."""
268 self.
ingest(dataRef.get(dstype), tableName, host, db, port, user)
271 def ingest(self, cat, tableName, host, db, port=3306, user=None):
272 """Ingest a SourceCatalog passed as an object.
274 @param cat (SourceCatalog) Catalog to ingest.
275 @param tableName (str) Name of the database table to create.
276 @param host (str) Name of the database host machine.
277 @param db (str) Name of the database to ingest into.
278 @param port (int) Port number on the database host.
279 @param user (str) User name to use for the database."""
283 self.
db = MySQLdb.connect(host=host, port=port, user=user, db=db)
286 user = dafPersist.DbAuth.username(host, str(port))
287 passwd = dafPersist.DbAuth.password(host, str(port))
288 self.
db = MySQLdb.connect(host=host, port=port,
289 user=user, passwd=passwd, db=db)
294 if self.config.maxQueryLen
is None:
296 SELECT variable_value
297 FROM information_schema.session_variables
298 WHERE variable_name = 'max_allowed_packet';"""))
302 """Ingest a SourceCatalog by converting it to one or more (large)
303 INSERT or REPLACE statements, executing those statements, and
304 committing the result."""
306 tableName = self.db.escape_string(self.
tableName)
309 while pos < len(cat):
310 if self.config.allowReplace:
314 sql +=
" INTO `%s` (" % (tableName,)
317 for col
in cat.schema:
318 if col.field.getTypeString()
not in columnFormatters:
320 "Skipping complex column: {name} ({type})".
format(
321 name=col.field.getName(),
322 type=col.field.getTypeString()))
325 formatter = columnFormatters[col.field.getTypeString()]
326 keys.append((col.key, formatter))
331 sql += self.
_columnDef(col, includeTypes=
False)
335 while pos < len(cat):
338 value +=
", ".join([formatter.formatValue(source.get(key))
339 for (key, formatter)
in keys])
341 maxValueLen -= len(value)
347 if pos == initialPos:
349 raise RuntimeError(
"Single row too large to insert")
354 """Execute a SQL query with no expectation of result."""
355 self.log.logdebug(
"executeSql: " + sql)
359 """Execute a SQL query and return a single scalar result."""
360 cur = self.db.cursor()
361 self.log.logdebug(
"getSqlScalar: " + sql)
362 rows = cur.execute(sql)
365 "Wrong number of rows (%d) for scalar query: %s" %
368 self.log.logdebug(
"Result: " + str(row))
372 """Check to make sure a table exists by selecting a row from it. If
373 the row contains the unique id of the first item in the input
374 SourceCatalog, assume that the rest are present as well. If the table
375 does not exist, create it."""
377 sampleId = cat[0][self.config.idColumnName]
381 "SELECT COUNT(*) FROM `%s` WHERE %s = %d;" % (
382 tableName, self.config.idColumnName, sampleId))
383 except RuntimeError, e:
389 elif self.config.allowReplace:
390 self.log.warn(
"Overwriting existing rows")
392 raise RuntimeError(
"Row exists: {name}={id}".
format(
393 name=self.config.idColumnName, id=sampleId))
396 """Create a table. Use column definitions based on the provided table
397 schema, adding in any extra columns specified in the config. The
398 unique id column is given a key."""
399 sql =
"CREATE TABLE IF NOT EXISTS `%s` (" % (tableName,)
400 sql +=
", ".join([self.
_columnDef(col)
for col
in schema
if
401 col.field.getTypeString()
in columnFormatters])
402 if self.config.extraColumns
is not None and self.config.extraColumns !=
"":
403 sql +=
", " + self.config.extraColumns
404 sql +=
", UNIQUE(%s)" % (self.config.idColumnName,)
409 """Return the column definition for a given schema column, which may
410 be composed of multiple database columns (separated by commas). If
411 includeTypes is True (the default), include the SQL type for the
412 column as for a CREATE TABLE statement."""
413 formatter = columnFormatters[col.field.getTypeString()]
415 columnType =
" " + formatter.getSqlType()
if includeTypes
else ""
416 return ", ".join([
"%s%s" % (self.
_remapColumn(columnName), columnType)
417 for columnName
in formatter.getColumnNames(baseName)])
420 """Remap a column name according to the remap dictionary in the
422 if colName
in self.config.remap:
423 return self.config.remap[colName]
427 """Return a SQL-compatible version of the schema column name."""
428 return re.sub(
r'[^\w]',
'_', colName)