5 from fnmatch
import fnmatch
7 from contextlib
import contextmanager
9 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
17 """Argument parser to support ingesting images into the image repository""" 20 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
21 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
22 help=
"Don't perform any action?")
23 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
24 help=
"Mode of delivering the files to their destination")
25 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
26 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
27 help=
"Don't register files that have already been registered")
28 self.
add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
29 self.add_argument(
"--badFile", nargs=
"*", default=[],
30 help=
"Names of bad files (no path; wildcards allowed)")
31 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
35 """Configuration for ParseTask""" 36 translation =
DictField(keytype=str, itemtype=str, default={},
37 doc=
"Translation table for property --> header")
38 translators =
DictField(keytype=str, itemtype=str, default={},
39 doc=
"Properties and name of translator method")
40 defaults =
DictField(keytype=str, itemtype=str, default={},
41 doc=
"Default values if header is not present")
42 hdu =
Field(dtype=int, default=DEFAULT_HDU, doc=
"HDU to read for metadata")
43 extnames =
ListField(dtype=str, default=[], doc=
"Extension names to search for")
47 """Task that will parse the filename and/or its contents to get the required information 48 for putting the file in the correct location and populating the registry.""" 49 ConfigClass = ParseConfig
52 """Get information about the image from the filename and its contents 54 Here, we open the image and parse the header, but one could also look at the filename itself 55 and derive information from that, or set values from the configuration. 57 @param filename Name of file to inspect 58 @return File properties; list of file properties for each extension 62 if len(self.
config.extnames) == 0:
64 return phuInfo, [phuInfo]
69 while len(extnames) > 0:
73 except Exception
as e:
74 self.
log.
warn(
"Error reading %s extensions %s: %s" % (filename, extnames, e))
80 hduInfo[
"hdu"] = extnum
81 infoList.append(hduInfo)
83 return phuInfo, infoList
87 """ Get the name of an extension. 88 @param md: PropertySet like one obtained from lsst.afw.fits.readMetadata) 89 @return Name of the extension if it exists. None otherwise. 93 ext = md.getScalar(
"EXTNAME")
99 """Attempt to pull the desired information out of the header 101 This is done through two mechanisms: 102 * translation: a property is set directly from the relevant header keyword 103 * translator: a property is set with the result of calling a method 105 The translator methods receive the header metadata and should return the 106 appropriate value, or None if the value cannot be determined. 108 @param md FITS header 109 @param info File properties, to be supplemented 114 for p, h
in self.
config.translation.items():
116 value = md.getScalar(h)
117 if isinstance(value, str):
118 value = value.strip()
120 elif p
in self.
config.defaults:
121 info[p] = self.
config.defaults[p]
123 self.
log.
warn(
"Unable to find value for %s (derived from %s)" % (p, h))
124 for p, t
in self.
config.translators.items():
125 func = getattr(self, t)
128 except Exception
as e:
129 self.
log.
warn(
"%s failed to translate %s: %s", t, p, e)
131 if value
is not None:
136 """Convert a full DATE-OBS to a mere date 138 Besides being an example of a translator, this is also generally useful. 139 It will only be used if listed as a translator in the configuration. 141 date = md.getScalar(
"DATE-OBS").
strip()
148 """Translate a full filter description into a mere filter name 150 Besides being an example of a translator, this is also generally useful. 151 It will only be used if listed as a translator in the configuration. 153 filterName = md.getScalar(
"FILTER").
strip()
154 filterName = filterName.strip()
155 c = filterName.find(
" ")
157 filterName = filterName[:c]
161 """Get destination for the file 163 @param butler Data butler 164 @param info File properties, used as dataId for the butler 165 @param filename Input filename 166 @return Destination filename 168 raw = butler.get(
"raw_filename", info)[0]
177 """Configuration for the RegisterTask""" 178 table =
Field(dtype=str, default=
"raw", doc=
"Name of table")
179 columns =
DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
180 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
181 default={
'object':
'text',
190 unique =
ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
191 default=[
"visit",
"ccd"])
192 visit =
ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
193 doc=
"List of columns for raw_visit table")
194 ignore =
Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
195 permissions =
Field(dtype=int, default=0o664, doc=
"Permissions mode for registry; 0o664 = rw-rw-r--")
199 """Context manager to provide a registry 201 An existing registry is copied, so that it may continue 202 to be used while we add to this new registry. Finally, 203 the new registry is moved into the right place. 206 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
207 """Construct a context manager 209 @param registryName: Name of registry file 210 @param createTableFunc: Function to create tables 211 @param forceCreateTables: Force the (re-)creation of tables? 212 @param permissions: Permissions to set on database file 217 updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.
registryName),
222 if os.path.exists(registryName):
224 os.chmod(self.
updateName, os.stat(registryName).st_mode)
225 shutil.copyfile(registryName, self.
updateName)
229 if not haveTable
or forceCreateTables:
230 createTableFunc(self.
conn)
234 """Provide the 'as' value""" 251 """A context manager that doesn't provide any context 253 Useful for dry runs where we don't want to actually do anything real. 258 class RegisterTask(Task):
259 """Task that will generate the registry for the Mapper""" 260 ConfigClass = RegisterConfig
262 typemap = {
'text': str,
'int': int,
'double': float}
264 def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
265 """Open the registry and return the connection handle. 267 @param directory Directory in which the registry file will be placed 268 @param create Clobber any existing registry and create a new one? 269 @param dryrun Don't do anything permanent? 270 @param name Filename of the registry 271 @return Database connection 276 registryName = os.path.join(directory, name)
281 """Create the registry tables 283 One table (typically 'raw') contains information on all files, and the 284 other (typically 'raw_visit') contains information on all visits. 286 @param conn Database connection 287 @param table Name of table to create in database 291 cmd =
"create table %s (id integer primary key autoincrement, " % table
292 cmd +=
",".join([(
"%s %s" % (col, colType))
for col, colType
in self.
config.columns.items()])
293 if len(self.
config.unique) > 0:
294 cmd +=
", unique(" +
",".join(self.
config.unique) +
")" 296 conn.cursor().execute(cmd)
298 cmd =
"create table %s_visit (" % table
299 cmd +=
",".join([(
"%s %s" % (col, self.
config.columns[col]))
for col
in self.
config.visit])
300 cmd +=
", unique(" +
",".join(
set(self.
config.visit).intersection(
set(self.
config.unique))) +
")" 302 conn.cursor().execute(cmd)
306 def check(self, conn, info, table=None):
307 """Check for the presence of a row already 309 Not sure this is required, given the 'ignore' configuration option. 315 cursor = conn.cursor()
316 sql =
"SELECT COUNT(*) FROM %s WHERE " % table
317 sql +=
" AND ".join([
"%s = %s" % (col, self.
placeHolder)
for col
in self.
config.unique])
320 cursor.execute(sql, values)
321 if cursor.fetchone()[0] > 0:
325 def addRow(self, conn, info, dryrun=False, create=False, table=None):
326 """Add a row to the file table (typically 'raw'). 328 @param conn Database connection 329 @param info File properties to add to database 330 @param table Name of table in database 334 sql =
"INSERT INTO %s (%s) SELECT " % (table,
",".join(self.
config.columns))
336 values = [self.
typemap[tt](info[col])
for col, tt
in self.
config.columns.items()]
339 sql +=
" WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
340 sql +=
" AND ".join([
"%s=%s" % (col, self.
placeHolder)
for col
in self.
config.unique])
342 values += [info[col]
for col
in self.
config.unique]
345 print(
"Would execute: '%s' with %s" % (sql,
",".join([
str(value)
for value
in values])))
347 conn.cursor().execute(sql, values)
350 """Generate the visits table (typically 'raw_visits') from the 351 file table (typically 'raw'). 353 @param conn Database connection 354 @param table Name of table in database 358 sql =
"INSERT INTO %s_visit SELECT DISTINCT " % table
359 sql +=
",".join(self.
config.visit)
360 sql +=
" FROM %s AS vv1" % table
361 sql +=
" WHERE NOT EXISTS " 362 sql +=
"(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
364 print(
"Would execute: %s" % sql)
366 conn.cursor().execute(sql)
370 """Configuration for IngestTask""" 373 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
374 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
378 """Task that will ingest images into the data repository""" 379 ConfigClass = IngestConfig
380 ArgumentParser = IngestArgumentParser
381 _DefaultName =
"ingest" 384 super(IngestTask, self).
__init__(*args, **kwargs)
390 """Parse the command-line arguments and run the Task""" 393 args = parser.parse_args(config)
394 task =
cls(config=args.config)
397 def ingest(self, infile, outfile, mode="move", dryrun=False):
398 """Ingest a file into the image repository. 400 @param infile Name of input file 401 @param outfile Name of output file (file in repository) 402 @param mode Mode of ingest (copy/link/move/skip) 403 @param dryrun Only report what would occur? 404 @param Success boolean 409 self.
log.
info(
"Would %s from %s to %s" % (mode, infile, outfile))
412 outdir = os.path.dirname(outfile)
413 if not os.path.isdir(outdir):
418 if not os.path.isdir(outdir):
420 if os.path.lexists(outfile):
424 raise RuntimeError(
"File %s already exists; consider --config clobber=True" % outfile)
428 shutil.copyfile(infile, outfile)
430 os.symlink(os.path.abspath(infile), outfile)
433 os.rename(infile, outfile)
435 raise AssertionError(
"Unknown mode: %s" % mode)
436 self.
log.
info(
"%s --<%s>--> %s" % (infile, mode, outfile))
437 except Exception
as e:
438 self.
log.
warn(
"Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
439 if not self.
config.allowError:
445 """Return whether the file qualifies as bad 447 We match against the list of bad file patterns. 449 filename = os.path.basename(filename)
452 for badFile
in badFileList:
453 if fnmatch(filename, badFile):
458 """Return whether the file information qualifies as bad 460 We match against the list of bad data identifiers. 464 for badId
in badIdList:
465 if all(info[key] == value
for key, value
in badId.items()):
470 """!Expand a set of filenames and globs, returning a list of filenames 472 @param fileNameList A list of files and glob patterns 474 N.b. globs obey Posix semantics, so a pattern that matches nothing is returned unchanged 477 for globPattern
in fileNameList:
478 files = glob(globPattern)
481 self.
log.
warn(
"%s doesn't match any file" % globPattern)
484 filenameList.extend(files)
489 """!Examine and ingest a single file 491 @param infile: File to process 492 @param args: Parsed command-line arguments 493 @return parsed information from FITS HDUs or None 496 self.
log.
info(
"Skipping declared bad file %s" % infile)
499 fileInfo, hduInfoList = self.parse.
getInfo(infile)
500 except Exception
as e:
501 if not self.
config.allowError:
503 self.
log.
warn(
"Error parsing %s (%s); skipping" % (infile, e))
505 if self.
isBadId(fileInfo, args.badId.idList):
506 self.
log.
info(
"Skipping declared bad file %s: %s" % (infile, fileInfo))
508 if registry
is not None and self.register.check(registry, fileInfo):
509 if args.ignoreIngested:
511 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
512 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
513 if not self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun):
518 """Ingest all specified files and add them to the registry""" 521 context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
522 with context
as registry:
523 for infile
in filenameList:
525 hduInfoList = self.
runFile(infile, registry, args)
526 except Exception
as exc:
527 self.
log.
warn(
"Failed to ingest file %s: %s", infile, exc)
529 if hduInfoList
is None:
531 for info
in hduInfoList:
532 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
533 self.register.addVisits(registry, dryrun=args.dryrun)
537 """Can I copy a file? Raise an exception is space constraints not met. 539 @param fromPath Path from which the file is being copied 540 @param toPath Path to which the file is being copied 542 req = os.stat(fromPath).st_size
543 st = os.statvfs(os.path.dirname(toPath))
544 avail = st.f_bavail * st.f_frsize
546 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))
def ingest(self, infile, outfile, mode="move", dryrun=False)
def makeSubtask(self, name, keyArgs)
def translate_filter(self, md)
def createTable(self, conn, table=None)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def translate_date(self, md)
def __exit__(self, excType, excValue, traceback)
Provides consistent interface for LSST exceptions.
def getInfo(self, filename)
daf::base::PropertySet * set
def getInfoFromMetadata(self, md, info=None)
def getDestination(self, butler, info, filename)
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
def runFile(self, infile, registry, args)
Examine and ingest a single file.
def isBadFile(self, filename, badFileList)
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions)
def assertCanCopy(fromPath, toPath)
def check(self, conn, info, table=None)
def __init__(self, args, kwargs)
def addVisits(self, conn, dryrun=False, table=None)
def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3")
def __init__(self, args, kwargs)
def addRow(self, conn, info, dryrun=False, create=False, table=None)
def isBadId(self, info, badIdList)
def add_id_argument(self, name, datasetType, help, level=None, doMakeDataRefList=True, ContainerClass=DataIdContainer)