LSST Applications  21.0.0+75b29a8a7f,21.0.0+e70536a077,21.0.0-1-ga51b5d4+62c747d40b,21.0.0-10-gbfb87ad6+3307648ee3,21.0.0-15-gedb9d5423+47cba9fc36,21.0.0-2-g103fe59+fdf0863a2a,21.0.0-2-g1367e85+d38a93257c,21.0.0-2-g45278ab+e70536a077,21.0.0-2-g5242d73+d38a93257c,21.0.0-2-g7f82c8f+e682ffb718,21.0.0-2-g8dde007+d179fbfa6a,21.0.0-2-g8f08a60+9402881886,21.0.0-2-ga326454+e682ffb718,21.0.0-2-ga63a54e+08647d4b1b,21.0.0-2-gde069b7+26c92b3210,21.0.0-2-gecfae73+0445ed2f95,21.0.0-2-gfc62afb+d38a93257c,21.0.0-27-gbbd0d29+ae871e0f33,21.0.0-28-g5fc5e037+feb0e9397b,21.0.0-3-g21c7a62+f4b9c0ff5c,21.0.0-3-g357aad2+57b0bddf0b,21.0.0-3-g4be5c26+d38a93257c,21.0.0-3-g65f322c+3f454acf5d,21.0.0-3-g7d9da8d+75b29a8a7f,21.0.0-3-gaa929c8+9e4ef6332c,21.0.0-3-ge02ed75+4b120a55c4,21.0.0-4-g3300ddd+e70536a077,21.0.0-4-g591bb35+4b120a55c4,21.0.0-4-gc004bbf+4911b9cd27,21.0.0-4-gccdca77+f94adcd104,21.0.0-4-ge8fba5a+2b3a696ff9,21.0.0-5-gb155db7+2c5429117a,21.0.0-5-gdf36809+637e4641ee,21.0.0-6-g00874e7+c9fd7f7160,21.0.0-6-g4e60332+4b120a55c4,21.0.0-7-gc8ca178+40eb9cf840,21.0.0-8-gfbe0b4b+9e4ef6332c,21.0.0-9-g2fd488a+d83b7cd606,w.2021.05
LSST Data Management Base Package
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.obs.base.ingest.RawIngestTask Class Reference
Inheritance diagram for lsst.obs.base.ingest.RawIngestTask:
lsst.pipe.base.task.Task lsst.obs.decam.ingest.DecamRawIngestTask

Public Member Functions

def getDatasetType (self)
 
def __init__ (self, Optional[RawIngestConfig] config=None, *Butler butler, **Any kwargs)
 
RawFileData extractMetadata (self, str filename)
 
List[RawExposureDatagroupByExposure (self, Iterable[RawFileData] files)
 
RawExposureData expandDataIds (self, RawExposureData data)
 
Iterator[RawExposureDataprep (self, files, *Optional[Pool] pool=None, int processes=1)
 
List[DatasetRef] ingestExposureDatasets (self, RawExposureData exposure, *Optional[str] run=None)
 
def run (self, files, *Optional[Pool] pool=None, int processes=1, Optional[str] run=None)
 
def emptyMetadata (self)
 
def getSchemaCatalogs (self)
 
def getAllSchemaCatalogs (self)
 
def getFullMetadata (self)
 
def getFullName (self)
 
def getName (self)
 
def getTaskDict (self)
 
def makeSubtask (self, name, **keyArgs)
 
def timer (self, name, logLevel=Log.DEBUG)
 
def makeField (cls, doc)
 
def __reduce__ (self)
 

Public Attributes

 butler
 
 universe
 
 datasetType
 
 metadata
 
 log
 
 config
 

Static Public Attributes

 ConfigClass = RawIngestConfig
 

Detailed Description

Driver Task for ingesting raw data into Gen3 Butler repositories.

Parameters
----------
config : `RawIngestConfig`
    Configuration for the task.
butler : `~lsst.daf.butler.Butler`
    Writeable butler instance, with ``butler.run`` set to the appropriate
    `~lsst.daf.butler.CollectionType.RUN` collection for these raw
    datasets.
**kwargs
    Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
    constructor.

Notes
-----
Each instance of `RawIngestTask` writes to the same Butler.  Each
invocation of `RawIngestTask.run` ingests a list of files.

Definition at line 161 of file ingest.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.obs.base.ingest.RawIngestTask.__init__ (   self,
Optional[RawIngestConfig]   config = None,
*Butler  butler,
**Any  kwargs 
)

Definition at line 192 of file ingest.py.

192  def __init__(self, config: Optional[RawIngestConfig] = None, *, butler: Butler, **kwargs: Any):
193  config.validate() # Not a CmdlineTask nor PipelineTask, so have to validate the config here.
194  super().__init__(config, **kwargs)
195  self.butler = butler
196  self.universe = self.butler.registry.dimensions
197  self.datasetType = self.getDatasetType()
198 
199  # Import all the instrument classes so that we ensure that we
200  # have all the relevant metadata translators loaded.
201  Instrument.importAll(self.butler.registry)
202 

Member Function Documentation

◆ __reduce__()

def lsst.pipe.base.task.Task.__reduce__ (   self)
inherited
Pickler.

Reimplemented in lsst.pipe.drivers.multiBandDriver.MultiBandDriverTask, and lsst.pipe.drivers.coaddDriver.CoaddDriverTask.

Definition at line 432 of file task.py.

432  def __reduce__(self):
433  """Pickler.
434  """
435  return self._unpickle_via_factory, (self.__class__, [], self._reduce_kwargs())

◆ emptyMetadata()

def lsst.pipe.base.task.Task.emptyMetadata (   self)
inherited
Empty (clear) the metadata for this Task and all sub-Tasks.

Definition at line 166 of file task.py.

166  def emptyMetadata(self):
167  """Empty (clear) the metadata for this Task and all sub-Tasks.
168  """
169  for subtask in self._taskDict.values():
170  subtask.metadata = dafBase.PropertyList()
171 
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68

◆ expandDataIds()

RawExposureData lsst.obs.base.ingest.RawIngestTask.expandDataIds (   self,
RawExposureData  data 
)
Expand the data IDs associated with a raw exposure to include
additional metadata records.

Parameters
----------
exposure : `RawExposureData`
    A structure containing information about the exposure to be
    ingested.  Must have `RawExposureData.records` populated. Should
    be considered consumed upon return.

Returns
-------
exposure : `RawExposureData`
    An updated version of the input structure, with
    `RawExposureData.dataId` and nested `RawFileData.dataId` attributes
    updated to data IDs for which `DataCoordinate.hasRecords` returns
    `True`.

Definition at line 345 of file ingest.py.

345  def expandDataIds(self, data: RawExposureData) -> RawExposureData:
346  """Expand the data IDs associated with a raw exposure to include
347  additional metadata records.
348 
349  Parameters
350  ----------
351  exposure : `RawExposureData`
352  A structure containing information about the exposure to be
353  ingested. Must have `RawExposureData.records` populated. Should
354  be considered consumed upon return.
355 
356  Returns
357  -------
358  exposure : `RawExposureData`
359  An updated version of the input structure, with
360  `RawExposureData.dataId` and nested `RawFileData.dataId` attributes
361  updated to data IDs for which `DataCoordinate.hasRecords` returns
362  `True`.
363  """
364  # We start by expanded the exposure-level data ID; we won't use that
365  # directly in file ingest, but this lets us do some database lookups
366  # once per exposure instead of once per file later.
367  data.dataId = self.butler.registry.expandDataId(
368  data.dataId,
369  # We pass in the records we'll be inserting shortly so they aren't
370  # looked up from the database. We do expect instrument and filter
371  # records to be retrieved from the database here (though the
372  # Registry may cache them so there isn't a lookup every time).
373  records={
374  self.butler.registry.dimensions["exposure"]: data.record,
375  }
376  )
377  # Now we expand the per-file (exposure+detector) data IDs. This time
378  # we pass in the records we just retrieved from the exposure data ID
379  # expansion.
380  for file in data.files:
381  for dataset in file.datasets:
382  dataset.dataId = self.butler.registry.expandDataId(
383  dataset.dataId,
384  records=dict(data.dataId.records)
385  )
386  return data
387 

◆ extractMetadata()

RawFileData lsst.obs.base.ingest.RawIngestTask.extractMetadata (   self,
str  filename 
)
Extract and process metadata from a single raw file.

Parameters
----------
filename : `str`
    Path to the file.

Returns
-------
data : `RawFileData`
    A structure containing the metadata extracted from the file,
    as well as the original filename.  All fields will be populated,
    but the `RawFileData.dataId` attribute will be a minimal
    (unexpanded) `DataCoordinate` instance.

Notes
-----
Assumes that there is a single dataset associated with the given
file.  Instruments using a single file to store multiple datasets
must implement their own version of this method.

Reimplemented in lsst.obs.decam.ingest.DecamRawIngestTask.

Definition at line 207 of file ingest.py.

207  def extractMetadata(self, filename: str) -> RawFileData:
208  """Extract and process metadata from a single raw file.
209 
210  Parameters
211  ----------
212  filename : `str`
213  Path to the file.
214 
215  Returns
216  -------
217  data : `RawFileData`
218  A structure containing the metadata extracted from the file,
219  as well as the original filename. All fields will be populated,
220  but the `RawFileData.dataId` attribute will be a minimal
221  (unexpanded) `DataCoordinate` instance.
222 
223  Notes
224  -----
225  Assumes that there is a single dataset associated with the given
226  file. Instruments using a single file to store multiple datasets
227  must implement their own version of this method.
228  """
229 
230  # We do not want to stop ingest if we are given a bad file.
231  # Instead return a RawFileData with no datasets and allow
232  # the caller to report the failure.
233 
234  try:
235  # Manually merge the primary and "first data" headers here because
236  # we do not know in general if an input file has set INHERIT=T.
237  phdu = readMetadata(filename, 0)
238  header = merge_headers([phdu, readMetadata(filename)], mode="overwrite")
239  datasets = [self._calculate_dataset_info(header, filename)]
240  except Exception as e:
241  self.log.debug("Problem extracting metadata from %s: %s", filename, e)
242  # Indicate to the caller that we failed to read
243  datasets = []
244  FormatterClass = Formatter
245  instrument = None
246  else:
247  self.log.debug("Extracted metadata from file %s", filename)
248  # The data model currently assumes that whilst multiple datasets
249  # can be associated with a single file, they must all share the
250  # same formatter.
251  try:
252  instrument = Instrument.fromName(datasets[0].dataId["instrument"], self.butler.registry)
253  except LookupError:
254  self.log.warning("Instrument %s for file %s not known to registry",
255  datasets[0].dataId["instrument"], filename)
256  datasets = []
257  FormatterClass = Formatter
258  instrument = None
259  else:
260  FormatterClass = instrument.getRawFormatter(datasets[0].dataId)
261 
262  return RawFileData(datasets=datasets, filename=filename,
263  FormatterClass=FormatterClass,
264  instrumentClass=instrument)
265 
std::shared_ptr< daf::base::PropertyList > readMetadata(std::string const &fileName, int hdu=DEFAULT_HDU, bool strip=false)
Read FITS header.
Definition: fits.cc:1657

◆ getAllSchemaCatalogs()

def lsst.pipe.base.task.Task.getAllSchemaCatalogs (   self)
inherited
Get schema catalogs for all tasks in the hierarchy, combining the
results into a single dict.

Returns
-------
schemacatalogs : `dict`
    Keys are butler dataset type, values are a empty catalog (an
    instance of the appropriate `lsst.afw.table` Catalog type) for all
    tasks in the hierarchy, from the top-level task down
    through all subtasks.

Notes
-----
This method may be called on any task in the hierarchy; it will return
the same answer, regardless.

The default implementation should always suffice. If your subtask uses
schemas the override `Task.getSchemaCatalogs`, not this method.

Definition at line 204 of file task.py.

204  def getAllSchemaCatalogs(self):
205  """Get schema catalogs for all tasks in the hierarchy, combining the
206  results into a single dict.
207 
208  Returns
209  -------
210  schemacatalogs : `dict`
211  Keys are butler dataset type, values are a empty catalog (an
212  instance of the appropriate `lsst.afw.table` Catalog type) for all
213  tasks in the hierarchy, from the top-level task down
214  through all subtasks.
215 
216  Notes
217  -----
218  This method may be called on any task in the hierarchy; it will return
219  the same answer, regardless.
220 
221  The default implementation should always suffice. If your subtask uses
222  schemas the override `Task.getSchemaCatalogs`, not this method.
223  """
224  schemaDict = self.getSchemaCatalogs()
225  for subtask in self._taskDict.values():
226  schemaDict.update(subtask.getSchemaCatalogs())
227  return schemaDict
228 

◆ getDatasetType()

def lsst.obs.base.ingest.RawIngestTask.getDatasetType (   self)
Return the DatasetType of the datasets ingested by this Task.

Definition at line 186 of file ingest.py.

186  def getDatasetType(self):
187  """Return the DatasetType of the datasets ingested by this Task.
188  """
189  return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
190  universe=self.butler.registry.dimensions)
191 

◆ getFullMetadata()

def lsst.pipe.base.task.Task.getFullMetadata (   self)
inherited
Get metadata for all tasks.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
    The `~lsst.daf.base.PropertySet` keys are the full task name.
    Values are metadata for the top-level task and all subtasks,
    sub-subtasks, etc.

Notes
-----
The returned metadata includes timing information (if
``@timer.timeMethod`` is used) and any metadata set by the task. The
name of each item consists of the full task name with ``.`` replaced
by ``:``, followed by ``.`` and the name of the item, e.g.::

    topLevelTaskName:subtaskName:subsubtaskName.itemName

using ``:`` in the full task name disambiguates the rare situation
that a task has a subtask and a metadata item with the same name.

Definition at line 229 of file task.py.

229  def getFullMetadata(self):
230  """Get metadata for all tasks.
231 
232  Returns
233  -------
234  metadata : `lsst.daf.base.PropertySet`
235  The `~lsst.daf.base.PropertySet` keys are the full task name.
236  Values are metadata for the top-level task and all subtasks,
237  sub-subtasks, etc.
238 
239  Notes
240  -----
241  The returned metadata includes timing information (if
242  ``@timer.timeMethod`` is used) and any metadata set by the task. The
243  name of each item consists of the full task name with ``.`` replaced
244  by ``:``, followed by ``.`` and the name of the item, e.g.::
245 
246  topLevelTaskName:subtaskName:subsubtaskName.itemName
247 
248  using ``:`` in the full task name disambiguates the rare situation
249  that a task has a subtask and a metadata item with the same name.
250  """
251  fullMetadata = dafBase.PropertySet()
252  for fullName, task in self.getTaskDict().items():
253  fullMetadata.set(fullName.replace(".", ":"), task.metadata)
254  return fullMetadata
255 
std::vector< SchemaItem< Flag > > * items
Class for storing generic metadata.
Definition: PropertySet.h:67

◆ getFullName()

def lsst.pipe.base.task.Task.getFullName (   self)
inherited
Get the task name as a hierarchical name including parent task
names.

Returns
-------
fullName : `str`
    The full name consists of the name of the parent task and each
    subtask separated by periods. For example:

    - The full name of top-level task "top" is simply "top".
    - The full name of subtask "sub" of top-level task "top" is
      "top.sub".
    - The full name of subtask "sub2" of subtask "sub" of top-level
      task "top" is "top.sub.sub2".

Definition at line 256 of file task.py.

256  def getFullName(self):
257  """Get the task name as a hierarchical name including parent task
258  names.
259 
260  Returns
261  -------
262  fullName : `str`
263  The full name consists of the name of the parent task and each
264  subtask separated by periods. For example:
265 
266  - The full name of top-level task "top" is simply "top".
267  - The full name of subtask "sub" of top-level task "top" is
268  "top.sub".
269  - The full name of subtask "sub2" of subtask "sub" of top-level
270  task "top" is "top.sub.sub2".
271  """
272  return self._fullName
273 

◆ getName()

def lsst.pipe.base.task.Task.getName (   self)
inherited
Get the name of the task.

Returns
-------
taskName : `str`
    Name of the task.

See also
--------
getFullName

Definition at line 274 of file task.py.

274  def getName(self):
275  """Get the name of the task.
276 
277  Returns
278  -------
279  taskName : `str`
280  Name of the task.
281 
282  See also
283  --------
284  getFullName
285  """
286  return self._name
287 
std::string const & getName() const noexcept
Return a filter's name.
Definition: Filter.h:78

◆ getSchemaCatalogs()

def lsst.pipe.base.task.Task.getSchemaCatalogs (   self)
inherited
Get the schemas generated by this task.

Returns
-------
schemaCatalogs : `dict`
    Keys are butler dataset type, values are an empty catalog (an
    instance of the appropriate `lsst.afw.table` Catalog type) for
    this task.

Notes
-----

.. warning::

   Subclasses that use schemas must override this method. The default
   implementation returns an empty dict.

This method may be called at any time after the Task is constructed,
which means that all task schemas should be computed at construction
time, *not* when data is actually processed. This reflects the
philosophy that the schema should not depend on the data.

Returning catalogs rather than just schemas allows us to save e.g.
slots for SourceCatalog as well.

See also
--------
Task.getAllSchemaCatalogs

Definition at line 172 of file task.py.

172  def getSchemaCatalogs(self):
173  """Get the schemas generated by this task.
174 
175  Returns
176  -------
177  schemaCatalogs : `dict`
178  Keys are butler dataset type, values are an empty catalog (an
179  instance of the appropriate `lsst.afw.table` Catalog type) for
180  this task.
181 
182  Notes
183  -----
184 
185  .. warning::
186 
187  Subclasses that use schemas must override this method. The default
188  implementation returns an empty dict.
189 
190  This method may be called at any time after the Task is constructed,
191  which means that all task schemas should be computed at construction
192  time, *not* when data is actually processed. This reflects the
193  philosophy that the schema should not depend on the data.
194 
195  Returning catalogs rather than just schemas allows us to save e.g.
196  slots for SourceCatalog as well.
197 
198  See also
199  --------
200  Task.getAllSchemaCatalogs
201  """
202  return {}
203 

◆ getTaskDict()

def lsst.pipe.base.task.Task.getTaskDict (   self)
inherited
Get a dictionary of all tasks as a shallow copy.

Returns
-------
taskDict : `dict`
    Dictionary containing full task name: task object for the top-level
    task and all subtasks, sub-subtasks, etc.

Definition at line 288 of file task.py.

288  def getTaskDict(self):
289  """Get a dictionary of all tasks as a shallow copy.
290 
291  Returns
292  -------
293  taskDict : `dict`
294  Dictionary containing full task name: task object for the top-level
295  task and all subtasks, sub-subtasks, etc.
296  """
297  return self._taskDict.copy()
298 
def getTaskDict(config, taskDict=None, baseName="")

◆ groupByExposure()

List[RawExposureData] lsst.obs.base.ingest.RawIngestTask.groupByExposure (   self,
Iterable[RawFileData files 
)
Group an iterable of `RawFileData` by exposure.

Parameters
----------
files : iterable of `RawFileData`
    File-level information to group.

Returns
-------
exposures : `list` of `RawExposureData`
    A list of structures that group the file-level information by
    exposure. All fields will be populated.  The
    `RawExposureData.dataId` attributes will be minimal (unexpanded)
    `DataCoordinate` instances.

Definition at line 320 of file ingest.py.

320  def groupByExposure(self, files: Iterable[RawFileData]) -> List[RawExposureData]:
321  """Group an iterable of `RawFileData` by exposure.
322 
323  Parameters
324  ----------
325  files : iterable of `RawFileData`
326  File-level information to group.
327 
328  Returns
329  -------
330  exposures : `list` of `RawExposureData`
331  A list of structures that group the file-level information by
332  exposure. All fields will be populated. The
333  `RawExposureData.dataId` attributes will be minimal (unexpanded)
334  `DataCoordinate` instances.
335  """
336  exposureDimensions = self.universe["exposure"].graph
337  byExposure = defaultdict(list)
338  for f in files:
339  # Assume that the first dataset is representative for the file
340  byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f)
341 
342  return [RawExposureData(dataId=dataId, files=exposureFiles, universe=self.universe)
343  for dataId, exposureFiles in byExposure.items()]
344 
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33

◆ ingestExposureDatasets()

List[DatasetRef] lsst.obs.base.ingest.RawIngestTask.ingestExposureDatasets (   self,
RawExposureData  exposure,
*Optional[str]   run = None 
)
Ingest all raw files in one exposure.

Parameters
----------
exposure : `RawExposureData`
    A structure containing information about the exposure to be
    ingested.  Must have `RawExposureData.records` populated and all
    data ID attributes expanded.
run : `str`, optional
    Name of a RUN-type collection to write to, overriding
    ``self.butler.run``.

Returns
-------
refs : `list` of `lsst.daf.butler.DatasetRef`
    Dataset references for ingested raws.

Definition at line 457 of file ingest.py.

458  ) -> List[DatasetRef]:
459  """Ingest all raw files in one exposure.
460 
461  Parameters
462  ----------
463  exposure : `RawExposureData`
464  A structure containing information about the exposure to be
465  ingested. Must have `RawExposureData.records` populated and all
466  data ID attributes expanded.
467  run : `str`, optional
468  Name of a RUN-type collection to write to, overriding
469  ``self.butler.run``.
470 
471  Returns
472  -------
473  refs : `list` of `lsst.daf.butler.DatasetRef`
474  Dataset references for ingested raws.
475  """
476  datasets = [FileDataset(path=os.path.abspath(file.filename),
477  refs=[DatasetRef(self.datasetType, d.dataId) for d in file.datasets],
478  formatter=file.FormatterClass)
479  for file in exposure.files]
480  self.butler.ingest(*datasets, transfer=self.config.transfer, run=run)
481  return [ref for dataset in datasets for ref in dataset.refs]
482 

◆ makeField()

def lsst.pipe.base.task.Task.makeField (   cls,
  doc 
)
inherited
Make a `lsst.pex.config.ConfigurableField` for this task.

Parameters
----------
doc : `str`
    Help text for the field.

Returns
-------
configurableField : `lsst.pex.config.ConfigurableField`
    A `~ConfigurableField` for this task.

Examples
--------
Provides a convenient way to specify this task is a subtask of another
task.

Here is an example of use:

.. code-block:: python

    class OtherTaskConfig(lsst.pex.config.Config):
        aSubtask = ATaskClass.makeField("brief description of task")

Definition at line 359 of file task.py.

359  def makeField(cls, doc):
360  """Make a `lsst.pex.config.ConfigurableField` for this task.
361 
362  Parameters
363  ----------
364  doc : `str`
365  Help text for the field.
366 
367  Returns
368  -------
369  configurableField : `lsst.pex.config.ConfigurableField`
370  A `~ConfigurableField` for this task.
371 
372  Examples
373  --------
374  Provides a convenient way to specify this task is a subtask of another
375  task.
376 
377  Here is an example of use:
378 
379  .. code-block:: python
380 
381  class OtherTaskConfig(lsst.pex.config.Config):
382  aSubtask = ATaskClass.makeField("brief description of task")
383  """
384  return ConfigurableField(doc=doc, target=cls)
385 

◆ makeSubtask()

def lsst.pipe.base.task.Task.makeSubtask (   self,
  name,
**  keyArgs 
)
inherited
Create a subtask as a new instance as the ``name`` attribute of this
task.

Parameters
----------
name : `str`
    Brief name of the subtask.
keyArgs
    Extra keyword arguments used to construct the task. The following
    arguments are automatically provided and cannot be overridden:

    - "config".
    - "parentTask".

Notes
-----
The subtask must be defined by ``Task.config.name``, an instance of
`~lsst.pex.config.ConfigurableField` or
`~lsst.pex.config.RegistryField`.

Definition at line 299 of file task.py.

299  def makeSubtask(self, name, **keyArgs):
300  """Create a subtask as a new instance as the ``name`` attribute of this
301  task.
302 
303  Parameters
304  ----------
305  name : `str`
306  Brief name of the subtask.
307  keyArgs
308  Extra keyword arguments used to construct the task. The following
309  arguments are automatically provided and cannot be overridden:
310 
311  - "config".
312  - "parentTask".
313 
314  Notes
315  -----
316  The subtask must be defined by ``Task.config.name``, an instance of
317  `~lsst.pex.config.ConfigurableField` or
318  `~lsst.pex.config.RegistryField`.
319  """
320  taskField = getattr(self.config, name, None)
321  if taskField is None:
322  raise KeyError(f"{self.getFullName()}'s config does not have field {name!r}")
323  subtask = taskField.apply(name=name, parentTask=self, **keyArgs)
324  setattr(self, name, subtask)
325 

◆ prep()

Iterator[RawExposureData] lsst.obs.base.ingest.RawIngestTask.prep (   self,
  files,
*Optional[Pool]   pool = None,
int   processes = 1 
)
Perform all ingest preprocessing steps that do not involve actually
modifying the database.

Parameters
----------
files : iterable over `str` or path-like objects
    Paths to the files to be ingested.  Will be made absolute
    if they are not already.
pool : `multiprocessing.Pool`, optional
    If not `None`, a process pool with which to parallelize some
    operations.
processes : `int`, optional
    The number of processes to use.  Ignored if ``pool`` is not `None`.

Yields
------
exposure : `RawExposureData`
    Data structures containing dimension records, filenames, and data
    IDs to be ingested (one structure for each exposure).
bad_files : `list` of `str`
    List of all the files that could not have metadata extracted.

Definition at line 388 of file ingest.py.

388  def prep(self, files, *, pool: Optional[Pool] = None, processes: int = 1) -> Iterator[RawExposureData]:
389  """Perform all ingest preprocessing steps that do not involve actually
390  modifying the database.
391 
392  Parameters
393  ----------
394  files : iterable over `str` or path-like objects
395  Paths to the files to be ingested. Will be made absolute
396  if they are not already.
397  pool : `multiprocessing.Pool`, optional
398  If not `None`, a process pool with which to parallelize some
399  operations.
400  processes : `int`, optional
401  The number of processes to use. Ignored if ``pool`` is not `None`.
402 
403  Yields
404  ------
405  exposure : `RawExposureData`
406  Data structures containing dimension records, filenames, and data
407  IDs to be ingested (one structure for each exposure).
408  bad_files : `list` of `str`
409  List of all the files that could not have metadata extracted.
410  """
411  if pool is None and processes > 1:
412  pool = Pool(processes)
413  mapFunc = map if pool is None else pool.imap_unordered
414 
415  # Extract metadata and build per-detector regions.
416  # This could run in a subprocess so collect all output
417  # before looking at failures.
418  fileData: Iterator[RawFileData] = mapFunc(self.extractMetadata, files)
419 
420  # Filter out all the failed reads and store them for later
421  # reporting
422  good_files = []
423  bad_files = []
424  for fileDatum in fileData:
425  if not fileDatum.datasets:
426  bad_files.append(fileDatum.filename)
427  else:
428  good_files.append(fileDatum)
429  fileData = good_files
430 
431  self.log.info("Successfully extracted metadata from %d file%s with %d failure%s",
432  len(fileData), "" if len(fileData) == 1 else "s",
433  len(bad_files), "" if len(bad_files) == 1 else "s")
434 
435  # Use that metadata to group files (and extracted metadata) by
436  # exposure. Never parallelized because it's intrinsically a gather
437  # step.
438  exposureData: List[RawExposureData] = self.groupByExposure(fileData)
439 
440  # The next operation operates on RawExposureData instances (one at
441  # a time) in-place and then returns the modified instance. We call it
442  # as a pass-through instead of relying on the arguments we pass in to
443  # have been modified because in the parallel case those arguments are
444  # going to be pickled and unpickled, and I'm not certain
445  # multiprocessing is careful enough with that for output arguments to
446  # work.
447 
448  # Expand the data IDs to include all dimension metadata; we need this
449  # because we may need to generate path templates that rely on that
450  # metadata.
451  # This is the first step that involves actual database calls (but just
452  # SELECTs), so if there's going to be a problem with connections vs.
453  # multiple processes, or lock contention (in SQLite) slowing things
454  # down, it'll happen here.
455  return mapFunc(self.expandDataIds, exposureData), bad_files
456 

◆ run()

def lsst.obs.base.ingest.RawIngestTask.run (   self,
  files,
*Optional[Pool]   pool = None,
int   processes = 1,
Optional[str]   run = None 
)
Ingest files into a Butler data repository.

This creates any new exposure or visit Dimension entries needed to
identify the ingested files, creates new Dataset entries in the
Registry and finally ingests the files themselves into the Datastore.
Any needed instrument, detector, and physical_filter Dimension entries
must exist in the Registry before `run` is called.

Parameters
----------
files : iterable over `str` or path-like objects
    Paths to the files to be ingested.  Will be made absolute
    if they are not already.
pool : `multiprocessing.Pool`, optional
    If not `None`, a process pool with which to parallelize some
    operations.
processes : `int`, optional
    The number of processes to use.  Ignored if ``pool`` is not `None`.
run : `str`, optional
    Name of a RUN-type collection to write to, overriding
    the default derived from the instrument name.

Returns
-------
refs : `list` of `lsst.daf.butler.DatasetRef`
    Dataset references for ingested raws.

Notes
-----
This method inserts all datasets for an exposure within a transaction,
guaranteeing that partial exposures are never ingested.  The exposure
dimension record is inserted with `Registry.syncDimensionData` first
(in its own transaction), which inserts only if a record with the same
primary key does not already exist.  This allows different files within
the same exposure to be incremented in different runs.

Definition at line 483 of file ingest.py.

483  def run(self, files, *, pool: Optional[Pool] = None, processes: int = 1, run: Optional[str] = None):
484  """Ingest files into a Butler data repository.
485 
486  This creates any new exposure or visit Dimension entries needed to
487  identify the ingested files, creates new Dataset entries in the
488  Registry and finally ingests the files themselves into the Datastore.
489  Any needed instrument, detector, and physical_filter Dimension entries
490  must exist in the Registry before `run` is called.
491 
492  Parameters
493  ----------
494  files : iterable over `str` or path-like objects
495  Paths to the files to be ingested. Will be made absolute
496  if they are not already.
497  pool : `multiprocessing.Pool`, optional
498  If not `None`, a process pool with which to parallelize some
499  operations.
500  processes : `int`, optional
501  The number of processes to use. Ignored if ``pool`` is not `None`.
502  run : `str`, optional
503  Name of a RUN-type collection to write to, overriding
504  the default derived from the instrument name.
505 
506  Returns
507  -------
508  refs : `list` of `lsst.daf.butler.DatasetRef`
509  Dataset references for ingested raws.
510 
511  Notes
512  -----
513  This method inserts all datasets for an exposure within a transaction,
514  guaranteeing that partial exposures are never ingested. The exposure
515  dimension record is inserted with `Registry.syncDimensionData` first
516  (in its own transaction), which inserts only if a record with the same
517  primary key does not already exist. This allows different files within
518  the same exposure to be incremented in different runs.
519  """
520  exposureData, bad_files = self.prep(files, pool=pool, processes=processes)
521  # Up to this point, we haven't modified the data repository at all.
522  # Now we finally do that, with one transaction per exposure. This is
523  # not parallelized at present because the performance of this step is
524  # limited by the database server. That may or may not change in the
525  # future once we increase our usage of bulk inserts and reduce our
526  # usage of savepoints; we've tried to get everything but the database
527  # operations done in advance to reduce the time spent inside
528  # transactions.
529  self.butler.registry.registerDatasetType(self.datasetType)
530  refs = []
531  runs = set()
532  n_exposures = 0
533  n_exposures_failed = 0
534  n_ingests_failed = 0
535  for exposure in exposureData:
536 
537  self.log.debug("Attempting to ingest %d file%s from exposure %s:%s",
538  len(exposure.files), "" if len(exposure.files) == 1 else "s",
539  exposure.record.instrument, exposure.record.obs_id)
540 
541  try:
542  self.butler.registry.syncDimensionData("exposure", exposure.record)
543  except Exception as e:
544  n_exposures_failed += 1
545  self.log.warning("Exposure %s:%s could not be registered: %s",
546  exposure.record.instrument, exposure.record.obs_id, e)
547  continue
548 
549  # Override default run if nothing specified explicitly
550  if run is None:
551  instrumentClass = exposure.files[0].instrumentClass
552  this_run = instrumentClass.makeDefaultRawIngestRunName()
553  else:
554  this_run = run
555  if this_run not in runs:
556  self.butler.registry.registerCollection(this_run, type=CollectionType.RUN)
557  runs.add(this_run)
558  try:
559  with self.butler.transaction():
560  refs.extend(self.ingestExposureDatasets(exposure, run=this_run))
561  except Exception as e:
562  n_ingests_failed += 1
563  self.log.warning("Failed to ingest the following for reason: %s", e)
564  for f in exposure.files:
565  self.log.warning("- %s", f.filename)
566  continue
567 
568  # Success for this exposure
569  n_exposures += 1
570  self.log.info("Exposure %s:%s ingested successfully",
571  exposure.record.instrument, exposure.record.obs_id)
572 
573  had_failure = False
574 
575  if bad_files:
576  had_failure = True
577  self.log.warning("Could not extract observation metadata from the following:")
578  for f in bad_files:
579  self.log.warning("- %s", f)
580 
581  self.log.info("Successfully processed data from %d exposure%s with %d failure%s from exposure"
582  " registration and %d failure%s from file ingest.",
583  n_exposures, "" if n_exposures == 1 else "s",
584  n_exposures_failed, "" if n_exposures_failed == 1 else "s",
585  n_ingests_failed, "" if n_ingests_failed == 1 else "s")
586  if n_exposures_failed > 0 or n_ingests_failed > 0:
587  had_failure = True
588  self.log.info("Ingested %d distinct Butler dataset%s",
589  len(refs), "" if len(refs) == 1 else "s")
590 
591  if had_failure:
592  raise RuntimeError("Some failures encountered during ingestion")
593 
594  return refs
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
daf::base::PropertySet * set
Definition: fits.cc:912

◆ timer()

def lsst.pipe.base.task.Task.timer (   self,
  name,
  logLevel = Log.DEBUG 
)
inherited
Context manager to log performance data for an arbitrary block of
code.

Parameters
----------
name : `str`
    Name of code being timed; data will be logged using item name:
    ``Start`` and ``End``.
logLevel
    A `lsst.log` level constant.

Examples
--------
Creating a timer context:

.. code-block:: python

    with self.timer("someCodeToTime"):
        pass  # code to time

See also
--------
timer.logInfo

Definition at line 327 of file task.py.

327  def timer(self, name, logLevel=Log.DEBUG):
328  """Context manager to log performance data for an arbitrary block of
329  code.
330 
331  Parameters
332  ----------
333  name : `str`
334  Name of code being timed; data will be logged using item name:
335  ``Start`` and ``End``.
336  logLevel
337  A `lsst.log` level constant.
338 
339  Examples
340  --------
341  Creating a timer context:
342 
343  .. code-block:: python
344 
345  with self.timer("someCodeToTime"):
346  pass # code to time
347 
348  See also
349  --------
350  timer.logInfo
351  """
352  logInfo(obj=self, prefix=name + "Start", logLevel=logLevel)
353  try:
354  yield
355  finally:
356  logInfo(obj=self, prefix=name + "End", logLevel=logLevel)
357 
def logInfo(obj, prefix, logLevel=Log.DEBUG)
Definition: timer.py:63

Member Data Documentation

◆ butler

lsst.obs.base.ingest.RawIngestTask.butler

Definition at line 195 of file ingest.py.

◆ config

lsst.pipe.base.task.Task.config
inherited

Definition at line 162 of file task.py.

◆ ConfigClass

lsst.obs.base.ingest.RawIngestTask.ConfigClass = RawIngestConfig
static

Definition at line 182 of file ingest.py.

◆ datasetType

lsst.obs.base.ingest.RawIngestTask.datasetType

Definition at line 197 of file ingest.py.

◆ log

lsst.pipe.base.task.Task.log
inherited

Definition at line 161 of file task.py.

◆ metadata

lsst.pipe.base.task.Task.metadata
inherited

Definition at line 134 of file task.py.

◆ universe

lsst.obs.base.ingest.RawIngestTask.universe

Definition at line 196 of file ingest.py.


The documentation for this class was generated from the following file: