LSSTApplications  19.0.0-10-g920eed2,19.0.0-11-g48a0200+2,19.0.0-18-gfc4e62b+10,19.0.0-2-g3b2f90d+2,19.0.0-2-gd671419+5,19.0.0-20-g5a5a17ab+8,19.0.0-21-g2644856+10,19.0.0-22-gc5dc5b1+5,19.0.0-23-gdc29a50+2,19.0.0-24-g923e380+10,19.0.0-25-g6c8df7140,19.0.0-27-g567f04d+6,19.0.0-3-g2b32d65+5,19.0.0-3-g8227491+9,19.0.0-3-g9c54d0d+9,19.0.0-3-gca68e65+5,19.0.0-3-gcfc5f51+5,19.0.0-3-ge110943+8,19.0.0-3-ge74d124,19.0.0-3-gfe04aa6+10,19.0.0-4-g06f5963+5,19.0.0-4-g3d16501+10,19.0.0-4-g4a9c019+5,19.0.0-4-g5a8b323,19.0.0-4-g66397f0+1,19.0.0-4-g8278b9b+1,19.0.0-4-g8557e14,19.0.0-4-g8964aba+10,19.0.0-4-ge404a01+9,19.0.0-5-g40f3a5a,19.0.0-5-g4db63b3,19.0.0-5-gfb03ce7+10,19.0.0-6-gbaebbfb+9,19.0.0-60-gafafd468+10,19.0.0-64-gf672fef+7,19.0.0-7-g039c0b5+8,19.0.0-7-gbea9075+4,19.0.0-7-gc567de5+10,19.0.0-8-g3a3ce09+5,19.0.0-9-g463f923+9,w.2020.21
LSSTDataManagementBasePackage
ingest.py
Go to the documentation of this file.
1 # This file is part of obs_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 
23 __all__ = ("RawIngestTask", "RawIngestConfig", "makeTransferChoiceField")
24 
25 import os.path
26 from dataclasses import dataclass, InitVar
27 from typing import List, Iterator, Iterable, Type, Optional, Any
28 from collections import defaultdict
29 from multiprocessing import Pool
30 
31 from astro_metadata_translator import ObservationInfo, fix_header, merge_headers
32 from lsst.afw.fits import readMetadata
33 from lsst.daf.butler import (
34  Butler,
35  DataCoordinate,
36  DatasetRef,
37  DatasetType,
38  DimensionRecord,
39  DimensionUniverse,
40  FileDataset,
41 )
42 from lsst.pex.config import Config, ChoiceField
43 from lsst.pipe.base import Task
44 
45 from .instrument import Instrument, makeExposureRecordFromObsInfo
46 from .fitsRawFormatterBase import FitsRawFormatterBase
47 
48 
49 @dataclass
51  """Structure that holds information about a single dataset within a
52  raw file.
53  """
54 
55  dataId: DataCoordinate
56  """Data ID for this file (`lsst.daf.butler.DataCoordinate`).
57 
58  This may be a minimal `~lsst.daf.butler.DataCoordinate` base instance, or
59  a complete `~lsst.daf.butler.ExpandedDataCoordinate`.
60  """
61 
62  obsInfo: ObservationInfo
63  """Standardized observation metadata extracted directly from the file
64  headers (`astro_metadata_translator.ObservationInfo`).
65  """
66 
67 
68 @dataclass
70  """Structure that holds information about a single raw file, used during
71  ingest.
72  """
73 
74  datasets: List[RawFileDatasetInfo]
75  """The information describing each dataset within this raw file.
76  (`list` of `RawFileDatasetInfo`)
77  """
78 
79  filename: str
80  """Name of the file this information was extracted from (`str`).
81 
82  This is the path prior to ingest, not the path after ingest.
83  """
84 
85  FormatterClass: Type[FitsRawFormatterBase]
86  """Formatter class that should be used to ingest this file (`type`; as
87  subclass of `FitsRawFormatterBase`).
88  """
89 
90 
91 @dataclass
93  """Structure that holds information about a complete raw exposure, used
94  during ingest.
95  """
96 
97  dataId: DataCoordinate
98  """Data ID for this exposure (`lsst.daf.butler.DataCoordinate`).
99 
100  This may be a minimal `~lsst.daf.butler.DataCoordinate` base instance, or
101  a complete `~lsst.daf.butler.ExpandedDataCoordinate`.
102  """
103 
104  files: List[RawFileData]
105  """List of structures containing file-level information.
106  """
107 
108  universe: InitVar[DimensionUniverse]
109  """Set of all known dimensions.
110  """
111 
112  record: Optional[DimensionRecord] = None
113  """The exposure `DimensionRecord` that must be inserted into the
114  `~lsst.daf.butler.Registry` prior to file-level ingest (`DimensionRecord`).
115  """
116 
117  def __post_init__(self, universe: DimensionUniverse):
118  # We don't care which file or dataset we read metadata from, because
119  # we're assuming they'll all be the same; just use the first ones.
120  self.record = makeExposureRecordFromObsInfo(self.files[0].datasets[0].obsInfo, universe)
121 
122 
123 def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None):
124  """Create a Config field with options for how to transfer files between
125  data repositories.
126 
127  The allowed options for the field are exactly those supported by
128  `lsst.daf.butler.Datastore.ingest`.
129 
130  Parameters
131  ----------
132  doc : `str`
133  Documentation for the configuration field.
134 
135  Returns
136  -------
137  field : `lsst.pex.config.ChoiceField`
138  Configuration field.
139  """
140  return ChoiceField(
141  doc=doc,
142  dtype=str,
143  allowed={"move": "move",
144  "copy": "copy",
145  "auto": "choice will depend on datastore",
146  "link": "hard link falling back to symbolic link",
147  "hardlink": "hard link",
148  "symlink": "symbolic (soft) link",
149  "relsymlink": "relative symbolic link",
150  },
151  optional=True,
152  default=default
153  )
154 
155 
156 class RawIngestConfig(Config):
158 
159 
161  """Driver Task for ingesting raw data into Gen3 Butler repositories.
162 
163  Parameters
164  ----------
165  config : `RawIngestConfig`
166  Configuration for the task.
167  butler : `~lsst.daf.butler.Butler`
168  Writeable butler instance, with ``butler.run`` set to the appropriate
169  `~lsst.daf.butler.CollectionType.RUN` collection for these raw
170  datasets.
171  **kwargs
172  Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
173  constructor.
174 
175  Notes
176  -----
177  Each instance of `RawIngestTask` writes to the same Butler. Each
178  invocation of `RawIngestTask.run` ingests a list of files.
179  """
180 
181  ConfigClass = RawIngestConfig
182 
183  _DefaultName = "ingest"
184 
185  def getDatasetType(self):
186  """Return the DatasetType of the datasets ingested by this Task.
187  """
188  return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
189  universe=self.butler.registry.dimensions)
190 
191  def __init__(self, config: Optional[RawIngestConfig] = None, *, butler: Butler, **kwargs: Any):
192  config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
193  super().__init__(config, **kwargs)
194  self.butler = butler
195  self.universe = self.butler.registry.dimensions
197 
198  def extractMetadata(self, filename: str) -> RawFileData:
199  """Extract and process metadata from a single raw file.
200 
201  Parameters
202  ----------
203  filename : `str`
204  Path to the file.
205 
206  Returns
207  -------
208  data : `RawFileData`
209  A structure containing the metadata extracted from the file,
210  as well as the original filename. All fields will be populated,
211  but the `RawFileData.dataId` attribute will be a minimal
212  (unexpanded) `DataCoordinate` instance.
213 
214  Notes
215  -----
216  Assumes that there is a single dataset associated with the given
217  file. Instruments using a single file to store multiple datasets
218  must implement their own version of this method.
219  """
220  # Manually merge the primary and "first data" headers here because we
221  # do not know in general if an input file has set INHERIT=T.
222  phdu = readMetadata(filename, 0)
223  header = merge_headers([phdu, readMetadata(filename)], mode="overwrite")
224  fix_header(header)
225  datasets = [self._calculate_dataset_info(header, filename)]
226 
227  # The data model currently assumes that whilst multiple datasets
228  # can be associated with a single file, they must all share the
229  # same formatter.
230  instrument = Instrument.fromName(datasets[0].dataId["instrument"], self.butler.registry)
231  FormatterClass = instrument.getRawFormatter(datasets[0].dataId)
232 
233  return RawFileData(datasets=datasets, filename=filename,
234  FormatterClass=FormatterClass)
235 
236  def _calculate_dataset_info(self, header, filename):
237  """Calculate a RawFileDatasetInfo from the supplied information.
238 
239  Parameters
240  ----------
241  header : `Mapping`
242  Header from the dataset.
243  filename : `str`
244  Filename to use for error messages.
245 
246  Returns
247  -------
248  dataset : `RawFileDatasetInfo`
249  The dataId, and observation information associated with this
250  dataset.
251  """
252  obsInfo = ObservationInfo(header)
253  dataId = DataCoordinate.standardize(instrument=obsInfo.instrument,
254  exposure=obsInfo.exposure_id,
255  detector=obsInfo.detector_num,
256  universe=self.universe)
257  return RawFileDatasetInfo(obsInfo=obsInfo, dataId=dataId)
258 
259  def groupByExposure(self, files: Iterable[RawFileData]) -> List[RawExposureData]:
260  """Group an iterable of `RawFileData` by exposure.
261 
262  Parameters
263  ----------
264  files : iterable of `RawFileData`
265  File-level information to group.
266 
267  Returns
268  -------
269  exposures : `list` of `RawExposureData`
270  A list of structures that group the file-level information by
271  exposure. All fields will be populated. The
272  `RawExposureData.dataId` attributes will be minimal (unexpanded)
273  `DataCoordinate` instances.
274  """
275  exposureDimensions = self.universe["exposure"].graph
276  byExposure = defaultdict(list)
277  for f in files:
278  # Assume that the first dataset is representative for the file
279  byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f)
280 
281  return [RawExposureData(dataId=dataId, files=exposureFiles, universe=self.universe)
282  for dataId, exposureFiles in byExposure.items()]
283 
284  def expandDataIds(self, data: RawExposureData) -> RawExposureData:
285  """Expand the data IDs associated with a raw exposure to include
286  additional metadata records.
287 
288  Parameters
289  ----------
290  exposure : `RawExposureData`
291  A structure containing information about the exposure to be
292  ingested. Must have `RawExposureData.records` populated. Should
293  be considered consumed upon return.
294 
295  Returns
296  -------
297  exposure : `RawExposureData`
298  An updated version of the input structure, with
299  `RawExposureData.dataId` and nested `RawFileData.dataId` attributes
300  containing `~lsst.daf.butler.ExpandedDataCoordinate` instances.
301  """
302  # We start by expanded the exposure-level data ID; we won't use that
303  # directly in file ingest, but this lets us do some database lookups
304  # once per exposure instead of once per file later.
305  data.dataId = self.butler.registry.expandDataId(
306  data.dataId,
307  # We pass in the records we'll be inserting shortly so they aren't
308  # looked up from the database. We do expect instrument and filter
309  # records to be retrieved from the database here (though the
310  # Registry may cache them so there isn't a lookup every time).
311  records={
312  "exposure": data.record,
313  }
314  )
315  # Now we expand the per-file (exposure+detector) data IDs. This time
316  # we pass in the records we just retrieved from the exposure data ID
317  # expansion.
318  for file in data.files:
319  for dataset in file.datasets:
320  dataset.dataId = self.butler.registry.expandDataId(
321  dataset.dataId,
322  records=dict(data.dataId.records)
323  )
324  return data
325 
326  def prep(self, files, *, pool: Optional[Pool] = None, processes: int = 1) -> Iterator[RawExposureData]:
327  """Perform all ingest preprocessing steps that do not involve actually
328  modifying the database.
329 
330  Parameters
331  ----------
332  files : iterable over `str` or path-like objects
333  Paths to the files to be ingested. Will be made absolute
334  if they are not already.
335  pool : `multiprocessing.Pool`, optional
336  If not `None`, a process pool with which to parallelize some
337  operations.
338  processes : `int`, optional
339  The number of processes to use. Ignored if ``pool`` is not `None`.
340 
341  Yields
342  ------
343  exposure : `RawExposureData`
344  Data structures containing dimension records, filenames, and data
345  IDs to be ingested (one structure for each exposure).
346  """
347  if pool is None and processes > 1:
348  pool = Pool(processes)
349  mapFunc = map if pool is None else pool.imap_unordered
350 
351  # Extract metadata and build per-detector regions.
352  fileData: Iterator[RawFileData] = mapFunc(self.extractMetadata, files)
353 
354  # Use that metadata to group files (and extracted metadata) by
355  # exposure. Never parallelized because it's intrinsically a gather
356  # step.
357  exposureData: List[RawExposureData] = self.groupByExposure(fileData)
358 
359  # The next operation operates on RawExposureData instances (one at
360  # a time) in-place and then returns the modified instance. We call it
361  # as a pass-through instead of relying on the arguments we pass in to
362  # have been modified because in the parallel case those arguments are
363  # going to be pickled and unpickled, and I'm not certain
364  # multiprocessing is careful enough with that for output arguments to
365  # work.
366 
367  # Expand the data IDs to include all dimension metadata; we need this
368  # because we may need to generate path templates that rely on that
369  # metadata.
370  # This is the first step that involves actual database calls (but just
371  # SELECTs), so if there's going to be a problem with connections vs.
372  # multiple processes, or lock contention (in SQLite) slowing things
373  # down, it'll happen here.
374  return mapFunc(self.expandDataIds, exposureData)
375 
376  def ingestExposureDatasets(self, exposure: RawExposureData, *, run: Optional[str] = None
377  ) -> List[DatasetRef]:
378  """Ingest all raw files in one exposure.
379 
380  Parameters
381  ----------
382  exposure : `RawExposureData`
383  A structure containing information about the exposure to be
384  ingested. Must have `RawExposureData.records` populated and all
385  data ID attributes expanded.
386  run : `str`, optional
387  Name of a RUN-type collection to write to, overriding
388  ``self.butler.run``.
389 
390  Returns
391  -------
392  refs : `list` of `lsst.daf.butler.DatasetRef`
393  Dataset references for ingested raws.
394  """
395  datasets = [FileDataset(path=os.path.abspath(file.filename),
396  refs=[DatasetRef(self.datasetType, d.dataId) for d in file.datasets],
397  formatter=file.FormatterClass)
398  for file in exposure.files]
399  self.butler.ingest(*datasets, transfer=self.config.transfer, run=run)
400  return [ref for dataset in datasets for ref in dataset.refs]
401 
402  def run(self, files, *, pool: Optional[Pool] = None, processes: int = 1, run: Optional[str] = None):
403  """Ingest files into a Butler data repository.
404 
405  This creates any new exposure or visit Dimension entries needed to
406  identify the ingested files, creates new Dataset entries in the
407  Registry and finally ingests the files themselves into the Datastore.
408  Any needed instrument, detector, and physical_filter Dimension entries
409  must exist in the Registry before `run` is called.
410 
411  Parameters
412  ----------
413  files : iterable over `str` or path-like objects
414  Paths to the files to be ingested. Will be made absolute
415  if they are not already.
416  pool : `multiprocessing.Pool`, optional
417  If not `None`, a process pool with which to parallelize some
418  operations.
419  processes : `int`, optional
420  The number of processes to use. Ignored if ``pool`` is not `None`.
421  run : `str`, optional
422  Name of a RUN-type collection to write to, overriding
423  ``self.butler.run``.
424 
425  Returns
426  -------
427  refs : `list` of `lsst.daf.butler.DatasetRef`
428  Dataset references for ingested raws.
429 
430  Notes
431  -----
432  This method inserts all datasets for an exposure within a transaction,
433  guaranteeing that partial exposures are never ingested. The exposure
434  dimension record is inserted with `Registry.syncDimensionData` first
435  (in its own transaction), which inserts only if a record with the same
436  primary key does not already exist. This allows different files within
437  the same exposure to be incremented in different runs.
438  """
439  exposureData = self.prep(files, pool=pool, processes=processes)
440  # Up to this point, we haven't modified the data repository at all.
441  # Now we finally do that, with one transaction per exposure. This is
442  # not parallelized at present because the performance of this step is
443  # limited by the database server. That may or may not change in the
444  # future once we increase our usage of bulk inserts and reduce our
445  # usage of savepoints; we've tried to get everything but the database
446  # operations done in advance to reduce the time spent inside
447  # transactions.
448  self.butler.registry.registerDatasetType(self.datasetType)
449  refs = []
450  for exposure in exposureData:
451  self.butler.registry.syncDimensionData("exposure", exposure.record)
452  with self.butler.transaction():
453  refs.extend(self.ingestExposureDatasets(exposure, run=run))
454  return refs
lsst.obs.base.ingest.RawExposureData.__post_init__
def __post_init__(self, DimensionUniverse universe)
Definition: ingest.py:117
lsst.obs.base.ingest.RawIngestTask.extractMetadata
RawFileData extractMetadata(self, str filename)
Definition: ingest.py:198
lsst.obs.base.ingest.RawFileDatasetInfo
Definition: ingest.py:50
lsst.obs.base.ingest.RawIngestTask.butler
butler
Definition: ingest.py:194
lsst.obs.base.ingest.RawIngestTask.groupByExposure
List[RawExposureData] groupByExposure(self, Iterable[RawFileData] files)
Definition: ingest.py:259
lsst.obs.base.ingest.RawExposureData
Definition: ingest.py:92
lsst.obs.base.instrument.makeExposureRecordFromObsInfo
def makeExposureRecordFromObsInfo(obsInfo, universe)
Definition: instrument.py:374
ast::append
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
lsst.obs.base.ingest.RawIngestTask.ingestExposureDatasets
List[DatasetRef] ingestExposureDatasets(self, RawExposureData exposure, *Optional[str] run=None)
Definition: ingest.py:376
lsst.obs.base.ingest.RawIngestConfig
Definition: ingest.py:156
lsst.pipe.base.task.Task.config
config
Definition: task.py:149
lsst.obs.base.ingest.RawIngestTask.__init__
def __init__(self, Optional[RawIngestConfig] config=None, *Butler butler, **Any kwargs)
Definition: ingest.py:191
lsst.obs.base.ingest.RawExposureData.record
record
Definition: ingest.py:120
lsst.obs.base.ingest.RawFileData
Definition: ingest.py:69
lsst::afw::image.readMetadata.readMetadataContinued.readMetadata
readMetadata
Definition: readMetadataContinued.py:28
lsst::afw::fits
Definition: fits.h:31
lsst.obs.base.ingest.RawIngestTask.prep
Iterator[RawExposureData] prep(self, files, *Optional[Pool] pool=None, int processes=1)
Definition: ingest.py:326
lsst.obs.base.ingest.RawIngestTask.run
def run(self, files, *Optional[Pool] pool=None, int processes=1, Optional[str] run=None)
Definition: ingest.py:402
lsst.obs.base.ingest.RawIngestTask.getDatasetType
def getDatasetType(self)
Definition: ingest.py:185
lsst.pipe.base.task.Task
Definition: task.py:46
lsst.obs.base.ingest.makeTransferChoiceField
def makeTransferChoiceField(doc="How to transfer files (None for no transfer).", default=None)
Definition: ingest.py:123
lsst.obs.base.ingest.RawIngestTask.universe
universe
Definition: ingest.py:195
lsst.pipe.base
Definition: __init__.py:1
lsst.obs.base.ingest.RawIngestTask.expandDataIds
RawExposureData expandDataIds(self, RawExposureData data)
Definition: ingest.py:284
lsst.obs.base.ingest.RawIngestTask
Definition: ingest.py:160
lsst.obs.base.ingest.RawIngestTask.datasetType
datasetType
Definition: ingest.py:196
lsst.obs.base.ingest.RawIngestTask._calculate_dataset_info
def _calculate_dataset_info(self, header, filename)
Definition: ingest.py:236