LSSTApplications  17.0+103,17.0+11,17.0+61,18.0.0+13,18.0.0+25,18.0.0+5,18.0.0+52,18.0.0-4-g68ffd23,18.1.0-1-g0001055+8,18.1.0-1-g03d53ef+1,18.1.0-1-g1349e88+28,18.1.0-1-g2505f39+22,18.1.0-1-g380d4d4+27,18.1.0-1-g5315e5e+1,18.1.0-1-g5e4b7ea+10,18.1.0-1-g7e8fceb+1,18.1.0-1-g85f8cd4+23,18.1.0-1-g9a6769a+13,18.1.0-1-ga1a4c1a+22,18.1.0-1-gd55f500+17,18.1.0-12-g42eabe8e+10,18.1.0-14-gd04256d+15,18.1.0-16-g430f6a53+1,18.1.0-17-gd2166b6e4,18.1.0-18-gb5d19ff+1,18.1.0-2-gfbf3545+7,18.1.0-2-gfefb8b5+16,18.1.0-3-g52aa583+13,18.1.0-3-g62b5e86+14,18.1.0-3-g8f4a2b1+17,18.1.0-3-g9bc06b8+7,18.1.0-3-gb69f684+9,18.1.0-4-g1ee41a7+1,18.1.0-5-g6dbcb01+13,18.1.0-5-gc286bb7+3,18.1.0-6-g48bdcd3+2,18.1.0-6-gd05e160+9,18.1.0-7-gc4d902b+2,18.1.0-7-gebc0338+8,18.1.0-9-gae7190a+10,w.2019.38
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 
23 __all__ = ("RawIngestTask", "RawIngestConfig", "makeTransferChoiceField")
24 
25 import os.path
26 
27 # This should really be an error that is caught in daf butler and rethrown
28 # with our own but it is not, so this exists here pending some error
29 # refactoring in daf butler
30 from sqlalchemy.exc import IntegrityError
31 
32 from astro_metadata_translator import ObservationInfo, fix_header, merge_headers
33 from lsst.afw.image import bboxFromMetadata
34 from lsst.afw.fits import readMetadata
35 from lsst.afw.geom import SkyWcs
36 from lsst.daf.butler import DatasetType, Run, DataId, ConflictingDefinitionError, Butler
37 from lsst.daf.butler.instrument import (Instrument, updateExposureEntryFromObsInfo,
38  updateVisitEntryFromObsInfo)
39 from lsst.geom import Box2D
40 from lsst.pex.config import Config, Field, ChoiceField
41 from lsst.pipe.base import Task
42 from lsst.sphgeom import ConvexPolygon
43 
44 
45 class IngestConflictError(ConflictingDefinitionError):
46  pass
47 
48 
49 def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None):
50  return ChoiceField(
51  doc=doc,
52  dtype=str,
53  allowed={"move": "move",
54  "copy": "copy",
55  "hardlink": "hard link",
56  "symlink": "symbolic (soft) link"},
57  optional=True,
58  default=default
59  )
60 
61 
62 class RawIngestConfig(Config):
64  conflict = ChoiceField(
65  ("What to do if a raw Dataset with the same data ID as an "
66  "ingested file already exists in the Butler's Collection."),
67  dtype=str,
68  allowed={"ignore": ("Do not add the new file to the Collection. If "
69  "'stash' is not None, the new file will be "
70  "ingested into the stash Collection instead."),
71  "fail": ("Raise RuntimeError if a conflict is encountered "
72  "(which may then be caught if onError == 'continue')."),
73  },
74  optional=False,
75  default="ignore",
76  )
77  stash = Field(
78  "Name of an alternate Collection to hold Datasets that lose conflicts.",
79  dtype=str,
80  default=None,
81  )
82  onError = ChoiceField(
83  "What to do if an error (including fatal conflicts) occurs.",
84  dtype=str,
85  allowed={"continue": "Warn and continue with the next file.",
86  "break": ("Stop processing immediately, but leave "
87  "already-ingested datasets in the repository."),
88  "rollback": ("Stop processing and attempt to remove aleady-"
89  "ingested datasets from the repository."),
90  },
91  optional=False,
92  default="continue",
93  )
94  doAddRegions = Field(
95  dtype=bool,
96  default=True,
97  doc="Add regions when ingesting tasks"
98  )
99  padRegionAmount = Field(
100  dtype=int,
101  default=0,
102  doc="Pad an image with specified number of pixels before calculating region"
103  )
104 
105 
107  """Driver Task for ingesting raw data into Gen3 Butler repositories.
108 
109  This Task is intended to be runnable from the command-line, but it doesn't
110  meet the other requirements of CmdLineTask or PipelineTask, and wouldn't
111  gain much from being one. It also wouldn't really be appropriate as a
112  subtask of a CmdLineTask or PipelineTask; it's a Task essentially just to
113  leverage the logging and configurability functionality that provides.
114 
115  Each instance of `RawIngestTask` writes to the same Butler and maintains a
116  cache of Dimension entries that have already been added to or extracted
117  from its Registry. Each invocation of `RawIngestTask.run` ingests a list
118  of files (possibly semi-atomically; see `RawIngestConfig.onError`).
119 
120  RawIngestTask may be subclassed to specialize ingest for the actual
121  structure of raw data files produced by a particular instrument, but this
122  is usually unnecessary because the instrument-specific header-extraction
123  provided by the ``astro_metadata_translator`` is usually enough.
124 
125  Parameters
126  ----------
127  config : `RawIngestConfig`
128  Configuration for whether/how to transfer files and how to handle
129  conflicts and errors.
130  butler : `~lsst.daf.butler.Butler`
131  Butler instance. Ingested Datasets will be created as part of
132  ``butler.run`` and associated with its Collection.
133 
134  Other keyword arguments are forwarded to the Task base class constructor.
135  """
136 
137  ConfigClass = RawIngestConfig
138 
139  _DefaultName = "ingest"
140 
141  def getDatasetType(self):
142  """Return the DatasetType of the Datasets ingested by this Task.
143  """
144  return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
145  universe=self.butler.registry.dimensions)
146 
147  def __init__(self, config=None, *, butler, **kwds):
148  super().__init__(config, **kwds)
149  self.butler = butler
151  self.dimensions = butler.registry.dimensions.extract(["instrument", "detector", "physical_filter",
152  "visit", "exposure"])
153  # Dictionary of {Dimension: set(DataId)} indicating Dimension entries
154  # we know are in the Registry.
155  self.dimensionEntriesDone = {k: set() for k in self.dimensions}
156  # Cache of instrument instances retrieved from Registry; needed to look
157  # up formatters.
158  self.instrumentCache = {}
159  # (Possibly) create a Run object for the "stash": where we put datasets
160  # that lose conflicts. Note that this doesn't actually add this Run
161  # to the Registry; we only do that on first use.
162  self.stashRun = Run(self.config.stash) if self.config.stash is not None else None
163  self.visitRegions = {}
164 
165  def _addVisitRegions(self):
166  """Adds a region associated with a Visit to registry.
167 
168  Visits will be created using regions for individual ccds that are
169  defined in the visitRegions dict field on self, joined against an
170  existing region if one exists. The dict field is formatted using
171  instrument and visit as a tuple for a key, with values that are a
172  list of regions for detectors associated the region.
173  """
174  for (instrument, visit), vertices in self.visitRegions.items():
175  # If there is an existing region it should be updated
176  existingRegion = self.butler.registry.expandDataId({"instrument": instrument, "visit": visit},
177  region=True).region
178  if existingRegion is not None:
179  vertices = list(existingRegion.getVertices()) + vertices
180  region = ConvexPolygon(vertices)
181  self.butler.registry.setDimensionRegion(instrument=instrument, visit=visit, region=region)
182 
183  def run(self, files):
184  """Ingest files into a Butler data repository.
185 
186  This creates any new exposure or visit Dimension entries needed to
187  identify the ingested files, creates new Dataset entries in the
188  Registry and finally ingests the files themselves into the Datastore.
189  Any needed instrument, detector, and physical_filter Dimension entries
190  must exist in the Registry before `run` is called.
191 
192  Parameters
193  ----------
194  files : iterable over `str` or path-like objects
195  Paths to the files to be ingested. Will be made absolute
196  if they are not already.
197  """
198  self.butler.registry.registerDatasetType(self.getDatasetType())
199  if self.config.onError == "rollback":
200  with self.butler.transaction():
201  for file in files:
202  self.processFile(os.path.abspath(file))
203  if self.config.doAddRegions:
204  self._addVisitRegions()
205  elif self.config.onError == "break":
206  for file in files:
207  self.processFile(os.path.abspath(file))
208  if self.config.doAddRegions:
209  self._addVisitRegions()
210  elif self.config.onError == "continue":
211  for file in files:
212  try:
213  self.processFile(os.path.abspath(file))
214  except Exception as err:
215  self.log.warnf("Error processing '{}': {}", file, err)
216  if self.config.doAddRegions:
217  self._addVisitRegions()
218 
219  def readHeaders(self, file):
220  """Read and return any relevant headers from the given file.
221 
222  The default implementation simply reads the header of the first
223  non-empty HDU, so it always returns a single-element list.
224 
225  Parameters
226  ----------
227  file : `str` or path-like object
228  Absolute path to the file to be ingested.
229 
230  Returns
231  -------
232  headers : `list` of `~lsst.daf.base.PropertyList`
233  Single-element list containing the header of the first
234  non-empty HDU.
235  """
236  phdu = readMetadata(file, 0)
237  headers = [merge_headers([phdu, readMetadata(file)], mode="overwrite")]
238  for h in headers:
239  fix_header(h)
240  return headers
241 
242  def buildRegion(self, headers):
243  """Builds a region from information contained in a header
244 
245  Parameters
246  ----------
247  headers : `lsst.daf.base.PropertyList`
248  Property list containing the information from the header of
249  one file.
250 
251  Returns
252  -------
253  region : `lsst.sphgeom.ConvexPolygon`
254 
255  Raises
256  ------
257  ValueError :
258  If required header keys can not be found to construct region
259  """
260  # Default implementation is for headers to be a one element list
261  header = headers[0]
262  wcs = SkyWcs(header)
263  bbox = Box2D(bboxFromMetadata(header))
264  if self.config.padRegionAmount > 0:
265  bbox.grow(self.config.padRegionAmount)
266  corners = bbox.getCorners()
267  sphCorners = [wcs.pixelToSky(point).getVector() for point in corners]
268  return ConvexPolygon(sphCorners)
269 
270  def ensureDimensions(self, file):
271  """Extract metadata from a raw file and add exposure and visit
272  Dimension entries.
273 
274  Any needed instrument, detector, and physical_filter Dimension entries must
275  exist in the Registry before `run` is called.
276 
277  Parameters
278  ----------
279  file : `str` or path-like object
280  Absolute path to the file to be ingested.
281 
282  Returns
283  -------
284  headers : `list` of `~lsst.daf.base.PropertyList`
285  Result of calling `readHeaders`.
286  dataId : `DataId`
287  Data ID dictionary, as returned by `extractDataId`.
288  """
289  headers = self.readHeaders(file)
290  obsInfo = ObservationInfo(headers[0])
291 
292  # Extract a DataId that covers all of self.dimensions.
293  fullDataId = self.extractDataId(file, headers, obsInfo=obsInfo)
294 
295  for dimension in self.dimensions:
296  if fullDataId.get(dimension.name) is None:
297  continue
298  dimensionDataId = DataId(fullDataId, dimension=dimension)
299  if dimensionDataId not in self.dimensionEntriesDone[dimension]:
300  # Next look in the Registry
301  dimensionEntryDict = self.butler.registry.findDimensionEntry(dimension, dimensionDataId)
302  if dimensionEntryDict is None:
303  if dimension.name in ("visit", "exposure"):
304  # Add the entry into the Registry.
305  self.butler.registry.addDimensionEntry(dimension, dimensionDataId)
306  else:
307  raise LookupError(
308  f"Entry for {dimension.name} with ID {dimensionDataId} not found; must be "
309  f"present in Registry prior to ingest."
310  )
311  # Record that we've handled this entry.
312  self.dimensionEntriesDone[dimension].add(dimensionDataId)
313  # Do this after the loop to ensure all the dimensions are added
314  if self.config.doAddRegions and obsInfo.tracking_radec is not None:
315  region = self.buildRegion(headers)
316  try:
317  self.butler.registry.setDimensionRegion(DataId(fullDataId,
318  dimensions=['visit', 'detector', 'instrument'],
319  region=region),
320  update=False)
321  self.visitRegions.setdefault((fullDataId['instrument'], fullDataId['visit']),
322  []).extend(region.getVertices())
323  except IntegrityError:
324  # This means that there were already regions for the dimensions in the database, and nothing
325  # should be done.
326  pass
327 
328  return headers, fullDataId
329 
330  def ingestFile(self, file, headers, dataId, run=None):
331  """Ingest a single raw file into the repository.
332 
333  All necessary Dimension entres must already be present.
334 
335  Parameters
336  ----------
337  file : `str` or path-like object
338  Absolute path to the file to be ingested.
339  headers : `list` of `~lsst.daf.base.PropertyList`
340  Result of calling `readHeaders`.
341  dataId : `dict`
342  Data ID dictionary, as returned by `extractDataId`.
343  run : `~lsst.daf.butler.Run`, optional
344  Run to add the Dataset to; defaults to ``self.butler.run``.
345 
346  Returns
347  -------
348  ref : `DatasetRef`
349  Reference to the ingested dataset.
350 
351  Raises
352  ------
353  ConflictingDefinitionError
354  Raised if the dataset already exists in the registry.
355  """
356  if run is not None and run != self.butler.run:
357  butler = Butler(butler=self.butler, run=run)
358  else:
359  butler = self.butler
360  try:
361  return butler.ingest(file, self.datasetType, dataId, transfer=self.config.transfer,
362  formatter=self.getFormatter(file, headers, dataId))
363  except ConflictingDefinitionError as err:
364  raise IngestConflictError("Ingest conflict on {} {}".format(file, dataId)) from err
365 
366  def processFile(self, file):
367  """Ingest a single raw data file after extacting metadata.
368 
369  This creates any new exposure or visit Dimension entries needed to
370  identify the ingest file, creates a new Dataset entry in the
371  Registry and finally ingests the file itself into the Datastore.
372  Any needed instrument, detector, and physical_filter Dimension entries must
373  exist in the Registry before `run` is called.
374 
375  Parameters
376  ----------
377  file : `str` or path-like object
378  Absolute path to the file to be ingested.
379  """
380  try:
381  headers, dataId = self.ensureDimensions(file)
382  except Exception as err:
383  raise RuntimeError(f"Unexpected error adding dimensions for {file}.") from err
384  # We want ingesting a single file to be atomic even if we are
385  # not trying to ingest the list of files atomically.
386  with self.butler.transaction():
387  try:
388  self.ingestFile(file, headers, dataId)
389  return
390  except IngestConflictError:
391  if self.config.conflict == "fail":
392  raise
393  if self.config.conflict == "ignore":
394  if self.stashRun is not None:
395  if self.stashRun.id is None:
396  self.butler.registry.ensureRun(self.stashRun)
397  self.log.infof("Conflict on {} ({}); ingesting to stash '{}' instead.",
398  dataId, file, self.config.stash)
399  with self.butler.transaction():
400  self.ingestFile(file, headers, dataId, run=self.stashRun)
401  else:
402  self.log.infof("Conflict on {} ({}); ignoring.", dataId, file)
403 
404  def extractDataId(self, file, headers, obsInfo):
405  """Return the Data ID dictionary that should be used to label a file.
406 
407  Parameters
408  ----------
409  file : `str` or path-like object
410  Absolute path to the file being ingested (prior to any transfers).
411  headers : `list` of `~lsst.daf.base.PropertyList`
412  All headers returned by `readHeaders()`.
413  obsInfo : `astro_metadata_translator.ObservationInfo`
414  Observational metadata extracted from the headers.
415 
416  Returns
417  -------
418  dataId : `DataId`
419  A mapping whose key-value pairs uniquely identify raw datasets.
420  Must have ``dataId.dimensions() <= self.dimensions``, with at least
421  instrument, exposure, and detector present.
422  """
423  toRemove = set()
424  if obsInfo.visit_id is None:
425  toRemove.add("visit")
426  if obsInfo.physical_filter is None:
427  toRemove.add("physical_filter")
428  if toRemove:
429  dimensions = self.dimensions.toSet().difference(toRemove)
430  else:
431  dimensions = self.dimensions
432  dataId = DataId(
433  dimensions=dimensions,
434  instrument=obsInfo.instrument,
435  exposure=obsInfo.exposure_id,
436  visit=obsInfo.visit_id,
437  detector=obsInfo.detector_num,
438  physical_filter=obsInfo.physical_filter,
439  )
440  updateExposureEntryFromObsInfo(dataId, obsInfo)
441  if obsInfo.visit_id is not None:
442  updateVisitEntryFromObsInfo(dataId, obsInfo)
443  return dataId
444 
445  def getFormatter(self, file, headers, dataId):
446  """Return the Formatter that should be used to read this file after
447  ingestion.
448 
449  The default implementation obtains the formatter from the Instrument
450  class for the given data ID.
451  """
452  instrument = self.instrumentCache.get(dataId["instrument"])
453  if instrument is None:
454  instrument = Instrument.factories[dataId["instrument"]]()
455  self.instrumentCache[dataId["instrument"]] = instrument
456  return instrument.getRawFormatter(dataId)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:167
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
Definition: SkyWcs.h:117
def infof(fmt, args, kwargs)
A floating-point coordinate rectangle geometry.
Definition: Box.h:305
std::vector< SchemaItem< Flag > > * items
def extractDataId(self, file, headers, obsInfo)
Definition: ingest.py:404
daf::base::PropertySet * set
Definition: fits.cc:884
def __init__(self, config=None, butler, kwds)
Definition: ingest.py:147
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
Definition: ingest.py:49
ConvexPolygon is a closed convex polygon on the unit sphere.
Definition: ConvexPolygon.h:57
def ingestFile(self, file, headers, dataId, run=None)
Definition: ingest.py:330
def warnf(fmt, args, kwargs)
def getFormatter(self, file, headers, dataId)
Definition: ingest.py:445
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
daf::base::PropertyList * list
Definition: fits.cc:885
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:694