LSSTApplications  16.0-10-g9d3e444,16.0-11-g09ed895+3,16.0-11-g12e47bd+4,16.0-11-g9bb73b2+10,16.0-12-g5c924a4+10,16.0-15-g7af1f30,16.0-15-gdd5ca33+2,16.0-16-gf0259e2+1,16.0-17-g31abd91+11,16.0-17-g5cf0468+3,16.0-18-g51a54b3+3,16.0-18-ga4d4bcb+5,16.0-18-gcf94535+2,16.0-19-g9d290d5+2,16.0-2-g0febb12+22,16.0-2-g9d5294e+73,16.0-2-ga8830df+7,16.0-21-g3d035912+2,16.0-26-g8e79609,16.0-28-gfc9ea6c+9,16.0-29-ge8801f9+4,16.0-3-ge00e371+38,16.0-4-g18f3627+17,16.0-4-g5f3a788+21,16.0-4-ga3eb747+11,16.0-4-gabf74b7+33,16.0-4-gb13d127+7,16.0-5-g27fb78a+11,16.0-5-g6a53317+38,16.0-5-gb3f8a4b+91,16.0-51-gbbe9c988+3,16.0-6-g9321be7+5,16.0-6-gcbc7b31+47,16.0-6-gf49912c+33,16.0-75-gbf7a9a820,16.0-8-g21fd5fe+34,16.0-8-g3a9f023+24,16.0-8-gc11f1cf,16.0-9-gf3bc169+2,16.0-9-gf5c1f43+12,master-gc237143d49,w.2019.02
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 
5 from lsst.afw.fits import readMetadata
6 from lsst.pex.config import Config, Field, ListField, ConfigurableField
7 from lsst.pipe.base import InputOnlyArgumentParser
8 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
9 
10 
11 def _convertToDate(dateString):
12  """Convert a string into a date object"""
13  return datetime.datetime.strptime(dateString, "%Y-%m-%d").date()
14 
15 
17  """Task that will parse the filename and/or its contents to get the
18  required information to populate the calibration registry."""
19 
20  def getCalibType(self, filename):
21  """Return a a known calibration dataset type using
22  the observation type in the header keyword OBSTYPE
23 
24  @param filename: Input filename
25  """
26  md = readMetadata(filename, self.config.hdu)
27  if not md.exists("OBSTYPE"):
28  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
29  (filename, self.config.hdu))
30  obstype = md.getScalar("OBSTYPE").strip().lower()
31  if "flat" in obstype:
32  obstype = "flat"
33  elif "zero" in obstype or "bias" in obstype:
34  obstype = "bias"
35  elif "dark" in obstype:
36  obstype = "dark"
37  elif "fringe" in obstype:
38  obstype = "fringe"
39  elif "sky" in obstype:
40  obstype = "sky"
41  return obstype
42 
43  def getDestination(self, butler, info, filename):
44  """Get destination for the file
45 
46  @param butler Data butler
47  @param info File properties, used as dataId for the butler
48  @param filename Input filename
49  @return Destination filename
50  """
51  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
52  # The Butler should handle this behind-the-scenes in the future.
53  # Please reference DM-9873 and delete this comment once it is resolved.
54  tempinfo = {k: v for (k, v) in info.items() if v is not None}
55  calibType = self.getCalibType(filename)
56  raw = butler.get(calibType + "_filename", tempinfo)[0]
57  # Ensure filename is devoid of cfitsio directions about HDUs
58  c = raw.find("[")
59  if c > 0:
60  raw = raw[:c]
61  return raw
62 
63 
65  """Configuration for the CalibsRegisterTask"""
66  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky"], doc="Names of tables")
67  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
68  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
69  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
70  detector = ListField(dtype=str, default=["filter", "ccd"],
71  doc="Columns that identify individual detectors")
72  validityUntilSuperseded = ListField(dtype=str, default=["defect"],
73  doc="Tables for which to set validity for a calib from when it is "
74  "taken until it is superseded by the next; validity in other tables "
75  "is calculated by applying the validity range.")
76 
77 
79  """Task that will generate the calibration registry for the Mapper"""
80  ConfigClass = CalibsRegisterConfig
81 
82  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
83  """Open the registry and return the connection handle"""
84  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
85 
86  def createTable(self, conn):
87  """Create the registry tables"""
88  for table in self.config.tables:
89  RegisterTask.createTable(self, conn, table=table)
90 
91  def addRow(self, conn, info, *args, **kwargs):
92  """Add a row to the file table"""
93  info[self.config.validStart] = None
94  info[self.config.validEnd] = None
95  RegisterTask.addRow(self, conn, info, *args, **kwargs)
96 
97  def updateValidityRanges(self, conn, validity):
98  """Loop over all tables, filters, and ccdnums,
99  and update the validity ranges in the registry.
100 
101  @param conn: Database connection
102  @param validity: Validity range (days)
103  """
104  conn.row_factory = sqlite3.Row
105  cursor = conn.cursor()
106  for table in self.config.tables:
107  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
108  cursor.execute(sql)
109  rows = cursor.fetchall()
110  for row in rows:
111  self.fixSubsetValidity(conn, table, row, validity)
112 
113  def fixSubsetValidity(self, conn, table, detectorData, validity):
114  """Update the validity ranges among selected rows in the registry.
115 
116  For defects, the products are valid from their start date until
117  they are superseded by subsequent defect data.
118  For other calibration products, the validity ranges are checked and
119  if there are overlaps, a midpoint is used to fix the overlaps,
120  so that the calibration data with whose date is nearest the date
121  of the observation is used.
122 
123  @param conn: Database connection
124  @param table: Name of table to be selected
125  @param detectorData: Values identifying a detector (from columns in self.config.detector)
126  @param validity: Validity range (days)
127  """
128  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
129  sql = "SELECT id, %s FROM %s" % (columns, table)
130  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
131  sql += " ORDER BY " + self.config.calibDate
132  cursor = conn.cursor()
133  cursor.execute(sql, detectorData)
134  rows = cursor.fetchall()
135 
136  try:
137  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
138  row in rows])
139  except Exception:
140  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
141  # Sqlite returns unicode strings, which cannot be passed through SWIG.
142  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
143  (table, det)))
144  return
145  dates = list(valids.keys())
146  if table in self.config.validityUntilSuperseded:
147  # A calib is valid until it is superseded
148  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
149  valids[thisDate][0] = thisDate
150  valids[thisDate][1] = nextDate - datetime.timedelta(1)
151  valids[dates[-1]][0] = dates[-1]
152  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
153  else:
154  # A calib is valid within the validity range (in days) specified.
155  for dd in dates:
156  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
157  # Fix the dates so that they do not overlap, which can cause the butler to find a
158  # non-unique calib.
159  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
160  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
161  if valids[date][1] > midpoint:
162  nextDate = dates[i + 1]
163  valids[nextDate][0] = midpoint + datetime.timedelta(1)
164  valids[date][1] = midpoint
165  del midpoints
166  del dates
167  # Update the validity data in the registry
168  for row in rows:
169  calibDate = _convertToDate(row[self.config.calibDate])
170  validStart = valids[calibDate][0].isoformat()
171  validEnd = valids[calibDate][1].isoformat()
172  sql = "UPDATE %s" % table
173  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
174  sql += " WHERE id=?"
175  conn.execute(sql, (validStart, validEnd, row["id"]))
176 
177 
179  """Argument parser to support ingesting calibration images into the repository"""
180 
181  def __init__(self, *args, **kwargs):
182  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
183  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
184  default=False, help="Don't perform any action?")
185  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
186  help="Mode of delivering the files to their destination")
187  self.add_argument("--create", action="store_true", help="Create new registry?")
188  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
189  self.add_argument("--calibType", type=str, default=None,
190  choices=[None, "bias", "dark", "flat", "fringe", "sky", "defect"],
191  help="Type of the calibration data to be ingested;"
192  " if omitted, the type is determined from"
193  " the file header information")
194  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
195  help="Don't register files that have already been registered")
196  self.add_argument("files", nargs="+", help="Names of file")
197 
198 
200  """Configuration for IngestCalibsTask"""
201  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
202  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
203  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
204  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
205 
206 
208  """Task that generates registry for calibration images"""
209  ConfigClass = IngestCalibsConfig
210  ArgumentParser = IngestCalibsArgumentParser
211  _DefaultName = "ingestCalibs"
212 
213  def run(self, args):
214  """Ingest all specified files and add them to the registry"""
215  calibRoot = args.calib if args.calib is not None else args.output
216  filenameList = self.expandFiles(args.files)
217  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
218  for infile in filenameList:
219  fileInfo, hduInfoList = self.parse.getInfo(infile)
220  if args.calibType is None:
221  calibType = self.parse.getCalibType(infile)
222  else:
223  calibType = args.calibType
224  if calibType not in self.register.config.tables:
225  self.log.warn(str("Skipped adding %s of observation type '%s' to registry "
226  "(must be one of %s)" %
227  (infile, calibType, ", ".join(self.register.config.tables))))
228  continue
229  if args.mode != 'skip':
230  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
231  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
232  if not ingested:
233  self.log.warn(str("Failed to ingest %s of observation type '%s'" %
234  (infile, calibType)))
235  continue
236  if self.register.check(registry, fileInfo, table=calibType):
237  if args.ignoreIngested:
238  continue
239 
240  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
241  for info in hduInfoList:
242  self.register.addRow(registry, info, dryrun=args.dryrun,
243  create=args.create, table=calibType)
244  if not args.dryrun:
245  self.register.updateValidityRanges(registry, args.validity)
246  else:
247  self.log.info("Would update validity ranges here, but dryrun")
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:397
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:469
def addRow(self, conn, info, args, kwargs)
Definition: ingestCalibs.py:91
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:43
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
Definition: ingestCalibs.py:82
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list
Definition: fits.cc:833
def updateValidityRanges(self, conn, validity)
Definition: ingestCalibs.py:97
bool strip
Definition: fits.cc:831