11 def _convertToDate(dateString):
12 """Convert a string into a date object""" 13 return datetime.datetime.strptime(dateString,
"%Y-%m-%d").date()
17 """Task that will parse the filename and/or its contents to get the 18 required information to populate the calibration registry.""" 21 """Return a a known calibration dataset type using 22 the observation type in the header keyword OBSTYPE 24 @param filename: Input filename 27 if not md.exists(
"OBSTYPE"):
28 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
29 (filename, self.
config.hdu))
30 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
33 elif "zero" in obstype
or "bias" in obstype:
35 elif "dark" in obstype:
37 elif "fringe" in obstype:
39 elif "sky" in obstype:
41 elif "illumcor" in obstype:
46 """Get destination for the file 48 @param butler Data butler 49 @param info File properties, used as dataId for the butler 50 @param filename Input filename 51 @return Destination filename 56 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
58 raw = butler.get(calibType +
"_filename", tempinfo)[0]
67 """Configuration for the CalibsRegisterTask""" 68 tables =
ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky"], doc=
"Names of tables")
69 calibDate =
Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
70 validStart =
Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
71 validEnd =
Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
72 detector =
ListField(dtype=str, default=[
"filter",
"ccd"],
73 doc=
"Columns that identify individual detectors")
74 validityUntilSuperseded =
ListField(dtype=str, default=[
"defect"],
75 doc=
"Tables for which to set validity for a calib from when it is " 76 "taken until it is superseded by the next; validity in other tables " 77 "is calculated by applying the validity range.")
81 """Task that will generate the calibration registry for the Mapper""" 82 ConfigClass = CalibsRegisterConfig
84 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
85 """Open the registry and return the connection handle""" 86 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
89 """Create the registry tables""" 90 for table
in self.
config.tables:
91 RegisterTask.createTable(self, conn, table=table)
93 def addRow(self, conn, info, *args, **kwargs):
94 """Add a row to the file table""" 95 info[self.
config.validStart] =
None 96 info[self.
config.validEnd] =
None 97 RegisterTask.addRow(self, conn, info, *args, **kwargs)
100 """Loop over all tables, filters, and ccdnums, 101 and update the validity ranges in the registry. 103 @param conn: Database connection 104 @param validity: Validity range (days) 106 conn.row_factory = sqlite3.Row
107 cursor = conn.cursor()
108 for table
in self.
config.tables:
109 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.
config.detector), table)
111 rows = cursor.fetchall()
116 """Update the validity ranges among selected rows in the registry. 118 For defects, the products are valid from their start date until 119 they are superseded by subsequent defect data. 120 For other calibration products, the validity ranges are checked and 121 if there are overlaps, a midpoint is used to fix the overlaps, 122 so that the calibration data with whose date is nearest the date 123 of the observation is used. 125 @param conn: Database connection 126 @param table: Name of table to be selected 127 @param detectorData: Values identifying a detector (from columns in self.config.detector) 128 @param validity: Validity range (days) 130 columns =
", ".join([self.
config.calibDate, self.
config.validStart, self.
config.validEnd])
131 sql =
"SELECT id, %s FROM %s" % (columns, table)
132 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.
config.detector)
133 sql +=
" ORDER BY " + self.
config.calibDate
134 cursor = conn.cursor()
135 cursor.execute(sql, detectorData)
136 rows = cursor.fetchall()
139 valids = collections.OrderedDict([(_convertToDate(row[self.
config.calibDate]), [
None,
None])
for 142 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.
config.detector, detectorData))
144 self.
log.
warn(
str(
"Skipped setting the validity overlaps for %s %s: missing calibration dates" %
147 dates =
list(valids.keys())
148 if table
in self.
config.validityUntilSuperseded:
150 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
151 valids[thisDate][0] = thisDate
152 valids[thisDate][1] = nextDate - datetime.timedelta(1)
153 valids[dates[-1]][0] = dates[-1]
154 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
158 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
161 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
162 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
163 if valids[date][1] > midpoint:
164 nextDate = dates[i + 1]
165 valids[nextDate][0] = midpoint + datetime.timedelta(1)
166 valids[date][1] = midpoint
171 calibDate = _convertToDate(row[self.
config.calibDate])
172 validStart = valids[calibDate][0].isoformat()
173 validEnd = valids[calibDate][1].isoformat()
174 sql =
"UPDATE %s" % table
175 sql +=
" SET %s=?, %s=?" % (self.
config.validStart, self.
config.validEnd)
177 conn.execute(sql, (validStart, validEnd, row[
"id"]))
181 """Argument parser to support ingesting calibration images into the repository""" 184 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
185 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
186 default=
False, help=
"Don't perform any action?")
187 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
188 help=
"Mode of delivering the files to their destination")
189 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
190 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
191 self.add_argument(
"--calibType", type=str, default=
None,
192 choices=[
None,
"bias",
"dark",
"flat",
"fringe",
"sky",
"defect"],
193 help=
"Type of the calibration data to be ingested;" 194 " if omitted, the type is determined from" 195 " the file header information")
196 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
197 help=
"Don't register files that have already been registered")
198 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
202 """Configuration for IngestCalibsTask""" 205 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
206 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
210 """Task that generates registry for calibration images""" 211 ConfigClass = IngestCalibsConfig
212 ArgumentParser = IngestCalibsArgumentParser
213 _DefaultName =
"ingestCalibs" 216 """Ingest all specified files and add them to the registry""" 217 calibRoot = args.calib
if args.calib
is not None else args.output
219 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
220 for infile
in filenameList:
221 fileInfo, hduInfoList = self.parse.
getInfo(infile)
222 if args.calibType
is None:
223 calibType = self.parse.getCalibType(infile)
225 calibType = args.calibType
226 if calibType
not in self.register.config.tables:
227 self.
log.
warn(
str(
"Skipped adding %s of observation type '%s' to registry " 228 "(must be one of %s)" %
229 (infile, calibType,
", ".join(self.register.config.tables))))
231 if args.mode !=
'skip':
232 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
233 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
235 self.
log.
warn(
str(
"Failed to ingest %s of observation type '%s'" %
236 (infile, calibType)))
238 if self.register.check(registry, fileInfo, table=calibType):
239 if args.ignoreIngested:
242 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
243 for info
in hduInfoList:
244 self.register.addRow(registry, info, dryrun=args.dryrun,
245 create=args.create, table=calibType)
247 self.register.updateValidityRanges(registry, args.validity)
249 self.
log.
info(
"Would update validity ranges here, but dryrun")
def __init__(self, args, kwargs)
def ingest(self, infile, outfile, mode="move", dryrun=False)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def addRow(self, conn, info, args, kwargs)
def getCalibType(self, filename)
def getDestination(self, butler, info, filename)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
def createTable(self, conn)
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list
def updateValidityRanges(self, conn, validity)