LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 import os
2 import shutil
3 import tempfile
4 try:
5  import sqlite3
6 except ImportError:
7  # try external pysqlite package; deprecated
8  import sqlite as sqlite3
9 from fnmatch import fnmatch
10 from glob import glob
11 
12 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
14 from lsst.pipe.base import Task, Struct, ArgumentParser
15 import lsst.afw.image as afwImage
16 
17 class IngestArgumentParser(ArgumentParser):
18  """Argument parser to support ingesting images into the image repository"""
19  def __init__(self, *args, **kwargs):
20  super(IngestArgumentParser, self).__init__(*args, **kwargs)
21  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", default=False,
22  help="Don't perform any action?")
23  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="link",
24  help="Mode of delivering the files to their destination")
25  self.add_argument("--create", action="store_true", help="Create new registry (clobber old)?")
26  self.add_id_argument("--badId", "raw", "Data identifier for bad data", doMakeDataRefList=False)
27  self.add_argument("--badFile", nargs="*", default=[],
28  help="Names of bad files (no path; wildcards allowed)")
29  self.add_argument("files", nargs="+", help="Names of file")
30 
31 class ParseConfig(Config):
32  """Configuration for ParseTask"""
33  translation = DictField(keytype=str, itemtype=str, default={},
34  doc="Translation table for property --> header")
35  translators = DictField(keytype=str, itemtype=str, default={},
36  doc="Properties and name of translator method")
37  defaults = DictField(keytype=str, itemtype=str, default={},
38  doc="Default values if header is not present")
39  hdu = Field(dtype=int, default=0, doc="HDU to read for metadata")
40  extnames = ListField(dtype=str, default=[], doc="Extension names to search for")
41 
42 class ParseTask(Task):
43  """Task that will parse the filename and/or its contents to get the required information
44  for putting the file in the correct location and populating the registry."""
45  ConfigClass = ParseConfig
46 
47  def getInfo(self, filename):
48  """Get information about the image from the filename and its contents
49 
50  Here, we open the image and parse the header, but one could also look at the filename itself
51  and derive information from that, or set values from the configuration.
52 
53  @param filename Name of file to inspect
54  @return File properties; list of file properties for each extension
55  """
56  md = afwImage.readMetadata(filename, self.config.hdu)
57  phuInfo = self.getInfoFromMetadata(md)
58  if len(self.config.extnames) == 0:
59  # No extensions to worry about
60  return phuInfo, [phuInfo]
61  # Look in the provided extensions
62  extnames = set(self.config.extnames)
63  extnum = 1
64  infoList = []
65  while len(extnames) > 0:
66  extnum += 1
67  try:
68  md = afwImage.readMetadata(filename, extnum)
69  except:
70  self.log.warn("Error reading %s extensions %s" % (filename, extnames))
71  break
72  ext = self.getExtensionName(md)
73  if ext in extnames:
74  infoList.append(self.getInfoFromMetadata(md, info=phuInfo.copy()))
75  extnames.discard(ext)
76  return phuInfo, infoList
77 
78  @staticmethod
80  """ Get the name of an extension.
81  @param md: PropertySet like one obtained from afwImage.readMetadata)
82  @return Name of the extension if it exists. None otherwise.
83  """
84  try:
85  # This returns a tuple
86  ext = md.get("EXTNAME")
87  return ext[1]
89  return None
90 
91  def getInfoFromMetadata(self, md, info={}):
92  """Attempt to pull the desired information out of the header
93 
94  This is done through two mechanisms:
95  * translation: a property is set directly from the relevant header keyword
96  * translator: a property is set with the result of calling a method
97 
98  The translator methods receive the header metadata and should return the
99  appropriate value, or None if the value cannot be determined.
100 
101  @param md FITS header
102  @param info File properties, to be supplemented
103  @return info
104  """
105  for p, h in self.config.translation.iteritems():
106  if md.exists(h):
107  value = md.get(h)
108  if isinstance(value, basestring):
109  value = value.strip()
110  info[p] = value
111  elif p in self.config.defaults:
112  info[p] = self.config.defaults[p]
113  else:
114  self.log.warn("Unable to find value for %s (derived from %s)" % (p, h))
115  for p, t in self.config.translators.iteritems():
116  func = getattr(self, t)
117  try:
118  value = func(md)
119  except:
120  value = None
121  if value is not None:
122  info[p] = value
123  return info
124 
125  def translate_date(self, md):
126  """Convert a full DATE-OBS to a mere date
127 
128  Besides being an example of a translator, this is also generally useful.
129  It will only be used if listed as a translator in the configuration.
130  """
131  date = md.get("DATE-OBS").strip()
132  c = date.find("T")
133  if c > 0:
134  date = date[:c]
135  return date
136 
137  def translate_filter(self, md):
138  """Translate a full filter description into a mere filter name
139 
140  Besides being an example of a translator, this is also generally useful.
141  It will only be used if listed as a translator in the configuration.
142  """
143  filterName = md.get("FILTER").strip()
144  filterName = filterName.strip()
145  c = filterName.find(" ")
146  if c > 0:
147  filterName = filterName[:c]
148  return filterName
149 
150  def getDestination(self, butler, info, filename):
151  """Get destination for the file
152 
153  @param butler Data butler
154  @param info File properties, used as dataId for the butler
155  @param filename Input filename
156  @return Destination filename
157  """
158  raw = butler.get("raw_filename", info)[0]
159  # Ensure filename is devoid of cfitsio directions about HDUs
160  c = raw.find("[")
161  if c > 0:
162  raw = raw[:c]
163  return raw
164 
165 class RegisterConfig(Config):
166  """Configuration for the RegisterTask"""
167  table = Field(dtype=str, default="raw", doc="Name of table")
168  columns = DictField(keytype=str, itemtype=str, doc="List of columns for raw table, with their types",
169  itemCheck=lambda x: x in ("text", "int", "double"),
170  default={'object': 'text',
171  'visit': 'int',
172  'ccd': 'int',
173  'filter': 'text',
174  'date': 'text',
175  'taiObs': 'text',
176  'expTime': 'double',
177  },
178  )
179  unique = ListField(dtype=str, doc="List of columns to be declared unique for the table",
180  default=["visit", "ccd"])
181  visit = ListField(dtype=str, default=["visit", "object", "date", "filter"],
182  doc="List of columns for raw_visit table")
183  ignore = Field(dtype=bool, default=False, doc="Ignore duplicates in the table?")
184  permissions = Field(dtype=int, default=0664, doc="Permissions mode for registry") # octal 664 = rw-rw-r--
185 
186 class RegistryContext(object):
187  """Context manager to provide a registry
188 
189  An existing registry is copied, so that it may continue
190  to be used while we add to this new registry. Finally,
191  the new registry is moved into the right place.
192  """
193  def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
194  """Construct a context manager
195 
196  @param registryName: Name of registry file
197  @param createTableFunc: Function to create tables
198  """
199  self.registryName = registryName
200  self.permissions = permissions
201 
202  updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.registryName),
203  delete=False)
204  self.updateName = updateFile.name
205 
206  haveTable = False
207  if os.path.exists(registryName):
208  assertCanCopy(registryName, self.updateName)
209  os.chmod(self.updateName, os.stat(registryName).st_mode)
210  shutil.copyfile(registryName, self.updateName)
211  haveTable = True
212 
213  self.conn = sqlite3.connect(self.updateName)
214  if not haveTable or forceCreateTables:
215  createTableFunc(self.conn)
216  os.chmod(self.updateName, self.permissions)
217 
218  def __enter__(self):
219  """Provide the 'as' value"""
220  return self.conn
221 
222  def __exit__(self, excType, excValue, traceback):
223  self.conn.commit()
224  self.conn.close()
225  if excType is None:
227  if os.path.exists(self.registryName):
228  os.unlink(self.registryName)
229  os.rename(self.updateName, self.registryName)
230  os.chmod(self.registryName, self.permissions)
231  return False # Don't suppress any exceptions
232 
233 class RegisterTask(Task):
234  """Task that will generate the registry for the Mapper"""
235  ConfigClass = RegisterConfig
236 
237  def openRegistry(self, butler, create=False, dryrun=False):
238  """Open the registry and return the connection handle.
239 
240  @param butler Data butler, from which the registry file is determined
241  @param create Clobber any existing registry and create a new one?
242  @param dryrun Don't do anything permanent?
243  @return Database connection
244  """
245  if dryrun:
246  from contextlib import contextmanager
247  @contextmanager
248  def fakeContext():
249  yield
250  return fakeContext
251  registryName = os.path.join(butler.mapper.root, "registry.sqlite3")
252  context = RegistryContext(registryName, self.createTable, create, self.config.permissions)
253  return context
254 
255  def createTable(self, conn):
256  """Create the registry tables
257 
258  One table (typically 'raw') contains information on all files, and the
259  other (typically 'raw_visit') contains information on all visits.
260 
261  @param conn Database connection
262  """
263  cmd = "create table %s (id integer primary key autoincrement, " % self.config.table
264  cmd += ",".join([("%s %s" % (col, colType)) for col,colType in self.config.columns.items()])
265  if len(self.config.unique) > 0:
266  cmd += ", unique(" + ",".join(self.config.unique) + ")"
267  cmd += ")"
268  conn.execute(cmd)
269 
270  cmd = "create table %s_visit (" % self.config.table
271  cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit])
272  cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
273  cmd += ")"
274  conn.execute(cmd)
275 
276  conn.commit()
277 
278  def check(self, conn, info):
279  """Check for the presence of a row already
280 
281  Not sure this is required, given the 'ignore' configuration option.
282  """
283  if self.config.ignore or len(self.config.unique) == 0:
284  return False # Our entry could already be there, but we don't care
285  cursor = conn.cursor()
286  sql = "SELECT COUNT(*) FROM %s WHERE " % self.config.table
287  sql += " AND ".join(["%s=?" % col for col in self.config.unique])
288  values = [info[col] for col in self.config.unique]
289 
290  cursor.execute(sql, values)
291  if cursor.fetchone()[0] > 0:
292  return True
293  return False
294 
295  def addRow(self, conn, info, dryrun=False, create=False):
296  """Add a row to the file table (typically 'raw').
297 
298  @param conn Database connection
299  @param info File properties to add to database
300  """
301  sql = "INSERT"
302  if self.config.ignore:
303  sql += " OR IGNORE"
304  sql += " INTO %s VALUES (NULL" % self.config.table
305  sql += ", ?" * len(self.config.columns)
306  sql += ")"
307  values = [info[col] for col in self.config.columns]
308  if dryrun:
309  print "Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values]))
310  else:
311  conn.execute(sql, values)
312 
313  def addVisits(self, conn, dryrun=False):
314  """Generate the visits table (typically 'raw_visits') from the
315  file table (typically 'raw').
316 
317  @param conn Database connection
318  """
319  sql = "INSERT OR IGNORE INTO %s_visit SELECT DISTINCT " % self.config.table
320  sql += ",".join(self.config.visit)
321  sql += " FROM %s" % self.config.table
322  if dryrun:
323  print "Would execute: %s" % sql
324  else:
325  conn.execute(sql)
326 
327 
328 class IngestConfig(Config):
329  """Configuration for IngestTask"""
330  parse = ConfigurableField(target=ParseTask, doc="File parsing")
331  register = ConfigurableField(target=RegisterTask, doc="Registry entry")
332  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
333  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
334 
335 class IngestTask(Task):
336  """Task that will ingest images into the data repository"""
337  ConfigClass = IngestConfig
338  ArgumentParser = IngestArgumentParser
339  _DefaultName = "ingest"
340 
341  def __init__(self, *args, **kwargs):
342  super(IngestTask, self).__init__(*args, **kwargs)
343  self.makeSubtask("parse")
344  self.makeSubtask("register")
345 
346  @classmethod
347  def parseAndRun(cls):
348  """Parse the command-line arguments and run the Task"""
349  config = cls.ConfigClass()
350  parser = cls.ArgumentParser("ingest")
351  args = parser.parse_args(config)
352  task = cls(config=args.config)
353  task.run(args)
354 
355  def ingest(self, infile, outfile, mode="move", dryrun=False):
356  """Ingest a file into the image repository.
357 
358  @param infile Name of input file
359  @param outfile Name of output file (file in repository)
360  @param mode Mode of ingest (copy/link/move/skip)
361  @param dryrun Only report what would occur?
362  @param Success boolean
363  """
364  if mode == "skip":
365  return True
366  if dryrun:
367  self.log.info("Would %s from %s to %s" % (mode, infile, outfile))
368  return True
369  try:
370  outdir = os.path.dirname(outfile)
371  if not os.path.isdir(outdir):
372  try:
373  os.makedirs(outdir)
374  except:
375  # Silently ignore mkdir failures due to race conditions
376  if not os.path.isdir(outdir):
377  raise
378  if self.config.clobber and os.path.lexists(outfile):
379  os.unlink(outfile)
380  if mode == "copy":
381  assertCanCopy(infile, outfile)
382  shutil.copyfile(infile, outfile)
383  elif mode == "link":
384  os.symlink(os.path.abspath(infile), outfile)
385  elif mode == "move":
386  assertCanCopy(infile, outfile)
387  os.rename(infile, outfile)
388  else:
389  raise AssertionError("Unknown mode: %s" % mode)
390  print "%s --<%s>--> %s" % (infile, mode, outfile)
391  except Exception, e:
392  self.log.warn("Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
393  if not self.config.allowError:
394  raise
395  return False
396  return True
397 
398  def isBadFile(self, filename, badFileList):
399  """Return whether the file qualifies as bad
400 
401  We match against the list of bad file patterns.
402  """
403  filename = os.path.basename(filename)
404  if not badFileList:
405  return False
406  for badFile in badFileList:
407  if fnmatch(filename, badFile):
408  return True
409  return False
410 
411  def isBadId(self, info, badIdList):
412  """Return whether the file information qualifies as bad
413 
414  We match against the list of bad data identifiers.
415  """
416  if not badIdList:
417  return False
418  for badId in badIdList:
419  if all(info[key] == value for key, value in badId.iteritems()):
420  return True
421  return False
422 
423  def run(self, args):
424  """Ingest all specified files and add them to the registry"""
425  filenameList = sum([glob(filename) for filename in args.files], [])
426  context = self.register.openRegistry(args.butler, create=args.create, dryrun=args.dryrun)
427  with context as registry:
428  for infile in filenameList:
429  if self.isBadFile(infile, args.badFile):
430  self.log.info("Skipping declared bad file %s" % infile)
431  continue
432  fileInfo, hduInfoList = self.parse.getInfo(infile)
433  if self.isBadId(fileInfo, args.badId.idList):
434  self.log.info("Skipping declared bad file %s: %s" % (infile, fileInfo))
435  continue
436  if self.register.check(registry, fileInfo):
437  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
438  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
439  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
440  if not ingested:
441  continue
442  for info in hduInfoList:
443  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
444  self.register.addVisits(registry, dryrun=args.dryrun)
445 
446 def assertCanCopy(fromPath, toPath):
447  """Can I copy a file? Raise an exception is space constraints not met.
448 
449  @param fromPath Path from which the file is being copied
450  @param toPath Path to which the file is being copied
451  """
452  req = os.stat(fromPath).st_size
453  st = os.statvfs(os.path.dirname(toPath))
454  avail = st.f_bavail * st.f_frsize
455  if avail < req:
456  raise RuntimeError("Insufficient space: %d vs %d" % (req, avail))
bool all(CoordinateExpr< N > const &expr)
Return true if all elements are true.
boost::enable_if< typename ExpressionTraits< Scalar >::IsScalar, Scalar >::type sum(Scalar const &scalar)
Definition: operators.h:1250
boost::shared_ptr< daf::base::PropertySet > readMetadata(std::string const &fileName, int hdu=0, bool strip=false)
Return the metadata (header entries) from a FITS file.