26 from fnmatch
import fnmatch
28 from contextlib
import contextmanager
30 from lsst.pex.config
import Config, Field, DictField, ListField, ConfigurableField
38 """Argument parser to support ingesting images into the image repository""" 41 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
42 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
43 help=
"Don't perform any action?")
44 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
45 help=
"Mode of delivering the files to their destination")
46 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
47 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
48 help=
"Don't register files that have already been registered")
49 self.
add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
50 self.add_argument(
"--badFile", nargs=
"*", default=[],
51 help=
"Names of bad files (no path; wildcards allowed)")
52 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
56 """Configuration for ParseTask""" 57 translation = DictField(keytype=str, itemtype=str, default={},
58 doc=
"Translation table for property --> header")
59 translators = DictField(keytype=str, itemtype=str, default={},
60 doc=
"Properties and name of translator method")
61 defaults = DictField(keytype=str, itemtype=str, default={},
62 doc=
"Default values if header is not present")
63 hdu = Field(dtype=int, default=DEFAULT_HDU, doc=
"HDU to read for metadata")
64 extnames = ListField(dtype=str, default=[], doc=
"Extension names to search for")
68 """Task that will parse the filename and/or its contents to get the required information 69 for putting the file in the correct location and populating the registry.""" 70 ConfigClass = ParseConfig
73 """Get information about the image from the filename and its contents 75 Here, we open the image and parse the header, but one could also look at the filename itself 76 and derive information from that, or set values from the configuration. 78 @param filename Name of file to inspect 79 @return File properties; list of file properties for each extension 83 if len(self.
config.extnames) == 0:
85 return phuInfo, [phuInfo]
90 while len(extnames) > 0:
94 except Exception
as e:
95 self.
log.
warn(
"Error reading %s extensions %s: %s" % (filename, extnames, e))
101 hduInfo[
"hdu"] = extnum
102 infoList.append(hduInfo)
103 extnames.discard(ext)
104 return phuInfo, infoList
108 """ Get the name of an extension. 109 @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata) 110 @return Name of the extension if it exists. None otherwise. 114 ext = md.getScalar(
"EXTNAME")
120 """Attempt to pull the desired information out of the header 122 This is done through two mechanisms: 123 * translation: a property is set directly from the relevant header keyword 124 * translator: a property is set with the result of calling a method 126 The translator methods receive the header metadata and should return the 127 appropriate value, or None if the value cannot be determined. 129 @param md FITS header 130 @param info File properties, to be supplemented 135 for p, h
in self.
config.translation.items():
136 value = md.get(h,
None)
137 if value
is not None:
138 if isinstance(value, str):
139 value = value.strip()
141 elif p
in self.
config.defaults:
142 info[p] = self.
config.defaults[p]
144 self.
log.
warn(
"Unable to find value for %s (derived from %s)" % (p, h))
145 for p, t
in self.
config.translators.items():
146 func = getattr(self, t)
149 except Exception
as e:
150 self.
log.
warn(
"%s failed to translate %s: %s", t, p, e)
152 if value
is not None:
157 """Convert a full DATE-OBS to a mere date 159 Besides being an example of a translator, this is also generally useful. 160 It will only be used if listed as a translator in the configuration. 162 date = md.getScalar(
"DATE-OBS").
strip()
169 """Translate a full filter description into a mere filter name 171 Besides being an example of a translator, this is also generally useful. 172 It will only be used if listed as a translator in the configuration. 174 filterName = md.getScalar(
"FILTER").
strip()
175 filterName = filterName.strip()
176 c = filterName.find(
" ")
178 filterName = filterName[:c]
182 """Get destination for the file 184 @param butler Data butler 185 @param info File properties, used as dataId for the butler 186 @param filename Input filename 187 @return Destination filename 189 raw = butler.get(
"raw_filename", info)[0]
198 """Configuration for the RegisterTask""" 199 table = Field(dtype=str, default=
"raw", doc=
"Name of table")
200 columns = DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
201 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
202 default={
'object':
'text',
211 unique = ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
212 default=[
"visit",
"ccd"])
213 visit = ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
214 doc=
"List of columns for raw_visit table")
215 ignore = Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
216 permissions = Field(dtype=int, default=0o664, doc=
"Permissions mode for registry; 0o664 = rw-rw-r--")
220 """Context manager to provide a registry 222 An existing registry is copied, so that it may continue 223 to be used while we add to this new registry. Finally, 224 the new registry is moved into the right place. 227 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
228 """Construct a context manager 230 @param registryName: Name of registry file 231 @param createTableFunc: Function to create tables 232 @param forceCreateTables: Force the (re-)creation of tables? 233 @param permissions: Permissions to set on database file 238 updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.
registryName),
242 if os.path.exists(registryName):
244 os.chmod(self.
updateName, os.stat(registryName).st_mode)
245 shutil.copyfile(registryName, self.
updateName)
248 createTableFunc(self.
conn, forceCreateTables=forceCreateTables)
252 """Provide the 'as' value""" 269 """A context manager that doesn't provide any context 271 Useful for dry runs where we don't want to actually do anything real. 276 class RegisterTask(Task):
277 """Task that will generate the registry for the Mapper""" 278 ConfigClass = RegisterConfig
280 typemap = {
'text': str,
'int': int,
'double': float}
282 def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
283 """Open the registry and return the connection handle. 285 @param directory Directory in which the registry file will be placed 286 @param create Clobber any existing registry and create a new one? 287 @param dryrun Don't do anything permanent? 288 @param name Filename of the registry 289 @return Database connection 294 registryName = os.path.join(directory, name)
299 """Create the registry tables 301 One table (typically 'raw') contains information on all files, and the 302 other (typically 'raw_visit') contains information on all visits. 304 @param conn Database connection 305 @param table Name of table to create in database 307 cursor = conn.cursor()
310 cmd =
"SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % table
312 if cursor.fetchone()
and not forceCreateTables:
313 self.
log.
info(
'Table "%s" exists. Skipping creation' % table)
316 cmd =
"drop table if exists %s" % table
318 cmd =
"drop table if exists %s_visit" % table
321 cmd =
"create table %s (id integer primary key autoincrement, " % table
322 cmd +=
",".join([(
"%s %s" % (col, colType))
for col, colType
in self.
config.columns.items()])
323 if len(self.
config.unique) > 0:
324 cmd +=
", unique(" +
",".join(self.
config.unique) +
")" 328 cmd =
"create table %s_visit (" % table
329 cmd +=
",".join([(
"%s %s" % (col, self.
config.columns[col]))
for col
in self.
config.visit])
330 cmd +=
", unique(" +
",".join(
set(self.
config.visit).intersection(
set(self.
config.unique))) +
")" 336 def check(self, conn, info, table=None):
337 """Check for the presence of a row already 339 Not sure this is required, given the 'ignore' configuration option. 345 cursor = conn.cursor()
346 sql =
"SELECT COUNT(*) FROM %s WHERE " % table
347 sql +=
" AND ".join([
"%s = %s" % (col, self.
placeHolder)
for col
in self.
config.unique])
350 cursor.execute(sql, values)
351 if cursor.fetchone()[0] > 0:
355 def addRow(self, conn, info, dryrun=False, create=False, table=None):
356 """Add a row to the file table (typically 'raw'). 358 @param conn Database connection 359 @param info File properties to add to database 360 @param table Name of table in database 364 sql =
"INSERT INTO %s (%s) SELECT " % (table,
",".join(self.
config.columns))
366 values = [self.
typemap[tt](info[col])
for col, tt
in self.
config.columns.items()]
369 sql +=
" WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
370 sql +=
" AND ".join([
"%s=%s" % (col, self.
placeHolder)
for col
in self.
config.unique])
372 values += [info[col]
for col
in self.
config.unique]
375 print(
"Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values])))
377 conn.cursor().execute(sql, values)
380 """Generate the visits table (typically 'raw_visits') from the 381 file table (typically 'raw'). 383 @param conn Database connection 384 @param table Name of table in database 388 sql =
"INSERT INTO %s_visit SELECT DISTINCT " % table
389 sql +=
",".join(self.
config.visit)
390 sql +=
" FROM %s AS vv1" % table
391 sql +=
" WHERE NOT EXISTS " 392 sql +=
"(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
394 print(
"Would execute: %s" % sql)
396 conn.cursor().execute(sql)
400 """Configuration for IngestTask""" 401 parse = ConfigurableField(target=ParseTask, doc=
"File parsing")
402 register = ConfigurableField(target=RegisterTask, doc=
"Registry entry")
403 allowError = Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
404 clobber = Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
408 """Task that will ingest images into the data repository""" 409 ConfigClass = IngestConfig
410 ArgumentParser = IngestArgumentParser
411 _DefaultName =
"ingest" 414 super(IngestTask, self).
__init__(*args, **kwargs)
420 """Parse the command-line arguments and run the Task""" 423 args = parser.parse_args(config)
424 task =
cls(config=args.config)
427 def ingest(self, infile, outfile, mode="move", dryrun=False):
428 """Ingest a file into the image repository. 430 @param infile Name of input file 431 @param outfile Name of output file (file in repository) 432 @param mode Mode of ingest (copy/link/move/skip) 433 @param dryrun Only report what would occur? 434 @param Success boolean 439 self.
log.
info(
"Would %s from %s to %s" % (mode, infile, outfile))
442 outdir = os.path.dirname(outfile)
443 if not os.path.isdir(outdir):
448 if not os.path.isdir(outdir):
450 if os.path.lexists(outfile):
454 raise RuntimeError(
"File %s already exists; consider --config clobber=True" % outfile)
458 shutil.copyfile(infile, outfile)
460 os.symlink(os.path.abspath(infile), outfile)
463 shutil.move(infile, outfile)
465 raise AssertionError(
"Unknown mode: %s" % mode)
466 self.
log.
info(
"%s --<%s>--> %s" % (infile, mode, outfile))
467 except Exception
as e:
468 self.
log.
warn(
"Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
469 if not self.
config.allowError:
475 """Return whether the file qualifies as bad 477 We match against the list of bad file patterns. 479 filename = os.path.basename(filename)
482 for badFile
in badFileList:
483 if fnmatch(filename, badFile):
488 """Return whether the file information qualifies as bad 490 We match against the list of bad data identifiers. 494 for badId
in badIdList:
495 if all(info[key] == value
for key, value
in badId.items()):
500 """!Expand a set of filenames and globs, returning a list of filenames 502 @param fileNameList A list of files and glob patterns 504 N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged 507 for globPattern
in fileNameList:
508 files = glob(globPattern)
511 self.
log.
warn(
"%s doesn't match any file" % globPattern)
514 filenameList.extend(files)
519 """!Examine and ingest a single file 521 @param infile: File to process 522 @param args: Parsed command-line arguments 523 @return parsed information from FITS HDUs or None 526 self.
log.
info(
"Skipping declared bad file %s" % infile)
529 fileInfo, hduInfoList = self.parse.
getInfo(infile)
530 except Exception
as e:
531 if not self.
config.allowError:
533 self.
log.
warn(
"Error parsing %s (%s); skipping" % (infile, e))
535 if self.
isBadId(fileInfo, args.badId.idList):
536 self.
log.
info(
"Skipping declared bad file %s: %s" % (infile, fileInfo))
538 if registry
is not None and self.register.check(registry, fileInfo):
539 if args.ignoreIngested:
541 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
542 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
543 if not self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
548 """Ingest all specified files and add them to the registry""" 551 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
552 with context
as registry:
553 for infile
in filenameList:
555 hduInfoList = self.
runFile(infile, registry, args)
556 except Exception
as exc:
557 self.
log.
warn(
"Failed to ingest file %s: %s", infile, exc)
559 if hduInfoList
is None:
561 for info
in hduInfoList:
562 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
563 self.register.addVisits(registry, dryrun=args.dryrun)
567 """Can I copy a file? Raise an exception is space constraints not met. 569 @param fromPath Path from which the file is being copied 570 @param toPath Path to which the file is being copied 572 req = os.stat(fromPath).st_size
573 st = os.statvfs(os.path.dirname(toPath))
574 avail = st.f_bavail * st.f_frsize
576 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
def makeSubtask(self, name, keyArgs)
def translate_filter(self, md)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def translate_date(self, md)
def __exit__(self, excType, excValue, traceback)
Provides consistent interface for LSST exceptions.
def getInfo(self, filename)
daf::base::PropertySet * set
def getInfoFromMetadata(self, md, info=None)
def getDestination(self, butler, info, filename)
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
def isBadFile(self, filename, badFileList)
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
def createTable(self, conn, table=None, forceCreateTables=False)
def assertCanCopy(fromPath, toPath)
def check(self, conn, info, table=None)
def __init__(self, args, kwargs)
def addVisits(self, conn, dryrun=False, table=None)
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
def __init__(self, args, kwargs)
def addRow(self, conn, info, dryrun=False, create=False, table=None)
def isBadId(self, info, badIdList)
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)