LSSTApplications  16.0-10-g9d3e444,16.0-11-g09ed895+3,16.0-11-g12e47bd+4,16.0-11-g9bb73b2+10,16.0-12-g5c924a4+10,16.0-15-g7af1f30,16.0-15-gdd5ca33+2,16.0-16-gf0259e2+1,16.0-17-g31abd91+11,16.0-17-g5cf0468+3,16.0-18-g51a54b3+3,16.0-18-ga4d4bcb+5,16.0-18-gcf94535+2,16.0-19-g9d290d5+2,16.0-2-g0febb12+22,16.0-2-g9d5294e+73,16.0-2-ga8830df+7,16.0-21-g3d035912+2,16.0-26-g8e79609,16.0-28-gfc9ea6c+9,16.0-29-ge8801f9+4,16.0-3-ge00e371+38,16.0-4-g18f3627+17,16.0-4-g5f3a788+21,16.0-4-ga3eb747+11,16.0-4-gabf74b7+33,16.0-4-gb13d127+7,16.0-5-g27fb78a+11,16.0-5-g6a53317+38,16.0-5-gb3f8a4b+91,16.0-51-gbbe9c988+3,16.0-6-g9321be7+5,16.0-6-gcbc7b31+47,16.0-6-gf49912c+33,16.0-75-gbf7a9a820,16.0-8-g21fd5fe+34,16.0-8-g3a9f023+24,16.0-8-gc11f1cf,16.0-9-gf3bc169+2,16.0-9-gf5c1f43+12,master-gc237143d49,w.2019.02
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 import os
2 import shutil
3 import tempfile
4 import sqlite3
5 from fnmatch import fnmatch
6 from glob import glob
7 from contextlib import contextmanager
8 
9 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
11 from lsst.afw.fits import readMetadata
12 from lsst.pipe.base import Task, InputOnlyArgumentParser
13 from lsst.afw.fits import DEFAULT_HDU
14 
15 
17  """Argument parser to support ingesting images into the image repository"""
18 
19  def __init__(self, *args, **kwargs):
20  super(IngestArgumentParser, self).__init__(*args, **kwargs)
21  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", default=False,
22  help="Don't perform any action?")
23  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="link",
24  help="Mode of delivering the files to their destination")
25  self.add_argument("--create", action="store_true", help="Create new registry (clobber old)?")
26  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
27  help="Don't register files that have already been registered")
28  self.add_id_argument("--badId", "raw", "Data identifier for bad data", doMakeDataRefList=False)
29  self.add_argument("--badFile", nargs="*", default=[],
30  help="Names of bad files (no path; wildcards allowed)")
31  self.add_argument("files", nargs="+", help="Names of file")
32 
33 
35  """Configuration for ParseTask"""
36  translation = DictField(keytype=str, itemtype=str, default={},
37  doc="Translation table for property --> header")
38  translators = DictField(keytype=str, itemtype=str, default={},
39  doc="Properties and name of translator method")
40  defaults = DictField(keytype=str, itemtype=str, default={},
41  doc="Default values if header is not present")
42  hdu = Field(dtype=int, default=DEFAULT_HDU, doc="HDU to read for metadata")
43  extnames = ListField(dtype=str, default=[], doc="Extension names to search for")
44 
45 
46 class ParseTask(Task):
47  """Task that will parse the filename and/or its contents to get the required information
48  for putting the file in the correct location and populating the registry."""
49  ConfigClass = ParseConfig
50 
51  def getInfo(self, filename):
52  """Get information about the image from the filename and its contents
53 
54  Here, we open the image and parse the header, but one could also look at the filename itself
55  and derive information from that, or set values from the configuration.
56 
57  @param filename Name of file to inspect
58  @return File properties; list of file properties for each extension
59  """
60  md = readMetadata(filename, self.config.hdu)
61  phuInfo = self.getInfoFromMetadata(md)
62  if len(self.config.extnames) == 0:
63  # No extensions to worry about
64  return phuInfo, [phuInfo]
65  # Look in the provided extensions
66  extnames = set(self.config.extnames)
67  extnum = 0
68  infoList = []
69  while len(extnames) > 0:
70  extnum += 1
71  try:
72  md = readMetadata(filename, extnum)
73  except Exception as e:
74  self.log.warn("Error reading %s extensions %s: %s" % (filename, extnames, e))
75  break
76  ext = self.getExtensionName(md)
77  if ext in extnames:
78  hduInfo = self.getInfoFromMetadata(md, info=phuInfo.copy())
79  # We need the HDU number when registering MEF files.
80  hduInfo["hdu"] = extnum
81  infoList.append(hduInfo)
82  extnames.discard(ext)
83  return phuInfo, infoList
84 
85  @staticmethod
87  """ Get the name of an extension.
88  @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata)
89  @return Name of the extension if it exists. None otherwise.
90  """
91  try:
92  # This returns a tuple
93  ext = md.getScalar("EXTNAME")
94  return ext[1]
96  return None
97 
98  def getInfoFromMetadata(self, md, info=None):
99  """Attempt to pull the desired information out of the header
100 
101  This is done through two mechanisms:
102  * translation: a property is set directly from the relevant header keyword
103  * translator: a property is set with the result of calling a method
104 
105  The translator methods receive the header metadata and should return the
106  appropriate value, or None if the value cannot be determined.
107 
108  @param md FITS header
109  @param info File properties, to be supplemented
110  @return info
111  """
112  if info is None:
113  info = {}
114  for p, h in self.config.translation.items():
115  if md.exists(h):
116  value = md.getScalar(h)
117  if isinstance(value, str):
118  value = value.strip()
119  info[p] = value
120  elif p in self.config.defaults:
121  info[p] = self.config.defaults[p]
122  else:
123  self.log.warn("Unable to find value for %s (derived from %s)" % (p, h))
124  for p, t in self.config.translators.items():
125  func = getattr(self, t)
126  try:
127  value = func(md)
128  except Exception as e:
129  self.log.warn("%s failed to translate %s: %s", t, p, e)
130  value = None
131  if value is not None:
132  info[p] = value
133  return info
134 
135  def translate_date(self, md):
136  """Convert a full DATE-OBS to a mere date
137 
138  Besides being an example of a translator, this is also generally useful.
139  It will only be used if listed as a translator in the configuration.
140  """
141  date = md.getScalar("DATE-OBS").strip()
142  c = date.find("T")
143  if c > 0:
144  date = date[:c]
145  return date
146 
147  def translate_filter(self, md):
148  """Translate a full filter description into a mere filter name
149 
150  Besides being an example of a translator, this is also generally useful.
151  It will only be used if listed as a translator in the configuration.
152  """
153  filterName = md.getScalar("FILTER").strip()
154  filterName = filterName.strip()
155  c = filterName.find(" ")
156  if c > 0:
157  filterName = filterName[:c]
158  return filterName
159 
160  def getDestination(self, butler, info, filename):
161  """Get destination for the file
162 
163  @param butler Data butler
164  @param info File properties, used as dataId for the butler
165  @param filename Input filename
166  @return Destination filename
167  """
168  raw = butler.get("raw_filename", info)[0]
169  # Ensure filename is devoid of cfitsio directions about HDUs
170  c = raw.find("[")
171  if c > 0:
172  raw = raw[:c]
173  return raw
174 
175 
177  """Configuration for the RegisterTask"""
178  table = Field(dtype=str, default="raw", doc="Name of table")
179  columns = DictField(keytype=str, itemtype=str, doc="List of columns for raw table, with their types",
180  itemCheck=lambda x: x in ("text", "int", "double"),
181  default={'object': 'text',
182  'visit': 'int',
183  'ccd': 'int',
184  'filter': 'text',
185  'date': 'text',
186  'taiObs': 'text',
187  'expTime': 'double',
188  },
189  )
190  unique = ListField(dtype=str, doc="List of columns to be declared unique for the table",
191  default=["visit", "ccd"])
192  visit = ListField(dtype=str, default=["visit", "object", "date", "filter"],
193  doc="List of columns for raw_visit table")
194  ignore = Field(dtype=bool, default=False, doc="Ignore duplicates in the table?")
195  permissions = Field(dtype=int, default=0o664, doc="Permissions mode for registry; 0o664 = rw-rw-r--")
196 
197 
199  """Context manager to provide a registry
200 
201  An existing registry is copied, so that it may continue
202  to be used while we add to this new registry. Finally,
203  the new registry is moved into the right place.
204  """
205 
206  def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
207  """Construct a context manager
208 
209  @param registryName: Name of registry file
210  @param createTableFunc: Function to create tables
211  @param forceCreateTables: Force the (re-)creation of tables?
212  @param permissions: Permissions to set on database file
213  """
214  self.registryName = registryName
215  self.permissions = permissions
216 
217  updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.registryName),
218  delete=False)
219  self.updateName = updateFile.name
220 
221  haveTable = False
222  if os.path.exists(registryName):
223  assertCanCopy(registryName, self.updateName)
224  os.chmod(self.updateName, os.stat(registryName).st_mode)
225  shutil.copyfile(registryName, self.updateName)
226  haveTable = True
227 
228  self.conn = sqlite3.connect(self.updateName)
229  if not haveTable or forceCreateTables:
230  createTableFunc(self.conn)
231  os.chmod(self.updateName, self.permissions)
232 
233  def __enter__(self):
234  """Provide the 'as' value"""
235  return self.conn
236 
237  def __exit__(self, excType, excValue, traceback):
238  self.conn.commit()
239  self.conn.close()
240  if excType is None:
242  if os.path.exists(self.registryName):
243  os.unlink(self.registryName)
244  os.rename(self.updateName, self.registryName)
245  os.chmod(self.registryName, self.permissions)
246  return False # Don't suppress any exceptions
247 
248 
249 @contextmanager
251  """A context manager that doesn't provide any context
252 
253  Useful for dry runs where we don't want to actually do anything real.
254  """
255  yield
256 
257 
258 class RegisterTask(Task):
259  """Task that will generate the registry for the Mapper"""
260  ConfigClass = RegisterConfig
261  placeHolder = '?' # Placeholder for parameter substitution; this value suitable for sqlite3
262  typemap = {'text': str, 'int': int, 'double': float} # Mapping database type --> python type
263 
264  def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
265  """Open the registry and return the connection handle.
266 
267  @param directory Directory in which the registry file will be placed
268  @param create Clobber any existing registry and create a new one?
269  @param dryrun Don't do anything permanent?
270  @param name Filename of the registry
271  @return Database connection
272  """
273  if dryrun:
274  return fakeContext()
275 
276  registryName = os.path.join(directory, name)
277  context = RegistryContext(registryName, self.createTable, create, self.config.permissions)
278  return context
279 
280  def createTable(self, conn, table=None):
281  """Create the registry tables
282 
283  One table (typically 'raw') contains information on all files, and the
284  other (typically 'raw_visit') contains information on all visits.
285 
286  @param conn Database connection
287  @param table Name of table to create in database
288  """
289  if table is None:
290  table = self.config.table
291  cmd = "create table %s (id integer primary key autoincrement, " % table
292  cmd += ",".join([("%s %s" % (col, colType)) for col, colType in self.config.columns.items()])
293  if len(self.config.unique) > 0:
294  cmd += ", unique(" + ",".join(self.config.unique) + ")"
295  cmd += ")"
296  conn.cursor().execute(cmd)
297 
298  cmd = "create table %s_visit (" % table
299  cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit])
300  cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
301  cmd += ")"
302  conn.cursor().execute(cmd)
303 
304  conn.commit()
305 
306  def check(self, conn, info, table=None):
307  """Check for the presence of a row already
308 
309  Not sure this is required, given the 'ignore' configuration option.
310  """
311  if table is None:
312  table = self.config.table
313  if self.config.ignore or len(self.config.unique) == 0:
314  return False # Our entry could already be there, but we don't care
315  cursor = conn.cursor()
316  sql = "SELECT COUNT(*) FROM %s WHERE " % table
317  sql += " AND ".join(["%s = %s" % (col, self.placeHolder) for col in self.config.unique])
318  values = [self.typemap[self.config.columns[col]](info[col]) for col in self.config.unique]
319 
320  cursor.execute(sql, values)
321  if cursor.fetchone()[0] > 0:
322  return True
323  return False
324 
325  def addRow(self, conn, info, dryrun=False, create=False, table=None):
326  """Add a row to the file table (typically 'raw').
327 
328  @param conn Database connection
329  @param info File properties to add to database
330  @param table Name of table in database
331  """
332  if table is None:
333  table = self.config.table
334  sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
335  sql += ",".join([self.placeHolder] * len(self.config.columns))
336  values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]
337 
338  if self.config.ignore:
339  sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
340  sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
341  sql += ")"
342  values += [info[col] for col in self.config.unique]
343 
344  if dryrun:
345  print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
346  else:
347  conn.cursor().execute(sql, values)
348 
349  def addVisits(self, conn, dryrun=False, table=None):
350  """Generate the visits table (typically 'raw_visits') from the
351  file table (typically 'raw').
352 
353  @param conn Database connection
354  @param table Name of table in database
355  """
356  if table is None:
357  table = self.config.table
358  sql = "INSERT INTO %s_visit SELECT DISTINCT " % table
359  sql += ",".join(self.config.visit)
360  sql += " FROM %s AS vv1" % table
361  sql += " WHERE NOT EXISTS "
362  sql += "(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
363  if dryrun:
364  print("Would execute: %s" % sql)
365  else:
366  conn.cursor().execute(sql)
367 
368 
370  """Configuration for IngestTask"""
371  parse = ConfigurableField(target=ParseTask, doc="File parsing")
372  register = ConfigurableField(target=RegisterTask, doc="Registry entry")
373  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
374  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
375 
376 
378  """Task that will ingest images into the data repository"""
379  ConfigClass = IngestConfig
380  ArgumentParser = IngestArgumentParser
381  _DefaultName = "ingest"
382 
383  def __init__(self, *args, **kwargs):
384  super(IngestTask, self).__init__(*args, **kwargs)
385  self.makeSubtask("parse")
386  self.makeSubtask("register")
387 
388  @classmethod
389  def parseAndRun(cls):
390  """Parse the command-line arguments and run the Task"""
391  config = cls.ConfigClass()
392  parser = cls.ArgumentParser(name=cls._DefaultName)
393  args = parser.parse_args(config)
394  task = cls(config=args.config)
395  task.run(args)
396 
397  def ingest(self, infile, outfile, mode="move", dryrun=False):
398  """Ingest a file into the image repository.
399 
400  @param infile Name of input file
401  @param outfile Name of output file (file in repository)
402  @param mode Mode of ingest (copy/link/move/skip)
403  @param dryrun Only report what would occur?
404  @param Success boolean
405  """
406  if mode == "skip":
407  return True
408  if dryrun:
409  self.log.info("Would %s from %s to %s" % (mode, infile, outfile))
410  return True
411  try:
412  outdir = os.path.dirname(outfile)
413  if not os.path.isdir(outdir):
414  try:
415  os.makedirs(outdir)
416  except OSError:
417  # Silently ignore mkdir failures due to race conditions
418  if not os.path.isdir(outdir):
419  raise
420  if os.path.lexists(outfile):
421  if self.config.clobber:
422  os.unlink(outfile)
423  else:
424  raise RuntimeError("File %s already exists; consider --config clobber=True" % outfile)
425 
426  if mode == "copy":
427  assertCanCopy(infile, outfile)
428  shutil.copyfile(infile, outfile)
429  elif mode == "link":
430  os.symlink(os.path.abspath(infile), outfile)
431  elif mode == "move":
432  assertCanCopy(infile, outfile)
433  os.rename(infile, outfile)
434  else:
435  raise AssertionError("Unknown mode: %s" % mode)
436  self.log.info("%s --<%s>--> %s" % (infile, mode, outfile))
437  except Exception as e:
438  self.log.warn("Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
439  if not self.config.allowError:
440  raise
441  return False
442  return True
443 
444  def isBadFile(self, filename, badFileList):
445  """Return whether the file qualifies as bad
446 
447  We match against the list of bad file patterns.
448  """
449  filename = os.path.basename(filename)
450  if not badFileList:
451  return False
452  for badFile in badFileList:
453  if fnmatch(filename, badFile):
454  return True
455  return False
456 
457  def isBadId(self, info, badIdList):
458  """Return whether the file information qualifies as bad
459 
460  We match against the list of bad data identifiers.
461  """
462  if not badIdList:
463  return False
464  for badId in badIdList:
465  if all(info[key] == value for key, value in badId.items()):
466  return True
467  return False
468 
469  def expandFiles(self, fileNameList):
470  """!Expand a set of filenames and globs, returning a list of filenames
471 
472  @param fileNameList A list of files and glob patterns
473 
474  N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
475  """
476  filenameList = []
477  for globPattern in fileNameList:
478  files = glob(globPattern)
479 
480  if not files: # posix behaviour is to return pattern unchanged
481  self.log.warn("%s doesn't match any file" % globPattern)
482  continue
483 
484  filenameList.extend(files)
485 
486  return filenameList
487 
488  def runFile(self, infile, registry, args):
489  """!Examine and ingest a single file
490 
491  @param infile: File to process
492  @param args: Parsed command-line arguments
493  @return parsed information from FITS HDUs or None
494  """
495  if self.isBadFile(infile, args.badFile):
496  self.log.info("Skipping declared bad file %s" % infile)
497  return None
498  try:
499  fileInfo, hduInfoList = self.parse.getInfo(infile)
500  except Exception as e:
501  if not self.config.allowError:
502  raise
503  self.log.warn("Error parsing %s (%s); skipping" % (infile, e))
504  return None
505  if self.isBadId(fileInfo, args.badId.idList):
506  self.log.info("Skipping declared bad file %s: %s" % (infile, fileInfo))
507  return
508  if registry is not None and self.register.check(registry, fileInfo):
509  if args.ignoreIngested:
510  return None
511  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
512  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
513  if not self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
514  return None
515  return hduInfoList
516 
517  def run(self, args):
518  """Ingest all specified files and add them to the registry"""
519  filenameList = self.expandFiles(args.files)
520  root = args.input
521  context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
522  with context as registry:
523  for infile in filenameList:
524  try:
525  hduInfoList = self.runFile(infile, registry, args)
526  except Exception as exc:
527  self.log.warn("Failed to ingest file %s: %s", infile, exc)
528  continue
529  if hduInfoList is None:
530  continue
531  for info in hduInfoList:
532  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
533  self.register.addVisits(registry, dryrun=args.dryrun)
534 
535 
536 def assertCanCopy(fromPath, toPath):
537  """Can I copy a file? Raise an exception is space constraints not met.
538 
539  @param fromPath Path from which the file is being copied
540  @param toPath Path to which the file is being copied
541  """
542  req = os.stat(fromPath).st_size
543  st = os.statvfs(os.path.dirname(toPath))
544  avail = st.f_bavail * st.f_frsize
545  if avail < req:
546  raise RuntimeError("Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:397
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def translate_filter(self, md)
Definition: ingest.py:147
def createTable(self, conn, table=None)
Definition: ingest.py:280
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:469
def __exit__(self, excType, excValue, traceback)
Definition: ingest.py:237
Provides consistent interface for LSST exceptions.
Definition: Exception.h:107
def getInfo(self, filename)
Definition: ingest.py:51
daf::base::PropertySet * set
Definition: fits.cc:832
def getInfoFromMetadata(self, md, info=None)
Definition: ingest.py:98
def getDestination(self, butler, info, filename)
Definition: ingest.py:160
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
Definition: ingest.py:488
def isBadFile(self, filename, badFileList)
Definition: ingest.py:444
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
Definition: ingest.py:206
def assertCanCopy(fromPath, toPath)
Definition: ingest.py:536
def check(self, conn, info, table=None)
Definition: ingest.py:306
def __init__(self, args, kwargs)
Definition: ingest.py:19
def addVisits(self, conn, dryrun=False, table=None)
Definition: ingest.py:349
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
Definition: ingest.py:264
def __init__(self, args, kwargs)
Definition: ingest.py:383
def addRow(self, conn, info, dryrun=False, create=False, table=None)
Definition: ingest.py:325
def isBadId(self, info, badIdList)
Definition: ingest.py:457
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
bool strip
Definition: fits.cc:831