26 from fnmatch
import fnmatch
28 from contextlib
import contextmanager
30 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
38 """Argument parser to support ingesting images into the image repository""" 41 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
42 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
43 help=
"Don't perform any action?")
44 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
45 help=
"Mode of delivering the files to their destination")
46 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
47 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
48 help=
"Don't register files that have already been registered")
49 self.
add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
50 self.add_argument(
"--badFile", nargs=
"*", default=[],
51 help=
"Names of bad files (no path; wildcards allowed)")
52 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
56 """Configuration for ParseTask""" 57 translation =
DictField(keytype=str, itemtype=str, default={},
58 doc=
"Translation table for property --> header")
59 translators =
DictField(keytype=str, itemtype=str, default={},
60 doc=
"Properties and name of translator method")
61 defaults =
DictField(keytype=str, itemtype=str, default={},
62 doc=
"Default values if header is not present")
63 hdu =
Field(dtype=int, default=DEFAULT_HDU, doc=
"HDU to read for metadata")
64 extnames =
ListField(dtype=str, default=[], doc=
"Extension names to search for")
68 """Task that will parse the filename and/or its contents to get the required information 69 for putting the file in the correct location and populating the registry.""" 70 ConfigClass = ParseConfig
73 """Get information about the image from the filename and its contents 75 Here, we open the image and parse the header, but one could also look at the filename itself 76 and derive information from that, or set values from the configuration. 78 @param filename Name of file to inspect 79 @return File properties; list of file properties for each extension 83 if len(self.
config.extnames) == 0:
85 return phuInfo, [phuInfo]
90 while len(extnames) > 0:
94 except Exception
as e:
95 self.
log.
warn(
"Error reading %s extensions %s: %s" % (filename, extnames, e))
101 hduInfo[
"hdu"] = extnum
102 infoList.append(hduInfo)
103 extnames.discard(ext)
104 return phuInfo, infoList
108 """ Get the name of an extension. 109 @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata) 110 @return Name of the extension if it exists. None otherwise. 114 ext = md.getScalar(
"EXTNAME")
120 """Attempt to pull the desired information out of the header 122 This is done through two mechanisms: 123 * translation: a property is set directly from the relevant header keyword 124 * translator: a property is set with the result of calling a method 126 The translator methods receive the header metadata and should return the 127 appropriate value, or None if the value cannot be determined. 129 @param md FITS header 130 @param info File properties, to be supplemented 135 for p, h
in self.
config.translation.items():
137 value = md.getScalar(h)
138 if isinstance(value, str):
139 value = value.strip()
141 elif p
in self.
config.defaults:
142 info[p] = self.
config.defaults[p]
144 self.
log.
warn(
"Unable to find value for %s (derived from %s)" % (p, h))
145 for p, t
in self.
config.translators.items():
146 func = getattr(self, t)
149 except Exception
as e:
150 self.
log.
warn(
"%s failed to translate %s: %s", t, p, e)
152 if value
is not None:
157 """Convert a full DATE-OBS to a mere date 159 Besides being an example of a translator, this is also generally useful. 160 It will only be used if listed as a translator in the configuration. 162 date = md.getScalar(
"DATE-OBS").
strip()
169 """Translate a full filter description into a mere filter name 171 Besides being an example of a translator, this is also generally useful. 172 It will only be used if listed as a translator in the configuration. 174 filterName = md.getScalar(
"FILTER").
strip()
175 filterName = filterName.strip()
176 c = filterName.find(
" ")
178 filterName = filterName[:c]
182 """Get destination for the file 184 @param butler Data butler 185 @param info File properties, used as dataId for the butler 186 @param filename Input filename 187 @return Destination filename 189 raw = butler.get(
"raw_filename", info)[0]
198 """Configuration for the RegisterTask""" 199 table =
Field(dtype=str, default=
"raw", doc=
"Name of table")
200 columns =
DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
201 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
202 default={
'object':
'text',
211 unique =
ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
212 default=[
"visit",
"ccd"])
213 visit =
ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
214 doc=
"List of columns for raw_visit table")
215 ignore =
Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
216 permissions =
Field(dtype=int, default=0o664, doc=
"Permissions mode for registry; 0o664 = rw-rw-r--")
220 """Context manager to provide a registry 222 An existing registry is copied, so that it may continue 223 to be used while we add to this new registry. Finally, 224 the new registry is moved into the right place. 227 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
228 """Construct a context manager 230 @param registryName: Name of registry file 231 @param createTableFunc: Function to create tables 232 @param forceCreateTables: Force the (re-)creation of tables? 233 @param permissions: Permissions to set on database file 238 updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.
registryName),
243 if os.path.exists(registryName):
245 os.chmod(self.
updateName, os.stat(registryName).st_mode)
246 shutil.copyfile(registryName, self.
updateName)
250 if not haveTable
or forceCreateTables:
251 createTableFunc(self.
conn)
255 """Provide the 'as' value""" 272 """A context manager that doesn't provide any context 274 Useful for dry runs where we don't want to actually do anything real. 279 class RegisterTask(Task):
280 """Task that will generate the registry for the Mapper""" 281 ConfigClass = RegisterConfig
283 typemap = {
'text': str,
'int': int,
'double': float}
285 def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
286 """Open the registry and return the connection handle. 288 @param directory Directory in which the registry file will be placed 289 @param create Clobber any existing registry and create a new one? 290 @param dryrun Don't do anything permanent? 291 @param name Filename of the registry 292 @return Database connection 297 registryName = os.path.join(directory, name)
302 """Create the registry tables 304 One table (typically 'raw') contains information on all files, and the 305 other (typically 'raw_visit') contains information on all visits. 307 @param conn Database connection 308 @param table Name of table to create in database 312 cmd =
"create table %s (id integer primary key autoincrement, " % table
313 cmd +=
",".join([(
"%s %s" % (col, colType))
for col, colType
in self.
config.columns.items()])
314 if len(self.
config.unique) > 0:
315 cmd +=
", unique(" +
",".join(self.
config.unique) +
")" 317 conn.cursor().execute(cmd)
319 cmd =
"create table %s_visit (" % table
320 cmd +=
",".join([(
"%s %s" % (col, self.
config.columns[col]))
for col
in self.
config.visit])
321 cmd +=
", unique(" +
",".join(
set(self.
config.visit).intersection(
set(self.
config.unique))) +
")" 323 conn.cursor().execute(cmd)
327 def check(self, conn, info, table=None):
328 """Check for the presence of a row already 330 Not sure this is required, given the 'ignore' configuration option. 336 cursor = conn.cursor()
337 sql =
"SELECT COUNT(*) FROM %s WHERE " % table
338 sql +=
" AND ".join([
"%s = %s" % (col, self.
placeHolder)
for col
in self.
config.unique])
341 cursor.execute(sql, values)
342 if cursor.fetchone()[0] > 0:
346 def addRow(self, conn, info, dryrun=False, create=False, table=None):
347 """Add a row to the file table (typically 'raw'). 349 @param conn Database connection 350 @param info File properties to add to database 351 @param table Name of table in database 355 sql =
"INSERT INTO %s (%s) SELECT " % (table,
",".join(self.
config.columns))
357 values = [self.
typemap[tt](info[col])
for col, tt
in self.
config.columns.items()]
360 sql +=
" WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
361 sql +=
" AND ".join([
"%s=%s" % (col, self.
placeHolder)
for col
in self.
config.unique])
363 values += [info[col]
for col
in self.
config.unique]
366 print(
"Would execute: '%s' with %s" % (sql,
",".join([
str(value)
for value
in values])))
368 conn.cursor().execute(sql, values)
371 """Generate the visits table (typically 'raw_visits') from the 372 file table (typically 'raw'). 374 @param conn Database connection 375 @param table Name of table in database 379 sql =
"INSERT INTO %s_visit SELECT DISTINCT " % table
380 sql +=
",".join(self.
config.visit)
381 sql +=
" FROM %s AS vv1" % table
382 sql +=
" WHERE NOT EXISTS " 383 sql +=
"(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
385 print(
"Would execute: %s" % sql)
387 conn.cursor().execute(sql)
391 """Configuration for IngestTask""" 394 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
395 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
399 """Task that will ingest images into the data repository""" 400 ConfigClass = IngestConfig
401 ArgumentParser = IngestArgumentParser
402 _DefaultName =
"ingest" 405 super(IngestTask, self).
__init__(*args, **kwargs)
411 """Parse the command-line arguments and run the Task""" 414 args = parser.parse_args(config)
415 task =
cls(config=args.config)
418 def ingest(self, infile, outfile, mode="move", dryrun=False):
419 """Ingest a file into the image repository. 421 @param infile Name of input file 422 @param outfile Name of output file (file in repository) 423 @param mode Mode of ingest (copy/link/move/skip) 424 @param dryrun Only report what would occur? 425 @param Success boolean 430 self.
log.
info(
"Would %s from %s to %s" % (mode, infile, outfile))
433 outdir = os.path.dirname(outfile)
434 if not os.path.isdir(outdir):
439 if not os.path.isdir(outdir):
441 if os.path.lexists(outfile):
445 raise RuntimeError(
"File %s already exists; consider --config clobber=True" % outfile)
449 shutil.copyfile(infile, outfile)
451 os.symlink(os.path.abspath(infile), outfile)
454 os.rename(infile, outfile)
456 raise AssertionError(
"Unknown mode: %s" % mode)
457 self.
log.
info(
"%s --<%s>--> %s" % (infile, mode, outfile))
458 except Exception
as e:
459 self.
log.
warn(
"Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
460 if not self.
config.allowError:
466 """Return whether the file qualifies as bad 468 We match against the list of bad file patterns. 470 filename = os.path.basename(filename)
473 for badFile
in badFileList:
474 if fnmatch(filename, badFile):
479 """Return whether the file information qualifies as bad 481 We match against the list of bad data identifiers. 485 for badId
in badIdList:
486 if all(info[key] == value
for key, value
in badId.items()):
491 """!Expand a set of filenames and globs, returning a list of filenames 493 @param fileNameList A list of files and glob patterns 495 N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged 498 for globPattern
in fileNameList:
499 files = glob(globPattern)
502 self.
log.
warn(
"%s doesn't match any file" % globPattern)
505 filenameList.extend(files)
510 """!Examine and ingest a single file 512 @param infile: File to process 513 @param args: Parsed command-line arguments 514 @return parsed information from FITS HDUs or None 517 self.
log.
info(
"Skipping declared bad file %s" % infile)
520 fileInfo, hduInfoList = self.parse.
getInfo(infile)
521 except Exception
as e:
522 if not self.
config.allowError:
524 self.
log.
warn(
"Error parsing %s (%s); skipping" % (infile, e))
526 if self.
isBadId(fileInfo, args.badId.idList):
527 self.
log.
info(
"Skipping declared bad file %s: %s" % (infile, fileInfo))
529 if registry
is not None and self.register.check(registry, fileInfo):
530 if args.ignoreIngested:
532 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
533 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
534 if not self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
539 """Ingest all specified files and add them to the registry""" 542 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
543 with context
as registry:
544 for infile
in filenameList:
546 hduInfoList = self.
runFile(infile, registry, args)
547 except Exception
as exc:
548 self.
log.
warn(
"Failed to ingest file %s: %s", infile, exc)
550 if hduInfoList
is None:
552 for info
in hduInfoList:
553 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
554 self.register.addVisits(registry, dryrun=args.dryrun)
558 """Can I copy a file? Raise an exception is space constraints not met. 560 @param fromPath Path from which the file is being copied 561 @param toPath Path to which the file is being copied 563 req = os.stat(fromPath).st_size
564 st = os.statvfs(os.path.dirname(toPath))
565 avail = st.f_bavail * st.f_frsize
567 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
def makeSubtask(self, name, keyArgs)
def translate_filter(self, md)
def createTable(self, conn, table=None)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def translate_date(self, md)
def __exit__(self, excType, excValue, traceback)
Provides consistent interface for LSST exceptions.
def getInfo(self, filename)
daf::base::PropertySet * set
def getInfoFromMetadata(self, md, info=None)
def getDestination(self, butler, info, filename)
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
def isBadFile(self, filename, badFileList)
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
def assertCanCopy(fromPath, toPath)
def check(self, conn, info, table=None)
def __init__(self, args, kwargs)
def addVisits(self, conn, dryrun=False, table=None)
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
def __init__(self, args, kwargs)
def addRow(self, conn, info, dryrun=False, create=False, table=None)
def isBadId(self, info, badIdList)
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)