LSSTApplications  18.1.0
LSSTDataManagementBasePackage
pipelineTask.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """This module defines PipelineTask class and related methods.
23 """
24 
25 __all__ = ["DatasetTypeDescriptor", "PipelineTask"] # Classes in this module
26 
27 from lsst.daf.butler import DatasetType
28 from .config import (InputDatasetConfig, OutputDatasetConfig,
29  InitInputDatasetConfig, InitOutputDatasetConfig)
30 from .task import Task
31 
32 
33 class ScalarError(TypeError):
34  """Exception raised when dataset type is configured as scalar
35  but there are multiple DataIds in a Quantum for that dataset.
36 
37  Parameters
38  ----------
39  key : `str`
40  Name of the configuration field for dataset type.
41  numDataIds : `int`
42  Actual number of DataIds in a Quantum for this dataset type.
43  """
44  def __init__(self, key, numDataIds):
45  super().__init__(("Expected scalar for output dataset field {}, "
46  "received {} DataIds").format(key, numDataIds))
47 
48 
50  """Describe DatasetType and its options for PipelineTask.
51 
52  This class contains DatasetType and all relevant options that are used by
53  PipelineTask. Typically this is derived from configuration classes but
54  sub-classes of PipelineTask can also define additional DatasetTypes that
55  are not part of the task configuration.
56 
57  Parameters
58  ----------
59  datasetType : `DatasetType`
60  scalar : `bool`
61  `True` if this is a scalar dataset.
62  manualLoad : `bool`
63  `True` if this dataset will be manually loaded by a concrete
64  `PipelineTask` instead of loaded automatically by the base class.
65  """
66 
67  def __init__(self, datasetType, scalar, manualLoad):
68  self._datasetType = datasetType
69  self._scalar = scalar
70  self._manualLoad = manualLoad
71 
72  @classmethod
73  def fromConfig(cls, datasetConfig):
74  """Make DatasetTypeDescriptor instance from configuration object.
75 
76  Parameters
77  ----------
78  datasetConfig : `lsst.pex.config.Config`
79  Instance of one the `InputDatasetConfig`, `OutputDatasetConfig`,
80  `InitInputDatasetConfig`, or `InitOutputDatasetConfig` types
81 
82  Returns
83  -------
84  descriptor : `DatasetTypeDescriptor`
85  """
86  datasetType = DatasetType(name=datasetConfig.name,
87  dimensions=datasetConfig.dimensions,
88  storageClass=datasetConfig.storageClass)
89  # Use scalar=True for Init dataset types
90  scalar = getattr(datasetConfig, 'scalar', True)
91  manualLoad = getattr(datasetConfig, 'manualLoad', False)
92  return cls(datasetType=datasetType, scalar=scalar, manualLoad=manualLoad)
93 
94  @property
95  def datasetType(self):
96  """`DatasetType` instance.
97  """
98  return self._datasetType
99 
100  @property
101  def scalar(self):
102  """`True` if this is a scalar dataset.
103  """
104  return self._scalar
105 
106  @property
107  def manualLoad(self):
108  """`True` if the task will handle loading the data
109  """
110  return self._manualLoad
111 
112 
114  """Base class for all pipeline tasks.
115 
116  This is an abstract base class for PipelineTasks which represents an
117  algorithm executed by framework(s) on data which comes from data butler,
118  resulting data is also stored in a data butler.
119 
120  PipelineTask inherits from a `pipe.base.Task` and uses the same
121  configuration mechanism based on `pex.config`. PipelineTask sub-class
122  typically implements `run()` method which receives Python-domain data
123  objects and returns `pipe.base.Struct` object with resulting data.
124  `run()` method is not supposed to perform any I/O, it operates entirely
125  on in-memory objects. `runQuantum()` is the method (can be re-implemented
126  in sub-class) where all necessary I/O is performed, it reads all input
127  data from data butler into memory, calls `run()` method with that data,
128  examines returned `Struct` object and saves some or all of that data back
129  to data butler. `runQuantum()` method receives `daf.butler.Quantum`
130  instance which defines all input and output datasets for a single
131  invocation of PipelineTask.
132 
133  Subclasses must be constructable with exactly the arguments taken by the
134  PipelineTask base class constructor, but may support other signatures as
135  well.
136 
137  Attributes
138  ----------
139  canMultiprocess : bool, True by default (class attribute)
140  This class attribute is checked by execution framework, sub-classes
141  can set it to ``False`` in case task does not support multiprocessing.
142 
143  Parameters
144  ----------
145  config : `pex.config.Config`, optional
146  Configuration for this task (an instance of ``self.ConfigClass``,
147  which is a task-specific subclass of `PipelineTaskConfig`).
148  If not specified then it defaults to `self.ConfigClass()`.
149  log : `lsst.log.Log`, optional
150  Logger instance whose name is used as a log name prefix, or ``None``
151  for no prefix.
152  initInputs : `dict`, optional
153  A dictionary of objects needed to construct this PipelineTask, with
154  keys matching the keys of the dictionary returned by
155  `getInitInputDatasetTypes` and values equivalent to what would be
156  obtained by calling `Butler.get` with those DatasetTypes and no data
157  IDs. While it is optional for the base class, subclasses are
158  permitted to require this argument.
159  """
160 
161  canMultiprocess = True
162 
163  def __init__(self, *, config=None, log=None, initInputs=None, **kwargs):
164  super().__init__(config=config, log=log, **kwargs)
165 
167  """Return persistable outputs that are available immediately after
168  the task has been constructed.
169 
170  Subclasses that operate on catalogs should override this method to
171  return the schema(s) of the catalog(s) they produce.
172 
173  It is not necessary to return the PipelineTask's configuration or
174  other provenance information in order for it to be persisted; that is
175  the responsibility of the execution system.
176 
177  Returns
178  -------
179  datasets : `dict`
180  Dictionary with keys that match those of the dict returned by
181  `getInitOutputDatasetTypes` values that can be written by calling
182  `Butler.put` with those DatasetTypes and no data IDs. An empty
183  `dict` should be returned by tasks that produce no initialization
184  outputs.
185  """
186  return {}
187 
188  @classmethod
189  def getInputDatasetTypes(cls, config):
190  """Return input dataset type descriptors for this task.
191 
192  Default implementation finds all fields of type `InputDatasetConfig`
193  in configuration (non-recursively) and uses them for constructing
194  `DatasetTypeDescriptor` instances. The names of these fields are used
195  as keys in returned dictionary. Subclasses can override this behavior.
196 
197  Parameters
198  ----------
199  config : `Config`
200  Configuration for this task. Typically datasets are defined in
201  a task configuration.
202 
203  Returns
204  -------
205  Dictionary where key is the name (arbitrary) of the input dataset
206  and value is the `DatasetTypeDescriptor` instance. Default
207  implementation uses configuration field name as dictionary key.
208  """
209  return cls.getDatasetTypes(config, InputDatasetConfig)
210 
211  @classmethod
212  def getOutputDatasetTypes(cls, config):
213  """Return output dataset type descriptors for this task.
214 
215  Default implementation finds all fields of type `OutputDatasetConfig`
216  in configuration (non-recursively) and uses them for constructing
217  `DatasetTypeDescriptor` instances. The keys of these fields are used
218  as keys in returned dictionary. Subclasses can override this behavior.
219 
220  Parameters
221  ----------
222  config : `Config`
223  Configuration for this task. Typically datasets are defined in
224  a task configuration.
225 
226  Returns
227  -------
228  Dictionary where key is the name (arbitrary) of the output dataset
229  and value is the `DatasetTypeDescriptor` instance. Default
230  implementation uses configuration field name as dictionary key.
231  """
232  return cls.getDatasetTypes(config, OutputDatasetConfig)
233 
234  @classmethod
235  def getPrerequisiteDatasetTypes(cls, config):
236  """Return the local names of input dataset types that should be
237  assumed to exist instead of constraining what data to process with
238  this task.
239 
240  Usually, when running a `PipelineTask`, the presence of input datasets
241  constrains the processing to be done (as defined by the `QuantumGraph`
242  generated during "preflight"). "Prerequisites" are special input
243  datasets that do not constrain that graph, but instead cause a hard
244  failure when missing. Calibration products and reference catalogs
245  are examples of dataset types that should usually be marked as
246  prerequisites.
247 
248  Parameters
249  ----------
250  config : `Config`
251  Configuration for this task. Typically datasets are defined in
252  a task configuration.
253 
254  Returns
255  -------
256  prerequisite : `~collections.abc.Set` of `str`
257  The keys in the dictionary returned by `getInputDatasetTypes` that
258  represent dataset types that should be considered prerequisites.
259  Names returned here that are not keys in that dictionary are
260  ignored; that way, if a config option removes an input dataset type
261  only `getInputDatasetTypes` needs to be updated.
262  """
263  return frozenset()
264 
265  @classmethod
266  def getInitInputDatasetTypes(cls, config):
267  """Return dataset type descriptors that can be used to retrieve the
268  ``initInputs`` constructor argument.
269 
270  Datasets used in initialization may not be associated with any
271  Dimension (i.e. their data IDs must be empty dictionaries).
272 
273  Default implementation finds all fields of type
274  `InitInputInputDatasetConfig` in configuration (non-recursively) and
275  uses them for constructing `DatasetTypeDescriptor` instances. The
276  names of these fields are used as keys in returned dictionary.
277  Subclasses can override this behavior.
278 
279  Parameters
280  ----------
281  config : `Config`
282  Configuration for this task. Typically datasets are defined in
283  a task configuration.
284 
285  Returns
286  -------
287  Dictionary where key is the name (arbitrary) of the input dataset
288  and value is the `DatasetTypeDescriptor` instance. Default
289  implementation uses configuration field name as dictionary key.
290 
291  When the task requires no initialization inputs, should return an
292  empty dict.
293  """
294  return cls.getDatasetTypes(config, InitInputDatasetConfig)
295 
296  @classmethod
297  def getInitOutputDatasetTypes(cls, config):
298  """Return dataset type descriptors that can be used to write the
299  objects returned by `getOutputDatasets`.
300 
301  Datasets used in initialization may not be associated with any
302  Dimension (i.e. their data IDs must be empty dictionaries).
303 
304  Default implementation finds all fields of type
305  `InitOutputDatasetConfig` in configuration (non-recursively) and uses
306  them for constructing `DatasetTypeDescriptor` instances. The names of
307  these fields are used as keys in returned dictionary. Subclasses can
308  override this behavior.
309 
310  Parameters
311  ----------
312  config : `Config`
313  Configuration for this task. Typically datasets are defined in
314  a task configuration.
315 
316  Returns
317  -------
318  Dictionary where key is the name (arbitrary) of the output dataset
319  and value is the `DatasetTypeDescriptor` instance. Default
320  implementation uses configuration field name as dictionary key.
321 
322  When the task produces no initialization outputs, should return an
323  empty dict.
324  """
325  return cls.getDatasetTypes(config, InitOutputDatasetConfig)
326 
327  @classmethod
328  def getDatasetTypes(cls, config, configClass):
329  """Return dataset type descriptors defined in task configuration.
330 
331  This method can be used by other methods that need to extract dataset
332  types from task configuration (e.g. `getInputDatasetTypes` or
333  sub-class methods).
334 
335  Parameters
336  ----------
337  config : `Config`
338  Configuration for this task. Typically datasets are defined in
339  a task configuration.
340  configClass : `type`
341  Class of the configuration object which defines dataset type.
342 
343  Returns
344  -------
345  Dictionary where key is the name (arbitrary) of the output dataset
346  and value is the `DatasetTypeDescriptor` instance. Default
347  implementation uses configuration field name as dictionary key.
348  Returns empty dict if configuration has no fields with the specified
349  ``configClass``.
350  """
351  dsTypes = {}
352  for key, value in config.items():
353  if isinstance(value, configClass):
354  dsTypes[key] = DatasetTypeDescriptor.fromConfig(value)
355  return dsTypes
356 
357  @classmethod
358  def getPerDatasetTypeDimensions(cls, config):
359  """Return any Dimensions that are permitted to have different values
360  for different DatasetTypes within the same quantum.
361 
362  Parameters
363  ----------
364  config : `Config`
365  Configuration for this task.
366 
367  Returns
368  -------
369  dimensions : `~collections.abc.Set` of `Dimension` or `str`
370  The dimensions or names thereof that should be considered
371  per-DatasetType.
372 
373  Notes
374  -----
375  Any Dimension declared to be per-DatasetType by a PipelineTask must
376  also be declared to be per-DatasetType by other PipelineTasks in the
377  same Pipeline.
378 
379  The classic example of a per-DatasetType dimension is the
380  ``CalibrationLabel`` dimension that maps to a validity range for
381  master calibrations. When running Instrument Signature Removal, one
382  does not care that different dataset types like flat, bias, and dark
383  have different validity ranges, as long as those validity ranges all
384  overlap the relevant observation.
385  """
386  return frozenset()
387 
388  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
389  """Run task algorithm on in-memory data.
390 
391  This method is called by `runQuantum` to operate on input in-memory
392  data and produce coressponding output in-memory data. It receives
393  arguments which are dictionaries with input data and input/output
394  DataIds. Many simple tasks do not need to know DataIds so default
395  implementation of this method calls `run` method passing input data
396  objects as keyword arguments. Most simple tasks will implement `run`
397  method, more complex tasks that need to know about output DataIds
398  will override this method instead.
399 
400  All three arguments to this method are dictionaries with keys equal
401  to the name of the configuration fields for dataset type. If dataset
402  type is configured with ``scalar`` fiels set to ``True`` then it is
403  expected that only one dataset appears on input or output for that
404  dataset type and dictionary value will be a single data object or
405  DataId. Otherwise if ``scalar`` is ``False`` (default) then value
406  will be a list (even if only one item is in the list).
407 
408  The method returns `Struct` instance with attributes matching the
409  configuration fields for output dataset types. Values stored in
410  returned struct are single object if ``scalar`` is ``True`` or
411  list of objects otherwise. If tasks produces more than one object
412  for some dataset type then data objects returned in ``struct`` must
413  match in count and order corresponding DataIds in ``outputDataIds``.
414 
415  Parameters
416  ----------
417  inputData : `dict`
418  Dictionary whose keys are the names of the configuration fields
419  describing input dataset types and values are Python-domain data
420  objects (or lists of objects) retrieved from data butler.
421  inputDataIds : `dict`
422  Dictionary whose keys are the names of the configuration fields
423  describing input dataset types and values are DataIds (or lists
424  of DataIds) that task consumes for corresponding dataset type.
425  DataIds are guaranteed to match data objects in ``inputData``
426  outputDataIds : `dict`
427  Dictionary whose keys are the names of the configuration fields
428  describing output dataset types and values are DataIds (or lists
429  of DataIds) that task is to produce for corresponding dataset
430  type.
431 
432  Returns
433  -------
434  struct : `Struct`
435  Standard convention is that this method should return `Struct`
436  instance containing all output data. Struct attribute names
437  should correspond to the names of the configuration fields
438  describing task output dataset types. If something different
439  is returned then `saveStruct` method has to be re-implemented
440  accordingly.
441  """
442  return self.run(**inputData)
443 
444  def run(self, **kwargs):
445  """Run task algorithm on in-memory data.
446 
447  This method should be implemented in a subclass unless tasks overrides
448  `adaptArgsAndRun` to do something different from its default
449  implementation. With default implementation of `adaptArgsAndRun` this
450  method will receive keyword arguments whose names will be the same as
451  names of configuration fields describing input dataset types. Argument
452  values will be data objects retrieved from data butler. If a dataset
453  type is configured with ``scalar`` field set to ``True`` then argument
454  value will be a single object, otherwise it will be a list of objects.
455 
456  If the task needs to know its input or output DataIds then it has to
457  override `adaptArgsAndRun` method instead.
458 
459  Returns
460  -------
461  struct : `Struct`
462  See description of `adaptArgsAndRun` method.
463 
464  Examples
465  --------
466  Typical implementation of this method may look like::
467 
468  def run(self, input, calib):
469  # "input", "calib", and "output" are the names of the config fields
470 
471  # Assuming that input/calib datasets are `scalar` they are simple objects,
472  # do something with inputs and calibs, produce output image.
473  image = self.makeImage(input, calib)
474 
475  # If output dataset is `scalar` then return object, not list
476  return Struct(output=image)
477 
478  """
479  raise NotImplementedError("run() is not implemented")
480 
481  def runQuantum(self, quantum, butler):
482  """Execute PipelineTask algorithm on single quantum of data.
483 
484  Typical implementation of this method will use inputs from quantum
485  to retrieve Python-domain objects from data butler and call
486  `adaptArgsAndRun` method on that data. On return from
487  `adaptArgsAndRun` this method will extract data from returned
488  `Struct` instance and save that data to butler.
489 
490  The `Struct` returned from `adaptArgsAndRun` is expected to contain
491  data attributes with the names equal to the names of the
492  configuration fields defining output dataset types. The values of
493  the data attributes must be data objects corresponding to
494  the DataIds of output dataset types. All data objects will be
495  saved in butler using DataRefs from Quantum's output dictionary.
496 
497  This method does not return anything to the caller, on errors
498  corresponding exception is raised.
499 
500  Parameters
501  ----------
502  quantum : `Quantum`
503  Object describing input and output corresponding to this
504  invocation of PipelineTask instance.
505  butler : object
506  Data butler instance.
507 
508  Raises
509  ------
510  `ScalarError` if a dataset type is configured as scalar but receives
511  multiple DataIds in `quantum`. Any exceptions that happen in data
512  butler or in `adaptArgsAndRun` method.
513  """
514 
515  def makeDataRefs(descriptors, refMap):
516  """Generate map of DatasetRefs and DataIds.
517 
518  Given a map of DatasetTypeDescriptor and a map of Quantum
519  DatasetRefs makes maps of DataIds and and DatasetRefs.
520  For scalar dataset types unpacks DatasetRefs and DataIds.
521 
522  Parameters
523  ----------
524  descriptors : `dict`
525  Map of (dataset key, DatasetTypeDescriptor).
526  refMap : `dict`
527  Map of (dataset type name, DatasetRefs).
528 
529  Returns
530  -------
531  dataIds : `dict`
532  Map of (dataset key, DataIds)
533  dataRefs : `dict`
534  Map of (dataset key, DatasetRefs)
535 
536  Raises
537  ------
538  ScalarError
539  Raised if dataset type is configured as scalar but more than
540  one DatasetRef exists for it.
541  """
542  dataIds = {}
543  dataRefs = {}
544  for key, descriptor in descriptors.items():
545  keyDataRefs = refMap[descriptor.datasetType.name]
546  keyDataIds = [dataRef.dataId for dataRef in keyDataRefs]
547  if descriptor.scalar:
548  # unpack single-item lists
549  if len(keyDataRefs) != 1:
550  raise ScalarError(key, len(keyDataRefs))
551  keyDataRefs = keyDataRefs[0]
552  keyDataIds = keyDataIds[0]
553  dataIds[key] = keyDataIds
554  if not descriptor.manualLoad:
555  dataRefs[key] = keyDataRefs
556  return dataIds, dataRefs
557 
558  # lists of DataRefs/DataIds for input datasets
559  descriptors = self.getInputDatasetTypes(self.config)
560  inputDataIds, inputDataRefs = makeDataRefs(descriptors, quantum.predictedInputs)
561 
562  # get all data from butler
563  inputs = {}
564  for key, dataRefs in inputDataRefs.items():
565  if isinstance(dataRefs, list):
566  inputs[key] = [butler.get(dataRef) for dataRef in dataRefs]
567  else:
568  inputs[key] = butler.get(dataRefs)
569  del inputDataRefs
570 
571  # lists of DataRefs/DataIds for output datasets
572  descriptors = self.getOutputDatasetTypes(self.config)
573  outputDataIds, outputDataRefs = makeDataRefs(descriptors, quantum.outputs)
574 
575  # call run method with keyword arguments
576  struct = self.adaptArgsAndRun(inputs, inputDataIds, outputDataIds, butler)
577 
578  # store produced ouput data
579  self.saveStruct(struct, outputDataRefs, butler)
580 
581  def saveStruct(self, struct, outputDataRefs, butler):
582  """Save data in butler.
583 
584  Convention is that struct returned from ``run()`` method has data
585  field(s) with the same names as the config fields defining
586  output DatasetTypes. Subclasses may override this method to implement
587  different convention for `Struct` content or in case any
588  post-processing of data may be needed.
589 
590  Parameters
591  ----------
592  struct : `Struct`
593  Data produced by the task packed into `Struct` instance
594  outputDataRefs : `dict`
595  Dictionary whose keys are the names of the configuration fields
596  describing output dataset types and values are lists of DataRefs.
597  DataRefs must match corresponding data objects in ``struct`` in
598  number and order.
599  butler : object
600  Data butler instance.
601  """
602  structDict = struct.getDict()
603  descriptors = self.getOutputDatasetTypes(self.config)
604  for key in descriptors.keys():
605  dataList = structDict[key]
606  dataRefs = outputDataRefs[key]
607  if not isinstance(dataRefs, list):
608  # scalar outputs, make them lists again
609  dataRefs = [dataRefs]
610  dataList = [dataList]
611  # TODO: check that data objects and data refs are aligned
612  for dataRef, data in zip(dataRefs, dataList):
613  butler.put(data, dataRef.datasetType.name, dataRef.dataId)
614 
615  def getResourceConfig(self):
616  """Return resource configuration for this task.
617 
618  Returns
619  -------
620  Object of type `~config.ResourceConfig` or ``None`` if resource
621  configuration is not defined for this task.
622  """
623  return getattr(self.config, "resources", None)
def getDatasetTypes(cls, config, configClass)
def __init__(self, datasetType, scalar, manualLoad)
Definition: pipelineTask.py:67
def runQuantum(self, quantum, butler)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def __init__(self, key, numDataIds)
Definition: pipelineTask.py:44
def __init__(self, config=None, log=None, initInputs=None, kwargs)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def saveStruct(self, struct, outputDataRefs, butler)