23 __all__ = (
"RawIngestTask",
"RawIngestConfig",
"makeTransferChoiceField")
26 from dataclasses
import dataclass, InitVar
27 from typing
import List, Iterator, Iterable, Type, Optional, Any
28 from collections
import defaultdict
29 from multiprocessing
import Pool
31 from astro_metadata_translator
import ObservationInfo, fix_header, merge_headers
33 from lsst.daf.butler
import (
42 from lsst.pex.config
import Config, ChoiceField
45 from .instrument
import Instrument, makeExposureRecordFromObsInfo
46 from .fitsRawFormatterBase
import FitsRawFormatterBase
51 """Structure that holds information about a single dataset within a
55 dataId: DataCoordinate
56 """Data ID for this file (`lsst.daf.butler.DataCoordinate`).
58 This may be a minimal `~lsst.daf.butler.DataCoordinate` base instance, or
59 a complete `~lsst.daf.butler.ExpandedDataCoordinate`.
62 obsInfo: ObservationInfo
63 """Standardized observation metadata extracted directly from the file
64 headers (`astro_metadata_translator.ObservationInfo`).
70 """Structure that holds information about a single raw file, used during
74 datasets: List[RawFileDatasetInfo]
75 """The information describing each dataset within this raw file.
76 (`list` of `RawFileDatasetInfo`)
80 """Name of the file this information was extracted from (`str`).
82 This is the path prior to ingest, not the path after ingest.
85 FormatterClass: Type[FitsRawFormatterBase]
86 """Formatter class that should be used to ingest this file (`type`; as
87 subclass of `FitsRawFormatterBase`).
93 """Structure that holds information about a complete raw exposure, used
97 dataId: DataCoordinate
98 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`).
100 This may be a minimal `~lsst.daf.butler.DataCoordinate` base instance, or
101 a complete `~lsst.daf.butler.ExpandedDataCoordinate`.
104 files: List[RawFileData]
105 """List of structures containing file-level information.
108 universe: InitVar[DimensionUniverse]
109 """Set of all known dimensions.
112 record: Optional[DimensionRecord] =
None
113 """The exposure `DimensionRecord` that must be inserted into the
114 `~lsst.daf.butler.Registry` prior to file-level ingest (`DimensionRecord`).
124 """Create a Config field with options for how to transfer files between
127 The allowed options for the field are exactly those supported by
128 `lsst.daf.butler.Datastore.ingest`.
133 Documentation for the configuration field.
137 field : `lsst.pex.config.ChoiceField`
143 allowed={
"move":
"move",
145 "auto":
"choice will depend on datastore",
146 "link":
"hard link falling back to symbolic link",
147 "hardlink":
"hard link",
148 "symlink":
"symbolic (soft) link",
149 "relsymlink":
"relative symbolic link",
161 """Driver Task for ingesting raw data into Gen3 Butler repositories.
165 config : `RawIngestConfig`
166 Configuration for the task.
167 butler : `~lsst.daf.butler.Butler`
168 Writeable butler instance, with ``butler.run`` set to the appropriate
169 `~lsst.daf.butler.CollectionType.RUN` collection for these raw
172 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
177 Each instance of `RawIngestTask` writes to the same Butler. Each
178 invocation of `RawIngestTask.run` ingests a list of files.
181 ConfigClass = RawIngestConfig
183 _DefaultName =
"ingest"
186 """Return the DatasetType of the datasets ingested by this Task.
188 return DatasetType(
"raw", (
"instrument",
"detector",
"exposure"),
"Exposure",
189 universe=self.
butler.registry.dimensions)
191 def __init__(self, config: Optional[RawIngestConfig] =
None, *, butler: Butler, **kwargs: Any):
199 """Extract and process metadata from a single raw file.
209 A structure containing the metadata extracted from the file,
210 as well as the original filename. All fields will be populated,
211 but the `RawFileData.dataId` attribute will be a minimal
212 (unexpanded) `DataCoordinate` instance.
216 Assumes that there is a single dataset associated with the given
217 file. Instruments using a single file to store multiple datasets
218 must implement their own version of this method.
223 header = merge_headers([phdu,
readMetadata(filename)], mode=
"overwrite")
230 instrument = Instrument.fromName(datasets[0].dataId[
"instrument"], self.
butler.registry)
231 FormatterClass = instrument.getRawFormatter(datasets[0].dataId)
233 return RawFileData(datasets=datasets, filename=filename,
234 FormatterClass=FormatterClass)
236 def _calculate_dataset_info(self, header, filename):
237 """Calculate a RawFileDatasetInfo from the supplied information.
242 Header from the dataset.
244 Filename to use for error messages.
248 dataset : `RawFileDatasetInfo`
249 The dataId, and observation information associated with this
252 obsInfo = ObservationInfo(header)
253 dataId = DataCoordinate.standardize(instrument=obsInfo.instrument,
254 exposure=obsInfo.exposure_id,
255 detector=obsInfo.detector_num,
260 """Group an iterable of `RawFileData` by exposure.
264 files : iterable of `RawFileData`
265 File-level information to group.
269 exposures : `list` of `RawExposureData`
270 A list of structures that group the file-level information by
271 exposure. All fields will be populated. The
272 `RawExposureData.dataId` attributes will be minimal (unexpanded)
273 `DataCoordinate` instances.
275 exposureDimensions = self.
universe[
"exposure"].graph
276 byExposure = defaultdict(list)
279 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].
append(f)
282 for dataId, exposureFiles
in byExposure.items()]
285 """Expand the data IDs associated with a raw exposure to include
286 additional metadata records.
290 exposure : `RawExposureData`
291 A structure containing information about the exposure to be
292 ingested. Must have `RawExposureData.records` populated. Should
293 be considered consumed upon return.
297 exposure : `RawExposureData`
298 An updated version of the input structure, with
299 `RawExposureData.dataId` and nested `RawFileData.dataId` attributes
300 containing `~lsst.daf.butler.ExpandedDataCoordinate` instances.
305 data.dataId = self.
butler.registry.expandDataId(
312 "exposure": data.record,
318 for file
in data.files:
319 for dataset
in file.datasets:
320 dataset.dataId = self.
butler.registry.expandDataId(
322 records=dict(data.dataId.records)
326 def prep(self, files, *, pool: Optional[Pool] =
None, processes: int = 1) -> Iterator[RawExposureData]:
327 """Perform all ingest preprocessing steps that do not involve actually
328 modifying the database.
332 files : iterable over `str` or path-like objects
333 Paths to the files to be ingested. Will be made absolute
334 if they are not already.
335 pool : `multiprocessing.Pool`, optional
336 If not `None`, a process pool with which to parallelize some
338 processes : `int`, optional
339 The number of processes to use. Ignored if ``pool`` is not `None`.
343 exposure : `RawExposureData`
344 Data structures containing dimension records, filenames, and data
345 IDs to be ingested (one structure for each exposure).
347 if pool
is None and processes > 1:
348 pool = Pool(processes)
349 mapFunc = map
if pool
is None else pool.imap_unordered
377 ) -> List[DatasetRef]:
378 """Ingest all raw files in one exposure.
382 exposure : `RawExposureData`
383 A structure containing information about the exposure to be
384 ingested. Must have `RawExposureData.records` populated and all
385 data ID attributes expanded.
386 run : `str`, optional
387 Name of a RUN-type collection to write to, overriding
392 refs : `list` of `lsst.daf.butler.DatasetRef`
393 Dataset references for ingested raws.
395 datasets = [FileDataset(path=os.path.abspath(file.filename),
396 refs=[DatasetRef(self.
datasetType, d.dataId)
for d
in file.datasets],
397 formatter=file.FormatterClass)
398 for file
in exposure.files]
399 self.
butler.ingest(*datasets, transfer=self.
config.transfer, run=run)
400 return [ref
for dataset
in datasets
for ref
in dataset.refs]
402 def run(self, files, *, pool: Optional[Pool] =
None, processes: int = 1, run: Optional[str] =
None):
403 """Ingest files into a Butler data repository.
405 This creates any new exposure or visit Dimension entries needed to
406 identify the ingested files, creates new Dataset entries in the
407 Registry and finally ingests the files themselves into the Datastore.
408 Any needed instrument, detector, and physical_filter Dimension entries
409 must exist in the Registry before `run` is called.
413 files : iterable over `str` or path-like objects
414 Paths to the files to be ingested. Will be made absolute
415 if they are not already.
416 pool : `multiprocessing.Pool`, optional
417 If not `None`, a process pool with which to parallelize some
419 processes : `int`, optional
420 The number of processes to use. Ignored if ``pool`` is not `None`.
421 run : `str`, optional
422 Name of a RUN-type collection to write to, overriding
427 refs : `list` of `lsst.daf.butler.DatasetRef`
428 Dataset references for ingested raws.
432 This method inserts all datasets for an exposure within a transaction,
433 guaranteeing that partial exposures are never ingested. The exposure
434 dimension record is inserted with `Registry.syncDimensionData` first
435 (in its own transaction), which inserts only if a record with the same
436 primary key does not already exist. This allows different files within
437 the same exposure to be incremented in different runs.
439 exposureData = self.
prep(files, pool=pool, processes=processes)
450 for exposure
in exposureData:
451 self.
butler.registry.syncDimensionData(
"exposure", exposure.record)
452 with self.
butler.transaction():