26 from fnmatch
import fnmatch
28 from contextlib
import contextmanager
30 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
37 """Argument parser to support ingesting images into the image repository"""
40 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
41 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
42 help=
"Don't perform any action?")
43 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
44 help=
"Mode of delivering the files to their destination")
45 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
46 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
47 help=
"Don't register files that have already been registered")
48 self.
add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
49 self.add_argument(
"--badFile", nargs=
"*", default=[],
50 help=
"Names of bad files (no path; wildcards allowed)")
51 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
55 """Configuration for ParseTask"""
56 translation =
DictField(keytype=str, itemtype=str, default={},
57 doc=
"Translation table for property --> header")
58 translators =
DictField(keytype=str, itemtype=str, default={},
59 doc=
"Properties and name of translator method")
60 defaults =
DictField(keytype=str, itemtype=str, default={},
61 doc=
"Default values if header is not present")
62 hdu =
Field(dtype=int, default=DEFAULT_HDU, doc=
"HDU to read for metadata")
63 extnames =
ListField(dtype=str, default=[], doc=
"Extension names to search for")
67 """Task that will parse the filename and/or its contents to get the required information
68 for putting the file in the correct location and populating the registry."""
69 ConfigClass = ParseConfig
72 """Get information about the image from the filename and its contents
74 Here, we open the image and parse the header, but one could also look at the filename itself
75 and derive information from that, or set values from the configuration.
77 @param filename Name of file to inspect
78 @return File properties; list of file properties for each extension
82 if len(self.
config.extnames) == 0:
84 return phuInfo, [phuInfo]
89 while len(extnames) > 0:
93 except Exception
as e:
94 self.
log.
warn(
"Error reading %s extensions %s: %s" % (filename, extnames, e))
100 hduInfo[
"hdu"] = extnum
101 infoList.append(hduInfo)
102 extnames.discard(ext)
103 return phuInfo, infoList
107 """ Get the name of a FITS extension.
111 md : `lsst.daf.base.PropertySet`
112 FITS header metadata.
116 result : `str` or `None`
117 The string from the EXTNAME header card if it exists. None otherwise.
126 """Attempt to pull the desired information out of the header
128 This is done through two mechanisms:
129 * translation: a property is set directly from the relevant header keyword
130 * translator: a property is set with the result of calling a method
132 The translator methods receive the header metadata and should return the
133 appropriate value, or None if the value cannot be determined.
135 @param md FITS header
136 @param info File properties, to be supplemented
141 for p, h
in self.
config.translation.items():
142 value = md.get(h,
None)
143 if value
is not None:
144 if isinstance(value, str):
145 value = value.strip()
147 elif p
in self.
config.defaults:
148 info[p] = self.
config.defaults[p]
150 self.
log.
warn(
"Unable to find value for %s (derived from %s)" % (p, h))
151 for p, t
in self.
config.translators.items():
152 func = getattr(self, t)
155 except Exception
as e:
156 self.
log.
warn(
"%s failed to translate %s: %s", t, p, e)
158 if value
is not None:
163 """Convert a full DATE-OBS to a mere date
165 Besides being an example of a translator, this is also generally useful.
166 It will only be used if listed as a translator in the configuration.
168 date = md.getScalar(
"DATE-OBS").
strip()
175 """Translate a full filter description into a mere filter name
177 Besides being an example of a translator, this is also generally useful.
178 It will only be used if listed as a translator in the configuration.
180 filterName = md.getScalar(
"FILTER").
strip()
181 filterName = filterName.strip()
182 c = filterName.find(
" ")
184 filterName = filterName[:c]
188 """Get destination for the file
190 @param butler Data butler
191 @param info File properties, used as dataId for the butler
192 @param filename Input filename
193 @return Destination filename
195 raw = butler.get(
"raw_filename", info)[0]
204 """Configuration for the RegisterTask"""
205 table =
Field(dtype=str, default=
"raw", doc=
"Name of table")
206 columns =
DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
207 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
208 default={
'object':
'text',
217 unique =
ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
218 default=[
"visit",
"ccd"])
219 visit =
ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
220 doc=
"List of columns for raw_visit table")
221 ignore =
Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
222 permissions =
Field(dtype=int, default=0o664, doc=
"Permissions mode for registry; 0o664 = rw-rw-r--")
226 """Context manager to provide a registry
229 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
230 """Construct a context manager
232 @param registryName: Name of registry file
233 @param createTableFunc: Function to create tables
234 @param forceCreateTables: Force the (re-)creation of tables?
235 @param permissions: Permissions to set on database file
237 self.
conn = sqlite3.connect(registryName)
238 os.chmod(registryName, permissions)
239 createTableFunc(self.
conn, forceCreateTables=forceCreateTables)
242 """Provide the 'as' value"""
253 """A context manager that doesn't provide any context
255 Useful for dry runs where we don't want to actually do anything real.
260 class RegisterTask(Task):
261 """Task that will generate the registry for the Mapper"""
262 ConfigClass = RegisterConfig
264 typemap = {
'text': str,
'int': int,
'double': float}
266 def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
267 """Open the registry and return the connection handle.
269 @param directory Directory in which the registry file will be placed
270 @param create Clobber any existing registry and create a new one?
271 @param dryrun Don't do anything permanent?
272 @param name Filename of the registry
273 @return Database connection
278 registryName = os.path.join(directory, name)
283 """Create the registry tables
285 One table (typically 'raw') contains information on all files, and the
286 other (typically 'raw_visit') contains information on all visits.
288 @param conn Database connection
289 @param table Name of table to create in database
291 cursor = conn.cursor()
294 cmd =
"SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % table
296 if cursor.fetchone()
and not forceCreateTables:
297 self.
log.
info(
'Table "%s" exists. Skipping creation' % table)
300 cmd =
"drop table if exists %s" % table
302 cmd =
"drop table if exists %s_visit" % table
305 cmd =
"create table %s (id integer primary key autoincrement, " % table
306 cmd +=
",".join([(
"%s %s" % (col, colType))
for col, colType
in self.
config.columns.items()])
307 if len(self.
config.unique) > 0:
308 cmd +=
", unique(" +
",".join(self.
config.unique) +
")"
312 cmd =
"create table %s_visit (" % table
313 cmd +=
",".join([(
"%s %s" % (col, self.
config.columns[col]))
for col
in self.
config.visit])
314 cmd +=
", unique(" +
",".join(
set(self.
config.visit).intersection(
set(self.
config.unique))) +
")"
320 def check(self, conn, info, table=None):
321 """Check for the presence of a row already
323 Not sure this is required, given the 'ignore' configuration option.
329 cursor = conn.cursor()
330 sql =
"SELECT COUNT(*) FROM %s WHERE " % table
331 sql +=
" AND ".join([
"%s = %s" % (col, self.
placeHolder)
for col
in self.
config.unique])
334 cursor.execute(sql, values)
335 if cursor.fetchone()[0] > 0:
339 def addRow(self, conn, info, dryrun=False, create=False, table=None):
340 """Add a row to the file table (typically 'raw').
342 @param conn Database connection
343 @param info File properties to add to database
344 @param table Name of table in database
351 ignoreClause =
" OR IGNORE"
352 sql =
"INSERT%s INTO %s (%s) VALUES (" % (ignoreClause, table,
",".join(self.
config.columns))
354 values = [self.
typemap[tt](info[col])
for col, tt
in self.
config.columns.items()]
357 print(
"Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values])))
359 conn.cursor().execute(sql, values)
361 sql =
"INSERT OR IGNORE INTO %s_visit VALUES (" % table
366 print(
"Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values])))
368 conn.cursor().execute(sql, values)
372 """Configuration for IngestTask"""
375 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
376 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
387 """Task that will ingest images into the data repository"""
388 ConfigClass = IngestConfig
389 ArgumentParser = IngestArgumentParser
390 _DefaultName =
"ingest"
393 super(IngestTask, self).
__init__(*args, **kwargs)
399 """Parse the command-line arguments and return them along with a Task
403 args = parser.parse_args(config)
404 task =
cls(config=args.config)
409 """Parse the command-line arguments and run the Task."""
414 def prepareTask(cls, root=None, dryrun=False, mode="move", create=False,
415 ignoreIngested=False):
416 """Prepare for running the task repeatedly with `ingestFiles`.
418 Saves the parsed arguments, including the Butler and log, as a
419 private instance variable.
423 root : `str`, optional
424 Repository root pathname. If None, run the Task using the
425 command line arguments, ignoring all other arguments below.
426 dryrun : `bool`, optional
427 If True, don't perform any action; log what would have happened.
428 mode : `str`, optional
429 How files are delivered to their destination. Default is "move",
430 unlike the command-line default of "link".
431 create : `bool`, optional
432 If True, create a new registry, clobbering any old one present.
433 ignoreIngested : `bool`, optional
434 If True, do not complain if the file is already present in the
435 registry (and do nothing else).
440 If `root` was provided, the IngestTask instance
442 sys.argv = [
"IngestTask"]
443 sys.argv.append(root)
445 sys.argv.append(
"--dry-run")
446 sys.argv.append(
"--mode")
447 sys.argv.append(mode)
449 sys.argv.append(
"--create")
451 sys.argv.append(
"--ignore-ingested")
452 sys.argv.append(
"__fakefile__")
458 def ingest(self, infile, outfile, mode="move", dryrun=False):
459 """Ingest a file into the image repository.
461 @param infile Name of input file
462 @param outfile Name of output file (file in repository)
463 @param mode Mode of ingest (copy/link/move/skip)
464 @param dryrun Only report what would occur?
465 @param Success boolean
470 self.
log.
info(
"Would %s from %s to %s" % (mode, infile, outfile))
473 outdir = os.path.dirname(outfile)
474 if not os.path.isdir(outdir):
477 except OSError
as exc:
479 if not os.path.isdir(outdir):
480 raise RuntimeError(f
"Failed to create directory {outdir}")
from exc
481 if os.path.lexists(outfile):
485 raise RuntimeError(
"File %s already exists; consider --config clobber=True" % outfile)
489 shutil.copyfile(infile, outfile)
491 if os.path.exists(outfile):
492 if os.path.samefile(infile, outfile):
493 self.
log.
debug(
"Already linked %s to %s: ignoring" % (infile, outfile))
495 self.
log.
warn(
"%s already has a file at the target location (%s): ignoring "
496 "(set clobber=True to overwrite)" % (infile, outfile))
498 os.symlink(os.path.abspath(infile), outfile)
501 shutil.move(infile, outfile)
503 raise AssertionError(
"Unknown mode: %s" % mode)
504 self.
log.
info(
"%s --<%s>--> %s" % (infile, mode, outfile))
505 except Exception
as e:
506 self.
log.
warn(
"Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
507 if not self.
config.allowError:
508 raise RuntimeError(f
"Failed to {mode} {infile} to {outfile}")
from e
513 """Return whether the file qualifies as bad
515 We match against the list of bad file patterns.
517 filename = os.path.basename(filename)
520 for badFile
in badFileList:
521 if fnmatch(filename, badFile):
526 """Return whether the file information qualifies as bad
528 We match against the list of bad data identifiers.
532 for badId
in badIdList:
533 if all(info[key] == value
for key, value
in badId.items()):
538 """!Expand a set of filenames and globs, returning a list of filenames
540 @param fileNameList A list of files and glob patterns
542 N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
545 for globPattern
in fileNameList:
546 files = glob(globPattern)
549 self.
log.
warn(
"%s doesn't match any file" % globPattern)
552 filenameList.extend(files)
556 def runFile(self, infile, registry, args, pos):
557 """!Examine and ingest a single file
559 @param infile: File to process
560 @param registry: Registry into which to insert Butler metadata, or None
561 @param args: Parsed command-line arguments
562 @param pos: Position number of this file in the input list
563 @return parsed information from FITS HDUs if registry is None; or None
566 self.
log.
info(
"Skipping declared bad file %s" % infile)
569 fileInfo, hduInfoList = self.parse.
getInfo(infile)
570 except Exception
as e:
571 if not self.
config.allowError:
572 raise RuntimeError(f
"Error parsing {infile}")
from e
573 self.
log.
warn(
"Error parsing %s (%s); skipping" % (infile, e))
575 if self.
isBadId(fileInfo, args.badId.idList):
576 self.
log.
info(
"Skipping declared bad file %s: %s" % (infile, fileInfo))
578 if registry
is not None and self.register.check(registry, fileInfo):
579 if args.ignoreIngested:
581 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
582 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
583 if not self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
585 if hduInfoList
is None:
589 for info
in hduInfoList:
591 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
592 except Exception
as exc:
593 raise IngestError(f
"Failed to register file {infile}", infile, pos)
from exc
597 """Ingest all specified files and add them to the registry"""
600 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
601 with context
as registry:
602 for pos
in range(len(filenameList)):
603 infile = filenameList[pos]
605 self.
runFile(infile, registry, args, pos)
606 except Exception
as exc:
607 self.
log.
warn(
"Failed to ingest file %s: %s", infile, exc)
608 if not self.
config.allowError:
609 raise IngestError(f
"Failed to ingest file {infile}", infile, pos)
from exc
613 """Ingest specified file or list of files and add them to the registry.
615 This method can only be called if `prepareTask` was used.
619 fileList : `str` or `list` [`str`]
620 Pathname or list of pathnames of files to ingest.
622 if not hasattr(self,
"_args"):
623 raise RuntimeError(
"Task not created with prepareTask")
624 if isinstance(fileList, str):
625 fileList = [fileList]
626 self._args.files = fileList
631 """Can I copy a file? Raise an exception is space constraints not met.
633 @param fromPath Path from which the file is being copied
634 @param toPath Path to which the file is being copied
636 req = os.stat(fromPath).st_size
637 st = os.statvfs(os.path.dirname(toPath))
638 avail = st.f_bavail * st.f_frsize
640 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))