LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 from builtins import zip
2 import collections
3 import datetime
4 import itertools
5 import sqlite3
6 from glob import glob
7 import lsst.afw.image as afwImage
8 from lsst.pex.config import Config, Field, ListField, ConfigurableField
9 from lsst.pipe.base import InputOnlyArgumentParser
10 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
11 
12 
13 def _convertToDate(dateString):
14  """Convert a string into a date object"""
15  return datetime.datetime.strptime(dateString, "%Y-%m-%d").date()
16 
17 
19  """Task that will parse the filename and/or its contents to get the
20  required information to populate the calibration registry."""
21 
22  def getCalibType(self, filename):
23  """Return a a known calibration dataset type using
24  the observation type in the header keyword OBSTYPE
25 
26  @param filename: Input filename
27  """
28  md = afwImage.readMetadata(filename, self.config.hdu)
29  if not md.exists("OBSTYPE"):
30  raise RuntimeError("Unable to find the required header keyword OBSTYPE")
31  obstype = md.get("OBSTYPE").strip().lower()
32  if "flat" in obstype:
33  obstype = "flat"
34  elif "zero" in obstype or "bias" in obstype:
35  obstype = "bias"
36  elif "dark" in obstype:
37  obstype = "dark"
38  elif "fringe" in obstype:
39  obstype = "fringe"
40  return obstype
41 
42 
44  """Configuration for the CalibsRegisterTask"""
45  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe"], doc="Names of tables")
46  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
47  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
48  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
49  detector = ListField(dtype=str, default=["filter", "ccd"],
50  doc="Columns that identify individual detectors")
51  validityUntilSuperseded = ListField(dtype=str, default=["defect"],
52  doc="Tables for which to set validity for a calib from when it is "
53  "taken until it is superseded by the next; validity in other tables "
54  "is calculated by applying the validity range.")
55 
56 
58  """Task that will generate the calibration registry for the Mapper"""
59  ConfigClass = CalibsRegisterConfig
60 
61  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
62  """Open the registry and return the connection handle"""
63  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
64 
65  def createTable(self, conn):
66  """Create the registry tables"""
67  for table in self.config.tables:
68  RegisterTask.createTable(self, conn, table=table)
69 
70  def addRow(self, conn, info, *args, **kwargs):
71  """Add a row to the file table"""
72  info[self.config.validStart] = None
73  info[self.config.validEnd] = None
74  RegisterTask.addRow(self, conn, info, *args, **kwargs)
75 
76  def updateValidityRanges(self, conn, validity):
77  """Loop over all tables, filters, and ccdnums,
78  and update the validity ranges in the registry.
79 
80  @param conn: Database connection
81  @param validity: Validity range (days)
82  """
83  conn.row_factory = sqlite3.Row
84  cursor = conn.cursor()
85  for table in self.config.tables:
86  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
87  cursor.execute(sql)
88  rows = cursor.fetchall()
89  for row in rows:
90  self.fixSubsetValidity(conn, table, row, validity)
91 
92  def fixSubsetValidity(self, conn, table, detectorData, validity):
93  """Update the validity ranges among selected rows in the registry.
94 
95  For defects, the products are valid from their start date until
96  they are superseded by subsequent defect data.
97  For other calibration products, the validity ranges are checked and
98  if there are overlaps, a midpoint is used to fix the overlaps,
99  so that the calibration data with whose date is nearest the date
100  of the observation is used.
101 
102  @param conn: Database connection
103  @param table: Name of table to be selected
104  @param detectorData: Values identifying a detector (from columns in self.config.detector)
105  @param validity: Validity range (days)
106  """
107  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
108  sql = "SELECT id, %s FROM %s" % (columns, table)
109  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
110  sql += " ORDER BY " + self.config.calibDate
111  cursor = conn.cursor()
112  cursor.execute(sql, detectorData)
113  rows = cursor.fetchall()
114 
115  try:
116  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
117  row in rows])
118  except Exception as e:
119  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
120  # Sqlite returns unicode strings, which cannot be passed through SWIG.
121  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
122  (table, det)))
123  return
124  dates = list(valids.keys())
125  if table in self.config.validityUntilSuperseded:
126  # A calib is valid until it is superseded
127  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
128  valids[thisDate][0] = thisDate
129  valids[thisDate][1] = nextDate - datetime.timedelta(1)
130  valids[dates[-1]][0] = dates[-1]
131  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
132  else:
133  # A calib is valid within the validity range (in days) specified.
134  for dd in dates:
135  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
136  # Fix the dates so that they do not overlap, which can cause the butler to find a
137  # non-unique calib.
138  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
139  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
140  if valids[date][1] > midpoint:
141  nextDate = dates[i + 1]
142  valids[nextDate][0] = midpoint + datetime.timedelta(1)
143  valids[date][1] = midpoint
144  del midpoints
145  del dates
146  # Update the validity data in the registry
147  for row in rows:
148  calibDate = _convertToDate(row[self.config.calibDate])
149  validStart = valids[calibDate][0].isoformat()
150  validEnd = valids[calibDate][1].isoformat()
151  sql = "UPDATE %s" % table
152  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
153  sql += " WHERE id=?"
154  conn.execute(sql, (validStart, validEnd, row["id"]))
155 
156 
157 class IngestCalibsArgumentParser(InputOnlyArgumentParser):
158  """Argument parser to support ingesting calibration images into the repository"""
159 
160  def __init__(self, *args, **kwargs):
161  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
162  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
163  default=False, help="Don't perform any action?")
164  self.add_argument("--create", action="store_true", help="Create new registry?")
165  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
166  self.add_argument("--calibType", type=str, default=None,
167  choices=[None, "bias", "dark", "flat", "fringe", "defect"],
168  help="Type of the calibration data to be ingested;" +
169  " if omitted, the type is determined from" +
170  " the file header information")
171  self.add_argument("files", nargs="+", help="Names of file")
172 
173 
174 class IngestCalibsConfig(Config):
175  """Configuration for IngestCalibsTask"""
176  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
177  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
178 
179 
181  """Task that generates registry for calibration images"""
182  ConfigClass = IngestCalibsConfig
183  ArgumentParser = IngestCalibsArgumentParser
184  _DefaultName = "ingestCalibs"
185 
186  def run(self, args):
187  """Ingest all specified files and add them to the registry"""
188  calibRoot = args.calib if args.calib is not None else "."
189  filenameList = sum([glob(filename) for filename in args.files], [])
190  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
191  for infile in filenameList:
192  fileInfo, hduInfoList = self.parse.getInfo(infile)
193  if args.calibType is None:
194  calibType = self.parse.getCalibType(infile)
195  else:
196  calibType = args.calibType
197  if calibType not in self.register.config.tables:
198  self.log.warn(str("Skipped adding %s of observation type '%s' to registry" %
199  (infile, calibType)))
200  continue
201  for info in hduInfoList:
202  self.register.addRow(registry, info, dryrun=args.dryrun,
203  create=args.create, table=calibType)
204  if not args.dryrun:
205  self.register.updateValidityRanges(registry, args.validity)
206  else:
207  self.log.info("Would update validity ranges here, but dryrun")
boost::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=0, bool strip=false)
Return the metadata (header entries) from a FITS file.
Definition: Utils.h:62