26 from fnmatch
import fnmatch
28 from contextlib
import contextmanager
30 from astro_metadata_translator
import fix_header
31 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
38 """Argument parser to support ingesting images into the image repository"""
41 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
42 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
43 help=
"Don't perform any action?")
44 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
45 help=
"Mode of delivering the files to their destination")
46 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
47 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
48 help=
"Don't register files that have already been registered")
49 self.add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
50 self.add_argument(
"--badFile", nargs=
"*", default=[],
51 help=
"Names of bad files (no path; wildcards allowed)")
52 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
56 """Configuration for ParseTask"""
57 translation =
DictField(keytype=str, itemtype=str, default={},
58 doc=
"Translation table for property --> header")
59 translators =
DictField(keytype=str, itemtype=str, default={},
60 doc=
"Properties and name of translator method")
61 defaults =
DictField(keytype=str, itemtype=str, default={},
62 doc=
"Default values if header is not present")
63 hdu =
Field(dtype=int, default=DEFAULT_HDU, doc=
"HDU to read for metadata")
64 extnames =
ListField(dtype=str, default=[], doc=
"Extension names to search for")
68 """Task that will parse the filename and/or its contents to get the required information
69 for putting the file in the correct location and populating the registry."""
70 ConfigClass = ParseConfig
71 translator_class =
None
72 """Metadata translation support (astro_metadata_translator.MetadataTranslator).
76 The default of `None` will attempt to guess the correct translator,
77 but specifying one (e.g., in obs-package specific ingest) will be faster.
81 """Get information about the image from the filename and its contents
83 Here, we open the image and parse the header, but one could also look at the filename itself
84 and derive information from that, or set values from the configuration.
89 Name of file to inspect
96 List of file properties for each extension
101 if len(self.config.extnames) == 0:
103 return phuInfo, [phuInfo]
105 extnames =
set(self.config.extnames)
108 while len(extnames) > 0:
113 except Exception
as e:
114 self.log.
warning(
"Error reading %s extensions %s: %s", filename, extnames, e)
120 hduInfo[
"hdu"] = extnum
121 infoList.append(hduInfo)
122 extnames.discard(ext)
123 return phuInfo, infoList
127 """ Get the name of a FITS extension.
131 md : `lsst.daf.base.PropertySet`
132 FITS header metadata.
136 result : `str` or `None`
137 The string from the EXTNAME header card if it exists. None otherwise.
146 """Attempt to pull the desired information out of the header
148 This is done through two mechanisms:
149 * translation: a property is set directly from the relevant header keyword
150 * translator: a property is set with the result of calling a method
152 The translator methods receive the header metadata and should return the
153 appropriate value, or None if the value cannot be determined.
155 @param md FITS header
156 @param info File properties, to be supplemented
161 for p, h
in self.config.translation.items():
162 value = md.get(h,
None)
163 if value
is not None:
164 if isinstance(value, str):
165 value = value.strip()
167 elif p
in self.config.defaults:
168 info[p] = self.config.defaults[p]
170 self.log.
warning(
"Unable to find value for %s (derived from %s)", p, h)
171 for p, t
in self.config.translators.items():
172 func = getattr(self, t)
175 except Exception
as e:
176 self.log.
warning(
"%s failed to translate %s: %s", t, p, e)
178 if value
is not None:
183 """Convert a full DATE-OBS to a mere date
185 Besides being an example of a translator, this is also generally useful.
186 It will only be used if listed as a translator in the configuration.
188 date = md.getScalar(
"DATE-OBS").
strip()
195 """Translate a full filter description into a mere filter name
197 Besides being an example of a translator, this is also generally useful.
198 It will only be used if listed as a translator in the configuration.
200 filterName = md.getScalar(
"FILTER").
strip()
201 filterName = filterName.strip()
202 c = filterName.find(
" ")
204 filterName = filterName[:c]
208 """Get destination for the file
210 @param butler Data butler
211 @param info File properties, used as dataId for the butler
212 @param filename Input filename
213 @return Destination filename
215 raw = butler.get(
"raw_filename", info)[0]
224 """Configuration for the RegisterTask"""
225 table =
Field(dtype=str, default=
"raw", doc=
"Name of table")
226 columns =
DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
227 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
228 default={
'object':
'text',
237 unique =
ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
238 default=[
"visit",
"ccd"])
239 visit =
ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
240 doc=
"List of columns for raw_visit table")
241 ignore =
Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
242 permissions =
Field(dtype=int, default=0o664, doc=
"Permissions mode for registry; 0o664 = rw-rw-r--")
246 """Context manager to provide a registry
249 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
250 """Construct a context manager
252 @param registryName: Name of registry file
253 @param createTableFunc: Function to create tables
254 @param forceCreateTables: Force the (re-)creation of tables?
255 @param permissions: Permissions to set on database file
257 self.
connconn = sqlite3.connect(registryName)
258 os.chmod(registryName, permissions)
259 createTableFunc(self.
connconn, forceCreateTables=forceCreateTables)
262 """Provide the 'as' value"""
266 self.
connconn.commit()
267 self.
connconn.close()
273 """A context manager that doesn't provide any context
275 Useful for dry runs where we don't want to actually do anything real.
280 class RegisterTask(Task):
281 """Task that will generate the registry for the Mapper"""
282 ConfigClass = RegisterConfig
284 typemap = {
'text': str,
'int': int,
'double': float}
286 def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
287 """Open the registry and return the connection handle.
289 @param directory Directory in which the registry file will be placed
290 @param create Clobber any existing registry and create a new one?
291 @param dryrun Don't do anything permanent?
292 @param name Filename of the registry
293 @return Database connection
298 registryName = os.path.join(directory, name)
303 """Create the registry tables
305 One table (typically 'raw') contains information on all files, and the
306 other (typically 'raw_visit') contains information on all visits.
308 @param conn Database connection
309 @param table Name of table to create in database
311 cursor = conn.cursor()
313 table = self.config.table
314 cmd =
"SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % table
316 if cursor.fetchone()
and not forceCreateTables:
317 self.log.
info(
'Table "%s" exists. Skipping creation', table)
320 cmd =
"drop table if exists %s" % table
322 cmd =
"drop table if exists %s_visit" % table
325 cmd =
"create table %s (id integer primary key autoincrement, " % table
326 cmd +=
",".join([(
"%s %s" % (col, colType))
for col, colType
in self.config.columns.items()])
327 if len(self.config.unique) > 0:
328 cmd +=
", unique(" +
",".join(self.config.unique) +
")"
332 cmd =
"create table %s_visit (" % table
333 cmd +=
",".join([(
"%s %s" % (col, self.config.columns[col]))
for col
in self.config.visit])
334 cmd +=
", unique(" +
",".join(
set(self.config.visit).intersection(
set(self.config.unique))) +
")"
340 def check(self, conn, info, table=None):
341 """Check for the presence of a row already
343 Not sure this is required, given the 'ignore' configuration option.
346 table = self.config.table
347 if self.config.ignore
or len(self.config.unique) == 0:
349 cursor = conn.cursor()
350 sql =
"SELECT COUNT(*) FROM %s WHERE " % table
351 sql +=
" AND ".join([
"%s = %s" % (col, self.
placeHolderplaceHolder)
for col
in self.config.unique])
352 values = [self.
typemaptypemap[self.config.columns[col]](info[col])
for col
in self.config.unique]
354 cursor.execute(sql, values)
355 if cursor.fetchone()[0] > 0:
359 def addRow(self, conn, info, dryrun=False, create=False, table=None):
360 """Add a row to the file table (typically 'raw').
362 @param conn Database connection
363 @param info File properties to add to database
364 @param table Name of table in database
368 table = self.config.table
370 if self.config.ignore:
371 ignoreClause =
" OR IGNORE"
372 sql =
"INSERT%s INTO %s (%s) VALUES (" % (ignoreClause, table,
",".join(self.config.columns))
373 sql +=
",".join([self.
placeHolderplaceHolder] * len(self.config.columns)) +
")"
374 values = [self.
typemaptypemap[tt](info[col])
for col, tt
in self.config.columns.items()]
377 print(
"Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values])))
379 conn.cursor().execute(sql, values)
381 sql =
"INSERT OR IGNORE INTO %s_visit VALUES (" % table
382 sql +=
",".join([self.
placeHolderplaceHolder] * len(self.config.visit)) +
")"
383 values = [self.
typemaptypemap[self.config.columns[col]](info[col])
for col
in self.config.visit]
386 print(
"Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values])))
388 conn.cursor().execute(sql, values)
392 """Configuration for IngestTask"""
395 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
396 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
407 """Task that will ingest images into the data repository"""
408 ConfigClass = IngestConfig
409 ArgumentParser = IngestArgumentParser
410 _DefaultName =
"ingest"
413 super(IngestTask, self).
__init__(*args, **kwargs)
414 self.makeSubtask(
"parse")
415 self.makeSubtask(
"register")
419 """Parse the command-line arguments and return them along with a Task
423 args = parser.parse_args(config)
424 task = cls(config=args.config)
429 """Parse the command-line arguments and run the Task."""
430 task, args = cls.
_parse_parse()
434 def prepareTask(cls, root=None, dryrun=False, mode="move", create=False,
435 ignoreIngested=False):
436 """Prepare for running the task repeatedly with `ingestFiles`.
438 Saves the parsed arguments, including the Butler and log, as a
439 private instance variable.
443 root : `str`, optional
444 Repository root pathname. If None, run the Task using the
445 command line arguments, ignoring all other arguments below.
446 dryrun : `bool`, optional
447 If True, don't perform any action; log what would have happened.
448 mode : `str`, optional
449 How files are delivered to their destination. Default is "move",
450 unlike the command-line default of "link".
451 create : `bool`, optional
452 If True, create a new registry, clobbering any old one present.
453 ignoreIngested : `bool`, optional
454 If True, do not complain if the file is already present in the
455 registry (and do nothing else).
460 If `root` was provided, the IngestTask instance
462 sys.argv = [
"IngestTask"]
463 sys.argv.append(root)
465 sys.argv.append(
"--dry-run")
466 sys.argv.append(
"--mode")
467 sys.argv.append(mode)
469 sys.argv.append(
"--create")
471 sys.argv.append(
"--ignore-ingested")
472 sys.argv.append(
"__fakefile__")
474 task, args = cls.
_parse_parse()
478 def ingest(self, infile, outfile, mode="move", dryrun=False):
479 """Ingest a file into the image repository.
481 @param infile Name of input file
482 @param outfile Name of output file (file in repository)
483 @param mode Mode of ingest (copy/link/move/skip)
484 @param dryrun Only report what would occur?
485 @param Success boolean
490 self.log.
info(
"Would %s from %s to %s", mode, infile, outfile)
493 outdir = os.path.dirname(outfile)
494 if not os.path.isdir(outdir):
497 except OSError
as exc:
499 if not os.path.isdir(outdir):
500 raise RuntimeError(f
"Failed to create directory {outdir}")
from exc
501 if os.path.lexists(outfile):
502 if self.config.clobber:
505 raise RuntimeError(
"File %s already exists; consider --config clobber=True" % outfile)
509 shutil.copyfile(infile, outfile)
511 if os.path.exists(outfile):
512 if os.path.samefile(infile, outfile):
513 self.log.
debug(
"Already linked %s to %s: ignoring", infile, outfile)
515 self.log.
warning(
"%s already has a file at the target location (%s): ignoring "
516 "(set clobber=True to overwrite)", infile, outfile)
518 os.symlink(os.path.abspath(infile), outfile)
521 shutil.move(infile, outfile)
523 raise AssertionError(
"Unknown mode: %s" % mode)
524 self.log.
info(
"%s --<%s>--> %s", infile, mode, outfile)
525 except Exception
as e:
526 self.log.
warning(
"Failed to %s %s to %s: %s", mode, infile, outfile, e)
527 if not self.config.allowError:
528 raise RuntimeError(f
"Failed to {mode} {infile} to {outfile}")
from e
533 """Return whether the file qualifies as bad
535 We match against the list of bad file patterns.
537 filename = os.path.basename(filename)
540 for badFile
in badFileList:
541 if fnmatch(filename, badFile):
546 """Return whether the file information qualifies as bad
548 We match against the list of bad data identifiers.
552 for badId
in badIdList:
553 if all(info[key] == value
for key, value
in badId.items()):
558 """!Expand a set of filenames and globs, returning a list of filenames
560 @param fileNameList A list of files and glob patterns
562 N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged
565 for globPattern
in fileNameList:
566 files = glob(globPattern)
569 self.log.
warning(
"%s doesn't match any file", globPattern)
572 filenameList.extend(files)
576 def runFile(self, infile, registry, args, pos):
577 """!Examine and ingest a single file
579 @param infile: File to process
580 @param registry: Registry into which to insert Butler metadata, or None
581 @param args: Parsed command-line arguments
582 @param pos: Position number of this file in the input list
583 @return parsed information from FITS HDUs if registry is None; or None
585 if self.
isBadFileisBadFile(infile, args.badFile):
586 self.log.
info(
"Skipping declared bad file %s", infile)
589 fileInfo, hduInfoList = self.parse.
getInfo(infile)
590 except Exception
as e:
591 if not self.config.allowError:
592 raise RuntimeError(f
"Error parsing {infile}")
from e
593 self.log.
warning(
"Error parsing %s (%s); skipping", infile, e)
595 if self.
isBadIdisBadId(fileInfo, args.badId.idList):
596 self.log.
info(
"Skipping declared bad file %s: %s", infile, fileInfo)
598 if registry
is not None and self.register.check(registry, fileInfo):
599 if args.ignoreIngested:
601 self.log.
warning(
"%s: already ingested: %s", infile, fileInfo)
602 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
603 if not self.
ingestingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
605 if hduInfoList
is None:
609 for info
in hduInfoList:
611 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
612 except Exception
as exc:
613 raise IngestError(f
"Failed to register file {infile}", infile, pos)
from exc
617 """Ingest all specified files and add them to the registry"""
618 filenameList = self.
expandFilesexpandFiles(args.files)
620 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
621 with context
as registry:
622 for pos
in range(len(filenameList)):
623 infile = filenameList[pos]
625 self.
runFilerunFile(infile, registry, args, pos)
626 except Exception
as exc:
627 self.log.
warning(
"Failed to ingest file %s: %s", infile, exc)
628 if not self.config.allowError:
629 raise IngestError(f
"Failed to ingest file {infile}", infile, pos)
from exc
633 """Ingest specified file or list of files and add them to the registry.
635 This method can only be called if `prepareTask` was used.
639 fileList : `str` or `list` [`str`]
640 Pathname or list of pathnames of files to ingest.
642 if not hasattr(self,
"_args"):
643 raise RuntimeError(
"Task not created with prepareTask")
644 if isinstance(fileList, str):
645 fileList = [fileList]
646 self._args.files = fileList
647 self.
runrun(self._args)
651 """Can I copy a file? Raise an exception is space constraints not met.
653 @param fromPath Path from which the file is being copied
654 @param toPath Path to which the file is being copied
656 req = os.stat(fromPath).st_size
657 st = os.statvfs(os.path.dirname(toPath))
658 avail = st.f_bavail * st.f_frsize
660 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))
def __init__(self, *args, **kwargs)
def __init__(self, message, pathname, position)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def runFile(self, infile, registry, args, pos)
Examine and ingest a single file.
def ingestFiles(self, fileList)
def __init__(self, *args, **kwargs)
def prepareTask(cls, root=None, dryrun=False, mode="move", create=False, ignoreIngested=False)
def isBadFile(self, filename, badFileList)
def isBadId(self, info, badIdList)
def ingest(self, infile, outfile, mode="move", dryrun=False)
def getInfoFromMetadata(self, md, info=None)
def translate_date(self, md)
def getDestination(self, butler, info, filename)
def getInfo(self, filename)
def translate_filter(self, md)
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
def check(self, conn, info, table=None)
def addRow(self, conn, info, dryrun=False, create=False, table=None)
def createTable(self, conn, table=None, forceCreateTables=False)
def __exit__(self, excType, excValue, traceback)
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
daf::base::PropertySet * set
std::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=DEFAULT_HDU, bool strip=false)
Read FITS header.
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def assertCanCopy(fromPath, toPath)