LSSTApplications  17.0+10,17.0+51,17.0+88,18.0.0+10,18.0.0+15,18.0.0+34,18.0.0+4,18.0.0+6,18.0.0-2-ge43143a+6,18.1.0-1-g0001055+2,18.1.0-1-g0896a44+10,18.1.0-1-g1349e88+9,18.1.0-1-g2505f39+7,18.1.0-1-g380d4d4+9,18.1.0-1-g5e4b7ea+2,18.1.0-1-g7e8fceb,18.1.0-1-g85f8cd4+7,18.1.0-1-g9a6769a+3,18.1.0-1-ga1a4c1a+6,18.1.0-1-gc037db8+2,18.1.0-1-gd55f500+3,18.1.0-1-ge10677a+7,18.1.0-10-g73b8679e+12,18.1.0-12-gf30922b,18.1.0-13-g451e75588,18.1.0-13-gbfe7f7f,18.1.0-2-g31c43f9+7,18.1.0-2-g9c63283+9,18.1.0-2-gdf0b915+9,18.1.0-2-gf03bb23+2,18.1.0-3-g52aa583+3,18.1.0-3-g8f4a2b1+1,18.1.0-3-g9cb968e+8,18.1.0-4-g7bbbad0,18.1.0-5-g510c42a+8,18.1.0-5-ga46117f,18.1.0-5-gaeab27e+9,18.1.0-6-gdda7f3e+11,18.1.0-8-g4084bf03+1,w.2019.34
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 
23 __all__ = ("RawIngestTask", "RawIngestConfig", "makeTransferChoiceField")
24 
25 import os.path
26 
27 # This should really be an error that is caught in daf butler and rethrown
28 # with our own but it is not, so this exists here pending some error
29 # refactoring in daf butler
30 from sqlalchemy.exc import IntegrityError
31 
32 from astro_metadata_translator import ObservationInfo, fix_header
33 from lsst.afw.image import readMetadata, bboxFromMetadata
34 from lsst.afw.geom import SkyWcs
35 from lsst.daf.butler import DatasetType, Run, DataId, ConflictingDefinitionError, Butler
36 from lsst.daf.butler.instrument import (Instrument, updateExposureEntryFromObsInfo,
37  updateVisitEntryFromObsInfo)
38 from lsst.geom import Box2D
39 from lsst.pex.config import Config, Field, ChoiceField
40 from lsst.pipe.base import Task
41 from lsst.sphgeom import ConvexPolygon
42 
43 
44 class IngestConflictError(ConflictingDefinitionError):
45  pass
46 
47 
48 def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None):
49  return ChoiceField(
50  doc=doc,
51  dtype=str,
52  allowed={"move": "move",
53  "copy": "copy",
54  "hardlink": "hard link",
55  "symlink": "symbolic (soft) link"},
56  optional=True,
57  default=default
58  )
59 
60 
63  conflict = ChoiceField(
64  ("What to do if a raw Dataset with the same data ID as an "
65  "ingested file already exists in the Butler's Collection."),
66  dtype=str,
67  allowed={"ignore": ("Do not add the new file to the Collection. If "
68  "'stash' is not None, the new file will be "
69  "ingested into the stash Collection instead."),
70  "fail": ("Raise RuntimeError if a conflict is encountered "
71  "(which may then be caught if onError == 'continue')."),
72  },
73  optional=False,
74  default="ignore",
75  )
76  stash = Field(
77  "Name of an alternate Collection to hold Datasets that lose conflicts.",
78  dtype=str,
79  default=None,
80  )
81  onError = ChoiceField(
82  "What to do if an error (including fatal conflicts) occurs.",
83  dtype=str,
84  allowed={"continue": "Warn and continue with the next file.",
85  "break": ("Stop processing immediately, but leave "
86  "already-ingested datasets in the repository."),
87  "rollback": ("Stop processing and attempt to remove aleady-"
88  "ingested datasets from the repository."),
89  },
90  optional=False,
91  default="continue",
92  )
93  doAddRegions = Field(
94  dtype=bool,
95  default=True,
96  doc="Add regions when ingesting tasks"
97  )
98  padRegionAmount = Field(
99  dtype=int,
100  default=0,
101  doc="Pad an image with specified number of pixels before calculating region"
102  )
103 
104 
106  """Driver Task for ingesting raw data into Gen3 Butler repositories.
107 
108  This Task is intended to be runnable from the command-line, but it doesn't
109  meet the other requirements of CmdLineTask or PipelineTask, and wouldn't
110  gain much from being one. It also wouldn't really be appropriate as a
111  subtask of a CmdLineTask or PipelineTask; it's a Task essentially just to
112  leverage the logging and configurability functionality that provides.
113 
114  Each instance of `RawIngestTask` writes to the same Butler and maintains a
115  cache of Dimension entries that have already been added to or extracted
116  from its Registry. Each invocation of `RawIngestTask.run` ingests a list
117  of files (possibly semi-atomically; see `RawIngestConfig.onError`).
118 
119  RawIngestTask may be subclassed to specialize ingest for the actual
120  structure of raw data files produced by a particular instrument, but this
121  is usually unnecessary because the instrument-specific header-extraction
122  provided by the ``astro_metadata_translator`` is usually enough.
123 
124  Parameters
125  ----------
126  config : `RawIngestConfig`
127  Configuration for whether/how to transfer files and how to handle
128  conflicts and errors.
129  butler : `~lsst.daf.butler.Butler`
130  Butler instance. Ingested Datasets will be created as part of
131  ``butler.run`` and associated with its Collection.
132 
133  Other keyword arguments are forwarded to the Task base class constructor.
134  """
135 
136  ConfigClass = RawIngestConfig
137 
138  _DefaultName = "ingest"
139 
140  def getDatasetType(self):
141  """Return the DatasetType of the Datasets ingested by this Task.
142  """
143  return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
144  universe=self.butler.registry.dimensions)
145 
146  def __init__(self, config=None, *, butler, **kwds):
147  super().__init__(config, **kwds)
148  self.butler = butler
150  self.dimensions = butler.registry.dimensions.extract(["instrument", "detector", "physical_filter",
151  "visit", "exposure"])
152  # Dictionary of {Dimension: set(DataId)} indicating Dimension entries
153  # we know are in the Registry.
154  self.dimensionEntriesDone = {k: set() for k in self.dimensions}
155  # Cache of instrument instances retrieved from Registry; needed to look
156  # up formatters.
157  self.instrumentCache = {}
158  # (Possibly) create a Run object for the "stash": where we put datasets
159  # that lose conflicts. Note that this doesn't actually add this Run
160  # to the Registry; we only do that on first use.
161  self.stashRun = Run(self.config.stash) if self.config.stash is not None else None
162  self.visitRegions = {}
163 
164  def _addVisitRegions(self):
165  """Adds a region associated with a Visit to registry.
166 
167  Visits will be created using regions for individual ccds that are
168  defined in the visitRegions dict field on self, joined against an
169  existing region if one exists. The dict field is formatted using
170  instrument and visit as a tuple for a key, with values that are a
171  list of regions for detectors associated the region.
172  """
173  for (instrument, visit), vertices in self.visitRegions.items():
174  # If there is an existing region it should be updated
175  existingRegion = self.butler.registry.expandDataId({"instrument": instrument, "visit": visit},
176  region=True).region
177  if existingRegion is not None:
178  vertices = list(existingRegion.getVertices()) + vertices
179  region = ConvexPolygon(vertices)
180  self.butler.registry.setDimensionRegion(instrument=instrument, visit=visit, region=region)
181 
182  def run(self, files):
183  """Ingest files into a Butler data repository.
184 
185  This creates any new exposure or visit Dimension entries needed to
186  identify the ingested files, creates new Dataset entries in the
187  Registry and finally ingests the files themselves into the Datastore.
188  Any needed instrument, detector, and physical_filter Dimension entries
189  must exist in the Registry before `run` is called.
190 
191  Parameters
192  ----------
193  files : iterable over `str` or path-like objects
194  Paths to the files to be ingested. Will be made absolute
195  if they are not already.
196  """
197  self.butler.registry.registerDatasetType(self.getDatasetType())
198  if self.config.onError == "rollback":
199  with self.butler.transaction():
200  for file in files:
201  self.processFile(os.path.abspath(file))
202  if self.config.doAddRegions:
203  self._addVisitRegions()
204  elif self.config.onError == "break":
205  for file in files:
206  self.processFile(os.path.abspath(file))
207  if self.config.doAddRegions:
208  self._addVisitRegions()
209  elif self.config.onError == "continue":
210  for file in files:
211  try:
212  self.processFile(os.path.abspath(file))
213  except Exception as err:
214  self.log.warnf("Error processing '{}': {}", file, err)
215  if self.config.doAddRegions:
216  self._addVisitRegions()
217 
218  def readHeaders(self, file):
219  """Read and return any relevant headers from the given file.
220 
221  The default implementation simply reads the header of the first
222  non-empty HDU, so it always returns a single-element list.
223 
224  Parameters
225  ----------
226  file : `str` or path-like object
227  Absolute path to the file to be ingested.
228 
229  Returns
230  -------
231  headers : `list` of `~lsst.daf.base.PropertyList`
232  Single-element list containing the header of the first
233  non-empty HDU.
234  """
235  headers = [readMetadata(file)]
236  for h in headers:
237  fix_header(h)
238  return headers
239 
240  def buildRegion(self, headers):
241  """Builds a region from information contained in a header
242 
243  Parameters
244  ----------
245  headers : `lsst.daf.base.PropertyList`
246  Property list containing the information from the header of
247  one file.
248 
249  Returns
250  -------
251  region : `lsst.sphgeom.ConvexPolygon`
252 
253  Raises
254  ------
255  ValueError :
256  If required header keys can not be found to construct region
257  """
258  # Default implementation is for headers to be a one element list
259  header = headers[0]
260  wcs = SkyWcs(header)
261  bbox = Box2D(bboxFromMetadata(header))
262  if self.config.padRegionAmount > 0:
263  bbox.grow(self.config.padRegionAmount)
264  corners = bbox.getCorners()
265  sphCorners = [wcs.pixelToSky(point).getVector() for point in corners]
266  return ConvexPolygon(sphCorners)
267 
268  def ensureDimensions(self, file):
269  """Extract metadata from a raw file and add exposure and visit
270  Dimension entries.
271 
272  Any needed instrument, detector, and physical_filter Dimension entries must
273  exist in the Registry before `run` is called.
274 
275  Parameters
276  ----------
277  file : `str` or path-like object
278  Absolute path to the file to be ingested.
279 
280  Returns
281  -------
282  headers : `list` of `~lsst.daf.base.PropertyList`
283  Result of calling `readHeaders`.
284  dataId : `DataId`
285  Data ID dictionary, as returned by `extractDataId`.
286  """
287  headers = self.readHeaders(file)
288  obsInfo = ObservationInfo(headers[0])
289 
290  # Extract a DataId that covers all of self.dimensions.
291  fullDataId = self.extractDataId(file, headers, obsInfo=obsInfo)
292 
293  for dimension in self.dimensions:
294  if fullDataId.get(dimension.name) is None:
295  continue
296  dimensionDataId = DataId(fullDataId, dimension=dimension)
297  if dimensionDataId not in self.dimensionEntriesDone[dimension]:
298  # Next look in the Registry
299  dimensionEntryDict = self.butler.registry.findDimensionEntry(dimension, dimensionDataId)
300  if dimensionEntryDict is None:
301  if dimension.name in ("visit", "exposure"):
302  # Add the entry into the Registry.
303  self.butler.registry.addDimensionEntry(dimension, dimensionDataId)
304  else:
305  raise LookupError(
306  f"Entry for {dimension.name} with ID {dimensionDataId} not found; must be "
307  f"present in Registry prior to ingest."
308  )
309  # Record that we've handled this entry.
310  self.dimensionEntriesDone[dimension].add(dimensionDataId)
311  # Do this after the loop to ensure all the dimensions are added
312  if self.config.doAddRegions:
313  region = self.buildRegion(headers)
314  try:
315  self.butler.registry.setDimensionRegion(DataId(fullDataId,
316  dimensions=['visit', 'detector', 'instrument'],
317  region=region),
318  update=False)
319  self.visitRegions.setdefault((fullDataId['instrument'], fullDataId['visit']),
320  []).extend(region.getVertices())
321  except IntegrityError:
322  # This means that there were already regions for the dimensions in the database, and nothing
323  # should be done.
324  pass
325 
326  return headers, fullDataId
327 
328  def ingestFile(self, file, headers, dataId, run=None):
329  """Ingest a single raw file into the repository.
330 
331  All necessary Dimension entres must already be present.
332 
333  Parameters
334  ----------
335  file : `str` or path-like object
336  Absolute path to the file to be ingested.
337  headers : `list` of `~lsst.daf.base.PropertyList`
338  Result of calling `readHeaders`.
339  dataId : `dict`
340  Data ID dictionary, as returned by `extractDataId`.
341  run : `~lsst.daf.butler.Run`, optional
342  Run to add the Dataset to; defaults to ``self.butler.run``.
343 
344  Returns
345  -------
346  ref : `DatasetRef`
347  Reference to the ingested dataset.
348 
349  Raises
350  ------
351  ConflictingDefinitionError
352  Raised if the dataset already exists in the registry.
353  """
354  if run is not None and run != self.butler.run:
355  butler = Butler(butler=self.butler, run=run)
356  else:
357  butler = self.butler
358  try:
359  return butler.ingest(file, self.datasetType, dataId, transfer=self.config.transfer,
360  formatter=self.getFormatter(file, headers, dataId))
361  except ConflictingDefinitionError as err:
362  raise IngestConflictError("Ingest conflict on {} {}".format(file, dataId)) from err
363 
364  def processFile(self, file):
365  """Ingest a single raw data file after extacting metadata.
366 
367  This creates any new exposure or visit Dimension entries needed to
368  identify the ingest file, creates a new Dataset entry in the
369  Registry and finally ingests the file itself into the Datastore.
370  Any needed instrument, detector, and physical_filter Dimension entries must
371  exist in the Registry before `run` is called.
372 
373  Parameters
374  ----------
375  file : `str` or path-like object
376  Absolute path to the file to be ingested.
377  """
378  try:
379  headers, dataId = self.ensureDimensions(file)
380  except Exception as err:
381  raise RuntimeError(f"Unexpected error adding dimensions for {file}.") from err
382  # We want ingesting a single file to be atomic even if we are
383  # not trying to ingest the list of files atomically.
384  with self.butler.transaction():
385  try:
386  self.ingestFile(file, headers, dataId)
387  return
388  except IngestConflictError:
389  if self.config.conflict == "fail":
390  raise
391  if self.config.conflict == "ignore":
392  if self.stashRun is not None:
393  if self.stashRun.id is None:
394  self.butler.registry.ensureRun(self.stashRun)
395  self.log.infof("Conflict on {} ({}); ingesting to stash '{}' instead.",
396  dataId, file, self.config.stash)
397  with self.butler.transaction():
398  self.ingestFile(file, headers, dataId, run=self.stashRun)
399  else:
400  self.log.infof("Conflict on {} ({}); ignoring.", dataId, file)
401 
402  def extractDataId(self, file, headers, obsInfo):
403  """Return the Data ID dictionary that should be used to label a file.
404 
405  Parameters
406  ----------
407  file : `str` or path-like object
408  Absolute path to the file being ingested (prior to any transfers).
409  headers : `list` of `~lsst.daf.base.PropertyList`
410  All headers returned by `readHeaders()`.
411  obsInfo : `astro_metadata_translator.ObservationInfo`
412  Observational metadata extracted from the headers.
413 
414  Returns
415  -------
416  dataId : `DataId`
417  A mapping whose key-value pairs uniquely identify raw datasets.
418  Must have ``dataId.dimensions() <= self.dimensions``, with at least
419  instrument, exposure, and detector present.
420  """
421  toRemove = set()
422  if obsInfo.visit_id is None:
423  toRemove.add("visit")
424  if obsInfo.physical_filter is None:
425  toRemove.add("physical_filter")
426  if toRemove:
427  dimensions = self.dimensions.toSet().difference(toRemove)
428  else:
429  dimensions = self.dimensions
430  dataId = DataId(
431  dimensions=dimensions,
432  instrument=obsInfo.instrument,
433  exposure=obsInfo.exposure_id,
434  visit=obsInfo.visit_id,
435  detector=obsInfo.detector_num,
436  physical_filter=obsInfo.physical_filter,
437  )
438  updateExposureEntryFromObsInfo(dataId, obsInfo)
439  if obsInfo.visit_id is not None:
440  updateVisitEntryFromObsInfo(dataId, obsInfo)
441  return dataId
442 
443  def getFormatter(self, file, headers, dataId):
444  """Return the Formatter that should be used to read this file after
445  ingestion.
446 
447  The default implementation obtains the formatter from the Instrument
448  class for the given data ID.
449  """
450  instrument = self.instrumentCache.get(dataId["instrument"])
451  if instrument is None:
452  instrument = Instrument.factories[dataId["instrument"]]()
453  self.instrumentCache[dataId["instrument"]] = instrument
454  return instrument.getRawFormatter(dataId)
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
def infof(fmt, args, kwargs)
A floating-point coordinate rectangle geometry.
Definition: Box.h:305
std::vector< SchemaItem< Flag > > * items
def extractDataId(self, file, headers, obsInfo)
Definition: ingest.py:402
daf::base::PropertySet * set
Definition: fits.cc:884
def __init__(self, config=None, butler, kwds)
Definition: ingest.py:146
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
Definition: ingest.py:48
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
ConvexPolygon is a closed convex polygon on the unit sphere.
Definition: ConvexPolygon.h:57
def ingestFile(self, file, headers, dataId, run=None)
Definition: ingest.py:328
def warnf(fmt, args, kwargs)
def getFormatter(self, file, headers, dataId)
Definition: ingest.py:443
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
daf::base::PropertyList * list
Definition: fits.cc:885
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:694