LSSTApplications  17.0+42,17.0+72,17.0+8,18.0.0,18.0.0+1,18.0.0+2,18.0.0+4,18.0.0+5,18.0.0+7,18.0.0+9,18.0.0-1-g0001055+1,18.0.0-1-g1349e88+1,18.0.0-1-g2505f39,18.0.0-1-g5e4b7ea+1,18.0.0-1-g85f8cd4+1,18.0.0-1-g8d41266+4,18.0.0-1-g9a6769a+1,18.0.0-1-ge10677a,18.0.0-1-gf8bf62c+1,18.0.0-2-g000ad9a+1,18.0.0-2-g0ee56d7+4,18.0.0-2-g31c43f9,18.0.0-2-g9c63283,18.0.0-2-gdf0b915+1,18.0.0-2-gf03bb23,18.0.0-3-g230d21f,18.0.0-3-gb4677f3+7,18.0.0-4-g329782e+1,18.0.0-5-ga38416e7+6,18.0.0-5-gba33251+1,18.0.0-6-gc37ddf8+1,18.0.0-8-g01e196b65,18.0.0-8-g24ce6f0f+1,18.0.0-8-g697fdba,w.2019.29
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import os
23 import shutil
24 import tempfile
25 import sqlite3
26 from fnmatch import fnmatch
27 from glob import glob
28 from contextlib import contextmanager
29 
30 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
32 from lsst.afw.fits import readMetadata
33 from lsst.pipe.base import Task, InputOnlyArgumentParser
34 from lsst.afw.fits import DEFAULT_HDU
35 
36 
38  """Argument parser to support ingesting images into the image repository"""
39 
40  def __init__(self, *args, **kwargs):
41  super(IngestArgumentParser, self).__init__(*args, **kwargs)
42  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", default=False,
43  help="Don't perform any action?")
44  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="link",
45  help="Mode of delivering the files to their destination")
46  self.add_argument("--create", action="store_true", help="Create new registry (clobber old)?")
47  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
48  help="Don't register files that have already been registered")
49  self.add_id_argument("--badId", "raw", "Data identifier for bad data", doMakeDataRefList=False)
50  self.add_argument("--badFile", nargs="*", default=[],
51  help="Names of bad files (no path; wildcards allowed)")
52  self.add_argument("files", nargs="+", help="Names of file")
53 
54 
56  """Configuration for ParseTask"""
57  translation = DictField(keytype=str, itemtype=str, default={},
58  doc="Translation table for property --> header")
59  translators = DictField(keytype=str, itemtype=str, default={},
60  doc="Properties and name of translator method")
61  defaults = DictField(keytype=str, itemtype=str, default={},
62  doc="Default values if header is not present")
63  hdu = Field(dtype=int, default=DEFAULT_HDU, doc="HDU to read for metadata")
64  extnames = ListField(dtype=str, default=[], doc="Extension names to search for")
65 
66 
67 class ParseTask(Task):
68  """Task that will parse the filename and/or its contents to get the required information
69  for putting the file in the correct location and populating the registry."""
70  ConfigClass = ParseConfig
71 
72  def getInfo(self, filename):
73  """Get information about the image from the filename and its contents
74 
75  Here, we open the image and parse the header, but one could also look at the filename itself
76  and derive information from that, or set values from the configuration.
77 
78  @param filename Name of file to inspect
79  @return File properties; list of file properties for each extension
80  """
81  md = readMetadata(filename, self.config.hdu)
82  phuInfo = self.getInfoFromMetadata(md)
83  if len(self.config.extnames) == 0:
84  # No extensions to worry about
85  return phuInfo, [phuInfo]
86  # Look in the provided extensions
87  extnames = set(self.config.extnames)
88  extnum = 0
89  infoList = []
90  while len(extnames) > 0:
91  extnum += 1
92  try:
93  md = readMetadata(filename, extnum)
94  except Exception as e:
95  self.log.warn("Error reading %s extensions %s: %s" % (filename, extnames, e))
96  break
97  ext = self.getExtensionName(md)
98  if ext in extnames:
99  hduInfo = self.getInfoFromMetadata(md, info=phuInfo.copy())
100  # We need the HDU number when registering MEF files.
101  hduInfo["hdu"] = extnum
102  infoList.append(hduInfo)
103  extnames.discard(ext)
104  return phuInfo, infoList
105 
106  @staticmethod
108  """ Get the name of an extension.
109  @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata)
110  @return Name of the extension if it exists. None otherwise.
111  """
112  try:
113  # This returns a tuple
114  ext = md.getScalar("EXTNAME")
115  return ext[1]
117  return None
118 
119  def getInfoFromMetadata(self, md, info=None):
120  """Attempt to pull the desired information out of the header
121 
122  This is done through two mechanisms:
123  * translation: a property is set directly from the relevant header keyword
124  * translator: a property is set with the result of calling a method
125 
126  The translator methods receive the header metadata and should return the
127  appropriate value, or None if the value cannot be determined.
128 
129  @param md FITS header
130  @param info File properties, to be supplemented
131  @return info
132  """
133  if info is None:
134  info = {}
135  for p, h in self.config.translation.items():
136  if md.exists(h):
137  value = md.getScalar(h)
138  if isinstance(value, str):
139  value = value.strip()
140  info[p] = value
141  elif p in self.config.defaults:
142  info[p] = self.config.defaults[p]
143  else:
144  self.log.warn("Unable to find value for %s (derived from %s)" % (p, h))
145  for p, t in self.config.translators.items():
146  func = getattr(self, t)
147  try:
148  value = func(md)
149  except Exception as e:
150  self.log.warn("%s failed to translate %s: %s", t, p, e)
151  value = None
152  if value is not None:
153  info[p] = value
154  return info
155 
156  def translate_date(self, md):
157  """Convert a full DATE-OBS to a mere date
158 
159  Besides being an example of a translator, this is also generally useful.
160  It will only be used if listed as a translator in the configuration.
161  """
162  date = md.getScalar("DATE-OBS").strip()
163  c = date.find("T")
164  if c > 0:
165  date = date[:c]
166  return date
167 
168  def translate_filter(self, md):
169  """Translate a full filter description into a mere filter name
170 
171  Besides being an example of a translator, this is also generally useful.
172  It will only be used if listed as a translator in the configuration.
173  """
174  filterName = md.getScalar("FILTER").strip()
175  filterName = filterName.strip()
176  c = filterName.find(" ")
177  if c > 0:
178  filterName = filterName[:c]
179  return filterName
180 
181  def getDestination(self, butler, info, filename):
182  """Get destination for the file
183 
184  @param butler Data butler
185  @param info File properties, used as dataId for the butler
186  @param filename Input filename
187  @return Destination filename
188  """
189  raw = butler.get("raw_filename", info)[0]
190  # Ensure filename is devoid of cfitsio directions about HDUs
191  c = raw.find("[")
192  if c > 0:
193  raw = raw[:c]
194  return raw
195 
196 
198  """Configuration for the RegisterTask"""
199  table = Field(dtype=str, default="raw", doc="Name of table")
200  columns = DictField(keytype=str, itemtype=str, doc="List of columns for raw table, with their types",
201  itemCheck=lambda x: x in ("text", "int", "double"),
202  default={'object': 'text',
203  'visit': 'int',
204  'ccd': 'int',
205  'filter': 'text',
206  'date': 'text',
207  'taiObs': 'text',
208  'expTime': 'double',
209  },
210  )
211  unique = ListField(dtype=str, doc="List of columns to be declared unique for the table",
212  default=["visit", "ccd"])
213  visit = ListField(dtype=str, default=["visit", "object", "date", "filter"],
214  doc="List of columns for raw_visit table")
215  ignore = Field(dtype=bool, default=False, doc="Ignore duplicates in the table?")
216  permissions = Field(dtype=int, default=0o664, doc="Permissions mode for registry; 0o664 = rw-rw-r--")
217 
218 
220  """Context manager to provide a registry
221 
222  An existing registry is copied, so that it may continue
223  to be used while we add to this new registry. Finally,
224  the new registry is moved into the right place.
225  """
226 
227  def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
228  """Construct a context manager
229 
230  @param registryName: Name of registry file
231  @param createTableFunc: Function to create tables
232  @param forceCreateTables: Force the (re-)creation of tables?
233  @param permissions: Permissions to set on database file
234  """
235  self.registryName = registryName
236  self.permissions = permissions
237 
238  updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.registryName),
239  delete=False)
240  self.updateName = updateFile.name
241 
242  if os.path.exists(registryName):
243  assertCanCopy(registryName, self.updateName)
244  os.chmod(self.updateName, os.stat(registryName).st_mode)
245  shutil.copyfile(registryName, self.updateName)
246 
247  self.conn = sqlite3.connect(self.updateName)
248  createTableFunc(self.conn, forceCreateTables=forceCreateTables)
249  os.chmod(self.updateName, self.permissions)
250 
251  def __enter__(self):
252  """Provide the 'as' value"""
253  return self.conn
254 
255  def __exit__(self, excType, excValue, traceback):
256  self.conn.commit()
257  self.conn.close()
258  if excType is None:
260  if os.path.exists(self.registryName):
261  os.unlink(self.registryName)
262  os.rename(self.updateName, self.registryName)
263  os.chmod(self.registryName, self.permissions)
264  return False # Don't suppress any exceptions
265 
266 
267 @contextmanager
269  """A context manager that doesn't provide any context
270 
271  Useful for dry runs where we don't want to actually do anything real.
272  """
273  yield
274 
275 
276 class RegisterTask(Task):
277  """Task that will generate the registry for the Mapper"""
278  ConfigClass = RegisterConfig
279  placeHolder = '?' # Placeholder for parameter substitution; this value suitable for sqlite3
280  typemap = {'text': str, 'int': int, 'double': float} # Mapping database type --> python type
281 
282  def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
283  """Open the registry and return the connection handle.
284 
285  @param directory Directory in which the registry file will be placed
286  @param create Clobber any existing registry and create a new one?
287  @param dryrun Don't do anything permanent?
288  @param name Filename of the registry
289  @return Database connection
290  """
291  if dryrun:
292  return fakeContext()
293 
294  registryName = os.path.join(directory, name)
295  context = RegistryContext(registryName, self.createTable, create, self.config.permissions)
296  return context
297 
298  def createTable(self, conn, table=None, forceCreateTables=False):
299  """Create the registry tables
300 
301  One table (typically 'raw') contains information on all files, and the
302  other (typically 'raw_visit') contains information on all visits.
303 
304  @param conn Database connection
305  @param table Name of table to create in database
306  """
307  cursor = conn.cursor()
308  if table is None:
309  table = self.config.table
310  cmd = "SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % table
311  cursor.execute(cmd)
312  if cursor.fetchone() and not forceCreateTables: # Assume if we get an answer the table exists
313  self.log.info('Table "%s" exists. Skipping creation' % table)
314  return
315  else:
316  cmd = "drop table if exists %s" % table
317  cursor.execute(cmd)
318  cmd = "drop table if exists %s_visit" % table
319  cursor.execute(cmd)
320 
321  cmd = "create table %s (id integer primary key autoincrement, " % table
322  cmd += ",".join([("%s %s" % (col, colType)) for col, colType in self.config.columns.items()])
323  if len(self.config.unique) > 0:
324  cmd += ", unique(" + ",".join(self.config.unique) + ")"
325  cmd += ")"
326  cursor.execute(cmd)
327 
328  cmd = "create table %s_visit (" % table
329  cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit])
330  cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
331  cmd += ")"
332  cursor.execute(cmd)
333 
334  conn.commit()
335 
336  def check(self, conn, info, table=None):
337  """Check for the presence of a row already
338 
339  Not sure this is required, given the 'ignore' configuration option.
340  """
341  if table is None:
342  table = self.config.table
343  if self.config.ignore or len(self.config.unique) == 0:
344  return False # Our entry could already be there, but we don't care
345  cursor = conn.cursor()
346  sql = "SELECT COUNT(*) FROM %s WHERE " % table
347  sql += " AND ".join(["%s = %s" % (col, self.placeHolder) for col in self.config.unique])
348  values = [self.typemap[self.config.columns[col]](info[col]) for col in self.config.unique]
349 
350  cursor.execute(sql, values)
351  if cursor.fetchone()[0] > 0:
352  return True
353  return False
354 
355  def addRow(self, conn, info, dryrun=False, create=False, table=None):
356  """Add a row to the file table (typically 'raw').
357 
358  @param conn Database connection
359  @param info File properties to add to database
360  @param table Name of table in database
361  """
362  if table is None:
363  table = self.config.table
364  sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
365  sql += ",".join([self.placeHolder] * len(self.config.columns))
366  values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]
367 
368  if self.config.ignore:
369  sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
370  sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
371  sql += ")"
372  values += [info[col] for col in self.config.unique]
373 
374  if dryrun:
375  print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
376  else:
377  conn.cursor().execute(sql, values)
378 
379  def addVisits(self, conn, dryrun=False, table=None):
380  """Generate the visits table (typically 'raw_visits') from the
381  file table (typically 'raw').
382 
383  @param conn Database connection
384  @param table Name of table in database
385  """
386  if table is None:
387  table = self.config.table
388  sql = "INSERT INTO %s_visit SELECT DISTINCT " % table
389  sql += ",".join(self.config.visit)
390  sql += " FROM %s AS vv1" % table
391  sql += " WHERE NOT EXISTS "
392  sql += "(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
393  if dryrun:
394  print("Would execute: %s" % sql)
395  else:
396  conn.cursor().execute(sql)
397 
398 
400  """Configuration for IngestTask"""
401  parse = ConfigurableField(target=ParseTask, doc="File parsing")
402  register = ConfigurableField(target=RegisterTask, doc="Registry entry")
403  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
404  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
405 
406 
408  """Task that will ingest images into the data repository"""
409  ConfigClass = IngestConfig
410  ArgumentParser = IngestArgumentParser
411  _DefaultName = "ingest"
412 
413  def __init__(self, *args, **kwargs):
414  super(IngestTask, self).__init__(*args, **kwargs)
415  self.makeSubtask("parse")
416  self.makeSubtask("register")
417 
418  @classmethod
419  def parseAndRun(cls):
420  """Parse the command-line arguments and run the Task"""
421  config = cls.ConfigClass()
422  parser = cls.ArgumentParser(name=cls._DefaultName)
423  args = parser.parse_args(config)
424  task = cls(config=args.config)
425  task.run(args)
426 
427  def ingest(self, infile, outfile, mode="move", dryrun=False):
428  """Ingest a file into the image repository.
429 
430  @param infile Name of input file
431  @param outfile Name of output file (file in repository)
432  @param mode Mode of ingest (copy/link/move/skip)
433  @param dryrun Only report what would occur?
434  @param Success boolean
435  """
436  if mode == "skip":
437  return True
438  if dryrun:
439  self.log.info("Would %s from %s to %s" % (mode, infile, outfile))
440  return True
441  try:
442  outdir = os.path.dirname(outfile)
443  if not os.path.isdir(outdir):
444  try:
445  os.makedirs(outdir)
446  except OSError:
447  # Silently ignore mkdir failures due to race conditions
448  if not os.path.isdir(outdir):
449  raise
450  if os.path.lexists(outfile):
451  if self.config.clobber:
452  os.unlink(outfile)
453  else:
454  raise RuntimeError("File %s already exists; consider --config clobber=True" % outfile)
455 
456  if mode == "copy":
457  assertCanCopy(infile, outfile)
458  shutil.copyfile(infile, outfile)
459  elif mode == "link":
460  os.symlink(os.path.abspath(infile), outfile)
461  elif mode == "move":
462  assertCanCopy(infile, outfile)
463  shutil.move(infile, outfile)
464  else:
465  raise AssertionError("Unknown mode: %s" % mode)
466  self.log.info("%s --<%s>--> %s" % (infile, mode, outfile))
467  except Exception as e:
468  self.log.warn("Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
469  if not self.config.allowError:
470  raise
471  return False
472  return True
473 
474  def isBadFile(self, filename, badFileList):
475  """Return whether the file qualifies as bad
476 
477  We match against the list of bad file patterns.
478  """
479  filename = os.path.basename(filename)
480  if not badFileList:
481  return False
482  for badFile in badFileList:
483  if fnmatch(filename, badFile):
484  return True
485  return False
486 
487  def isBadId(self, info, badIdList):
488  """Return whether the file information qualifies as bad
489 
490  We match against the list of bad data identifiers.
491  """
492  if not badIdList:
493  return False
494  for badId in badIdList:
495  if all(info[key] == value for key, value in badId.items()):
496  return True
497  return False
498 
499  def expandFiles(self, fileNameList):
500  """!Expand a set of filenames and globs, returning a list of filenames
501 
502  @param fileNameList A list of files and glob patterns
503 
504  N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
505  """
506  filenameList = []
507  for globPattern in fileNameList:
508  files = glob(globPattern)
509 
510  if not files: # posix behaviour is to return pattern unchanged
511  self.log.warn("%s doesn't match any file" % globPattern)
512  continue
513 
514  filenameList.extend(files)
515 
516  return filenameList
517 
518  def runFile(self, infile, registry, args):
519  """!Examine and ingest a single file
520 
521  @param infile: File to process
522  @param args: Parsed command-line arguments
523  @return parsed information from FITS HDUs or None
524  """
525  if self.isBadFile(infile, args.badFile):
526  self.log.info("Skipping declared bad file %s" % infile)
527  return None
528  try:
529  fileInfo, hduInfoList = self.parse.getInfo(infile)
530  except Exception as e:
531  if not self.config.allowError:
532  raise
533  self.log.warn("Error parsing %s (%s); skipping" % (infile, e))
534  return None
535  if self.isBadId(fileInfo, args.badId.idList):
536  self.log.info("Skipping declared bad file %s: %s" % (infile, fileInfo))
537  return
538  if registry is not None and self.register.check(registry, fileInfo):
539  if args.ignoreIngested:
540  return None
541  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
542  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
543  if not self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
544  return None
545  return hduInfoList
546 
547  def run(self, args):
548  """Ingest all specified files and add them to the registry"""
549  filenameList = self.expandFiles(args.files)
550  root = args.input
551  context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
552  with context as registry:
553  for infile in filenameList:
554  try:
555  hduInfoList = self.runFile(infile, registry, args)
556  except Exception as exc:
557  self.log.warn("Failed to ingest file %s: %s", infile, exc)
558  continue
559  if hduInfoList is None:
560  continue
561  for info in hduInfoList:
562  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
563  self.register.addVisits(registry, dryrun=args.dryrun)
564 
565 
566 def assertCanCopy(fromPath, toPath):
567  """Can I copy a file? Raise an exception is space constraints not met.
568 
569  @param fromPath Path from which the file is being copied
570  @param toPath Path to which the file is being copied
571  """
572  req = os.stat(fromPath).st_size
573  st = os.statvfs(os.path.dirname(toPath))
574  avail = st.f_bavail * st.f_frsize
575  if avail < req:
576  raise RuntimeError("Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:427
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def translate_filter(self, md)
Definition: ingest.py:168
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:499
def translate_date(self, md)
Definition: ingest.py:156
def __exit__(self, excType, excValue, traceback)
Definition: ingest.py:255
Provides consistent interface for LSST exceptions.
Definition: Exception.h:107
def getInfo(self, filename)
Definition: ingest.py:72
daf::base::PropertySet * set
Definition: fits.cc:884
def getInfoFromMetadata(self, md, info=None)
Definition: ingest.py:119
def getDestination(self, butler, info, filename)
Definition: ingest.py:181
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
Definition: ingest.py:518
def isBadFile(self, filename, badFileList)
Definition: ingest.py:474
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
Definition: ingest.py:227
def createTable(self, conn, table=None, forceCreateTables=False)
Definition: ingest.py:298
def assertCanCopy(fromPath, toPath)
Definition: ingest.py:566
def check(self, conn, info, table=None)
Definition: ingest.py:336
def __init__(self, args, kwargs)
Definition: ingest.py:40
def addVisits(self, conn, dryrun=False, table=None)
Definition: ingest.py:379
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
Definition: ingest.py:282
def __init__(self, args, kwargs)
Definition: ingest.py:413
def addRow(self, conn, info, dryrun=False, create=False, table=None)
Definition: ingest.py:355
def isBadId(self, info, badIdList)
Definition: ingest.py:487
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
bool strip
Definition: fits.cc:883