23 __all__ = (
"RawIngestTask",
"RawIngestConfig",
"makeTransferChoiceField")
30 from sqlalchemy.exc
import IntegrityError
32 from astro_metadata_translator
import ObservationInfo, fix_header
35 from lsst.daf.butler
import DatasetType, Run, DataId, ConflictingDefinitionError, Butler
36 from lsst.daf.butler.instrument
import (Instrument, updateExposureEntryFromObsInfo,
37 updateVisitEntryFromObsInfo)
52 allowed={
"move":
"move",
54 "hardlink":
"hard link",
55 "symlink":
"symbolic (soft) link"},
64 (
"What to do if a raw Dataset with the same data ID as an " 65 "ingested file already exists in the Butler's Collection."),
67 allowed={
"ignore": (
"Do not add the new file to the Collection. If " 68 "'stash' is not None, the new file will be " 69 "ingested into the stash Collection instead."),
70 "fail": (
"Raise RuntimeError if a conflict is encountered " 71 "(which may then be caught if onError == 'continue')."),
77 "Name of an alternate Collection to hold Datasets that lose conflicts.",
82 "What to do if an error (including fatal conflicts) occurs.",
84 allowed={
"continue":
"Warn and continue with the next file.",
85 "break": (
"Stop processing immediately, but leave " 86 "already-ingested datasets in the repository."),
87 "rollback": (
"Stop processing and attempt to remove aleady-" 88 "ingested datasets from the repository."),
96 doc=
"Add regions when ingesting tasks" 101 doc=
"Pad an image with specified number of pixels before calculating region" 106 """Driver Task for ingesting raw data into Gen3 Butler repositories. 108 This Task is intended to be runnable from the command-line, but it doesn't 109 meet the other requirements of CmdLineTask or PipelineTask, and wouldn't 110 gain much from being one. It also wouldn't really be appropriate as a 111 subtask of a CmdLineTask or PipelineTask; it's a Task essentially just to 112 leverage the logging and configurability functionality that provides. 114 Each instance of `RawIngestTask` writes to the same Butler and maintains a 115 cache of Dimension entries that have already been added to or extracted 116 from its Registry. Each invocation of `RawIngestTask.run` ingests a list 117 of files (possibly semi-atomically; see `RawIngestConfig.onError`). 119 RawIngestTask may be subclassed to specialize ingest for the actual 120 structure of raw data files produced by a particular instrument, but this 121 is usually unnecessary because the instrument-specific header-extraction 122 provided by the ``astro_metadata_translator`` is usually enough. 126 config : `RawIngestConfig` 127 Configuration for whether/how to transfer files and how to handle 128 conflicts and errors. 129 butler : `~lsst.daf.butler.Butler` 130 Butler instance. Ingested Datasets will be created as part of 131 ``butler.run`` and associated with its Collection. 133 Other keyword arguments are forwarded to the Task base class constructor. 136 ConfigClass = RawIngestConfig
138 _DefaultName =
"ingest" 141 """Return the DatasetType of the Datasets ingested by this Task. 143 return DatasetType(
"raw", (
"instrument",
"detector",
"exposure"),
"Exposure",
144 universe=self.
butler.registry.dimensions)
146 def __init__(self, config=None, *, butler, **kwds):
150 self.
dimensions = butler.registry.dimensions.extract([
"instrument",
"detector",
"physical_filter",
151 "visit",
"exposure"])
164 def _addVisitRegions(self):
165 """Adds a region associated with a Visit to registry. 167 Visits will be created using regions for individual ccds that are 168 defined in the visitRegions dict field on self, joined against an 169 existing region if one exists. The dict field is formatted using 170 instrument and visit as a tuple for a key, with values that are a 171 list of regions for detectors associated the region. 175 existingRegion = self.
butler.registry.expandDataId({
"instrument": instrument,
"visit": visit},
177 if existingRegion
is not None:
178 vertices =
list(existingRegion.getVertices()) + vertices
180 self.
butler.registry.setDimensionRegion(instrument=instrument, visit=visit, region=region)
183 """Ingest files into a Butler data repository. 185 This creates any new exposure or visit Dimension entries needed to 186 identify the ingested files, creates new Dataset entries in the 187 Registry and finally ingests the files themselves into the Datastore. 188 Any needed instrument, detector, and physical_filter Dimension entries 189 must exist in the Registry before `run` is called. 193 files : iterable over `str` or path-like objects 194 Paths to the files to be ingested. Will be made absolute 195 if they are not already. 198 if self.
config.onError ==
"rollback":
199 with self.
butler.transaction():
202 if self.
config.doAddRegions:
204 elif self.
config.onError ==
"break":
207 if self.
config.doAddRegions:
209 elif self.
config.onError ==
"continue":
213 except Exception
as err:
214 self.
log.
warnf(
"Error processing '{}': {}", file, err)
215 if self.
config.doAddRegions:
219 """Read and return any relevant headers from the given file. 221 The default implementation simply reads the header of the first 222 non-empty HDU, so it always returns a single-element list. 226 file : `str` or path-like object 227 Absolute path to the file to be ingested. 231 headers : `list` of `~lsst.daf.base.PropertyList` 232 Single-element list containing the header of the first 241 """Builds a region from information contained in a header 245 headers : `lsst.daf.base.PropertyList` 246 Property list containing the information from the header of 251 region : `lsst.sphgeom.ConvexPolygon` 256 If required header keys can not be found to construct region 262 if self.
config.padRegionAmount > 0:
263 bbox.grow(self.
config.padRegionAmount)
264 corners = bbox.getCorners()
265 sphCorners = [wcs.pixelToSky(point).getVector()
for point
in corners]
269 """Extract metadata from a raw file and add exposure and visit 272 Any needed instrument, detector, and physical_filter Dimension entries must 273 exist in the Registry before `run` is called. 277 file : `str` or path-like object 278 Absolute path to the file to be ingested. 282 headers : `list` of `~lsst.daf.base.PropertyList` 283 Result of calling `readHeaders`. 285 Data ID dictionary, as returned by `extractDataId`. 288 obsInfo = ObservationInfo(headers[0])
291 fullDataId = self.
extractDataId(file, headers, obsInfo=obsInfo)
294 if fullDataId.get(dimension.name)
is None:
296 dimensionDataId = DataId(fullDataId, dimension=dimension)
299 dimensionEntryDict = self.
butler.registry.findDimensionEntry(dimension, dimensionDataId)
300 if dimensionEntryDict
is None:
301 if dimension.name
in (
"visit",
"exposure"):
303 self.
butler.registry.addDimensionEntry(dimension, dimensionDataId)
306 f
"Entry for {dimension.name} with ID {dimensionDataId} not found; must be " 307 f
"present in Registry prior to ingest." 312 if self.
config.doAddRegions:
315 self.
butler.registry.setDimensionRegion(DataId(fullDataId,
316 dimensions=[
'visit',
'detector',
'instrument'],
319 self.
visitRegions.setdefault((fullDataId[
'instrument'], fullDataId[
'visit']),
320 []).extend(region.getVertices())
321 except IntegrityError:
326 return headers, fullDataId
329 """Ingest a single raw file into the repository. 331 All necessary Dimension entres must already be present. 335 file : `str` or path-like object 336 Absolute path to the file to be ingested. 337 headers : `list` of `~lsst.daf.base.PropertyList` 338 Result of calling `readHeaders`. 340 Data ID dictionary, as returned by `extractDataId`. 341 run : `~lsst.daf.butler.Run`, optional 342 Run to add the Dataset to; defaults to ``self.butler.run``. 347 Reference to the ingested dataset. 351 ConflictingDefinitionError 352 Raised if the dataset already exists in the registry. 354 if run
is not None and run != self.
butler.run:
355 butler = Butler(butler=self.
butler, run=run)
359 return butler.ingest(file, self.
datasetType, dataId, transfer=self.
config.transfer,
361 except ConflictingDefinitionError
as err:
365 """Ingest a single raw data file after extacting metadata. 367 This creates any new exposure or visit Dimension entries needed to 368 identify the ingest file, creates a new Dataset entry in the 369 Registry and finally ingests the file itself into the Datastore. 370 Any needed instrument, detector, and physical_filter Dimension entries must 371 exist in the Registry before `run` is called. 375 file : `str` or path-like object 376 Absolute path to the file to be ingested. 380 except Exception
as err:
381 raise RuntimeError(f
"Unexpected error adding dimensions for {file}.")
from err
384 with self.
butler.transaction():
388 except IngestConflictError:
389 if self.
config.conflict ==
"fail":
391 if self.
config.conflict ==
"ignore":
395 self.
log.
infof(
"Conflict on {} ({}); ingesting to stash '{}' instead.",
396 dataId, file, self.
config.stash)
397 with self.
butler.transaction():
400 self.
log.
infof(
"Conflict on {} ({}); ignoring.", dataId, file)
403 """Return the Data ID dictionary that should be used to label a file. 407 file : `str` or path-like object 408 Absolute path to the file being ingested (prior to any transfers). 409 headers : `list` of `~lsst.daf.base.PropertyList` 410 All headers returned by `readHeaders()`. 411 obsInfo : `astro_metadata_translator.ObservationInfo` 412 Observational metadata extracted from the headers. 417 A mapping whose key-value pairs uniquely identify raw datasets. 418 Must have ``dataId.dimensions() <= self.dimensions``, with at least 419 instrument, exposure, and detector present. 422 if obsInfo.visit_id
is None:
423 toRemove.add(
"visit")
424 if obsInfo.physical_filter
is None:
425 toRemove.add(
"physical_filter")
427 dimensions = self.
dimensions.toSet().difference(toRemove)
431 dimensions=dimensions,
432 instrument=obsInfo.instrument,
433 exposure=obsInfo.exposure_id,
434 visit=obsInfo.visit_id,
435 detector=obsInfo.detector_num,
436 physical_filter=obsInfo.physical_filter,
438 updateExposureEntryFromObsInfo(dataId, obsInfo)
439 if obsInfo.visit_id
is not None:
440 updateVisitEntryFromObsInfo(dataId, obsInfo)
444 """Return the Formatter that should be used to read this file after 447 The default implementation obtains the formatter from the Instrument 448 class for the given data ID. 451 if instrument
is None:
452 instrument = Instrument.factories[dataId[
"instrument"]]()
454 return instrument.getRawFormatter(dataId)
def ensureDimensions(self, file)
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
def infof(fmt, args, kwargs)
A floating-point coordinate rectangle geometry.
std::vector< SchemaItem< Flag > > * items
def extractDataId(self, file, headers, obsInfo)
def readHeaders(self, file)
daf::base::PropertySet * set
def __init__(self, config=None, butler, kwds)
def buildRegion(self, headers)
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
ConvexPolygon is a closed convex polygon on the unit sphere.
def ingestFile(self, file, headers, dataId, run=None)
def warnf(fmt, args, kwargs)
def _addVisitRegions(self)
def getFormatter(self, file, headers, dataId)
def processFile(self, file)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
daf::base::PropertyList * list
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)