23 __all__ = (
"RawIngestTask",
"RawIngestConfig",
"makeTransferChoiceField")
26 from dataclasses
import dataclass, InitVar
27 from typing
import List, Iterator, Iterable, Type, Optional, Any
28 from collections
import defaultdict
29 from multiprocessing
import Pool
31 from astro_metadata_translator
import ObservationInfo, merge_headers
33 from lsst.daf.butler
import (
47 from ._instrument
import Instrument, makeExposureRecordFromObsInfo
48 from ._fitsRawFormatterBase
import FitsRawFormatterBase
53 """Structure that holds information about a single dataset within a
57 dataId: DataCoordinate
58 """Data ID for this file (`lsst.daf.butler.DataCoordinate`).
61 obsInfo: ObservationInfo
62 """Standardized observation metadata extracted directly from the file
63 headers (`astro_metadata_translator.ObservationInfo`).
69 """Structure that holds information about a single raw file, used during
73 datasets: List[RawFileDatasetInfo]
74 """The information describing each dataset within this raw file.
75 (`list` of `RawFileDatasetInfo`)
79 """Name of the file this information was extracted from (`str`).
81 This is the path prior to ingest, not the path after ingest.
84 FormatterClass: Type[FitsRawFormatterBase]
85 """Formatter class that should be used to ingest this file (`type`; as
86 subclass of `FitsRawFormatterBase`).
89 instrumentClass: Optional[Type[Instrument]]
90 """The `Instrument` class associated with this file. Can be `None`
91 if ``datasets`` is an empty list."""
96 """Structure that holds information about a complete raw exposure, used
100 dataId: DataCoordinate
101 """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`).
104 files: List[RawFileData]
105 """List of structures containing file-level information.
108 universe: InitVar[DimensionUniverse]
109 """Set of all known dimensions.
112 record: Optional[DimensionRecord] =
None
113 """The exposure `DimensionRecord` that must be inserted into the
114 `~lsst.daf.butler.Registry` prior to file-level ingest (`DimensionRecord`).
124 """Create a Config field with options for how to transfer files between
127 The allowed options for the field are exactly those supported by
128 `lsst.daf.butler.Datastore.ingest`.
133 Documentation for the configuration field.
137 field : `lsst.pex.config.ChoiceField`
143 allowed={
"move":
"move",
145 "auto":
"choice will depend on datastore",
146 "direct":
"use URI to ingested file directly in datastore",
147 "link":
"hard link falling back to symbolic link",
148 "hardlink":
"hard link",
149 "symlink":
"symbolic (soft) link",
150 "relsymlink":
"relative symbolic link",
162 """Driver Task for ingesting raw data into Gen3 Butler repositories.
166 config : `RawIngestConfig`
167 Configuration for the task.
168 butler : `~lsst.daf.butler.Butler`
169 Writeable butler instance, with ``butler.run`` set to the appropriate
170 `~lsst.daf.butler.CollectionType.RUN` collection for these raw
173 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
178 Each instance of `RawIngestTask` writes to the same Butler. Each
179 invocation of `RawIngestTask.run` ingests a list of files.
182 ConfigClass = RawIngestConfig
184 _DefaultName =
"ingest"
187 """Return the DatasetType of the datasets ingested by this Task.
189 return DatasetType(
"raw", (
"instrument",
"detector",
"exposure"),
"Exposure",
190 universe=self.
butler.registry.dimensions)
192 def __init__(self, config: Optional[RawIngestConfig] =
None, *, butler: Butler, **kwargs: Any):
201 Instrument.importAll(self.
butler.registry)
203 def _reduce_kwargs(self):
205 return dict(**super()._reduce_kwargs(), butler=self.
butler)
208 """Extract and process metadata from a single raw file.
218 A structure containing the metadata extracted from the file,
219 as well as the original filename. All fields will be populated,
220 but the `RawFileData.dataId` attribute will be a minimal
221 (unexpanded) `DataCoordinate` instance.
225 Assumes that there is a single dataset associated with the given
226 file. Instruments using a single file to store multiple datasets
227 must implement their own version of this method.
238 header = merge_headers([phdu,
readMetadata(filename)], mode=
"overwrite")
240 except Exception
as e:
241 self.
log.
debug(
"Problem extracting metadata from %s: %s", filename, e)
244 FormatterClass = Formatter
247 self.
log.
debug(
"Extracted metadata from file %s", filename)
252 instrument = Instrument.fromName(datasets[0].dataId[
"instrument"], self.
butler.registry)
254 self.
log.
warning(
"Instrument %s for file %s not known to registry",
255 datasets[0].dataId[
"instrument"], filename)
257 FormatterClass = Formatter
260 FormatterClass = instrument.getRawFormatter(datasets[0].dataId)
262 return RawFileData(datasets=datasets, filename=filename,
263 FormatterClass=FormatterClass,
264 instrumentClass=instrument)
266 def _calculate_dataset_info(self, header, filename):
267 """Calculate a RawFileDatasetInfo from the supplied information.
272 Header from the dataset.
274 Filename to use for error messages.
278 dataset : `RawFileDatasetInfo`
279 The dataId, and observation information associated with this
287 "altaz_begin":
False,
288 "boresight_rotation_coord":
False,
289 "boresight_rotation_angle":
False,
291 "datetime_begin":
True,
292 "datetime_end":
True,
293 "detector_num":
True,
294 "exposure_group":
False,
296 "exposure_time":
True,
298 "tracking_radec":
False,
300 "observation_counter":
False,
301 "observation_id":
True,
302 "observation_reason":
False,
303 "observation_type":
True,
304 "observing_day":
False,
305 "physical_filter":
True,
306 "science_program":
False,
310 obsInfo = ObservationInfo(header, pedantic=
False, filename=filename,
311 required={k
for k
in ingest_subset
if ingest_subset[k]},
312 subset=
set(ingest_subset))
314 dataId = DataCoordinate.standardize(instrument=obsInfo.instrument,
315 exposure=obsInfo.exposure_id,
316 detector=obsInfo.detector_num,
321 """Group an iterable of `RawFileData` by exposure.
325 files : iterable of `RawFileData`
326 File-level information to group.
330 exposures : `list` of `RawExposureData`
331 A list of structures that group the file-level information by
332 exposure. All fields will be populated. The
333 `RawExposureData.dataId` attributes will be minimal (unexpanded)
334 `DataCoordinate` instances.
336 exposureDimensions = self.
universe[
"exposure"].graph
337 byExposure = defaultdict(list)
340 byExposure[f.datasets[0].dataId.subset(exposureDimensions)].
append(f)
343 for dataId, exposureFiles
in byExposure.items()]
346 """Expand the data IDs associated with a raw exposure to include
347 additional metadata records.
351 exposure : `RawExposureData`
352 A structure containing information about the exposure to be
353 ingested. Must have `RawExposureData.records` populated. Should
354 be considered consumed upon return.
358 exposure : `RawExposureData`
359 An updated version of the input structure, with
360 `RawExposureData.dataId` and nested `RawFileData.dataId` attributes
361 updated to data IDs for which `DataCoordinate.hasRecords` returns
367 data.dataId = self.
butler.registry.expandDataId(
374 self.
butler.registry.dimensions[
"exposure"]: data.record,
380 for file
in data.files:
381 for dataset
in file.datasets:
382 dataset.dataId = self.
butler.registry.expandDataId(
384 records=dict(data.dataId.records)
388 def prep(self, files, *, pool: Optional[Pool] =
None, processes: int = 1) -> Iterator[RawExposureData]:
389 """Perform all ingest preprocessing steps that do not involve actually
390 modifying the database.
394 files : iterable over `str` or path-like objects
395 Paths to the files to be ingested. Will be made absolute
396 if they are not already.
397 pool : `multiprocessing.Pool`, optional
398 If not `None`, a process pool with which to parallelize some
400 processes : `int`, optional
401 The number of processes to use. Ignored if ``pool`` is not `None`.
405 exposure : `RawExposureData`
406 Data structures containing dimension records, filenames, and data
407 IDs to be ingested (one structure for each exposure).
408 bad_files : `list` of `str`
409 List of all the files that could not have metadata extracted.
411 if pool
is None and processes > 1:
412 pool = Pool(processes)
413 mapFunc = map
if pool
is None else pool.imap_unordered
424 for fileDatum
in fileData:
425 if not fileDatum.datasets:
426 bad_files.append(fileDatum.filename)
428 good_files.append(fileDatum)
429 fileData = good_files
431 self.
log.
info(
"Successfully extracted metadata from %d file%s with %d failure%s",
432 len(fileData),
"" if len(fileData) == 1
else "s",
433 len(bad_files),
"" if len(bad_files) == 1
else "s")
458 ) -> List[DatasetRef]:
459 """Ingest all raw files in one exposure.
463 exposure : `RawExposureData`
464 A structure containing information about the exposure to be
465 ingested. Must have `RawExposureData.records` populated and all
466 data ID attributes expanded.
467 run : `str`, optional
468 Name of a RUN-type collection to write to, overriding
473 refs : `list` of `lsst.daf.butler.DatasetRef`
474 Dataset references for ingested raws.
476 datasets = [FileDataset(path=os.path.abspath(file.filename),
477 refs=[DatasetRef(self.
datasetType, d.dataId)
for d
in file.datasets],
478 formatter=file.FormatterClass)
479 for file
in exposure.files]
480 self.
butler.ingest(*datasets, transfer=self.
config.transfer, run=run)
481 return [ref
for dataset
in datasets
for ref
in dataset.refs]
483 def run(self, files, *, pool: Optional[Pool] =
None, processes: int = 1, run: Optional[str] =
None):
484 """Ingest files into a Butler data repository.
486 This creates any new exposure or visit Dimension entries needed to
487 identify the ingested files, creates new Dataset entries in the
488 Registry and finally ingests the files themselves into the Datastore.
489 Any needed instrument, detector, and physical_filter Dimension entries
490 must exist in the Registry before `run` is called.
494 files : iterable over `str` or path-like objects
495 Paths to the files to be ingested. Will be made absolute
496 if they are not already.
497 pool : `multiprocessing.Pool`, optional
498 If not `None`, a process pool with which to parallelize some
500 processes : `int`, optional
501 The number of processes to use. Ignored if ``pool`` is not `None`.
502 run : `str`, optional
503 Name of a RUN-type collection to write to, overriding
504 the default derived from the instrument name.
508 refs : `list` of `lsst.daf.butler.DatasetRef`
509 Dataset references for ingested raws.
513 This method inserts all datasets for an exposure within a transaction,
514 guaranteeing that partial exposures are never ingested. The exposure
515 dimension record is inserted with `Registry.syncDimensionData` first
516 (in its own transaction), which inserts only if a record with the same
517 primary key does not already exist. This allows different files within
518 the same exposure to be incremented in different runs.
520 exposureData, bad_files = self.
prep(files, pool=pool, processes=processes)
533 n_exposures_failed = 0
535 for exposure
in exposureData:
537 self.
log.
debug(
"Attempting to ingest %d file%s from exposure %s:%s",
538 len(exposure.files),
"" if len(exposure.files) == 1
else "s",
539 exposure.record.instrument, exposure.record.obs_id)
542 self.
butler.registry.syncDimensionData(
"exposure", exposure.record)
543 except Exception
as e:
544 n_exposures_failed += 1
545 self.
log.
warning(
"Exposure %s:%s could not be registered: %s",
546 exposure.record.instrument, exposure.record.obs_id, e)
551 instrumentClass = exposure.files[0].instrumentClass
552 this_run = instrumentClass.makeDefaultRawIngestRunName()
555 if this_run
not in runs:
556 self.
butler.registry.registerCollection(this_run, type=CollectionType.RUN)
559 with self.
butler.transaction():
561 except Exception
as e:
562 n_ingests_failed += 1
563 self.
log.
warning(
"Failed to ingest the following for reason: %s", e)
564 for f
in exposure.files:
570 self.
log.
info(
"Exposure %s:%s ingested successfully",
571 exposure.record.instrument, exposure.record.obs_id)
577 self.
log.
warning(
"Could not extract observation metadata from the following:")
581 self.
log.
info(
"Successfully processed data from %d exposure%s with %d failure%s from exposure"
582 " registration and %d failure%s from file ingest.",
583 n_exposures,
"" if n_exposures == 1
else "s",
584 n_exposures_failed,
"" if n_exposures_failed == 1
else "s",
585 n_ingests_failed,
"" if n_ingests_failed == 1
else "s")
586 if n_exposures_failed > 0
or n_ingests_failed > 0:
588 self.
log.
info(
"Ingested %d distinct Butler dataset%s",
589 len(refs),
"" if len(refs) == 1
else "s")
592 raise RuntimeError(
"Some failures encountered during ingestion")