8 import sqlite
as sqlite3
9 from fnmatch
import fnmatch
12 from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
18 """Argument parser to support ingesting images into the image repository"""
20 super(IngestArgumentParser, self).
__init__(*args, **kwargs)
21 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true", default=
False,
22 help=
"Don't perform any action?")
23 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"link",
24 help=
"Mode of delivering the files to their destination")
25 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry (clobber old)?")
26 self.add_id_argument(
"--badId",
"raw",
"Data identifier for bad data", doMakeDataRefList=
False)
27 self.add_argument(
"--badFile", nargs=
"*", default=[],
28 help=
"Names of bad files (no path; wildcards allowed)")
29 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
32 """Configuration for ParseTask"""
33 translation = DictField(keytype=str, itemtype=str, default={},
34 doc=
"Translation table for property --> header")
35 translators = DictField(keytype=str, itemtype=str, default={},
36 doc=
"Properties and name of translator method")
37 defaults = DictField(keytype=str, itemtype=str, default={},
38 doc=
"Default values if header is not present")
39 hdu = Field(dtype=int, default=0, doc=
"HDU to read for metadata")
40 extnames = ListField(dtype=str, default=[], doc=
"Extension names to search for")
43 """Task that will parse the filename and/or its contents to get the required information
44 for putting the file in the correct location and populating the registry."""
45 ConfigClass = ParseConfig
48 """Get information about the image from the filename and its contents
50 Here, we open the image and parse the header, but one could also look at the filename itself
51 and derive information from that, or set values from the configuration.
53 @param filename Name of file to inspect
54 @return File properties; list of file properties for each extension
58 if len(self.config.extnames) == 0:
60 return phuInfo, [phuInfo]
62 extnames = set(self.config.extnames)
65 while len(extnames) > 0:
70 self.log.warn(
"Error reading %s extensions %s" % (filename, extnames))
76 return phuInfo, infoList
80 """ Get the name of an extension.
81 @param md: PropertySet like one obtained from afwImage.readMetadata)
82 @return Name of the extension if it exists. None otherwise.
86 ext = md.get(
"EXTNAME")
92 """Attempt to pull the desired information out of the header
94 This is done through two mechanisms:
95 * translation: a property is set directly from the relevant header keyword
96 * translator: a property is set with the result of calling a method
98 The translator methods receive the header metadata and should return the
99 appropriate value, or None if the value cannot be determined.
101 @param md FITS header
102 @param info File properties, to be supplemented
105 for p, h
in self.config.translation.iteritems():
108 if isinstance(value, basestring):
109 value = value.strip()
111 elif p
in self.config.defaults:
112 info[p] = self.config.defaults[p]
114 self.log.warn(
"Unable to find value for %s (derived from %s)" % (p, h))
115 for p, t
in self.config.translators.iteritems():
116 func = getattr(self, t)
121 if value
is not None:
126 """Convert a full DATE-OBS to a mere date
128 Besides being an example of a translator, this is also generally useful.
129 It will only be used if listed as a translator in the configuration.
131 date = md.get(
"DATE-OBS").strip()
138 """Translate a full filter description into a mere filter name
140 Besides being an example of a translator, this is also generally useful.
141 It will only be used if listed as a translator in the configuration.
143 filterName = md.get(
"FILTER").strip()
144 filterName = filterName.strip()
145 c = filterName.find(
" ")
147 filterName = filterName[:c]
151 """Get destination for the file
153 @param butler Data butler
154 @param info File properties, used as dataId for the butler
155 @param filename Input filename
156 @return Destination filename
158 raw = butler.get(
"raw_filename", info)[0]
166 """Configuration for the RegisterTask"""
167 table = Field(dtype=str, default=
"raw", doc=
"Name of table")
168 columns = DictField(keytype=str, itemtype=str, doc=
"List of columns for raw table, with their types",
169 itemCheck=
lambda x: x
in (
"text",
"int",
"double"),
170 default={
'object':
'text',
179 unique = ListField(dtype=str, doc=
"List of columns to be declared unique for the table",
180 default=[
"visit",
"ccd"])
181 visit = ListField(dtype=str, default=[
"visit",
"object",
"date",
"filter"],
182 doc=
"List of columns for raw_visit table")
183 ignore = Field(dtype=bool, default=
False, doc=
"Ignore duplicates in the table?")
184 permissions = Field(dtype=int, default=0664, doc=
"Permissions mode for registry")
187 """Context manager to provide a registry
189 An existing registry is copied, so that it may continue
190 to be used while we add to this new registry. Finally,
191 the new registry is moved into the right place.
193 def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
194 """Construct a context manager
196 @param registryName: Name of registry file
197 @param createTableFunc: Function to create tables
202 updateFile = tempfile.NamedTemporaryFile(prefix=registryName, dir=os.path.dirname(self.
registryName),
207 if os.path.exists(registryName):
209 os.chmod(self.
updateName, os.stat(registryName).st_mode)
210 shutil.copyfile(registryName, self.
updateName)
214 if not haveTable
or forceCreateTables:
215 createTableFunc(self.
conn)
219 """Provide the 'as' value"""
234 """Task that will generate the registry for the Mapper"""
235 ConfigClass = RegisterConfig
238 """Open the registry and return the connection handle.
240 @param butler Data butler, from which the registry file is determined
241 @param create Clobber any existing registry and create a new one?
242 @param dryrun Don't do anything permanent?
243 @return Database connection
246 from contextlib
import contextmanager
251 registryName = os.path.join(butler.mapper.root,
"registry.sqlite3")
252 context =
RegistryContext(registryName, self.createTable, create, self.config.permissions)
256 """Create the registry tables
258 One table (typically 'raw') contains information on all files, and the
259 other (typically 'raw_visit') contains information on all visits.
261 @param conn Database connection
263 cmd =
"create table %s (id integer primary key autoincrement, " % self.config.table
264 cmd +=
",".join([(
"%s %s" % (col, colType))
for col,colType
in self.config.columns.items()])
265 if len(self.config.unique) > 0:
266 cmd +=
", unique(" +
",".join(self.config.unique) +
")"
270 cmd =
"create table %s_visit (" % self.config.table
271 cmd +=
",".join([(
"%s %s" % (col, self.config.columns[col]))
for col
in self.config.visit])
272 cmd +=
", unique(" +
",".join(set(self.config.visit).intersection(set(self.config.unique))) +
")"
279 """Check for the presence of a row already
281 Not sure this is required, given the 'ignore' configuration option.
283 if self.config.ignore
or len(self.config.unique) == 0:
285 cursor = conn.cursor()
286 sql =
"SELECT COUNT(*) FROM %s WHERE " % self.config.table
287 sql +=
" AND ".join([
"%s=?" % col
for col
in self.config.unique])
288 values = [info[col]
for col
in self.config.unique]
290 cursor.execute(sql, values)
291 if cursor.fetchone()[0] > 0:
295 def addRow(self, conn, info, dryrun=False, create=False):
296 """Add a row to the file table (typically 'raw').
298 @param conn Database connection
299 @param info File properties to add to database
302 if self.config.ignore:
304 sql +=
" INTO %s VALUES (NULL" % self.config.table
305 sql +=
", ?" * len(self.config.columns)
307 values = [info[col]
for col
in self.config.columns]
309 print "Would execute: '%s' with %s" % (sql,
",".join([str(value)
for value
in values]))
311 conn.execute(sql, values)
314 """Generate the visits table (typically 'raw_visits') from the
315 file table (typically 'raw').
317 @param conn Database connection
319 sql =
"INSERT OR IGNORE INTO %s_visit SELECT DISTINCT " % self.config.table
320 sql +=
",".join(self.config.visit)
321 sql +=
" FROM %s" % self.config.table
323 print "Would execute: %s" % sql
329 """Configuration for IngestTask"""
330 parse = ConfigurableField(target=ParseTask, doc=
"File parsing")
331 register = ConfigurableField(target=RegisterTask, doc=
"Registry entry")
332 allowError = Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
333 clobber = Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
336 """Task that will ingest images into the data repository"""
337 ConfigClass = IngestConfig
338 ArgumentParser = IngestArgumentParser
339 _DefaultName =
"ingest"
342 super(IngestTask, self).
__init__(*args, **kwargs)
343 self.makeSubtask(
"parse")
344 self.makeSubtask(
"register")
348 """Parse the command-line arguments and run the Task"""
349 config = cls.ConfigClass()
350 parser = cls.ArgumentParser(
"ingest")
351 args = parser.parse_args(config)
352 task = cls(config=args.config)
355 def ingest(self, infile, outfile, mode="move", dryrun=False):
356 """Ingest a file into the image repository.
358 @param infile Name of input file
359 @param outfile Name of output file (file in repository)
360 @param mode Mode of ingest (copy/link/move/skip)
361 @param dryrun Only report what would occur?
362 @param Success boolean
367 self.log.info(
"Would %s from %s to %s" % (mode, infile, outfile))
370 outdir = os.path.dirname(outfile)
371 if not os.path.isdir(outdir):
376 if not os.path.isdir(outdir):
378 if self.config.clobber
and os.path.lexists(outfile):
382 shutil.copyfile(infile, outfile)
384 os.symlink(os.path.abspath(infile), outfile)
387 os.rename(infile, outfile)
389 raise AssertionError(
"Unknown mode: %s" % mode)
390 print "%s --<%s>--> %s" % (infile, mode, outfile)
392 self.log.warn(
"Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
393 if not self.config.allowError:
399 """Return whether the file qualifies as bad
401 We match against the list of bad file patterns.
403 filename = os.path.basename(filename)
406 for badFile
in badFileList:
407 if fnmatch(filename, badFile):
412 """Return whether the file information qualifies as bad
414 We match against the list of bad data identifiers.
418 for badId
in badIdList:
419 if all(info[key] == value
for key, value
in badId.iteritems()):
424 """Ingest all specified files and add them to the registry"""
425 filenameList =
sum([glob(filename)
for filename
in args.files], [])
426 context = self.register.openRegistry(args.butler, create=args.create, dryrun=args.dryrun)
427 with context
as registry:
428 for infile
in filenameList:
430 self.log.info(
"Skipping declared bad file %s" % infile)
432 fileInfo, hduInfoList = self.parse.getInfo(infile)
433 if self.
isBadId(fileInfo, args.badId.idList):
434 self.log.info(
"Skipping declared bad file %s: %s" % (infile, fileInfo))
436 if self.register.check(registry, fileInfo):
437 self.log.warn(
"%s: already ingested: %s" % (infile, fileInfo))
438 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
439 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
442 for info
in hduInfoList:
443 self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
444 self.register.addVisits(registry, dryrun=args.dryrun)
447 """Can I copy a file? Raise an exception is space constraints not met.
449 @param fromPath Path from which the file is being copied
450 @param toPath Path to which the file is being copied
452 req = os.stat(fromPath).st_size
453 st = os.statvfs(os.path.dirname(toPath))
454 avail = st.f_bavail * st.f_frsize
456 raise RuntimeError(
"Insufficient space: %d vs %d" % (req, avail))
bool all(CoordinateExpr< N > const &expr)
Return true if all elements are true.
boost::enable_if< typename ExpressionTraits< Scalar >::IsScalar, Scalar >::type sum(Scalar const &scalar)
boost::shared_ptr< daf::base::PropertySet > readMetadata(std::string const &fileName, int hdu=0, bool strip=false)
Return the metadata (header entries) from a FITS file.