11 def _convertToDate(dateString):
12 """Convert a string into a date object""" 13 return datetime.datetime.strptime(dateString,
"%Y-%m-%d").date()
17 """Task that will parse the filename and/or its contents to get the 18 required information to populate the calibration registry.""" 21 """Return a a known calibration dataset type using 22 the observation type in the header keyword OBSTYPE 24 @param filename: Input filename 27 if not md.exists(
"OBSTYPE"):
28 raise RuntimeError(
"Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
29 (filename, self.
config.hdu))
30 obstype = md.getScalar(
"OBSTYPE").
strip().lower()
33 elif "zero" in obstype
or "bias" in obstype:
35 elif "dark" in obstype:
37 elif "fringe" in obstype:
39 elif "sky" in obstype:
44 """Get destination for the file 46 @param butler Data butler 47 @param info File properties, used as dataId for the butler 48 @param filename Input filename 49 @return Destination filename 54 tempinfo = {k: v
for (k, v)
in info.items()
if v
is not None}
56 raw = butler.get(calibType +
"_filename", tempinfo)[0]
65 """Configuration for the CalibsRegisterTask""" 66 tables =
ListField(dtype=str, default=[
"bias",
"dark",
"flat",
"fringe",
"sky"], doc=
"Names of tables")
67 calibDate =
Field(dtype=str, default=
"calibDate", doc=
"Name of column for calibration date")
68 validStart =
Field(dtype=str, default=
"validStart", doc=
"Name of column for validity start")
69 validEnd =
Field(dtype=str, default=
"validEnd", doc=
"Name of column for validity stop")
70 detector =
ListField(dtype=str, default=[
"filter",
"ccd"],
71 doc=
"Columns that identify individual detectors")
72 validityUntilSuperseded =
ListField(dtype=str, default=[
"defect"],
73 doc=
"Tables for which to set validity for a calib from when it is " 74 "taken until it is superseded by the next; validity in other tables " 75 "is calculated by applying the validity range.")
79 """Task that will generate the calibration registry for the Mapper""" 80 ConfigClass = CalibsRegisterConfig
82 def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
83 """Open the registry and return the connection handle""" 84 return RegisterTask.openRegistry(self, directory, create, dryrun, name)
87 """Create the registry tables""" 88 for table
in self.
config.tables:
89 RegisterTask.createTable(self, conn, table=table)
91 def addRow(self, conn, info, *args, **kwargs):
92 """Add a row to the file table""" 93 info[self.
config.validStart] =
None 94 info[self.
config.validEnd] =
None 95 RegisterTask.addRow(self, conn, info, *args, **kwargs)
98 """Loop over all tables, filters, and ccdnums, 99 and update the validity ranges in the registry. 101 @param conn: Database connection 102 @param validity: Validity range (days) 104 conn.row_factory = sqlite3.Row
105 cursor = conn.cursor()
106 for table
in self.
config.tables:
107 sql =
"SELECT DISTINCT %s FROM %s" % (
", ".join(self.
config.detector), table)
109 rows = cursor.fetchall()
114 """Update the validity ranges among selected rows in the registry. 116 For defects, the products are valid from their start date until 117 they are superseded by subsequent defect data. 118 For other calibration products, the validity ranges are checked and 119 if there are overlaps, a midpoint is used to fix the overlaps, 120 so that the calibration data with whose date is nearest the date 121 of the observation is used. 123 @param conn: Database connection 124 @param table: Name of table to be selected 125 @param detectorData: Values identifying a detector (from columns in self.config.detector) 126 @param validity: Validity range (days) 128 columns =
", ".join([self.
config.calibDate, self.
config.validStart, self.
config.validEnd])
129 sql =
"SELECT id, %s FROM %s" % (columns, table)
130 sql +=
" WHERE " +
" AND ".join(col +
"=?" for col
in self.
config.detector)
131 sql +=
" ORDER BY " + self.
config.calibDate
132 cursor = conn.cursor()
133 cursor.execute(sql, detectorData)
134 rows = cursor.fetchall()
137 valids = collections.OrderedDict([(_convertToDate(row[self.
config.calibDate]), [
None,
None])
for 140 det =
" ".join(
"%s=%s" % (k, v)
for k, v
in zip(self.
config.detector, detectorData))
142 self.
log.
warn(
str(
"Skipped setting the validity overlaps for %s %s: missing calibration dates" %
145 dates =
list(valids.keys())
146 if table
in self.
config.validityUntilSuperseded:
148 for thisDate, nextDate
in zip(dates[:-1], dates[1:]):
149 valids[thisDate][0] = thisDate
150 valids[thisDate][1] = nextDate - datetime.timedelta(1)
151 valids[dates[-1]][0] = dates[-1]
152 valids[dates[-1]][1] = _convertToDate(
"2037-12-31")
156 valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
159 midpoints = [t1 + (t2 - t1)//2
for t1, t2
in zip(dates[:-1], dates[1:])]
160 for i, (date, midpoint)
in enumerate(zip(dates[:-1], midpoints)):
161 if valids[date][1] > midpoint:
162 nextDate = dates[i + 1]
163 valids[nextDate][0] = midpoint + datetime.timedelta(1)
164 valids[date][1] = midpoint
169 calibDate = _convertToDate(row[self.
config.calibDate])
170 validStart = valids[calibDate][0].isoformat()
171 validEnd = valids[calibDate][1].isoformat()
172 sql =
"UPDATE %s" % table
173 sql +=
" SET %s=?, %s=?" % (self.
config.validStart, self.
config.validEnd)
175 conn.execute(sql, (validStart, validEnd, row[
"id"]))
179 """Argument parser to support ingesting calibration images into the repository""" 182 InputOnlyArgumentParser.__init__(self, *args, **kwargs)
183 self.add_argument(
"-n",
"--dry-run", dest=
"dryrun", action=
"store_true",
184 default=
False, help=
"Don't perform any action?")
185 self.add_argument(
"--mode", choices=[
"move",
"copy",
"link",
"skip"], default=
"move",
186 help=
"Mode of delivering the files to their destination")
187 self.add_argument(
"--create", action=
"store_true", help=
"Create new registry?")
188 self.add_argument(
"--validity", type=int, required=
True, help=
"Calibration validity period (days)")
189 self.add_argument(
"--calibType", type=str, default=
None,
190 choices=[
None,
"bias",
"dark",
"flat",
"fringe",
"sky",
"defect"],
191 help=
"Type of the calibration data to be ingested;" 192 " if omitted, the type is determined from" 193 " the file header information")
194 self.add_argument(
"--ignore-ingested", dest=
"ignoreIngested", action=
"store_true",
195 help=
"Don't register files that have already been registered")
196 self.add_argument(
"files", nargs=
"+", help=
"Names of file")
200 """Configuration for IngestCalibsTask""" 203 allowError =
Field(dtype=bool, default=
False, doc=
"Allow error in ingestion?")
204 clobber =
Field(dtype=bool, default=
False, doc=
"Clobber existing file?")
208 """Task that generates registry for calibration images""" 209 ConfigClass = IngestCalibsConfig
210 ArgumentParser = IngestCalibsArgumentParser
211 _DefaultName =
"ingestCalibs" 214 """Ingest all specified files and add them to the registry""" 215 calibRoot = args.calib
if args.calib
is not None else args.output
217 with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun)
as registry:
218 for infile
in filenameList:
219 fileInfo, hduInfoList = self.parse.
getInfo(infile)
220 if args.calibType
is None:
221 calibType = self.parse.getCalibType(infile)
223 calibType = args.calibType
224 if calibType
not in self.register.config.tables:
225 self.
log.
warn(
str(
"Skipped adding %s of observation type '%s' to registry " 226 "(must be one of %s)" %
227 (infile, calibType,
", ".join(self.register.config.tables))))
229 if args.mode !=
'skip':
230 outfile = self.parse.getDestination(args.butler, fileInfo, infile)
231 ingested = self.
ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
233 self.
log.
warn(
str(
"Failed to ingest %s of observation type '%s'" %
234 (infile, calibType)))
236 if self.register.check(registry, fileInfo, table=calibType):
237 if args.ignoreIngested:
240 self.
log.
warn(
"%s: already ingested: %s" % (infile, fileInfo))
241 for info
in hduInfoList:
242 self.register.addRow(registry, info, dryrun=args.dryrun,
243 create=args.create, table=calibType)
245 self.register.updateValidityRanges(registry, args.validity)
247 self.
log.
info(
"Would update validity ranges here, but dryrun")
def __init__(self, args, kwargs)
def ingest(self, infile, outfile, mode="move", dryrun=False)
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
def addRow(self, conn, info, args, kwargs)
def getCalibType(self, filename)
def getDestination(self, butler, info, filename)
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
def createTable(self, conn)
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list
def updateValidityRanges(self, conn, validity)