LSSTApplications  1.1.2+25,10.0+13,10.0+132,10.0+133,10.0+224,10.0+41,10.0+8,10.0-1-g0f53050+14,10.0-1-g4b7b172+19,10.0-1-g61a5bae+98,10.0-1-g7408a83+3,10.0-1-gc1e0f5a+19,10.0-1-gdb4482e+14,10.0-11-g3947115+2,10.0-12-g8719d8b+2,10.0-15-ga3f480f+1,10.0-2-g4f67435,10.0-2-gcb4bc6c+26,10.0-28-gf7f57a9+1,10.0-3-g1bbe32c+14,10.0-3-g5b46d21,10.0-4-g027f45f+5,10.0-4-g86f66b5+2,10.0-4-gc4fccf3+24,10.0-40-g4349866+2,10.0-5-g766159b,10.0-5-gca2295e+25,10.0-6-g462a451+1
LSSTDataManagementBasePackage
schema.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2012 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 import re
23 
24 import lsst.pex.config
26 import lsst.ap.cluster
27 import lsst.ap.utils
28 
29 __all__ = ['makeMysqlCsvConfig',
30  'DbMappingConfig',
31  'genericTableSql',
32  'sourceTableSql',
33  'objectTableSql',
34  'coaddSourceTableSql',
35  ]
36 
37 _dbType = {
38  'I': 'INTEGER',
39  'L': 'BIGINT',
40  'F': 'FLOAT',
41  'D': 'DOUBLE',
42  'Angle': 'DOUBLE',
43  'Flag': 'BIT(1)',
44 }
45 
46 def _pk(cols):
47  if not isinstance(cols, basestring):
48  cols = ', '.join(colName)
49  return str.format('PRIMARY KEY ({0})', cols)
50 
51 def _k(cols):
52  if not isinstance(cols, basestring):
53  idx = '_'.join(cols)
54  cols = ', '.join(cols)
55  else:
56  idx = cols
57  return str.format('KEY IDX_{0} ({0})', idx, cols)
58 
59 
61  """Return the lsst.ap.utils.CsvConfig to use when writing out CSV files
62  that must be loaded into MySQL.
63  """
64  cfg = lsst.ap.utils.CsvConfig()
65  cfg.quoting = 'QUOTE_NONE' # C++ tables cannot contain strings (yet)
66  cfg.delimiter = ','
67  cfg.escapeChar = '\\'
68  cfg.quoteChar = ''
69  cfg.skipInitialSpace = False
70  cfg.doubleQuote = False
71  cfg.standardEscapes = True
72  cfg.trailingDelimiter = False
73  cfg.nonfiniteAsNull = True
74  return cfg
75 
76 
77 class DbMappingConfig(lsst.pex.config.Config):
78  sourceConversion = lsst.pex.config.ConfigField(
79  dtype = lsst.ap.utils.CsvConversionConfig,
80  doc = "C++ source table to CSV file conversion parameters.")
81 
82  objectConversion = lsst.pex.config.ConfigField(
83  dtype = lsst.ap.utils.CsvConversionConfig,
84  doc = "C++ source cluster table to CSV file conversion parameters.")
85 
86  asView = lsst.pex.config.Field(dtype=bool, default=True,
87  doc="Create canonical Source/Object tables as VIEWs over the "
88  "run-specific tables? False means they are materialized.")
89 
90 
91 def genericTableSql(schema, csvConversionConfig, indexedFields):
92  """Return a pair of SQL template strings (createStmt, loadStmt).
93 
94  createStmt : a format string for the CREATE TABLE statement corresponding
95  to the given schema and desired indexes. To generate valid
96  SQL, tableName must be supplied, e.g.:
97 
98  str.format(create, tableName='MyTable')
99 
100  loadStmt : a format string for the corresponding LOAD DATA statement.
101  To generate valid SQL, tableName and fileName must be supplied,
102  e.g.:
103 
104  str.format(load, tableName='MyTable', fileName='MyTable.csv')
105 
106  Note that the generated LOAD statement will never REPLACE data, and assumes
107  that CSV files conform to the format returned by makeMysqlCsvConfig().
108 
109  @param schema lsst.afw.table.Schema describing the C++ table
110  to map to a MySQL database table.
111  @param csvConversionConfig lsst.ap.utils.CsvConversionConfig describing
112  the C++ table to CSV file conversion options.
113  @param indexedFields List or set of C++ field names to create indexes on
114  """
115  dbkeys = []
116  coldefs = []
117  columns = []
118  setexprs = []
119 
120  def _append(dbcol, dbty, suffixes):
121  for suffix in suffixes:
122  coldefs.append(str.format('{}{} {} NULL', dbcol, suffix, dbty))
123  columns.append(dbcol + suffix)
124 
125  for item in schema.asList():
126  name = item.field.getName()
127  #replace all non-word characters with underscore
128  dbcol = re.sub('[\W]','_',name)
129  ty = item.key.getTypeString()
130  if ty in _dbType:
131  dbty = _dbType[ty]
132  constraint = 'NULL'
133  if name == 'id':
134  dbkeys.append(_pk(dbcol))
135  constraint = 'NOT NULL'
136  elif ty == 'Flag':
137  if not csvConversionConfig.flagsAsBits:
138  continue # we will deal with flags later
139  constraint = 'NOT NULL'
140  elif ty == 'I' or ty == 'L':
141  if name in csvConversionConfig.nullableIntegers:
142  constraint = 'NULL'
143  else:
144  constraint = 'NOT NULL'
145  else:
146  constraint = 'NULL'
147  coldefs.append(' '.join([dbcol, dbty, constraint]))
148  if ty == 'Flag':
149  columns.append('@' + dbcol)
150  setexprs.append(str.format('{0} = CAST(@{0} AS UNSIGNED)', dbcol))
151  else:
152  columns.append(dbcol)
153  if name != 'id' and name in indexedFields:
154  dbkeys.append(_k(dbcol))
155  elif ty == 'Coord':
156  if name == "coord":
157  # the Coord slot field
158  coldefs.append('coord_ra DOUBLE NULL')
159  coldefs.append('coord_decl DOUBLE NULL')
160  coldefs.append('coord_htmId20 BIGINT NULL')
161  columns.append('coord_ra')
162  columns.append('coord_decl')
163  setexprs.append('coord_htmId20 = scisql_s2HtmId(coord_ra, coord_decl, 20)')
164  dbkeys.append(_k('coord_htmId20'))
165  dbkeys.append(_k('coord_decl'))
166  else:
167  _append(dbcol, 'DOUBLE', ['_ra', '_decl'])
168  elif ty == 'ArrayF' or ty == 'ArrayD':
169  dbty = _dbType[ty[-1]]
170  for i in xrange(1, item.key.getSize() + 1):
171  coldefs.append(str.format('{}_{} {} NULL', dbcol, i, dbty))
172  columns.append(str.format('{}_{}', dbcol, i))
173  elif ty == 'PointI' or ty == 'PointF' or ty == 'PointD':
174  dbty = _dbType[ty[-1]]
175  _append(dbcol, dbty, ['_x', '_y'])
176  elif ty == 'MomentsF' or ty == 'MomentsD':
177  dbty = _dbType[ty[-1]]
178  _append(dbcol, dbty, ['_Ixx', '_Iyy', '_Ixy'])
179  elif ty == 'CovF' or ty == 'CovD':
180  dbty = _dbType[ty[-1]]
181  sz = item.key.getSize()
182  for i in xrange(1, sz + 1):
183  for j in xrange(i, sz + 1):
184  coldefs.append(str.format('{}_{}_{} {} NULL', dbcol, i, j, dbty))
185  columns.append(str.format('{}_{}_{}', dbcol, i, j))
186  elif ty == 'CovPointF' or ty == 'CovPointD':
187  dbty = _dbType[ty[-1]]
188  if name.endswith('.err'):
189  dbcol = dbcol[:-4]
190  if item.field.getUnits() == 'rad^2':
191  # HACK: this is a coordinate covariance matrix
192  _append(dbcol, dbty, ['_raVar', '_radeclCov',
193  '_declVar'])
194  continue
195  _append(dbcol, dbty, ['_xVar', '_xyCov',
196  '_yVar'])
197  elif ty == 'CovMomentsF' or ty == 'CovMomentsD':
198  dbty = _dbType[ty[-1]]
199  if name.endswith('.err'):
200  dbcol = dbcol[:-4]
201  _append(dbcol, dbty, ['_IxxVar', '_IxxIyyCov', '_IxxIxyCov',
202  '_IyyVar', '_IyyIxyCov',
203  '_IxyVar'])
204  else:
205  raise RuntimeError(ty + ' is not a recognized AFW field type string!')
206  if not csvConversionConfig.flagsAsBits:
207  # add BIGINT flag columns
208  n = (schema.getFlagFieldCount() + 62) / 63
209  for i in xrange(1, n + 1):
210  coldefs.append(str.format('runFlags{} BIGINT NOT NULL', i))
211  columns.append(str.format('runFlags{}', i))
212  # add BIGINT flag columns for canonical flags in canonical order
213  n = (len(csvConversionConfig.canonicalFlags) + 62) / 63
214  for i in xrange(1, n + 1):
215  coldefs.append(str.format('flags{} BIGINT NOT NULL', i))
216  columns.append(str.format('flags{}', i))
217  # Finally, create schema SQL and LOAD DATA templates
218  createStmt = 'CREATE TABLE IF NOT EXISTS {tableName} (\n\t'
219  createStmt += ',\n\t'.join(coldefs + dbkeys)
220  createStmt += '\n) ENGINE=MyISAM;\n'
221 
222  loadStmt = ("LOAD DATA LOCAL INFILE '{fileName}'\n"
223  "\tINTO TABLE {tableName}\n"
224  "\tFIELDS TERMINATED BY ','\n"
225  "(\n\t")
226  loadStmt += ',\n\t'.join(columns)
227  if len(setexprs) == 0:
228  loadStmt += '\n);'
229  else:
230  loadStmt += '\n) SET\n\t'
231  loadStmt += ',\n\t'.join(setexprs)
232  loadStmt += ';'
233  return createStmt, loadStmt
234 
235 
236 def _sourceIndexes(sourceProcessingConfig):
237  """Return the list of C++ Source field names to create indexes on.
238 
239  @param sourceProcessingConfig lsst.ap.cluster.SourceProcessingConfig;
240  describes source processing performed by
241  SourceAssoc.
242  """
243  indexes = set()
244  indexes.add("parent")
245  if sourceProcessingConfig.exposurePrefix:
246  indexes.add(sourceProcessingConfig.exposurePrefix + ".id")
247  if not sourceProcessingConfig.multiBand:
248  indexes.add(sourceProcessingConfig.exposurePrefix + ".filter.id")
249  if sourceProcessingConfig.clusterPrefix:
250  indexes.add(sourceProcessingConfig.clusterPrefix + ".id")
251  return indexes
252 
253 
254 # mappings from run-specific table column names to canonical Source columns
255 _sourceMappings = [
256  ("id", "sourceId"), # from minimal schema, no prefix
257  ("parent", "parentSourceId"), # from minimal schema, no prefix
258  ("{exposurePrefix}_id", "scienceCcdExposureId"),
259  ("{exposurePrefix}_filter_id", "filterId"),
260  ("{clusterPrefix}_id", "objectId"),
261  ("{clusterPrefix}_coord_ra", "objectRa"),
262  ("{clusterPrefix}_coord_decl", "objectDecl"),
263  ("coord_ra", "ra"), # from minimal schema, no prefix
264  ("coord_decl", "decl"), # from minimal schema, no prefix
265  ("coord_raVar", "raVar"), # from source association, no prefix
266  ("coord_declVar", "declVar"), # from source association, no prefix
267  ("coord_radeclCov", "radeclCov"), # from source association, no prefix
268  ("coord_htmId20", "htmId20"), # from ingest, no prefix
269  ("{measPrefix}{centroid}_x", "x"),
270  ("{measPrefix}{centroid}_y", "y"),
271  ("{measPrefix}{centroid}_xVar", "xVar"),
272  ("{measPrefix}{centroid}_yVar", "yVar"),
273  ("{measPrefix}{centroid}_xyCov", "xyCov"),
274  ("{exposurePrefix}_time_mid", "timeMid"),
275  ("{exposurePrefix}_time", "expTime"),
276  ("{measPrefix}{psfFlux}", "psfFlux"),
277  ("{measPrefix}{psfFlux}_err", "psfFluxSigma"),
278  ("{measPrefix}{apFlux}", "apFlux"),
279  ("{measPrefix}{apFlux}_err", "apFluxSigma"),
280  ("{measPrefix}{modelFlux}", "modelFlux"),
281  ("{measPrefix}{modelFlux}_err", "modelFluxSigma"),
282  ("{measPrefix}{instFlux}", "instFlux"),
283  ("{measPrefix}{instFlux}_err", "instFluxSigma"),
284  ("aperturecorrection", "apCorrection"), # from measurement, no prefix (!?)
285  ("aperturecorrection_err", "apCorrectionSigma"), # from measurement, no prefix (!?)
286  ("{measPrefix}{shape}_centroid_x", "shapeIx"),
287  ("{measPrefix}{shape}_centroid_y", "shapeIy"),
288  ("{measPrefix}{shape}_centroid_xVar", "shapeIxVar"),
289  ("{measPrefix}{shape}_centroid_yVar", "shapeIyVar"),
290  ("{measPrefix}{shape}_centroid_xyCov", "shapeIxIyCov"),
291  ("{measPrefix}{shape}_Ixx", "shapeIxx"),
292  ("{measPrefix}{shape}_Iyy", "shapeIyy"),
293  ("{measPrefix}{shape}_Ixy", "shapeIxy"),
294  ("{measPrefix}{shape}_IxxVar", "shapeIxxVar"),
295  ("{measPrefix}{shape}_IyyVar", "shapeIyyVar"),
296  ("{measPrefix}{shape}_IxyVar", "shapeIxyVar"),
297  ("{measPrefix}{shape}_IxxIyyCov", "shapeIxxIyyCov"),
298  ("{measPrefix}{shape}_IxxIxyCov", "shapeIxxIxyCov"),
299  ("{measPrefix}{shape}_IxxIxyCov", "shapeIyyIxyCov"),
300  ("{measPrefix}classification_extendedness", "extendedness"),
301  ("flags_negative", "flagNegative"), # from detection, no prefix
302  ("{measPrefix}flags_badcentroid", "flagBadMeasCentroid"),
303  ("{measPrefix}flags_pixel_edge", "flagPixEdge"),
304  ("{measPrefix}flags_pixel_interpolated_any", "flagPixInterpAny"),
305  ("{measPrefix}flags_pixel_interpolated_center", "flagPixInterpCen"),
306  ("{measPrefix}flags_pixel_saturated_any", "flagPixSaturAny"),
307  ("{measPrefix}flags_pixel_saturated_center", "flagPixSaturCen"),
308  ("{measPrefix}{psfFlux}_flags", "flagBadPsfFlux"),
309  ("{measPrefix}{apFlux}_flags", "flagBadApFlux"),
310  ("{measPrefix}{modelFlux}_flags", "flagBadModelFlux"),
311  ("{measPrefix}{instFlux}_flags", "flagBadInstFlux"),
312  ("{measPrefix}{centroid}_flags", "flagBadCentroid"),
313  ("{measPrefix}{shape}_flags", "flagBadShape"),
314 ]
315 
316 def _colToField(col):
317  """Turn a database column name back into a C++ table field name"""
318  i = col.rfind('_')
319  if i == -1:
320  return col
321  field = col[:i].replace('_', '.')
322  suffix = col[i+1:]
323  if suffix in ['x', 'y', 'Ixx', 'Iyy', 'Ixy', 'ra', 'decl']:
324  return field
325  if suffix.endswith('Var') or suffix.endswith('Cov'):
326  return field + '.err'
327  # TODO: generic Cov<T> or Array<T> fields not supported yet,
328  # but so far no such fields are mapped to canonical
329  # Source/Object table columns.
330  return col.replace('_', '.')
331 
332 def _getMappingKw(slots, sourceProcessingConfig, measPrefix=None):
333  """Return substitution parameters for mapping table entries.
334  """
335  kw = dict()
336  kw['measPrefix'] = (measPrefix or '').replace('.', '_')
337  kw['exposurePrefix'] = sourceProcessingConfig.exposurePrefix.replace('.', '_')
338  kw['clusterPrefix'] = sourceProcessingConfig.clusterPrefix.replace('.', '_')
339  kw['centroid'] = slots.centroid.replace('.', '_') if slots.centroid else '__X__'
340  kw['shape'] = slots.shape.replace('.', '_') if slots.shape else '__X__'
341  kw['psfFlux'] = slots.psfFlux.replace('.', '_') if slots.psfFlux else '__X__'
342  kw['apFlux'] = slots.apFlux.replace('.', '_') if slots.apFlux else '__X__'
343  kw['modelFlux'] = slots.modelFlux.replace('.', '_') if slots.modelFlux else '__X__'
344  kw['instFlux'] = slots.instFlux.replace('.', '_') if slots.instFlux else '__X__'
345  return kw
346 
347 def sourceTableSql(schema, dbMappingConfig, sourceAssocConfig):
348  """Return a tuple of SQL statements (createStmt, loadStmt, sourceStmt)
349  for the Source table.
350 
351  createStmt : CREATE TABLE statement for the RunSource table, which
352  includes all fields from the run-specific
353  lsst.afw.table.Schema for source tables output by the
354  pipelines.
355 
356  loadStmt : LOAD DATA statement for the RunSource table. This is
357  a format string; to generate valid SQL a fileName must
358  be supplied, e.g.:
359 
360  loadStmt.format(fileName='source.csv')
361 
362  sourceStmt : Map the RunSource table to the canonical Source schema.
363  This will either create a VIEW, or INSERT into the
364  materialized equivalent.
365 
366  @param schema lsst.afw.table.Schema for sources
367  @param dbMappingConfig lsst.datarel.DbMappingConfig
368  @param sourceAssocConfig lsst.ap.tasks.sourceAssoc.SourceAssocConfig
369  """
370  # Generate SQL for run specific table
371  createStmt, loadStmt = genericTableSql(
372  schema,
373  dbMappingConfig.sourceConversion,
374  _sourceIndexes(sourceAssocConfig.sourceProcessing))
375  # build substitution parameters for mapping table
376  kw = _getMappingKw(
377  sourceAssocConfig.measSlots,
378  sourceAssocConfig.sourceProcessing,
379  sourceAssocConfig.measPrefix)
380  # build selection/output column lists
381  selcols = []
382  outcols = []
383  for runFmt, srcCol in _sourceMappings:
384  runCol = runFmt.format(**kw)
385  field = _colToField(runCol)
386  isFlag = srcCol.startswith('flag')
387  if isFlag and not dbMappingConfig.sourceConversion.flagsAsBits:
388  continue
389  if field in schema or runCol == 'coord_htmId20':
390  selcols.append(runCol)
391  elif isFlag:
392  selcols.append("b'0'")
393  else:
394  selcols.append('NULL')
395  outcols.append(srcCol)
396  if not dbMappingConfig.sourceConversion.flagsAsBits:
397  # Deal with canonical flags packed into BIGINTs
398  n = (len(dbMappingConfig.sourceConversion.canonicalFlags) + 62) / 63
399  if n == 1:
400  selcols.append('flags')
401  outcols.append('flags')
402  else:
403  for i in xrange(1, n + 1):
404  c = 'flags{}'.format(i)
405  selcols.append(c)
406  outcols.append(c)
407  if dbMappingConfig.asView:
408  # Replace the official version of Source with an equivalent VIEW
409  sourceStmt = 'CREATE OR REPLACE VIEW Source AS SELECT\n\t'
410  sourceStmt += ',\n\t'.join(a + ' AS ' + b for a,b in zip(selcols, outcols))
411  sourceStmt += '\nFROM RunSource;'
412  else:
413  # Use the definition of Source from cat (i.e. the one used by the
414  # schema browser for documentation purposes). This should cause
415  # ingest to fail if this code and the canonical schema are not in sync.
416  sourceStmt = 'INSERT INTO Source (\n\t'
417  sourceStmt += ',\n\t'.join(outcols)
418  sourceStmt += ')\nSELECT\n\t'
419  sourceStmt += ',\n\t'.join(selcols)
420  sourceStmt += '\nFROM RunSource;\n'
421  return (createStmt.format(tableName='RunSource'),
422  loadStmt.format(tableName='RunSource', fileName='{fileName}'),
423  sourceStmt)
424 
425 _objectMappings = [
426  ("id", "objectId"),
427  ("coord_ra", "ra"),
428  ("coord_decl", "decl"),
429  ("coord_raVar", "raVar"),
430  ("coord_declVar", "declVar"),
431  ("coord_radeclCov", "radeclCov"),
432  ("coord_htmId20", "htmId20"),
433  ("coord_weightedmean_ra", "wmRa"),
434  ("coord_weightedmean_decl", "wmDecl"),
435  ("coord_weightedmean_raVar", "wmRaVar"),
436  ("coord_weightedmean_declVar", "wmDeclVar"),
437  ("coord_weightedmean_radeclCov", "wmRadeclCov"),
438  ("obs_count", "obsCount"),
439  ("obs_time_min", "obsTimeMin"),
440  ("obs_time_max", "obsTimeMax"),
441  ("obs_time_mean", "obsTimeMean"),
442  ("flag_noise", "flagNoise"),
443 ]
444 
445 _filterMappings = [
446  ("{filter}_obs_count", "{filter}ObsCount"),
447  ("{filter}_obs_time_min", "{filter}ObsTimeMin"),
448  ("{filter}_obs_time_max", "{filter}ObsTimeMax"),
449  ("{filter}_{measPrefix}{psfFlux}", "{filter}PsfFlux"),
450  ("{filter}_{measPrefix}{psfFlux}_err", "{filter}PsfFluxSigma"),
451  ("{filter}_{measPrefix}{psfFlux}_count", "{filter}PsfFluxCount"),
452  ("{filter}_{measPrefix}{apFlux}", "{filter}ApFlux"),
453  ("{filter}_{measPrefix}{apFlux}_err", "{filter}ApFluxSigma"),
454  ("{filter}_{measPrefix}{apFlux}_count", "{filter}ApFluxCount"),
455  ("{filter}_{measPrefix}{modelFlux}", "{filter}ModelFlux"),
456  ("{filter}_{measPrefix}{modelFlux}_err", "{filter}ModelFluxSigma"),
457  ("{filter}_{measPrefix}{modelFlux}_count", "{filter}ModelFluxCount"),
458  ("{filter}_{measPrefix}{instFlux}", "{filter}InstFlux"),
459  ("{filter}_{measPrefix}{instFlux}_err", "{filter}InstFluxSigma"),
460  ("{filter}_{measPrefix}{instFlux}_count", "{filter}InstFluxCount"),
461  ("{filter}_{measPrefix}{shape}_Ixx", "{filter}ShapeIxx"),
462  ("{filter}_{measPrefix}{shape}_Iyy", "{filter}ShapeIyy"),
463  ("{filter}_{measPrefix}{shape}_Ixy", "{filter}ShapeIxy"),
464  ("{filter}_{measPrefix}{shape}_IxxVar", "{filter}ShapeIxxVar"),
465  ("{filter}_{measPrefix}{shape}_IyyVar", "{filter}ShapeIyyVar"),
466  ("{filter}_{measPrefix}{shape}_IxyVar", "{filter}ShapeIxyVar"),
467  ("{filter}_{measPrefix}{shape}_IxxIyyCov", "{filter}ShapeIxxIyyCov"),
468  ("{filter}_{measPrefix}{shape}_IxxIxyCov", "{filter}ShapeIxxIxyCov"),
469  ("{filter}_{measPrefix}{shape}_IyyIxyCov", "{filter}ShapeIyyIxyCov"),
470  ("{filter}_{measPrefix}{shape}_count", "{filter}ShapeCount"),
471 ]
472 
473 def objectTableSql(schema, dbMappingConfig, sourceAssocConfig, filters):
474  """Return a tuple of SQL statements (createStmt, loadStmt, objectStmt)
475  for the Object table.
476 
477  createStmt : CREATE TABLE statement for the RunObject table, which
478  includes all fields from the run-specific
479  lsst.afw.table.Schema for source cluster tables output
480  by the SourceAssoc pipeline.
481 
482  loadStmt : LOAD DATA statement for the RunObject table. This is
483  a format string; to generate valid SQL a fileName must
484  be supplied, e.g.:
485 
486  loadStmt.format(fileName='object.csv')
487 
488  objectStmt : Map the RunObject table to the canonical Object schema.
489  This will either create a VIEW, or INSERT into its
490  materialized equivalent.
491 
492  @param schema lsst.afw.table.Schema for objects (source clusters)
493  @param dbMappingConfig lsst.datarel.DbMappingConfig
494  @param sourceAssocConfig lsst.ap.tasks.sourceAssoc.SourceAssocConfig
495  @param filters Iterable over the filter names included in the
496  canonical Object table.
497  """
498  # Generate SQL for run specific table
499  createStmt, loadStmt = genericTableSql(
500  schema, dbMappingConfig.objectConversion, set())
501  # build substitution parameters for mapping table
502  kw = _getMappingKw(
503  sourceAssocConfig.measSlots,
504  sourceAssocConfig.sourceProcessing,
505  sourceAssocConfig.measPrefix)
506  # build selection/output column lists
507  selcols = []
508  outcols = []
509  for runFmt, objCol in _objectMappings:
510  runCol = runFmt.format(**kw)
511  field = _colToField(runCol)
512  isFlag = objCol.startswith('flag')
513  if isFlag and not dbMappingConfig.objectConversion.flagsAsBits:
514  continue
515  if field in schema or runCol == 'coord_htmId20':
516  selcols.append(runCol)
517  elif isFlag:
518  selcols.append("b'0'")
519  else:
520  selcols.append('NULL')
521  outcols.append(objCol)
522  for filter in filters:
523  kw['filter'] = filter
524  for runFmt, objFmt in _filterMappings:
525  runCol = runFmt.format(**kw)
526  objCol = objFmt.format(filter=filter)
527  field = _colToField(runCol)
528  isFlag = objCol.startswith('flag')
529  if isFlag and not dbMappingConfig.objectConversion.flagsAsBits:
530  continue
531  if field in schema:
532  selcols.append(runCol)
533  elif isFlag:
534  selcols.append("b'0'")
535  else:
536  selcols.append('NULL')
537  outcols.append(objCol)
538  if not dbMappingConfig.objectConversion.flagsAsBits:
539  # Deal with canonical flags packed into BIGINTs
540  n = (len(dbMappingConfig.objectConversion.canonicalFlags) + 62) / 63
541  if n == 1:
542  selcols.append('flags')
543  outcols.append('flags')
544  else:
545  for i in xrange(1, n + 1):
546  c = 'flags{}'.format(i)
547  selcols.append(c)
548  outcols.append(c)
549  if dbMappingConfig.asView:
550  # Replace the official version of Object with an equivalent VIEW
551  objectStmt = 'CREATE OR REPLACE VIEW Object AS SELECT\n\t'
552  objectStmt += ',\n\t'.join(a + ' AS ' + b for a,b in zip(selcols, outcols))
553  objectStmt += '\nFROM RunObject;'
554  else:
555  # Use the definition of Object from cat (i.e. the one used by the
556  # schema browser for documentation purposes). This should cause
557  # ingest to fail if this code and the canonical schema are not in sync.
558  objectStmt = 'INSERT INTO Object (\n\t'
559  objectStmt += ',\n\t'.join(outcols)
560  objectStmt += ')\nSELECT\n\t'
561  objectStmt += ',\n\t'.join(selcols)
562  objectStmt += '\nFROM RunObject;'
563  return (createStmt.format(tableName='RunObject'),
564  loadStmt.format(tableName='RunObject', fileName='{fileName}'),
565  objectStmt)
566 
567 
568 _coaddSourceMappings = [
569  ("id", "{coaddName}SourceId"), # from minimal schema, no prefix
570  ("parent", "parent{CoaddName}SourceId"), # from minimal schema, no prefix
571  ("{exposurePrefix}_id", "{coaddName}CoaddId"),
572  ("{exposurePrefix}_filter_id", "filterId"),
573  ("coord_ra", "ra"), # from minimal schema, no prefix
574  ("coord_decl", "decl"), # from minimal schema, no prefix
575  ("coord_raVar", "raVar"), # from source association, no prefix
576  ("coord_declVar", "declVar"), # from source association, no prefix
577  ("coord_radeclCov", "radeclCov"), # from source association, no prefix
578  ("coord_htmId20", "htmId20"), # from ingest, no prefix
579  ("{measPrefix}{centroid}_x", "x"),
580  ("{measPrefix}{centroid}_y", "y"),
581  ("{measPrefix}{centroid}_xVar", "xVar"),
582  ("{measPrefix}{centroid}_yVar", "yVar"),
583  ("{measPrefix}{centroid}_xyCov", "xyCov"),
584  ("{measPrefix}{psfFlux}", "psfFlux"),
585  ("{measPrefix}{psfFlux}_err", "psfFluxSigma"),
586  ("{measPrefix}{apFlux}", "apFlux"),
587  ("{measPrefix}{apFlux}_err", "apFluxSigma"),
588  ("{measPrefix}{modelFlux}", "modelFlux"),
589  ("{measPrefix}{modelFlux}_err", "modelFluxSigma"),
590  ("{measPrefix}{instFlux}", "instFlux"),
591  ("{measPrefix}{instFlux}_err", "instFluxSigma"),
592  ("aperturecorrection", "apCorrection"), # from measurement, no prefix (!?)
593  ("aperturecorrection_err", "apCorrectionSigma"), # from measurement, no prefix (!?)
594  ("{measPrefix}{shape}_centroid_x", "shapeIx"),
595  ("{measPrefix}{shape}_centroid_y", "shapeIy"),
596  ("{measPrefix}{shape}_centroid_xVar", "shapeIxVar"),
597  ("{measPrefix}{shape}_centroid_yVar", "shapeIyVar"),
598  ("{measPrefix}{shape}_centroid_xyCov", "shapeIxIyCov"),
599  ("{measPrefix}{shape}_Ixx", "shapeIxx"),
600  ("{measPrefix}{shape}_Iyy", "shapeIyy"),
601  ("{measPrefix}{shape}_Ixy", "shapeIxy"),
602  ("{measPrefix}{shape}_IxxVar", "shapeIxxVar"),
603  ("{measPrefix}{shape}_IyyVar", "shapeIyyVar"),
604  ("{measPrefix}{shape}_IxyVar", "shapeIxyVar"),
605  ("{measPrefix}{shape}_IxxIyyCov", "shapeIxxIyyCov"),
606  ("{measPrefix}{shape}_IxxIxyCov", "shapeIxxIxyCov"),
607  ("{measPrefix}{shape}_IxxIxyCov", "shapeIyyIxyCov"),
608  ("{measPrefix}classification_extendedness", "extendedness"),
609  ("flags_negative", "flagNegative"), # from detection, no prefix
610  ("{measPrefix}flags_badcentroid", "flagBadMeasCentroid"),
611  ("{measPrefix}flags_pixel_edge", "flagPixEdge"),
612  ("{measPrefix}flags_pixel_interpolated_any", "flagPixInterpAny"),
613  ("{measPrefix}flags_pixel_interpolated_center", "flagPixInterpCen"),
614  ("{measPrefix}flags_pixel_saturated_any", "flagPixSaturAny"),
615  ("{measPrefix}flags_pixel_saturated_center", "flagPixSaturCen"),
616  ("{measPrefix}{psfFlux}_flags", "flagBadPsfFlux"),
617  ("{measPrefix}{apFlux}_flags", "flagBadApFlux"),
618  ("{measPrefix}{modelFlux}_flags", "flagBadModelFlux"),
619  ("{measPrefix}{instFlux}_flags", "flagBadInstFlux"),
620  ("{measPrefix}{centroid}_flags", "flagBadCentroid"),
621  ("{measPrefix}{shape}_flags", "flagBadShape"),
622 ]
623 
624 def coaddSourceTableSql(coaddName,
625  schema,
626  sourceConversionConfig,
627  asView,
628  sourceProcessingConfig,
629  slotConfig,
630  measPrefix):
631  """Return a tuple of SQL statements (createStmt, loadStmt, sourceStmt)
632  for a coadd source table. The canonical table name is obtained by
633  captilizing the first letter of coaddName and appending 'Source'. The
634  run specific table name is derived from the former by prepending 'Run'.
635 
636  createStmt : CREATE TABLE statement for the Run<CoaddName>Source table,
637  which includes all fields from the run-specific
638  lsst.afw.table.Schema for source tables output by the
639  pipelines.
640 
641  loadStmt : LOAD DATA statement for the Run<CoaddName>Source table.
642  This is a format string; to generate valid SQL a fileName
643  must be supplied, e.g.:
644 
645  loadStmt.format(fileName='source.csv')
646 
647  sourceStmt : Map the Run<CoaddName>Source table to the canonical
648  <CoaddName>Source schema. This will either create a VIEW,
649  or INSERT into the materialized equivalent.
650 
651  @param coaddName
652  Coadd name (camel-case), e.g. 'deep' or 'goodSeeing'.
653  @param schema
654  lsst.afw.table.Schema for coadd-sources.
655  @param sourceConversionConfig
656  lsst.ap.utils.CsvConversionConfig - parameters used for
657  C++ to CSV conversion.
658  @param asView
659  True if the canonical table should be constructed as a VIEW on
660  top of the run-specific table.
661  @param sourceProcessingConfig
662  lsst.ap.cluster.SourceProcessingConfig - parameters used to
663  denormalize the C++ schema produced by the pipeline.
664  @param slotConfig
665  lsst.meas.algorithms.SlotConfig - pipeline slot mappings.
666  @param measPrefix
667  Prefix for measurement field names.
668  """
669  # Generate SQL for run specific table
670  createStmt, loadStmt = genericTableSql(
671  schema,
672  sourceConversionConfig,
673  _sourceIndexes(sourceProcessingConfig))
674  # build substitution parameters for mapping table
675  kw = _getMappingKw(
676  slotConfig,
677  sourceProcessingConfig,
678  measPrefix)
679  # build selection/output column lists
680  selcols = []
681  outcols = []
682  CoaddName = coaddName[0].upper() + coaddName[1:]
683  for runFmt, srcCol in _coaddSourceMappings:
684  runCol = runFmt.format(**kw)
685  srcCol = srcCol.format(coaddName=coaddName, CoaddName=CoaddName)
686  if sourceProcessingConfig.multiBand and srcCol == 'filterId':
687  continue # multi-band source has no filterId
688  field = _colToField(runCol)
689  isFlag = srcCol.startswith('flag')
690  if isFlag and not sourceConversionConfig.flagsAsBits:
691  continue
692  if field in schema or runCol == 'coord_htmId20':
693  selcols.append(runCol)
694  elif isFlag:
695  selcols.append("b'0'")
696  else:
697  selcols.append('NULL')
698  outcols.append(srcCol)
699  if not sourceConversionConfig.flagsAsBits:
700  # Deal with canonical flags packed into BIGINTs
701  n = (len(sourceConversionConfig.canonicalFlags) + 62) / 63
702  if n == 1:
703  selcols.append('flags')
704  outcols.append('flags')
705  else:
706  for i in xrange(1, n + 1):
707  c = 'flags{}'.format(i)
708  selcols.append(c)
709  outcols.append(c)
710  tableName = CoaddName + 'Source'
711  runTableName = 'Run' + tableName
712  if asView:
713  # Replace the official version of <CoaddName>Source with an equivalent VIEW
714  sourceStmt = 'CREATE OR REPLACE VIEW {} AS SELECT\n\t'.format(tableName)
715  sourceStmt += ',\n\t'.join(a + ' AS ' + b for a,b in zip(selcols, outcols))
716  sourceStmt += '\nFROM {};'.format(runTableName)
717  else:
718  # Use the definition of Source from cat (i.e. the one used by the
719  # schema browser for documentation purposes). This should cause
720  # ingest to fail if this code and the canonical schema are not in sync.
721  sourceStmt = 'INSERT INTO {} (\n\t'.format(tableName)
722  sourceStmt += ',\n\t'.join(outcols)
723  sourceStmt += ')\nSELECT\n\t'
724  sourceStmt += ',\n\t'.join(selcols)
725  sourceStmt += '\nFROM {};\n'.format(runTableName)
726  return (createStmt.format(tableName=runTableName),
727  loadStmt.format(tableName=runTableName, fileName='{fileName}'),
728  sourceStmt)
729 
730 # mappings from run-specific table column names to canonical ForcedSource columns
731 _forcedSourceMappings = [
732  ("id", "{coaddName}ForcedSourceId"), # from minimal schema, no prefix
733  ("{exposurePrefix}_id", "scienceCcdExposureId"),
734  ("{exposurePrefix}_filter_id", "filterId"),
735  ("{exposurePrefix}_time_mid", "timeMid"),
736  ("{exposurePrefix}_time", "expTime"),
737  ("objectId", "{coaddName}SourceId"),
738  ("coord_ra", "ra"), # from minimal schema, no prefix
739  ("coord_decl", "decl"), # from minimal schema, no prefix
740  ("coord_raVar", "raVar"), # from source association, no prefix
741  ("coord_declVar", "declVar"), # from source association, no prefix
742  ("coord_radeclCov", "radeclCov"), # from source association, no prefix
743  ("coord_htmId20", "htmId20"), # from ingest, no prefix
744  ("{measPrefix}{centroid}_x", "x"),
745  ("{measPrefix}{centroid}_y", "y"),
746  ("{measPrefix}{centroid}_xVar", "xVar"),
747  ("{measPrefix}{centroid}_yVar", "yVar"),
748  ("{measPrefix}{centroid}_xyCov", "xyCov"),
749  ("{measPrefix}{psfFlux}", "psfFlux"),
750  ("{measPrefix}{psfFlux}_err", "psfFluxSigma"),
751  ("{measPrefix}{apFlux}", "apFlux"),
752  ("{measPrefix}{apFlux}_err", "apFluxSigma"),
753  ("{measPrefix}{modelFlux}", "modelFlux"),
754  ("{measPrefix}{modelFlux}_err", "modelFluxSigma"),
755  ("{measPrefix}{instFlux}", "instFlux"),
756  ("{measPrefix}{instFlux}_err", "instFluxSigma"),
757  ("aperturecorrection", "apCorrection"), # from measurement, no prefix (!?)
758  ("aperturecorrection_err", "apCorrectionSigma"), # from measurement, no prefix (!?)
759  ("{measPrefix}{shape}_centroid_x", "shapeIx"),
760  ("{measPrefix}{shape}_centroid_y", "shapeIy"),
761  ("{measPrefix}{shape}_centroid_xVar", "shapeIxVar"),
762  ("{measPrefix}{shape}_centroid_yVar", "shapeIyVar"),
763  ("{measPrefix}{shape}_centroid_xyCov", "shapeIxIyCov"),
764  ("{measPrefix}{shape}_Ixx", "shapeIxx"),
765  ("{measPrefix}{shape}_Iyy", "shapeIyy"),
766  ("{measPrefix}{shape}_Ixy", "shapeIxy"),
767  ("{measPrefix}{shape}_IxxVar", "shapeIxxVar"),
768  ("{measPrefix}{shape}_IyyVar", "shapeIyyVar"),
769  ("{measPrefix}{shape}_IxyVar", "shapeIxyVar"),
770  ("{measPrefix}{shape}_IxxIyyCov", "shapeIxxIyyCov"),
771  ("{measPrefix}{shape}_IxxIxyCov", "shapeIxxIxyCov"),
772  ("{measPrefix}{shape}_IxxIxyCov", "shapeIyyIxyCov"),
773  ("{measPrefix}classification_extendedness", "extendedness"),
774  ("flags_negative", "flagNegative"), # from detection, no prefix
775  ("{measPrefix}flags_badcentroid", "flagBadMeasCentroid"),
776  ("{measPrefix}flags_pixel_edge", "flagPixEdge"),
777  ("{measPrefix}flags_pixel_interpolated_any", "flagPixInterpAny"),
778  ("{measPrefix}flags_pixel_interpolated_center", "flagPixInterpCen"),
779  ("{measPrefix}flags_pixel_saturated_any", "flagPixSaturAny"),
780  ("{measPrefix}flags_pixel_saturated_center", "flagPixSaturCen"),
781  ("{measPrefix}{psfFlux}_flags", "flagBadPsfFlux"),
782  ("{measPrefix}{apFlux}_flags", "flagBadApFlux"),
783  ("{measPrefix}{modelFlux}_flags", "flagBadModelFlux"),
784  ("{measPrefix}{instFlux}_flags", "flagBadInstFlux"),
785  ("{measPrefix}{centroid}_flags", "flagBadCentroid"),
786  ("{measPrefix}{shape}_flags", "flagBadShape"),
787 ]
788 
789 def forcedSourceTableSql(coaddName,
790  schema,
791  sourceConversionConfig,
792  asView,
793  sourceProcessingConfig,
794  slotConfig,
795  measPrefix):
796  """Return a tuple of SQL statements (createStmt, loadStmt, sourceStmt)
797  for a forced source table. The canonical table name is obtained by
798  capitalizing the first letter of coaddName and appending 'ForcedSource'.
799  The run specific table name is derived from the former by prepending 'Run'.
800 
801  createStmt : CREATE TABLE statement for the Run<CoaddName>ForcedSource
802  table, which includes all fields from the run-specific
803  lsst.afw.table.Schema for source tables output by the
804  pipelines.
805 
806  loadStmt : LOAD DATA statement for the Run<CoaddName>ForcedSource
807  table. This is a format string; to generate valid SQL a
808  fileName must be supplied, e.g.:
809 
810  loadStmt.format(fileName='source.csv')
811 
812  sourceStmt : Map the Run<CoaddName>ForcedSource table to the canonical
813  <CoaddName>ForcedSource schema. This will either create a
814  VIEW, or INSERT into the materialized equivalent.
815 
816  @param coaddName
817  Coadd name (camel-case), e.g. 'deep' or 'goodSeeing'.
818  @param schema
819  lsst.afw.table.Schema for forced sources.
820  @param sourceConversionConfig
821  lsst.ap.utils.CsvConversionConfig - parameters used for
822  C++ to CSV conversion.
823  @param asView
824  True if the canonical table should be constructed as a VIEW on
825  top of the run-specific table.
826  @param sourceProcessingConfig
827  lsst.ap.cluster.SourceProcessingConfig - parameters used to
828  denormalize the C++ schema produced by the pipeline.
829  @param slotConfig
830  lsst.meas.algorithms.SlotConfig - pipeline slot mappings.
831  @param measPrefix
832  Prefix for measurement field names.
833  """
834  # Generate SQL for run specific table
835  createStmt, loadStmt = genericTableSql(
836  schema,
837  sourceConversionConfig,
838  _sourceIndexes(sourceProcessingConfig))
839  # build substitution parameters for mapping table
840  if sourceProcessingConfig.clusterPrefix is None:
841  sourceProcessingConfig.clusterPrefix = ""
842  kw = _getMappingKw(
843  slotConfig,
844  sourceProcessingConfig,
845  measPrefix)
846  # build selection/output column lists
847  selcols = []
848  outcols = []
849  CoaddName = coaddName[0].upper() + coaddName[1:]
850  for runFmt, srcCol in _forcedSourceMappings:
851  runCol = runFmt.format(**kw)
852  srcCol = srcCol.format(coaddName=coaddName, CoaddName=CoaddName)
853  if sourceProcessingConfig.multiBand and srcCol == 'filterId':
854  continue # multi-band source has no filterId
855  field = _colToField(runCol)
856  isFlag = srcCol.startswith('flag')
857  if isFlag and not sourceConversionConfig.flagsAsBits:
858  continue
859  if field in schema or runCol == 'coord_htmId20':
860  selcols.append(runCol)
861  elif isFlag:
862  selcols.append("b'0'")
863  else:
864  selcols.append('NULL')
865  outcols.append(srcCol)
866  if not sourceConversionConfig.flagsAsBits:
867  # Deal with canonical flags packed into BIGINTs
868  n = (len(sourceConversionConfig.canonicalFlags) + 62) / 63
869  if n == 1:
870  selcols.append('flags')
871  outcols.append('flags')
872  else:
873  for i in xrange(1, n + 1):
874  c = 'flags{}'.format(i)
875  selcols.append(c)
876  outcols.append(c)
877  tableName = CoaddName + 'ForcedSource'
878  runTableName = 'Run' + tableName
879  if asView:
880  # Replace the official version of <CoaddName>ForcedSource with an equivalent VIEW
881  sourceStmt = 'CREATE OR REPLACE VIEW {} AS SELECT\n\t'.format(tableName)
882  sourceStmt += ',\n\t'.join(a + ' AS ' + b for a,b in zip(selcols, outcols))
883  sourceStmt += '\nFROM {};'.format(runTableName)
884  else:
885  # Use the definition of Source from cat (i.e. the one used by the
886  # schema browser for documentation purposes). This should cause
887  # ingest to fail if this code and the canonical schema are not in sync.
888  sourceStmt = 'INSERT INTO {} (\n\t'.format(tableName)
889  sourceStmt += ',\n\t'.join(outcols)
890  sourceStmt += ')\nSELECT\n\t'
891  sourceStmt += ',\n\t'.join(selcols)
892  sourceStmt += '\nFROM {};\n'.format(runTableName)
893  return (createStmt.format(tableName=runTableName),
894  loadStmt.format(tableName=runTableName, fileName='{fileName}'),
895  sourceStmt)
def makeMysqlCsvConfig
Definition: schema.py:60
def forcedSourceTableSql
Definition: schema.py:795
def coaddSourceTableSql
Definition: schema.py:630