LSSTApplications  17.0+11,17.0+35,17.0+60,17.0+61,17.0+63,17.0+7,17.0-1-g377950a+35,17.0.1-1-g114240f+2,17.0.1-1-g4d4fbc4+30,17.0.1-1-g55520dc+55,17.0.1-1-g5f4ed7e+59,17.0.1-1-g6dd7d69+22,17.0.1-1-g8de6c91+11,17.0.1-1-gb9095d2+7,17.0.1-1-ge9fec5e+5,17.0.1-1-gf4e0155+63,17.0.1-1-gfc65f5f+56,17.0.1-1-gfc6fb1f+20,17.0.1-10-g87f9f3f+9,17.0.1-12-g112a4bc+3,17.0.1-17-gab9750a3+5,17.0.1-17-gdae4c4a+16,17.0.1-19-g3a24bb2+2,17.0.1-2-g26618f5+35,17.0.1-2-g54f2ebc+9,17.0.1-2-gf403422+1,17.0.1-21-g52a398f+5,17.0.1-26-gd98a1d13,17.0.1-3-g7e86b59+45,17.0.1-3-gb5ca14a,17.0.1-3-gd08d533+46,17.0.1-31-gb0791f330,17.0.1-4-g59d126d+10,17.0.1-5-g3877d06+2,17.0.1-7-g35889ee+7,17.0.1-7-gc7c8782+20,17.0.1-7-gcb7da53+5,17.0.1-9-gc4bbfb2+10,w.2019.24
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 
5 from lsst.afw.fits import readMetadata
6 from lsst.pex.config import Config, Field, ListField, ConfigurableField
7 from lsst.pipe.base import InputOnlyArgumentParser
8 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
9 
10 
11 def _convertToDate(dateString):
12  """Convert a string into a date object"""
13  return datetime.datetime.strptime(dateString, "%Y-%m-%d").date()
14 
15 
17  """Task that will parse the filename and/or its contents to get the
18  required information to populate the calibration registry."""
19 
20  def getCalibType(self, filename):
21  """Return a a known calibration dataset type using
22  the observation type in the header keyword OBSTYPE
23 
24  @param filename: Input filename
25  """
26  md = readMetadata(filename, self.config.hdu)
27  if not md.exists("OBSTYPE"):
28  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
29  (filename, self.config.hdu))
30  obstype = md.getScalar("OBSTYPE").strip().lower()
31  if "flat" in obstype:
32  obstype = "flat"
33  elif "zero" in obstype or "bias" in obstype:
34  obstype = "bias"
35  elif "dark" in obstype:
36  obstype = "dark"
37  elif "fringe" in obstype:
38  obstype = "fringe"
39  elif "sky" in obstype:
40  obstype = "sky"
41  elif "illumcor" in obstype:
42  obstype = "illumcor"
43  return obstype
44 
45  def getDestination(self, butler, info, filename):
46  """Get destination for the file
47 
48  @param butler Data butler
49  @param info File properties, used as dataId for the butler
50  @param filename Input filename
51  @return Destination filename
52  """
53  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
54  # The Butler should handle this behind-the-scenes in the future.
55  # Please reference DM-9873 and delete this comment once it is resolved.
56  tempinfo = {k: v for (k, v) in info.items() if v is not None}
57  calibType = self.getCalibType(filename)
58  raw = butler.get(calibType + "_filename", tempinfo)[0]
59  # Ensure filename is devoid of cfitsio directions about HDUs
60  c = raw.find("[")
61  if c > 0:
62  raw = raw[:c]
63  return raw
64 
65 
67  """Configuration for the CalibsRegisterTask"""
68  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky"], doc="Names of tables")
69  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
70  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
71  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
72  detector = ListField(dtype=str, default=["filter", "ccd"],
73  doc="Columns that identify individual detectors")
74  validityUntilSuperseded = ListField(dtype=str, default=["defect"],
75  doc="Tables for which to set validity for a calib from when it is "
76  "taken until it is superseded by the next; validity in other tables "
77  "is calculated by applying the validity range.")
78 
79 
81  """Task that will generate the calibration registry for the Mapper"""
82  ConfigClass = CalibsRegisterConfig
83 
84  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
85  """Open the registry and return the connection handle"""
86  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
87 
88  def createTable(self, conn):
89  """Create the registry tables"""
90  for table in self.config.tables:
91  RegisterTask.createTable(self, conn, table=table)
92 
93  def addRow(self, conn, info, *args, **kwargs):
94  """Add a row to the file table"""
95  info[self.config.validStart] = None
96  info[self.config.validEnd] = None
97  RegisterTask.addRow(self, conn, info, *args, **kwargs)
98 
99  def updateValidityRanges(self, conn, validity):
100  """Loop over all tables, filters, and ccdnums,
101  and update the validity ranges in the registry.
102 
103  @param conn: Database connection
104  @param validity: Validity range (days)
105  """
106  conn.row_factory = sqlite3.Row
107  cursor = conn.cursor()
108  for table in self.config.tables:
109  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
110  cursor.execute(sql)
111  rows = cursor.fetchall()
112  for row in rows:
113  self.fixSubsetValidity(conn, table, row, validity)
114 
115  def fixSubsetValidity(self, conn, table, detectorData, validity):
116  """Update the validity ranges among selected rows in the registry.
117 
118  For defects, the products are valid from their start date until
119  they are superseded by subsequent defect data.
120  For other calibration products, the validity ranges are checked and
121  if there are overlaps, a midpoint is used to fix the overlaps,
122  so that the calibration data with whose date is nearest the date
123  of the observation is used.
124 
125  @param conn: Database connection
126  @param table: Name of table to be selected
127  @param detectorData: Values identifying a detector (from columns in self.config.detector)
128  @param validity: Validity range (days)
129  """
130  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
131  sql = "SELECT id, %s FROM %s" % (columns, table)
132  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
133  sql += " ORDER BY " + self.config.calibDate
134  cursor = conn.cursor()
135  cursor.execute(sql, detectorData)
136  rows = cursor.fetchall()
137 
138  try:
139  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
140  row in rows])
141  except Exception:
142  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
143  # Sqlite returns unicode strings, which cannot be passed through SWIG.
144  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
145  (table, det)))
146  return
147  dates = list(valids.keys())
148  if table in self.config.validityUntilSuperseded:
149  # A calib is valid until it is superseded
150  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
151  valids[thisDate][0] = thisDate
152  valids[thisDate][1] = nextDate - datetime.timedelta(1)
153  valids[dates[-1]][0] = dates[-1]
154  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
155  else:
156  # A calib is valid within the validity range (in days) specified.
157  for dd in dates:
158  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
159  # Fix the dates so that they do not overlap, which can cause the butler to find a
160  # non-unique calib.
161  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
162  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
163  if valids[date][1] > midpoint:
164  nextDate = dates[i + 1]
165  valids[nextDate][0] = midpoint + datetime.timedelta(1)
166  valids[date][1] = midpoint
167  del midpoints
168  del dates
169  # Update the validity data in the registry
170  for row in rows:
171  calibDate = _convertToDate(row[self.config.calibDate])
172  validStart = valids[calibDate][0].isoformat()
173  validEnd = valids[calibDate][1].isoformat()
174  sql = "UPDATE %s" % table
175  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
176  sql += " WHERE id=?"
177  conn.execute(sql, (validStart, validEnd, row["id"]))
178 
179 
181  """Argument parser to support ingesting calibration images into the repository"""
182 
183  def __init__(self, *args, **kwargs):
184  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
185  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
186  default=False, help="Don't perform any action?")
187  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
188  help="Mode of delivering the files to their destination")
189  self.add_argument("--create", action="store_true", help="Create new registry?")
190  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
191  self.add_argument("--calibType", type=str, default=None,
192  choices=[None, "bias", "dark", "flat", "fringe", "sky", "defect"],
193  help="Type of the calibration data to be ingested;"
194  " if omitted, the type is determined from"
195  " the file header information")
196  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
197  help="Don't register files that have already been registered")
198  self.add_argument("files", nargs="+", help="Names of file")
199 
200 
202  """Configuration for IngestCalibsTask"""
203  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
204  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
205  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
206  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
207 
208 
210  """Task that generates registry for calibration images"""
211  ConfigClass = IngestCalibsConfig
212  ArgumentParser = IngestCalibsArgumentParser
213  _DefaultName = "ingestCalibs"
214 
215  def run(self, args):
216  """Ingest all specified files and add them to the registry"""
217  calibRoot = args.calib if args.calib is not None else args.output
218  filenameList = self.expandFiles(args.files)
219  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
220  for infile in filenameList:
221  fileInfo, hduInfoList = self.parse.getInfo(infile)
222  if args.calibType is None:
223  calibType = self.parse.getCalibType(infile)
224  else:
225  calibType = args.calibType
226  if calibType not in self.register.config.tables:
227  self.log.warn(str("Skipped adding %s of observation type '%s' to registry "
228  "(must be one of %s)" %
229  (infile, calibType, ", ".join(self.register.config.tables))))
230  continue
231  if args.mode != 'skip':
232  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
233  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
234  if not ingested:
235  self.log.warn(str("Failed to ingest %s of observation type '%s'" %
236  (infile, calibType)))
237  continue
238  if self.register.check(registry, fileInfo, table=calibType):
239  if args.ignoreIngested:
240  continue
241 
242  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
243  for info in hduInfoList:
244  self.register.addRow(registry, info, dryrun=args.dryrun,
245  create=args.create, table=calibType)
246  if not args.dryrun:
247  self.register.updateValidityRanges(registry, args.validity)
248  else:
249  self.log.info("Would update validity ranges here, but dryrun")
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:418
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:490
def addRow(self, conn, info, args, kwargs)
Definition: ingestCalibs.py:93
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:45
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
Definition: ingestCalibs.py:84
def fixSubsetValidity(self, conn, table, detectorData, validity)
daf::base::PropertyList * list
Definition: fits.cc:885
def updateValidityRanges(self, conn, validity)
Definition: ingestCalibs.py:99
bool strip
Definition: fits.cc:883