LSSTApplications  17.0+11,17.0+35,17.0+60,17.0+61,17.0+63,17.0+7,17.0-1-g377950a+35,17.0.1-1-g114240f+2,17.0.1-1-g4d4fbc4+30,17.0.1-1-g55520dc+55,17.0.1-1-g5f4ed7e+59,17.0.1-1-g6dd7d69+22,17.0.1-1-g8de6c91+11,17.0.1-1-gb9095d2+7,17.0.1-1-ge9fec5e+5,17.0.1-1-gf4e0155+63,17.0.1-1-gfc65f5f+56,17.0.1-1-gfc6fb1f+20,17.0.1-10-g87f9f3f+9,17.0.1-12-g112a4bc+3,17.0.1-17-gab9750a3+5,17.0.1-17-gdae4c4a+16,17.0.1-19-g3a24bb2+2,17.0.1-2-g26618f5+35,17.0.1-2-g54f2ebc+9,17.0.1-2-gf403422+1,17.0.1-21-g52a398f+5,17.0.1-26-gd98a1d13,17.0.1-3-g7e86b59+45,17.0.1-3-gb5ca14a,17.0.1-3-gd08d533+46,17.0.1-31-gb0791f330,17.0.1-4-g59d126d+10,17.0.1-5-g3877d06+2,17.0.1-7-g35889ee+7,17.0.1-7-gc7c8782+20,17.0.1-7-gcb7da53+5,17.0.1-9-gc4bbfb2+10,w.2019.24
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 # This file is part of pipe_tasks.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 import os
23 import shutil
24 import tempfile
25 import sqlite3
26 from fnmatch import fnmatch
27 from glob import glob
28 from contextlib import contextmanager
29 
30 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
32 from lsst.afw.fits import readMetadata
33 from lsst.pipe.base import Task, InputOnlyArgumentParser
34 from lsst.afw.fits import DEFAULT_HDU
35 
36 
38  """Argument parser to support ingesting images into the image repository"""
39 
40  def __init__(self, *args, **kwargs):
41  super(IngestArgumentParser, self).__init__(*args, **kwargs)
42  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", default=False,
43  help="Don't perform any action?")
44  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="link",
45  help="Mode of delivering the files to their destination")
46  self.add_argument("--create", action="store_true", help="Create new registry (clobber old)?")
47  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
48  help="Don't register files that have already been registered")
49  self.add_id_argument("--badId", "raw", "Data identifier for bad data", doMakeDataRefList=False)
50  self.add_argument("--badFile", nargs="*", default=[],
51  help="Names of bad files (no path; wildcards allowed)")
52  self.add_argument("files", nargs="+", help="Names of file")
53 
54 
56  """Configuration for ParseTask"""
57  translation = DictField(keytype=str, itemtype=str, default={},
58  doc="Translation table for property --> header")
59  translators = DictField(keytype=str, itemtype=str, default={},
60  doc="Properties and name of translator method")
61  defaults = DictField(keytype=str, itemtype=str, default={},
62  doc="Default values if header is not present")
63  hdu = Field(dtype=int, default=DEFAULT_HDU, doc="HDU to read for metadata")
64  extnames = ListField(dtype=str, default=[], doc="Extension names to search for")
65 
66 
67 class ParseTask(Task):
68  """Task that will parse the filename and/or its contents to get the required information
69  for putting the file in the correct location and populating the registry."""
70  ConfigClass = ParseConfig
71 
72  def getInfo(self, filename):
73  """Get information about the image from the filename and its contents
74 
75  Here, we open the image and parse the header, but one could also look at the filename itself
76  and derive information from that, or set values from the configuration.
77 
78  @param filename Name of file to inspect
79  @return File properties; list of file properties for each extension
80  """
81  md = readMetadata(filename, self.config.hdu)
82  phuInfo = self.getInfoFromMetadata(md)
83  if len(self.config.extnames) == 0:
84  # No extensions to worry about
85  return phuInfo, [phuInfo]
86  # Look in the provided extensions
87  extnames = set(self.config.extnames)
88  extnum = 0
89  infoList = []
90  while len(extnames) > 0:
91  extnum += 1
92  try:
93  md = readMetadata(filename, extnum)
94  except Exception as e:
95  self.log.warn("Error reading %s extensions %s: %s" % (filename, extnames, e))
96  break
97  ext = self.getExtensionName(md)
98  if ext in extnames:
99  hduInfo = self.getInfoFromMetadata(md, info=phuInfo.copy())
100  # We need the HDU number when registering MEF files.
101  hduInfo["hdu"] = extnum
102  infoList.append(hduInfo)
103  extnames.discard(ext)
104  return phuInfo, infoList
105 
106  @staticmethod
108  """ Get the name of an extension.
109  @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata)
110  @return Name of the extension if it exists. None otherwise.
111  """
112  try:
113  # This returns a tuple
114  ext = md.getScalar("EXTNAME")
115  return ext[1]
117  return None
118 
119  def getInfoFromMetadata(self, md, info=None):
120  """Attempt to pull the desired information out of the header
121 
122  This is done through two mechanisms:
123  * translation: a property is set directly from the relevant header keyword
124  * translator: a property is set with the result of calling a method
125 
126  The translator methods receive the header metadata and should return the
127  appropriate value, or None if the value cannot be determined.
128 
129  @param md FITS header
130  @param info File properties, to be supplemented
131  @return info
132  """
133  if info is None:
134  info = {}
135  for p, h in self.config.translation.items():
136  if md.exists(h):
137  value = md.getScalar(h)
138  if isinstance(value, str):
139  value = value.strip()
140  info[p] = value
141  elif p in self.config.defaults:
142  info[p] = self.config.defaults[p]
143  else:
144  self.log.warn("Unable to find value for %s (derived from %s)" % (p, h))
145  for p, t in self.config.translators.items():
146  func = getattr(self, t)
147  try:
148  value = func(md)
149  except Exception as e:
150  self.log.warn("%s failed to translate %s: %s", t, p, e)
151  value = None
152  if value is not None:
153  info[p] = value
154  return info
155 
156  def translate_date(self, md):
157  """Convert a full DATE-OBS to a mere date
158 
159  Besides being an example of a translator, this is also generally useful.
160  It will only be used if listed as a translator in the configuration.
161  """
162  date = md.getScalar("DATE-OBS").strip()
163  c = date.find("T")
164  if c > 0:
165  date = date[:c]
166  return date
167 
168  def translate_filter(self, md):
169  """Translate a full filter description into a mere filter name
170 
171  Besides being an example of a translator, this is also generally useful.
172  It will only be used if listed as a translator in the configuration.
173  """
174  filterName = md.getScalar("FILTER").strip()
175  filterName = filterName.strip()
176  c = filterName.find(" ")
177  if c > 0:
178  filterName = filterName[:c]
179  return filterName
180 
181  def getDestination(self, butler, info, filename):
182  """Get destination for the file
183 
184  @param butler Data butler
185  @param info File properties, used as dataId for the butler
186  @param filename Input filename
187  @return Destination filename
188  """
189  raw = butler.get("raw_filename", info)[0]
190  # Ensure filename is devoid of cfitsio directions about HDUs
191  c = raw.find("[")
192  if c > 0:
193  raw = raw[:c]
194  return raw
195 
196 
198  """Configuration for the RegisterTask"""
199  table = Field(dtype=str, default="raw", doc="Name of table")
200  columns = DictField(keytype=str, itemtype=str, doc="List of columns for raw table, with their types",
201  itemCheck=lambda x: x in ("text", "int", "double"),
202  default={'object': 'text',
203  'visit': 'int',
204  'ccd': 'int',
205  'filter': 'text',
206  'date': 'text',
207  'taiObs': 'text',
208  'expTime': 'double',
209  },
210  )
211  unique = ListField(dtype=str, doc="List of columns to be declared unique for the table",
212  default=["visit", "ccd"])
213  visit = ListField(dtype=str, default=["visit", "object", "date", "filter"],
214  doc="List of columns for raw_visit table")
215  ignore = Field(dtype=bool, default=False, doc="Ignore duplicates in the table?")
216  permissions = Field(dtype=int, default=0o664, doc="Permissions mode for registry; 0o664 = rw-rw-r--")
217 
218 
220  """Context manager to provide a registry
221 
222  An existing registry is copied, so that it may continue
223  to be used while we add to this new registry. Finally,
224  the new registry is moved into the right place.
225  """
226 
227  def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
228  """Construct a context manager
229 
230  @param registryName: Name of registry file
231  @param createTableFunc: Function to create tables
232  @param forceCreateTables: Force the (re-)creation of tables?
233  @param permissions: Permissions to set on database file
234  """
235  self.registryName = registryName
236  self.permissions = permissions
237 
238  updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.registryName),
239  delete=False)
240  self.updateName = updateFile.name
241 
242  haveTable = False
243  if os.path.exists(registryName):
244  assertCanCopy(registryName, self.updateName)
245  os.chmod(self.updateName, os.stat(registryName).st_mode)
246  shutil.copyfile(registryName, self.updateName)
247  haveTable = True
248 
249  self.conn = sqlite3.connect(self.updateName)
250  if not haveTable or forceCreateTables:
251  createTableFunc(self.conn)
252  os.chmod(self.updateName, self.permissions)
253 
254  def __enter__(self):
255  """Provide the 'as' value"""
256  return self.conn
257 
258  def __exit__(self, excType, excValue, traceback):
259  self.conn.commit()
260  self.conn.close()
261  if excType is None:
263  if os.path.exists(self.registryName):
264  os.unlink(self.registryName)
265  os.rename(self.updateName, self.registryName)
266  os.chmod(self.registryName, self.permissions)
267  return False # Don't suppress any exceptions
268 
269 
270 @contextmanager
272  """A context manager that doesn't provide any context
273 
274  Useful for dry runs where we don't want to actually do anything real.
275  """
276  yield
277 
278 
279 class RegisterTask(Task):
280  """Task that will generate the registry for the Mapper"""
281  ConfigClass = RegisterConfig
282  placeHolder = '?' # Placeholder for parameter substitution; this value suitable for sqlite3
283  typemap = {'text': str, 'int': int, 'double': float} # Mapping database type --> python type
284 
285  def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
286  """Open the registry and return the connection handle.
287 
288  @param directory Directory in which the registry file will be placed
289  @param create Clobber any existing registry and create a new one?
290  @param dryrun Don't do anything permanent?
291  @param name Filename of the registry
292  @return Database connection
293  """
294  if dryrun:
295  return fakeContext()
296 
297  registryName = os.path.join(directory, name)
298  context = RegistryContext(registryName, self.createTable, create, self.config.permissions)
299  return context
300 
301  def createTable(self, conn, table=None):
302  """Create the registry tables
303 
304  One table (typically 'raw') contains information on all files, and the
305  other (typically 'raw_visit') contains information on all visits.
306 
307  @param conn Database connection
308  @param table Name of table to create in database
309  """
310  if table is None:
311  table = self.config.table
312  cmd = "create table %s (id integer primary key autoincrement, " % table
313  cmd += ",".join([("%s %s" % (col, colType)) for col, colType in self.config.columns.items()])
314  if len(self.config.unique) > 0:
315  cmd += ", unique(" + ",".join(self.config.unique) + ")"
316  cmd += ")"
317  conn.cursor().execute(cmd)
318 
319  cmd = "create table %s_visit (" % table
320  cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit])
321  cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
322  cmd += ")"
323  conn.cursor().execute(cmd)
324 
325  conn.commit()
326 
327  def check(self, conn, info, table=None):
328  """Check for the presence of a row already
329 
330  Not sure this is required, given the 'ignore' configuration option.
331  """
332  if table is None:
333  table = self.config.table
334  if self.config.ignore or len(self.config.unique) == 0:
335  return False # Our entry could already be there, but we don't care
336  cursor = conn.cursor()
337  sql = "SELECT COUNT(*) FROM %s WHERE " % table
338  sql += " AND ".join(["%s = %s" % (col, self.placeHolder) for col in self.config.unique])
339  values = [self.typemap[self.config.columns[col]](info[col]) for col in self.config.unique]
340 
341  cursor.execute(sql, values)
342  if cursor.fetchone()[0] > 0:
343  return True
344  return False
345 
346  def addRow(self, conn, info, dryrun=False, create=False, table=None):
347  """Add a row to the file table (typically 'raw').
348 
349  @param conn Database connection
350  @param info File properties to add to database
351  @param table Name of table in database
352  """
353  if table is None:
354  table = self.config.table
355  sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
356  sql += ",".join([self.placeHolder] * len(self.config.columns))
357  values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]
358 
359  if self.config.ignore:
360  sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
361  sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
362  sql += ")"
363  values += [info[col] for col in self.config.unique]
364 
365  if dryrun:
366  print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
367  else:
368  conn.cursor().execute(sql, values)
369 
370  def addVisits(self, conn, dryrun=False, table=None):
371  """Generate the visits table (typically 'raw_visits') from the
372  file table (typically 'raw').
373 
374  @param conn Database connection
375  @param table Name of table in database
376  """
377  if table is None:
378  table = self.config.table
379  sql = "INSERT INTO %s_visit SELECT DISTINCT " % table
380  sql += ",".join(self.config.visit)
381  sql += " FROM %s AS vv1" % table
382  sql += " WHERE NOT EXISTS "
383  sql += "(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
384  if dryrun:
385  print("Would execute: %s" % sql)
386  else:
387  conn.cursor().execute(sql)
388 
389 
391  """Configuration for IngestTask"""
392  parse = ConfigurableField(target=ParseTask, doc="File parsing")
393  register = ConfigurableField(target=RegisterTask, doc="Registry entry")
394  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
395  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
396 
397 
399  """Task that will ingest images into the data repository"""
400  ConfigClass = IngestConfig
401  ArgumentParser = IngestArgumentParser
402  _DefaultName = "ingest"
403 
404  def __init__(self, *args, **kwargs):
405  super(IngestTask, self).__init__(*args, **kwargs)
406  self.makeSubtask("parse")
407  self.makeSubtask("register")
408 
409  @classmethod
410  def parseAndRun(cls):
411  """Parse the command-line arguments and run the Task"""
412  config = cls.ConfigClass()
413  parser = cls.ArgumentParser(name=cls._DefaultName)
414  args = parser.parse_args(config)
415  task = cls(config=args.config)
416  task.run(args)
417 
418  def ingest(self, infile, outfile, mode="move", dryrun=False):
419  """Ingest a file into the image repository.
420 
421  @param infile Name of input file
422  @param outfile Name of output file (file in repository)
423  @param mode Mode of ingest (copy/link/move/skip)
424  @param dryrun Only report what would occur?
425  @param Success boolean
426  """
427  if mode == "skip":
428  return True
429  if dryrun:
430  self.log.info("Would %s from %s to %s" % (mode, infile, outfile))
431  return True
432  try:
433  outdir = os.path.dirname(outfile)
434  if not os.path.isdir(outdir):
435  try:
436  os.makedirs(outdir)
437  except OSError:
438  # Silently ignore mkdir failures due to race conditions
439  if not os.path.isdir(outdir):
440  raise
441  if os.path.lexists(outfile):
442  if self.config.clobber:
443  os.unlink(outfile)
444  else:
445  raise RuntimeError("File %s already exists; consider --config clobber=True" % outfile)
446 
447  if mode == "copy":
448  assertCanCopy(infile, outfile)
449  shutil.copyfile(infile, outfile)
450  elif mode == "link":
451  os.symlink(os.path.abspath(infile), outfile)
452  elif mode == "move":
453  assertCanCopy(infile, outfile)
454  os.rename(infile, outfile)
455  else:
456  raise AssertionError("Unknown mode: %s" % mode)
457  self.log.info("%s --<%s>--> %s" % (infile, mode, outfile))
458  except Exception as e:
459  self.log.warn("Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
460  if not self.config.allowError:
461  raise
462  return False
463  return True
464 
465  def isBadFile(self, filename, badFileList):
466  """Return whether the file qualifies as bad
467 
468  We match against the list of bad file patterns.
469  """
470  filename = os.path.basename(filename)
471  if not badFileList:
472  return False
473  for badFile in badFileList:
474  if fnmatch(filename, badFile):
475  return True
476  return False
477 
478  def isBadId(self, info, badIdList):
479  """Return whether the file information qualifies as bad
480 
481  We match against the list of bad data identifiers.
482  """
483  if not badIdList:
484  return False
485  for badId in badIdList:
486  if all(info[key] == value for key, value in badId.items()):
487  return True
488  return False
489 
490  def expandFiles(self, fileNameList):
491  """!Expand a set of filenames and globs, returning a list of filenames
492 
493  @param fileNameList A list of files and glob patterns
494 
495  N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
496  """
497  filenameList = []
498  for globPattern in fileNameList:
499  files = glob(globPattern)
500 
501  if not files: # posix behaviour is to return pattern unchanged
502  self.log.warn("%s doesn't match any file" % globPattern)
503  continue
504 
505  filenameList.extend(files)
506 
507  return filenameList
508 
509  def runFile(self, infile, registry, args):
510  """!Examine and ingest a single file
511 
512  @param infile: File to process
513  @param args: Parsed command-line arguments
514  @return parsed information from FITS HDUs or None
515  """
516  if self.isBadFile(infile, args.badFile):
517  self.log.info("Skipping declared bad file %s" % infile)
518  return None
519  try:
520  fileInfo, hduInfoList = self.parse.getInfo(infile)
521  except Exception as e:
522  if not self.config.allowError:
523  raise
524  self.log.warn("Error parsing %s (%s); skipping" % (infile, e))
525  return None
526  if self.isBadId(fileInfo, args.badId.idList):
527  self.log.info("Skipping declared bad file %s: %s" % (infile, fileInfo))
528  return
529  if registry is not None and self.register.check(registry, fileInfo):
530  if args.ignoreIngested:
531  return None
532  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
533  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
534  if not self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
535  return None
536  return hduInfoList
537 
538  def run(self, args):
539  """Ingest all specified files and add them to the registry"""
540  filenameList = self.expandFiles(args.files)
541  root = args.input
542  context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
543  with context as registry:
544  for infile in filenameList:
545  try:
546  hduInfoList = self.runFile(infile, registry, args)
547  except Exception as exc:
548  self.log.warn("Failed to ingest file %s: %s", infile, exc)
549  continue
550  if hduInfoList is None:
551  continue
552  for info in hduInfoList:
553  self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
554  self.register.addVisits(registry, dryrun=args.dryrun)
555 
556 
557 def assertCanCopy(fromPath, toPath):
558  """Can I copy a file? Raise an exception is space constraints not met.
559 
560  @param fromPath Path from which the file is being copied
561  @param toPath Path to which the file is being copied
562  """
563  req = os.stat(fromPath).st_size
564  st = os.statvfs(os.path.dirname(toPath))
565  avail = st.f_bavail * st.f_frsize
566  if avail < req:
567  raise RuntimeError("Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:418
def makeSubtask(self, name, keyArgs)
Definition: task.py:275
def translate_filter(self, md)
Definition: ingest.py:168
def createTable(self, conn, table=None)
Definition: ingest.py:301
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:490
def translate_date(self, md)
Definition: ingest.py:156
def __exit__(self, excType, excValue, traceback)
Definition: ingest.py:258
Provides consistent interface for LSST exceptions.
Definition: Exception.h:107
def getInfo(self, filename)
Definition: ingest.py:72
daf::base::PropertySet * set
Definition: fits.cc:884
def getInfoFromMetadata(self, md, info=None)
Definition: ingest.py:119
def getDestination(self, butler, info, filename)
Definition: ingest.py:181
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
Definition: ingest.py:509
def isBadFile(self, filename, badFileList)
Definition: ingest.py:465
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
Definition: ingest.py:227
def assertCanCopy(fromPath, toPath)
Definition: ingest.py:557
def check(self, conn, info, table=None)
Definition: ingest.py:327
def __init__(self, args, kwargs)
Definition: ingest.py:40
def addVisits(self, conn, dryrun=False, table=None)
Definition: ingest.py:370
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
Definition: ingest.py:285
def __init__(self, args, kwargs)
Definition: ingest.py:404
def addRow(self, conn, info, dryrun=False, create=False, table=None)
Definition: ingest.py:346
def isBadId(self, info, badIdList)
Definition: ingest.py:478
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)
bool strip
Definition: fits.cc:883