LSSTApplications  19.0.0-10-g920eed2,19.0.0-11-g48a0200+2,19.0.0-18-gfc4e62b+13,19.0.0-2-g3b2f90d+2,19.0.0-2-gd671419+5,19.0.0-20-g5a5a17ab+11,19.0.0-21-g2644856+13,19.0.0-23-g84eeccb+1,19.0.0-24-g878c510+1,19.0.0-25-g6c8df7140,19.0.0-25-gb330496+1,19.0.0-3-g2b32d65+5,19.0.0-3-g8227491+12,19.0.0-3-g9c54d0d+12,19.0.0-3-gca68e65+8,19.0.0-3-gcfc5f51+5,19.0.0-3-ge110943+11,19.0.0-3-ge74d124,19.0.0-3-gfe04aa6+13,19.0.0-30-g9c3fd16+1,19.0.0-4-g06f5963+5,19.0.0-4-g3d16501+13,19.0.0-4-g4a9c019+5,19.0.0-4-g5a8b323,19.0.0-4-g66397f0+1,19.0.0-4-g8278b9b+1,19.0.0-4-g8557e14,19.0.0-4-g8964aba+13,19.0.0-4-ge404a01+12,19.0.0-5-g40f3a5a,19.0.0-5-g4db63b3,19.0.0-5-gfb03ce7+13,19.0.0-6-gbaebbfb+12,19.0.0-61-gec4c6e08+1,19.0.0-7-g039c0b5+11,19.0.0-7-gbea9075+4,19.0.0-7-gc567de5+13,19.0.0-71-g41c0270,19.0.0-9-g2f02add+1,19.0.0-9-g463f923+12,w.2020.22
LSSTDataManagementBasePackage
ingestCalibs.py
Go to the documentation of this file.
1 import collections
2 import datetime
3 import sqlite3
4 from dateutil import parser
5 
6 from lsst.afw.fits import readMetadata
7 from lsst.pex.config import Config, Field, ListField, ConfigurableField
8 from lsst.pipe.base import InputOnlyArgumentParser
9 from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask
10 
11 
12 def _convertToDate(dateString):
13  """Convert a string into a date object"""
14  return parser.parse(dateString).date()
15 
16 
18  """Task that will parse the filename and/or its contents to get the
19  required information to populate the calibration registry."""
20 
21  def getCalibType(self, filename):
22  """Return a a known calibration dataset type using
23  the observation type in the header keyword OBSTYPE
24 
25  @param filename: Input filename
26  """
27  md = readMetadata(filename, self.config.hdu)
28  if not md.exists("OBSTYPE"):
29  raise RuntimeError("Unable to find the required header keyword OBSTYPE in %s, hdu %d" %
30  (filename, self.config.hdu))
31  obstype = md.getScalar("OBSTYPE").strip().lower()
32  if "flat" in obstype:
33  obstype = "flat"
34  elif "zero" in obstype or "bias" in obstype:
35  obstype = "bias"
36  elif "dark" in obstype:
37  obstype = "dark"
38  elif "fringe" in obstype:
39  obstype = "fringe"
40  elif "sky" in obstype:
41  obstype = "sky"
42  elif "illumcor" in obstype:
43  obstype = "illumcor"
44  elif "defects" in obstype:
45  obstype = "defects"
46  elif "qe_curve" in obstype:
47  obstype = "qe_curve"
48  elif "linearizer" in obstype:
49  obstype = "linearizer"
50  return obstype
51 
52  def getDestination(self, butler, info, filename):
53  """Get destination for the file
54 
55  @param butler Data butler
56  @param info File properties, used as dataId for the butler
57  @param filename Input filename
58  @return Destination filename
59  """
60  # 'tempinfo' was added as part of DM-5466 to strip Nones from info.
61  # The Butler should handle this behind-the-scenes in the future.
62  # Please reference DM-9873 and delete this comment once it is resolved.
63  tempinfo = {k: v for (k, v) in info.items() if v is not None}
64  calibType = self.getCalibType(filename)
65  raw = butler.get(calibType + "_filename", tempinfo)[0]
66  # Ensure filename is devoid of cfitsio directions about HDUs
67  c = raw.find("[")
68  if c > 0:
69  raw = raw[:c]
70  return raw
71 
72 
74  """Configuration for the CalibsRegisterTask"""
75  tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky", "defects", "qe_curve",
76  "linearizer"], doc="Names of tables")
77  calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date")
78  validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start")
79  validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop")
80  detector = ListField(dtype=str, default=["filter", "ccd"],
81  doc="Columns that identify individual detectors")
82  validityUntilSuperseded = ListField(dtype=str, default=["defects", "qe_curve", "linearizer"],
83  doc="Tables for which to set validity for a calib from when it is "
84  "taken until it is superseded by the next; validity in other tables "
85  "is calculated by applying the validity range.")
86 
87 
89  """Task that will generate the calibration registry for the Mapper"""
90  ConfigClass = CalibsRegisterConfig
91 
92  def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3"):
93  """Open the registry and return the connection handle"""
94  return RegisterTask.openRegistry(self, directory, create, dryrun, name)
95 
96  def createTable(self, conn, forceCreateTables=False):
97  """Create the registry tables"""
98  for table in self.config.tables:
99  RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables)
100 
101  def addRow(self, conn, info, *args, **kwargs):
102  """Add a row to the file table"""
103  info[self.config.validStart] = None
104  info[self.config.validEnd] = None
105  RegisterTask.addRow(self, conn, info, *args, **kwargs)
106 
107  def updateValidityRanges(self, conn, validity, tables=None):
108  """Loop over all tables, filters, and ccdnums,
109  and update the validity ranges in the registry.
110 
111  @param conn: Database connection
112  @param validity: Validity range (days)
113  """
114  conn.row_factory = sqlite3.Row
115  cursor = conn.cursor()
116  if tables is None:
117  tables = self.config.tables
118  for table in tables:
119  sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table)
120  cursor.execute(sql)
121  rows = cursor.fetchall()
122  for row in rows:
123  self.fixSubsetValidity(conn, table, row, validity)
124 
125  def fixSubsetValidity(self, conn, table, detectorData, validity):
126  """Update the validity ranges among selected rows in the registry.
127 
128  For defects and qe_curve, the products are valid from their start date until
129  they are superseded by subsequent defect data.
130  For other calibration products, the validity ranges are checked and
131  if there are overlaps, a midpoint is used to fix the overlaps,
132  so that the calibration data with whose date is nearest the date
133  of the observation is used.
134 
135  @param conn: Database connection
136  @param table: Name of table to be selected
137  @param detectorData: Values identifying a detector (from columns in self.config.detector)
138  @param validity: Validity range (days)
139  """
140  columns = ", ".join([self.config.calibDate, self.config.validStart, self.config.validEnd])
141  sql = "SELECT id, %s FROM %s" % (columns, table)
142  sql += " WHERE " + " AND ".join(col + "=?" for col in self.config.detector)
143  sql += " ORDER BY " + self.config.calibDate
144  cursor = conn.cursor()
145  cursor.execute(sql, detectorData)
146  rows = cursor.fetchall()
147 
148  try:
149  valids = collections.OrderedDict([(_convertToDate(row[self.config.calibDate]), [None, None]) for
150  row in rows])
151  except Exception:
152  det = " ".join("%s=%s" % (k, v) for k, v in zip(self.config.detector, detectorData))
153  # Sqlite returns unicode strings, which cannot be passed through SWIG.
154  self.log.warn(str("Skipped setting the validity overlaps for %s %s: missing calibration dates" %
155  (table, det)))
156  return
157  dates = list(valids.keys())
158  if table in self.config.validityUntilSuperseded:
159  # A calib is valid until it is superseded
160  for thisDate, nextDate in zip(dates[:-1], dates[1:]):
161  valids[thisDate][0] = thisDate
162  valids[thisDate][1] = nextDate - datetime.timedelta(1)
163  valids[dates[-1]][0] = dates[-1]
164  valids[dates[-1]][1] = _convertToDate("2037-12-31") # End of UNIX time
165  else:
166  # A calib is valid within the validity range (in days) specified.
167  for dd in dates:
168  valids[dd] = [dd - datetime.timedelta(validity), dd + datetime.timedelta(validity)]
169  # Fix the dates so that they do not overlap, which can cause the butler to find a
170  # non-unique calib.
171  midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:-1], dates[1:])]
172  for i, (date, midpoint) in enumerate(zip(dates[:-1], midpoints)):
173  if valids[date][1] > midpoint:
174  nextDate = dates[i + 1]
175  valids[nextDate][0] = midpoint + datetime.timedelta(1)
176  valids[date][1] = midpoint
177  del midpoints
178  del dates
179  # Update the validity data in the registry
180  for row in rows:
181  calibDate = _convertToDate(row[self.config.calibDate])
182  validStart = valids[calibDate][0].isoformat()
183  validEnd = valids[calibDate][1].isoformat()
184  sql = "UPDATE %s" % table
185  sql += " SET %s=?, %s=?" % (self.config.validStart, self.config.validEnd)
186  sql += " WHERE id=?"
187  conn.execute(sql, (validStart, validEnd, row["id"]))
188 
189 
191  """Argument parser to support ingesting calibration images into the repository"""
192 
193  def __init__(self, *args, **kwargs):
194  InputOnlyArgumentParser.__init__(self, *args, **kwargs)
195  self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
196  default=False, help="Don't perform any action?")
197  self.add_argument("--mode", choices=["move", "copy", "link", "skip"], default="move",
198  help="Mode of delivering the files to their destination")
199  self.add_argument("--create", action="store_true", help="Create new registry?")
200  self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)")
201  self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true",
202  help="Don't register files that have already been registered")
203  self.add_argument("files", nargs="+", help="Names of file")
204 
205 
206 class IngestCalibsConfig(Config):
207  """Configuration for IngestCalibsTask"""
208  parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
209  register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")
210  allowError = Field(dtype=bool, default=False, doc="Allow error in ingestion?")
211  clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")
212 
213 
215  """Task that generates registry for calibration images"""
216  ConfigClass = IngestCalibsConfig
217  ArgumentParser = IngestCalibsArgumentParser
218  _DefaultName = "ingestCalibs"
219 
220  def run(self, args):
221  """Ingest all specified files and add them to the registry"""
222  calibRoot = args.calib if args.calib is not None else args.output
223  filenameList = self.expandFiles(args.files)
224  with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry:
225  calibTypes = set()
226  for infile in filenameList:
227  fileInfo, hduInfoList = self.parse.getInfo(infile)
228  calibType = self.parse.getCalibType(infile)
229  if calibType not in self.register.config.tables:
230  self.log.warn(str("Skipped adding %s of observation type '%s' to registry "
231  "(must be one of %s)" %
232  (infile, calibType, ", ".join(self.register.config.tables))))
233  continue
234  calibTypes.add(calibType)
235  if args.mode != 'skip':
236  outfile = self.parse.getDestination(args.butler, fileInfo, infile)
237  ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun)
238  if not ingested:
239  self.log.warn(str("Failed to ingest %s of observation type '%s'" %
240  (infile, calibType)))
241  continue
242  if self.register.check(registry, fileInfo, table=calibType):
243  if args.ignoreIngested:
244  continue
245 
246  self.log.warn("%s: already ingested: %s" % (infile, fileInfo))
247  for info in hduInfoList:
248  self.register.addRow(registry, info, dryrun=args.dryrun,
249  create=args.create, table=calibType)
250  if not args.dryrun:
251  self.register.updateValidityRanges(registry, args.validity, tables=calibTypes)
252  else:
253  self.log.info("Would update validity ranges here, but dryrun")
lsst.pipe.tasks.ingestCalibs.IngestCalibsTask
Definition: ingestCalibs.py:214
lsst::log.log.logContinued.warn
def warn(fmt, *args)
Definition: logContinued.py:202
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:198
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask
Definition: ingestCalibs.py:88
lsst.pipe.tasks.ingestCalibs.CalibsParseTask.getDestination
def getDestination(self, butler, info, filename)
Definition: ingestCalibs.py:52
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.createTable
def createTable(self, conn, forceCreateTables=False)
Definition: ingestCalibs.py:96
lsst.pipe.tasks.ingest.IngestTask.expandFiles
def expandFiles(self, fileNameList)
Expand a set of filenames and globs, returning a list of filenames.
Definition: ingest.py:531
lsst.pipe.base.argumentParser.InputOnlyArgumentParser
Definition: argumentParser.py:914
lsst.pipe.tasks.ingest.IngestTask
Definition: ingest.py:380
strip
bool strip
Definition: fits.cc:911
lsst.pipe.tasks.ingest.ParseTask
Definition: ingest.py:67
lsst.pipe.tasks.ingestCalibs.CalibsRegisterConfig
Definition: ingestCalibs.py:73
lsst.pipe.tasks.ingest
Definition: ingest.py:1
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.fixSubsetValidity
def fixSubsetValidity(self, conn, table, detectorData, validity)
Definition: ingestCalibs.py:125
lsst.pipe.base.task.Task.config
config
Definition: task.py:149
lsst.pipe.tasks.ingestCalibs.IngestCalibsArgumentParser
Definition: ingestCalibs.py:190
lsstDebug.getInfo
getInfo
Definition: lsstDebug.py:87
lsst.pipe.base.task.Task.log
log
Definition: task.py:148
lsst.pipe.tasks.ingest.RegisterTask
Definition: ingest.py:254
lsst.pipe.tasks.ingestCalibs.IngestCalibsArgumentParser.__init__
def __init__(self, *args, **kwargs)
Definition: ingestCalibs.py:193
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.addRow
def addRow(self, conn, info, *args, **kwargs)
Definition: ingestCalibs.py:101
lsst::afw::image.readMetadata.readMetadataContinued.readMetadata
readMetadata
Definition: readMetadataContinued.py:28
lsst.pipe.tasks.ingest.IngestTask.ingest
def ingest(self, infile, outfile, mode="move", dryrun=False)
Definition: ingest.py:452
lsst.pipe.tasks.ingestCalibs.IngestCalibsConfig
Definition: ingestCalibs.py:206
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.updateValidityRanges
def updateValidityRanges(self, conn, validity, tables=None)
Definition: ingestCalibs.py:107
lsst::afw::fits
Definition: fits.h:31
list
daf::base::PropertyList * list
Definition: fits.cc:913
lsst.pipe.tasks.ingestCalibs.CalibsParseTask
Definition: ingestCalibs.py:17
lsst.pipe.tasks.ingestCalibs.CalibsRegisterTask.openRegistry
def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistry.sqlite3")
Definition: ingestCalibs.py:92
lsst.pipe.base
Definition: __init__.py:1
lsst.pipe.tasks.ingest.RegisterConfig
Definition: ingest.py:197
lsst.pipe.tasks.ingestCalibs.CalibsParseTask.getCalibType
def getCalibType(self, filename)
Definition: ingestCalibs.py:21
lsst.pipe.tasks.ingestCalibs.IngestCalibsTask.run
def run(self, args)
Definition: ingestCalibs.py:220
set
daf::base::PropertySet * set
Definition: fits.cc:912