LSSTApplications  17.0+103,17.0+11,17.0+61,18.0.0+13,18.0.0+25,18.0.0+5,18.0.0+52,18.0.0-4-g68ffd23,18.1.0-1-g0001055+8,18.1.0-1-g03d53ef+1,18.1.0-1-g1349e88+28,18.1.0-1-g2505f39+22,18.1.0-1-g380d4d4+27,18.1.0-1-g5315e5e+1,18.1.0-1-g5e4b7ea+10,18.1.0-1-g7e8fceb+1,18.1.0-1-g85f8cd4+23,18.1.0-1-g9a6769a+13,18.1.0-1-ga1a4c1a+22,18.1.0-1-gd55f500+17,18.1.0-12-g42eabe8e+10,18.1.0-14-gd04256d+15,18.1.0-16-g430f6a53+1,18.1.0-17-gd2166b6e4,18.1.0-18-gb5d19ff+1,18.1.0-2-gfbf3545+7,18.1.0-2-gfefb8b5+16,18.1.0-3-g52aa583+13,18.1.0-3-g62b5e86+14,18.1.0-3-g8f4a2b1+17,18.1.0-3-g9bc06b8+7,18.1.0-3-gb69f684+9,18.1.0-4-g1ee41a7+1,18.1.0-5-g6dbcb01+13,18.1.0-5-gc286bb7+3,18.1.0-6-g48bdcd3+2,18.1.0-6-gd05e160+9,18.1.0-7-gc4d902b+2,18.1.0-7-gebc0338+8,18.1.0-9-gae7190a+10,w.2019.38
LSSTDataManagementBasePackage
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.obs.base.gen3.ingest.RawIngestTask Class Reference
Inheritance diagram for lsst.obs.base.gen3.ingest.RawIngestTask:
lsst.pipe.base.task.Task

Public Member Functions

def getDatasetType (self)
 
def __init__ (self, config=None, butler, kwds)
 
def run (self, files)
 
def readHeaders (self, file)
 
def buildRegion (self, headers)
 
def ensureDimensions (self, file)
 
def ingestFile (self, file, headers, dataId, run=None)
 
def processFile (self, file)
 
def extractDataId (self, file, headers, obsInfo)
 
def getFormatter (self, file, headers, dataId)
 
def emptyMetadata (self)
 
def getSchemaCatalogs (self)
 
def getAllSchemaCatalogs (self)
 
def getFullMetadata (self)
 
def getFullName (self)
 
def getName (self)
 
def getTaskDict (self)
 
def makeSubtask (self, name, keyArgs)
 
def timer (self, name, logLevel=Log.DEBUG)
 
def makeField (cls, doc)
 
def __reduce__ (self)
 

Public Attributes

 butler
 
 datasetType
 
 dimensions
 
 dimensionEntriesDone
 
 instrumentCache
 
 stashRun
 
 visitRegions
 
 metadata
 
 log
 
 config
 

Static Public Attributes

 ConfigClass = RawIngestConfig
 

Detailed Description

Driver Task for ingesting raw data into Gen3 Butler repositories.

This Task is intended to be runnable from the command-line, but it doesn't
meet the other requirements of CmdLineTask or PipelineTask, and wouldn't
gain much from being one.  It also wouldn't really be appropriate as a
subtask of a CmdLineTask or PipelineTask; it's a Task essentially just to
leverage the logging and configurability functionality that provides.

Each instance of `RawIngestTask` writes to the same Butler and maintains a
cache of Dimension entries that have already been added to or extracted
from its Registry.  Each invocation of `RawIngestTask.run` ingests a list
of files (possibly semi-atomically; see `RawIngestConfig.onError`).

RawIngestTask may be subclassed to specialize ingest for the actual
structure of raw data files produced by a particular instrument, but this
is usually unnecessary because the instrument-specific header-extraction
provided by the ``astro_metadata_translator`` is usually enough.

Parameters
----------
config : `RawIngestConfig`
    Configuration for whether/how to transfer files and how to handle
    conflicts and errors.
butler : `~lsst.daf.butler.Butler`
    Butler instance.  Ingested Datasets will be created as part of
    ``butler.run`` and associated with its Collection.

Other keyword arguments are forwarded to the Task base class constructor.

Definition at line 106 of file ingest.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.obs.base.gen3.ingest.RawIngestTask.__init__ (   self,
  config = None,
  butler,
  kwds 
)

Definition at line 147 of file ingest.py.

147  def __init__(self, config=None, *, butler, **kwds):
148  super().__init__(config, **kwds)
149  self.butler = butler
150  self.datasetType = self.getDatasetType()
151  self.dimensions = butler.registry.dimensions.extract(["instrument", "detector", "physical_filter",
152  "visit", "exposure"])
153  # Dictionary of {Dimension: set(DataId)} indicating Dimension entries
154  # we know are in the Registry.
155  self.dimensionEntriesDone = {k: set() for k in self.dimensions}
156  # Cache of instrument instances retrieved from Registry; needed to look
157  # up formatters.
158  self.instrumentCache = {}
159  # (Possibly) create a Run object for the "stash": where we put datasets
160  # that lose conflicts. Note that this doesn't actually add this Run
161  # to the Registry; we only do that on first use.
162  self.stashRun = Run(self.config.stash) if self.config.stash is not None else None
163  self.visitRegions = {}
164 
daf::base::PropertySet * set
Definition: fits.cc:884
def __init__(self, minimum, dataRange, Q)

Member Function Documentation

◆ __reduce__()

def lsst.pipe.base.task.Task.__reduce__ (   self)
inherited
Pickler.

Definition at line 373 of file task.py.

373  def __reduce__(self):
374  """Pickler.
375  """
376  return self.__class__, (self.config, self._name, self._parentTask, None)
377 

◆ buildRegion()

def lsst.obs.base.gen3.ingest.RawIngestTask.buildRegion (   self,
  headers 
)
Builds a region from information contained in a header

Parameters
----------
headers : `lsst.daf.base.PropertyList`
    Property list containing the information from the header of
    one file.

Returns
-------
region : `lsst.sphgeom.ConvexPolygon`

Raises
------
ValueError :
    If required header keys can not be found to construct region

Definition at line 242 of file ingest.py.

242  def buildRegion(self, headers):
243  """Builds a region from information contained in a header
244 
245  Parameters
246  ----------
247  headers : `lsst.daf.base.PropertyList`
248  Property list containing the information from the header of
249  one file.
250 
251  Returns
252  -------
253  region : `lsst.sphgeom.ConvexPolygon`
254 
255  Raises
256  ------
257  ValueError :
258  If required header keys can not be found to construct region
259  """
260  # Default implementation is for headers to be a one element list
261  header = headers[0]
262  wcs = SkyWcs(header)
263  bbox = Box2D(bboxFromMetadata(header))
264  if self.config.padRegionAmount > 0:
265  bbox.grow(self.config.padRegionAmount)
266  corners = bbox.getCorners()
267  sphCorners = [wcs.pixelToSky(point).getVector() for point in corners]
268  return ConvexPolygon(sphCorners)
269 
lsst::geom::Box2I bboxFromMetadata(daf::base::PropertySet &metadata)
Determine the image bounding box from its metadata (FITS header)
Definition: Image.cc:694

◆ emptyMetadata()

def lsst.pipe.base.task.Task.emptyMetadata (   self)
inherited
Empty (clear) the metadata for this Task and all sub-Tasks.

Definition at line 153 of file task.py.

153  def emptyMetadata(self):
154  """Empty (clear) the metadata for this Task and all sub-Tasks.
155  """
156  for subtask in self._taskDict.values():
157  subtask.metadata = dafBase.PropertyList()
158 
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68

◆ ensureDimensions()

def lsst.obs.base.gen3.ingest.RawIngestTask.ensureDimensions (   self,
  file 
)
Extract metadata from a raw file and add exposure and visit
Dimension entries.

Any needed instrument, detector, and physical_filter Dimension entries must
exist in the Registry before `run` is called.

Parameters
----------
file : `str` or path-like object
    Absolute path to the file to be ingested.

Returns
-------
headers : `list` of `~lsst.daf.base.PropertyList`
    Result of calling `readHeaders`.
dataId : `DataId`
    Data ID dictionary, as returned by `extractDataId`.

Definition at line 270 of file ingest.py.

270  def ensureDimensions(self, file):
271  """Extract metadata from a raw file and add exposure and visit
272  Dimension entries.
273 
274  Any needed instrument, detector, and physical_filter Dimension entries must
275  exist in the Registry before `run` is called.
276 
277  Parameters
278  ----------
279  file : `str` or path-like object
280  Absolute path to the file to be ingested.
281 
282  Returns
283  -------
284  headers : `list` of `~lsst.daf.base.PropertyList`
285  Result of calling `readHeaders`.
286  dataId : `DataId`
287  Data ID dictionary, as returned by `extractDataId`.
288  """
289  headers = self.readHeaders(file)
290  obsInfo = ObservationInfo(headers[0])
291 
292  # Extract a DataId that covers all of self.dimensions.
293  fullDataId = self.extractDataId(file, headers, obsInfo=obsInfo)
294 
295  for dimension in self.dimensions:
296  if fullDataId.get(dimension.name) is None:
297  continue
298  dimensionDataId = DataId(fullDataId, dimension=dimension)
299  if dimensionDataId not in self.dimensionEntriesDone[dimension]:
300  # Next look in the Registry
301  dimensionEntryDict = self.butler.registry.findDimensionEntry(dimension, dimensionDataId)
302  if dimensionEntryDict is None:
303  if dimension.name in ("visit", "exposure"):
304  # Add the entry into the Registry.
305  self.butler.registry.addDimensionEntry(dimension, dimensionDataId)
306  else:
307  raise LookupError(
308  f"Entry for {dimension.name} with ID {dimensionDataId} not found; must be "
309  f"present in Registry prior to ingest."
310  )
311  # Record that we've handled this entry.
312  self.dimensionEntriesDone[dimension].add(dimensionDataId)
313  # Do this after the loop to ensure all the dimensions are added
314  if self.config.doAddRegions and obsInfo.tracking_radec is not None:
315  region = self.buildRegion(headers)
316  try:
317  self.butler.registry.setDimensionRegion(DataId(fullDataId,
318  dimensions=['visit', 'detector', 'instrument'],
319  region=region),
320  update=False)
321  self.visitRegions.setdefault((fullDataId['instrument'], fullDataId['visit']),
322  []).extend(region.getVertices())
323  except IntegrityError:
324  # This means that there were already regions for the dimensions in the database, and nothing
325  # should be done.
326  pass
327 
328  return headers, fullDataId
329 

◆ extractDataId()

def lsst.obs.base.gen3.ingest.RawIngestTask.extractDataId (   self,
  file,
  headers,
  obsInfo 
)
Return the Data ID dictionary that should be used to label a file.

Parameters
----------
file : `str` or path-like object
    Absolute path to the file being ingested (prior to any transfers).
headers : `list` of `~lsst.daf.base.PropertyList`
    All headers returned by `readHeaders()`.
obsInfo : `astro_metadata_translator.ObservationInfo`
    Observational metadata extracted from the headers.

Returns
-------
dataId : `DataId`
    A mapping whose key-value pairs uniquely identify raw datasets.
    Must have ``dataId.dimensions() <= self.dimensions``, with at least
    instrument, exposure, and detector present.

Definition at line 404 of file ingest.py.

404  def extractDataId(self, file, headers, obsInfo):
405  """Return the Data ID dictionary that should be used to label a file.
406 
407  Parameters
408  ----------
409  file : `str` or path-like object
410  Absolute path to the file being ingested (prior to any transfers).
411  headers : `list` of `~lsst.daf.base.PropertyList`
412  All headers returned by `readHeaders()`.
413  obsInfo : `astro_metadata_translator.ObservationInfo`
414  Observational metadata extracted from the headers.
415 
416  Returns
417  -------
418  dataId : `DataId`
419  A mapping whose key-value pairs uniquely identify raw datasets.
420  Must have ``dataId.dimensions() <= self.dimensions``, with at least
421  instrument, exposure, and detector present.
422  """
423  toRemove = set()
424  if obsInfo.visit_id is None:
425  toRemove.add("visit")
426  if obsInfo.physical_filter is None:
427  toRemove.add("physical_filter")
428  if toRemove:
429  dimensions = self.dimensions.toSet().difference(toRemove)
430  else:
431  dimensions = self.dimensions
432  dataId = DataId(
433  dimensions=dimensions,
434  instrument=obsInfo.instrument,
435  exposure=obsInfo.exposure_id,
436  visit=obsInfo.visit_id,
437  detector=obsInfo.detector_num,
438  physical_filter=obsInfo.physical_filter,
439  )
440  updateExposureEntryFromObsInfo(dataId, obsInfo)
441  if obsInfo.visit_id is not None:
442  updateVisitEntryFromObsInfo(dataId, obsInfo)
443  return dataId
444 
daf::base::PropertySet * set
Definition: fits.cc:884

◆ getAllSchemaCatalogs()

def lsst.pipe.base.task.Task.getAllSchemaCatalogs (   self)
inherited
Get schema catalogs for all tasks in the hierarchy, combining the results into a single dict.

Returns
-------
schemacatalogs : `dict`
    Keys are butler dataset type, values are a empty catalog (an instance of the appropriate
    lsst.afw.table Catalog type) for all tasks in the hierarchy, from the top-level task down
    through all subtasks.

Notes
-----
This method may be called on any task in the hierarchy; it will return the same answer, regardless.

The default implementation should always suffice. If your subtask uses schemas the override
`Task.getSchemaCatalogs`, not this method.

Definition at line 188 of file task.py.

188  def getAllSchemaCatalogs(self):
189  """Get schema catalogs for all tasks in the hierarchy, combining the results into a single dict.
190 
191  Returns
192  -------
193  schemacatalogs : `dict`
194  Keys are butler dataset type, values are a empty catalog (an instance of the appropriate
195  lsst.afw.table Catalog type) for all tasks in the hierarchy, from the top-level task down
196  through all subtasks.
197 
198  Notes
199  -----
200  This method may be called on any task in the hierarchy; it will return the same answer, regardless.
201 
202  The default implementation should always suffice. If your subtask uses schemas the override
203  `Task.getSchemaCatalogs`, not this method.
204  """
205  schemaDict = self.getSchemaCatalogs()
206  for subtask in self._taskDict.values():
207  schemaDict.update(subtask.getSchemaCatalogs())
208  return schemaDict
209 

◆ getDatasetType()

def lsst.obs.base.gen3.ingest.RawIngestTask.getDatasetType (   self)
Return the DatasetType of the Datasets ingested by this Task.

Definition at line 141 of file ingest.py.

141  def getDatasetType(self):
142  """Return the DatasetType of the Datasets ingested by this Task.
143  """
144  return DatasetType("raw", ("instrument", "detector", "exposure"), "Exposure",
145  universe=self.butler.registry.dimensions)
146 

◆ getFormatter()

def lsst.obs.base.gen3.ingest.RawIngestTask.getFormatter (   self,
  file,
  headers,
  dataId 
)
Return the Formatter that should be used to read this file after
ingestion.

The default implementation obtains the formatter from the Instrument
class for the given data ID.

Definition at line 445 of file ingest.py.

445  def getFormatter(self, file, headers, dataId):
446  """Return the Formatter that should be used to read this file after
447  ingestion.
448 
449  The default implementation obtains the formatter from the Instrument
450  class for the given data ID.
451  """
452  instrument = self.instrumentCache.get(dataId["instrument"])
453  if instrument is None:
454  instrument = Instrument.factories[dataId["instrument"]]()
455  self.instrumentCache[dataId["instrument"]] = instrument
456  return instrument.getRawFormatter(dataId)
457 

◆ getFullMetadata()

def lsst.pipe.base.task.Task.getFullMetadata (   self)
inherited
Get metadata for all tasks.

Returns
-------
metadata : `lsst.daf.base.PropertySet`
    The `~lsst.daf.base.PropertySet` keys are the full task name. Values are metadata
    for the top-level task and all subtasks, sub-subtasks, etc..

Notes
-----
The returned metadata includes timing information (if ``@timer.timeMethod`` is used)
and any metadata set by the task. The name of each item consists of the full task name
with ``.`` replaced by ``:``, followed by ``.`` and the name of the item, e.g.::

    topLevelTaskName:subtaskName:subsubtaskName.itemName

using ``:`` in the full task name disambiguates the rare situation that a task has a subtask
and a metadata item with the same name.

Definition at line 210 of file task.py.

210  def getFullMetadata(self):
211  """Get metadata for all tasks.
212 
213  Returns
214  -------
215  metadata : `lsst.daf.base.PropertySet`
216  The `~lsst.daf.base.PropertySet` keys are the full task name. Values are metadata
217  for the top-level task and all subtasks, sub-subtasks, etc..
218 
219  Notes
220  -----
221  The returned metadata includes timing information (if ``@timer.timeMethod`` is used)
222  and any metadata set by the task. The name of each item consists of the full task name
223  with ``.`` replaced by ``:``, followed by ``.`` and the name of the item, e.g.::
224 
225  topLevelTaskName:subtaskName:subsubtaskName.itemName
226 
227  using ``:`` in the full task name disambiguates the rare situation that a task has a subtask
228  and a metadata item with the same name.
229  """
230  fullMetadata = dafBase.PropertySet()
231  for fullName, task in self.getTaskDict().items():
232  fullMetadata.set(fullName.replace(".", ":"), task.metadata)
233  return fullMetadata
234 
std::vector< SchemaItem< Flag > > * items
Class for storing generic metadata.
Definition: PropertySet.h:67

◆ getFullName()

def lsst.pipe.base.task.Task.getFullName (   self)
inherited
Get the task name as a hierarchical name including parent task names.

Returns
-------
fullName : `str`
    The full name consists of the name of the parent task and each subtask separated by periods.
    For example:

    - The full name of top-level task "top" is simply "top".
    - The full name of subtask "sub" of top-level task "top" is "top.sub".
    - The full name of subtask "sub2" of subtask "sub" of top-level task "top" is "top.sub.sub2".

Definition at line 235 of file task.py.

235  def getFullName(self):
236  """Get the task name as a hierarchical name including parent task names.
237 
238  Returns
239  -------
240  fullName : `str`
241  The full name consists of the name of the parent task and each subtask separated by periods.
242  For example:
243 
244  - The full name of top-level task "top" is simply "top".
245  - The full name of subtask "sub" of top-level task "top" is "top.sub".
246  - The full name of subtask "sub2" of subtask "sub" of top-level task "top" is "top.sub.sub2".
247  """
248  return self._fullName
249 

◆ getName()

def lsst.pipe.base.task.Task.getName (   self)
inherited
Get the name of the task.

Returns
-------
taskName : `str`
    Name of the task.

See also
--------
getFullName

Definition at line 250 of file task.py.

250  def getName(self):
251  """Get the name of the task.
252 
253  Returns
254  -------
255  taskName : `str`
256  Name of the task.
257 
258  See also
259  --------
260  getFullName
261  """
262  return self._name
263 

◆ getSchemaCatalogs()

def lsst.pipe.base.task.Task.getSchemaCatalogs (   self)
inherited
Get the schemas generated by this task.

Returns
-------
schemaCatalogs : `dict`
    Keys are butler dataset type, values are an empty catalog (an instance of the appropriate
    `lsst.afw.table` Catalog type) for this task.

Notes
-----

.. warning::

   Subclasses that use schemas must override this method. The default implemenation returns
   an empty dict.

This method may be called at any time after the Task is constructed, which means that all task
schemas should be computed at construction time, *not* when data is actually processed. This
reflects the philosophy that the schema should not depend on the data.

Returning catalogs rather than just schemas allows us to save e.g. slots for SourceCatalog as well.

See also
--------
Task.getAllSchemaCatalogs

Definition at line 159 of file task.py.

159  def getSchemaCatalogs(self):
160  """Get the schemas generated by this task.
161 
162  Returns
163  -------
164  schemaCatalogs : `dict`
165  Keys are butler dataset type, values are an empty catalog (an instance of the appropriate
166  `lsst.afw.table` Catalog type) for this task.
167 
168  Notes
169  -----
170 
171  .. warning::
172 
173  Subclasses that use schemas must override this method. The default implemenation returns
174  an empty dict.
175 
176  This method may be called at any time after the Task is constructed, which means that all task
177  schemas should be computed at construction time, *not* when data is actually processed. This
178  reflects the philosophy that the schema should not depend on the data.
179 
180  Returning catalogs rather than just schemas allows us to save e.g. slots for SourceCatalog as well.
181 
182  See also
183  --------
184  Task.getAllSchemaCatalogs
185  """
186  return {}
187 

◆ getTaskDict()

def lsst.pipe.base.task.Task.getTaskDict (   self)
inherited
Get a dictionary of all tasks as a shallow copy.

Returns
-------
taskDict : `dict`
    Dictionary containing full task name: task object for the top-level task and all subtasks,
    sub-subtasks, etc..

Definition at line 264 of file task.py.

264  def getTaskDict(self):
265  """Get a dictionary of all tasks as a shallow copy.
266 
267  Returns
268  -------
269  taskDict : `dict`
270  Dictionary containing full task name: task object for the top-level task and all subtasks,
271  sub-subtasks, etc..
272  """
273  return self._taskDict.copy()
274 
def getTaskDict(config, taskDict=None, baseName="")

◆ ingestFile()

def lsst.obs.base.gen3.ingest.RawIngestTask.ingestFile (   self,
  file,
  headers,
  dataId,
  run = None 
)
Ingest a single raw file into the repository.

All necessary Dimension entres must already be present.

Parameters
----------
file : `str` or path-like object
    Absolute path to the file to be ingested.
headers : `list` of `~lsst.daf.base.PropertyList`
    Result of calling `readHeaders`.
dataId : `dict`
    Data ID dictionary, as returned by `extractDataId`.
run : `~lsst.daf.butler.Run`, optional
    Run to add the Dataset to; defaults to ``self.butler.run``.

Returns
-------
ref : `DatasetRef`
    Reference to the ingested dataset.

Raises
------
ConflictingDefinitionError
    Raised if the dataset already exists in the registry.

Definition at line 330 of file ingest.py.

330  def ingestFile(self, file, headers, dataId, run=None):
331  """Ingest a single raw file into the repository.
332 
333  All necessary Dimension entres must already be present.
334 
335  Parameters
336  ----------
337  file : `str` or path-like object
338  Absolute path to the file to be ingested.
339  headers : `list` of `~lsst.daf.base.PropertyList`
340  Result of calling `readHeaders`.
341  dataId : `dict`
342  Data ID dictionary, as returned by `extractDataId`.
343  run : `~lsst.daf.butler.Run`, optional
344  Run to add the Dataset to; defaults to ``self.butler.run``.
345 
346  Returns
347  -------
348  ref : `DatasetRef`
349  Reference to the ingested dataset.
350 
351  Raises
352  ------
353  ConflictingDefinitionError
354  Raised if the dataset already exists in the registry.
355  """
356  if run is not None and run != self.butler.run:
357  butler = Butler(butler=self.butler, run=run)
358  else:
359  butler = self.butler
360  try:
361  return butler.ingest(file, self.datasetType, dataId, transfer=self.config.transfer,
362  formatter=self.getFormatter(file, headers, dataId))
363  except ConflictingDefinitionError as err:
364  raise IngestConflictError("Ingest conflict on {} {}".format(file, dataId)) from err
365 
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:167

◆ makeField()

def lsst.pipe.base.task.Task.makeField (   cls,
  doc 
)
inherited
Make a `lsst.pex.config.ConfigurableField` for this task.

Parameters
----------
doc : `str`
    Help text for the field.

Returns
-------
configurableField : `lsst.pex.config.ConfigurableField`
    A `~ConfigurableField` for this task.

Examples
--------
Provides a convenient way to specify this task is a subtask of another task.

Here is an example of use::

    class OtherTaskConfig(lsst.pex.config.Config)
aSubtask = ATaskClass.makeField("a brief description of what this task does")

Definition at line 329 of file task.py.

329  def makeField(cls, doc):
330  """Make a `lsst.pex.config.ConfigurableField` for this task.
331 
332  Parameters
333  ----------
334  doc : `str`
335  Help text for the field.
336 
337  Returns
338  -------
339  configurableField : `lsst.pex.config.ConfigurableField`
340  A `~ConfigurableField` for this task.
341 
342  Examples
343  --------
344  Provides a convenient way to specify this task is a subtask of another task.
345 
346  Here is an example of use::
347 
348  class OtherTaskConfig(lsst.pex.config.Config)
349  aSubtask = ATaskClass.makeField("a brief description of what this task does")
350  """
351  return ConfigurableField(doc=doc, target=cls)
352 

◆ makeSubtask()

def lsst.pipe.base.task.Task.makeSubtask (   self,
  name,
  keyArgs 
)
inherited
Create a subtask as a new instance as the ``name`` attribute of this task.

Parameters
----------
name : `str`
    Brief name of the subtask.
keyArgs
    Extra keyword arguments used to construct the task. The following arguments are automatically
    provided and cannot be overridden:

    - "config".
    - "parentTask".

Notes
-----
The subtask must be defined by ``Task.config.name``, an instance of pex_config ConfigurableField
or RegistryField.

Definition at line 275 of file task.py.

275  def makeSubtask(self, name, **keyArgs):
276  """Create a subtask as a new instance as the ``name`` attribute of this task.
277 
278  Parameters
279  ----------
280  name : `str`
281  Brief name of the subtask.
282  keyArgs
283  Extra keyword arguments used to construct the task. The following arguments are automatically
284  provided and cannot be overridden:
285 
286  - "config".
287  - "parentTask".
288 
289  Notes
290  -----
291  The subtask must be defined by ``Task.config.name``, an instance of pex_config ConfigurableField
292  or RegistryField.
293  """
294  taskField = getattr(self.config, name, None)
295  if taskField is None:
296  raise KeyError("%s's config does not have field %r" % (self.getFullName(), name))
297  subtask = taskField.apply(name=name, parentTask=self, **keyArgs)
298  setattr(self, name, subtask)
299 

◆ processFile()

def lsst.obs.base.gen3.ingest.RawIngestTask.processFile (   self,
  file 
)
Ingest a single raw data file after extacting metadata.

This creates any new exposure or visit Dimension entries needed to
identify the ingest file, creates a new Dataset entry in the
Registry and finally ingests the file itself into the Datastore.
Any needed instrument, detector, and physical_filter Dimension entries must
exist in the Registry before `run` is called.

Parameters
----------
file : `str` or path-like object
    Absolute path to the file to be ingested.

Definition at line 366 of file ingest.py.

366  def processFile(self, file):
367  """Ingest a single raw data file after extacting metadata.
368 
369  This creates any new exposure or visit Dimension entries needed to
370  identify the ingest file, creates a new Dataset entry in the
371  Registry and finally ingests the file itself into the Datastore.
372  Any needed instrument, detector, and physical_filter Dimension entries must
373  exist in the Registry before `run` is called.
374 
375  Parameters
376  ----------
377  file : `str` or path-like object
378  Absolute path to the file to be ingested.
379  """
380  try:
381  headers, dataId = self.ensureDimensions(file)
382  except Exception as err:
383  raise RuntimeError(f"Unexpected error adding dimensions for {file}.") from err
384  # We want ingesting a single file to be atomic even if we are
385  # not trying to ingest the list of files atomically.
386  with self.butler.transaction():
387  try:
388  self.ingestFile(file, headers, dataId)
389  return
390  except IngestConflictError:
391  if self.config.conflict == "fail":
392  raise
393  if self.config.conflict == "ignore":
394  if self.stashRun is not None:
395  if self.stashRun.id is None:
396  self.butler.registry.ensureRun(self.stashRun)
397  self.log.infof("Conflict on {} ({}); ingesting to stash '{}' instead.",
398  dataId, file, self.config.stash)
399  with self.butler.transaction():
400  self.ingestFile(file, headers, dataId, run=self.stashRun)
401  else:
402  self.log.infof("Conflict on {} ({}); ignoring.", dataId, file)
403 
def infof(fmt, args, kwargs)

◆ readHeaders()

def lsst.obs.base.gen3.ingest.RawIngestTask.readHeaders (   self,
  file 
)
Read and return any relevant headers from the given file.

The default implementation simply reads the header of the first
non-empty HDU, so it always returns a single-element list.

Parameters
----------
file : `str` or path-like object
    Absolute path to the file to be ingested.

Returns
-------
headers : `list` of `~lsst.daf.base.PropertyList`
    Single-element list containing the header of the first
    non-empty HDU.

Definition at line 219 of file ingest.py.

219  def readHeaders(self, file):
220  """Read and return any relevant headers from the given file.
221 
222  The default implementation simply reads the header of the first
223  non-empty HDU, so it always returns a single-element list.
224 
225  Parameters
226  ----------
227  file : `str` or path-like object
228  Absolute path to the file to be ingested.
229 
230  Returns
231  -------
232  headers : `list` of `~lsst.daf.base.PropertyList`
233  Single-element list containing the header of the first
234  non-empty HDU.
235  """
236  phdu = readMetadata(file, 0)
237  headers = [merge_headers([phdu, readMetadata(file)], mode="overwrite")]
238  for h in headers:
239  fix_header(h)
240  return headers
241 

◆ run()

def lsst.obs.base.gen3.ingest.RawIngestTask.run (   self,
  files 
)
Ingest files into a Butler data repository.

This creates any new exposure or visit Dimension entries needed to
identify the ingested files, creates new Dataset entries in the
Registry and finally ingests the files themselves into the Datastore.
Any needed instrument, detector, and physical_filter Dimension entries
must exist in the Registry before `run` is called.

Parameters
----------
files : iterable over `str` or path-like objects
    Paths to the files to be ingested.  Will be made absolute
    if they are not already.

Definition at line 183 of file ingest.py.

183  def run(self, files):
184  """Ingest files into a Butler data repository.
185 
186  This creates any new exposure or visit Dimension entries needed to
187  identify the ingested files, creates new Dataset entries in the
188  Registry and finally ingests the files themselves into the Datastore.
189  Any needed instrument, detector, and physical_filter Dimension entries
190  must exist in the Registry before `run` is called.
191 
192  Parameters
193  ----------
194  files : iterable over `str` or path-like objects
195  Paths to the files to be ingested. Will be made absolute
196  if they are not already.
197  """
198  self.butler.registry.registerDatasetType(self.getDatasetType())
199  if self.config.onError == "rollback":
200  with self.butler.transaction():
201  for file in files:
202  self.processFile(os.path.abspath(file))
203  if self.config.doAddRegions:
204  self._addVisitRegions()
205  elif self.config.onError == "break":
206  for file in files:
207  self.processFile(os.path.abspath(file))
208  if self.config.doAddRegions:
209  self._addVisitRegions()
210  elif self.config.onError == "continue":
211  for file in files:
212  try:
213  self.processFile(os.path.abspath(file))
214  except Exception as err:
215  self.log.warnf("Error processing '{}': {}", file, err)
216  if self.config.doAddRegions:
217  self._addVisitRegions()
218 
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def warnf(fmt, args, kwargs)

◆ timer()

def lsst.pipe.base.task.Task.timer (   self,
  name,
  logLevel = Log.DEBUG 
)
inherited
Context manager to log performance data for an arbitrary block of code.

Parameters
----------
name : `str`
    Name of code being timed; data will be logged using item name: ``Start`` and ``End``.
logLevel
    A `lsst.log` level constant.

Examples
--------
Creating a timer context::

    with self.timer("someCodeToTime"):
pass  # code to time

See also
--------
timer.logInfo

Definition at line 301 of file task.py.

301  def timer(self, name, logLevel=Log.DEBUG):
302  """Context manager to log performance data for an arbitrary block of code.
303 
304  Parameters
305  ----------
306  name : `str`
307  Name of code being timed; data will be logged using item name: ``Start`` and ``End``.
308  logLevel
309  A `lsst.log` level constant.
310 
311  Examples
312  --------
313  Creating a timer context::
314 
315  with self.timer("someCodeToTime"):
316  pass # code to time
317 
318  See also
319  --------
320  timer.logInfo
321  """
322  logInfo(obj=self, prefix=name + "Start", logLevel=logLevel)
323  try:
324  yield
325  finally:
326  logInfo(obj=self, prefix=name + "End", logLevel=logLevel)
327 
def logInfo(obj, prefix, logLevel=Log.DEBUG)
Definition: timer.py:62

Member Data Documentation

◆ butler

lsst.obs.base.gen3.ingest.RawIngestTask.butler

Definition at line 149 of file ingest.py.

◆ config

lsst.pipe.base.task.Task.config
inherited

Definition at line 149 of file task.py.

◆ ConfigClass

lsst.obs.base.gen3.ingest.RawIngestTask.ConfigClass = RawIngestConfig
static

Definition at line 137 of file ingest.py.

◆ datasetType

lsst.obs.base.gen3.ingest.RawIngestTask.datasetType

Definition at line 150 of file ingest.py.

◆ dimensionEntriesDone

lsst.obs.base.gen3.ingest.RawIngestTask.dimensionEntriesDone

Definition at line 155 of file ingest.py.

◆ dimensions

lsst.obs.base.gen3.ingest.RawIngestTask.dimensions

Definition at line 151 of file ingest.py.

◆ instrumentCache

lsst.obs.base.gen3.ingest.RawIngestTask.instrumentCache

Definition at line 158 of file ingest.py.

◆ log

lsst.pipe.base.task.Task.log
inherited

Definition at line 148 of file task.py.

◆ metadata

lsst.pipe.base.task.Task.metadata
inherited

Definition at line 121 of file task.py.

◆ stashRun

lsst.obs.base.gen3.ingest.RawIngestTask.stashRun

Definition at line 162 of file ingest.py.

◆ visitRegions

lsst.obs.base.gen3.ingest.RawIngestTask.visitRegions

Definition at line 163 of file ingest.py.


The documentation for this class was generated from the following file:
  • /j/snowflake/release/lsstsw/stack/Linux64/obs_base/18.1.0-18-gb5d19ff+1/python/lsst/obs/base/gen3/ingest.py