23 __all__ = (
"RawIngestTask",
"RawIngestConfig",
"makeTransferChoiceField")
30 from sqlalchemy.exc
import IntegrityError
32 from astro_metadata_translator
import ObservationInfo, fix_header, merge_headers
36 from lsst.daf.butler
import DatasetType, Run, DataId, ConflictingDefinitionError, Butler
37 from lsst.daf.butler.instrument
import (Instrument, updateExposureEntryFromObsInfo,
38 updateVisitEntryFromObsInfo)
40 from lsst.pex.config
import Config, Field, ChoiceField
53 allowed={
"move":
"move",
55 "hardlink":
"hard link",
56 "symlink":
"symbolic (soft) link"},
64 conflict = ChoiceField(
65 (
"What to do if a raw Dataset with the same data ID as an " 66 "ingested file already exists in the Butler's Collection."),
68 allowed={
"ignore": (
"Do not add the new file to the Collection. If " 69 "'stash' is not None, the new file will be " 70 "ingested into the stash Collection instead."),
71 "fail": (
"Raise RuntimeError if a conflict is encountered " 72 "(which may then be caught if onError == 'continue')."),
78 "Name of an alternate Collection to hold Datasets that lose conflicts.",
82 onError = ChoiceField(
83 "What to do if an error (including fatal conflicts) occurs.",
85 allowed={
"continue":
"Warn and continue with the next file.",
86 "break": (
"Stop processing immediately, but leave " 87 "already-ingested datasets in the repository."),
88 "rollback": (
"Stop processing and attempt to remove aleady-" 89 "ingested datasets from the repository."),
97 doc=
"Add regions when ingesting tasks" 99 padRegionAmount = Field(
102 doc=
"Pad an image with specified number of pixels before calculating region" 107 """Driver Task for ingesting raw data into Gen3 Butler repositories. 109 This Task is intended to be runnable from the command-line, but it doesn't 110 meet the other requirements of CmdLineTask or PipelineTask, and wouldn't 111 gain much from being one. It also wouldn't really be appropriate as a 112 subtask of a CmdLineTask or PipelineTask; it's a Task essentially just to 113 leverage the logging and configurability functionality that provides. 115 Each instance of `RawIngestTask` writes to the same Butler and maintains a 116 cache of Dimension entries that have already been added to or extracted 117 from its Registry. Each invocation of `RawIngestTask.run` ingests a list 118 of files (possibly semi-atomically; see `RawIngestConfig.onError`). 120 RawIngestTask may be subclassed to specialize ingest for the actual 121 structure of raw data files produced by a particular instrument, but this 122 is usually unnecessary because the instrument-specific header-extraction 123 provided by the ``astro_metadata_translator`` is usually enough. 127 config : `RawIngestConfig` 128 Configuration for whether/how to transfer files and how to handle 129 conflicts and errors. 130 butler : `~lsst.daf.butler.Butler` 131 Butler instance. Ingested Datasets will be created as part of 132 ``butler.run`` and associated with its Collection. 134 Other keyword arguments are forwarded to the Task base class constructor. 137 ConfigClass = RawIngestConfig
139 _DefaultName =
"ingest" 142 """Return the DatasetType of the Datasets ingested by this Task. 144 return DatasetType(
"raw", (
"instrument",
"detector",
"exposure"),
"Exposure",
145 universe=self.
butler.registry.dimensions)
147 def __init__(self, config=None, *, butler, **kwds):
151 self.
dimensions = butler.registry.dimensions.extract([
"instrument",
"detector",
"physical_filter",
152 "visit",
"exposure"])
165 def _addVisitRegions(self):
166 """Adds a region associated with a Visit to registry. 168 Visits will be created using regions for individual ccds that are 169 defined in the visitRegions dict field on self, joined against an 170 existing region if one exists. The dict field is formatted using 171 instrument and visit as a tuple for a key, with values that are a 172 list of regions for detectors associated the region. 176 existingRegion = self.
butler.registry.expandDataId({
"instrument": instrument,
"visit": visit},
178 if existingRegion
is not None:
179 vertices =
list(existingRegion.getVertices()) + vertices
181 self.
butler.registry.setDimensionRegion(instrument=instrument, visit=visit, region=region)
184 """Ingest files into a Butler data repository. 186 This creates any new exposure or visit Dimension entries needed to 187 identify the ingested files, creates new Dataset entries in the 188 Registry and finally ingests the files themselves into the Datastore. 189 Any needed instrument, detector, and physical_filter Dimension entries 190 must exist in the Registry before `run` is called. 194 files : iterable over `str` or path-like objects 195 Paths to the files to be ingested. Will be made absolute 196 if they are not already. 199 if self.
config.onError ==
"rollback":
200 with self.
butler.transaction():
203 if self.
config.doAddRegions:
205 elif self.
config.onError ==
"break":
208 if self.
config.doAddRegions:
210 elif self.
config.onError ==
"continue":
214 except Exception
as err:
215 self.
log.
warnf(
"Error processing '{}': {}", file, err)
216 if self.
config.doAddRegions:
220 """Read and return any relevant headers from the given file. 222 The default implementation simply reads the header of the first 223 non-empty HDU, so it always returns a single-element list. 227 file : `str` or path-like object 228 Absolute path to the file to be ingested. 232 headers : `list` of `~lsst.daf.base.PropertyList` 233 Single-element list containing the header of the first 237 headers = [merge_headers([phdu,
readMetadata(file)], mode=
"overwrite")]
243 """Builds a region from information contained in a header 247 headers : `lsst.daf.base.PropertyList` 248 Property list containing the information from the header of 253 region : `lsst.sphgeom.ConvexPolygon` 258 If required header keys can not be found to construct region 264 if self.
config.padRegionAmount > 0:
265 bbox.grow(self.
config.padRegionAmount)
266 corners = bbox.getCorners()
267 sphCorners = [wcs.pixelToSky(point).getVector()
for point
in corners]
271 """Extract metadata from a raw file and add exposure and visit 274 Any needed instrument, detector, and physical_filter Dimension entries must 275 exist in the Registry before `run` is called. 279 file : `str` or path-like object 280 Absolute path to the file to be ingested. 284 headers : `list` of `~lsst.daf.base.PropertyList` 285 Result of calling `readHeaders`. 287 Data ID dictionary, as returned by `extractDataId`. 290 obsInfo = ObservationInfo(headers[0])
293 fullDataId = self.
extractDataId(file, headers, obsInfo=obsInfo)
296 if fullDataId.get(dimension.name)
is None:
298 dimensionDataId = DataId(fullDataId, dimension=dimension)
301 dimensionEntryDict = self.
butler.registry.findDimensionEntry(dimension, dimensionDataId)
302 if dimensionEntryDict
is None:
303 if dimension.name
in (
"visit",
"exposure"):
305 self.
butler.registry.addDimensionEntry(dimension, dimensionDataId)
308 f
"Entry for {dimension.name} with ID {dimensionDataId} not found; must be " 309 f
"present in Registry prior to ingest." 314 if self.
config.doAddRegions
and obsInfo.tracking_radec
is not None:
317 self.
butler.registry.setDimensionRegion(DataId(fullDataId,
318 dimensions=[
'visit',
'detector',
'instrument'],
321 self.
visitRegions.setdefault((fullDataId[
'instrument'], fullDataId[
'visit']),
322 []).extend(region.getVertices())
323 except IntegrityError:
328 return headers, fullDataId
331 """Ingest a single raw file into the repository. 333 All necessary Dimension entres must already be present. 337 file : `str` or path-like object 338 Absolute path to the file to be ingested. 339 headers : `list` of `~lsst.daf.base.PropertyList` 340 Result of calling `readHeaders`. 342 Data ID dictionary, as returned by `extractDataId`. 343 run : `~lsst.daf.butler.Run`, optional 344 Run to add the Dataset to; defaults to ``self.butler.run``. 349 Reference to the ingested dataset. 353 ConflictingDefinitionError 354 Raised if the dataset already exists in the registry. 356 if run
is not None and run != self.
butler.run:
357 butler = Butler(butler=self.
butler, run=run)
361 return butler.ingest(file, self.
datasetType, dataId, transfer=self.
config.transfer,
363 except ConflictingDefinitionError
as err:
367 """Ingest a single raw data file after extacting metadata. 369 This creates any new exposure or visit Dimension entries needed to 370 identify the ingest file, creates a new Dataset entry in the 371 Registry and finally ingests the file itself into the Datastore. 372 Any needed instrument, detector, and physical_filter Dimension entries must 373 exist in the Registry before `run` is called. 377 file : `str` or path-like object 378 Absolute path to the file to be ingested. 382 except Exception
as err:
383 raise RuntimeError(f
"Unexpected error adding dimensions for {file}.")
from err
386 with self.
butler.transaction():
390 except IngestConflictError:
391 if self.
config.conflict ==
"fail":
393 if self.
config.conflict ==
"ignore":
397 self.
log.
infof(
"Conflict on {} ({}); ingesting to stash '{}' instead.",
398 dataId, file, self.
config.stash)
399 with self.
butler.transaction():
402 self.
log.
infof(
"Conflict on {} ({}); ignoring.", dataId, file)
405 """Return the Data ID dictionary that should be used to label a file. 409 file : `str` or path-like object 410 Absolute path to the file being ingested (prior to any transfers). 411 headers : `list` of `~lsst.daf.base.PropertyList` 412 All headers returned by `readHeaders()`. 413 obsInfo : `astro_metadata_translator.ObservationInfo` 414 Observational metadata extracted from the headers. 419 A mapping whose key-value pairs uniquely identify raw datasets. 420 Must have ``dataId.dimensions() <= self.dimensions``, with at least 421 instrument, exposure, and detector present. 424 if obsInfo.visit_id
is None:
425 toRemove.add(
"visit")
426 if obsInfo.physical_filter
is None:
427 toRemove.add(
"physical_filter")
429 dimensions = self.
dimensions.toSet().difference(toRemove)
433 dimensions=dimensions,
434 instrument=obsInfo.instrument,
435 exposure=obsInfo.exposure_id,
436 visit=obsInfo.visit_id,
437 detector=obsInfo.detector_num,
438 physical_filter=obsInfo.physical_filter,
440 updateExposureEntryFromObsInfo(dataId, obsInfo)
441 if obsInfo.visit_id
is not None:
442 updateVisitEntryFromObsInfo(dataId, obsInfo)
446 """Return the Formatter that should be used to read this file after 449 The default implementation obtains the formatter from the Instrument 450 class for the given data ID. 453 if instrument
is None:
454 instrument = Instrument.factories[dataId[
"instrument"]]()
456 return instrument.getRawFormatter(dataId)
def ensureDimensions(self, file)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
A 2-dimensional celestial WCS that transform pixels to ICRS RA/Dec, using the LSST standard for pixel...
def infof(fmt, args, kwargs)
A floating-point coordinate rectangle geometry.
std::vector< SchemaItem< Flag > > * items
def extractDataId(self, file, headers, obsInfo)
def readHeaders(self, file)
daf::base::PropertySet * set
def __init__(self, config=None, butler, kwds)
def buildRegion(self, headers)
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
ConvexPolygon is a closed convex polygon on the unit sphere.
def ingestFile(self, file, headers, dataId, run=None)
def warnf(fmt, args, kwargs)
def _addVisitRegions(self)
def getFormatter(self, file, headers, dataId)
def processFile(self, file)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
daf::base::PropertyList * list
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)