LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
pipelineTask.py
Go to the documentation of this file.
1 # This file is part of pipe_base.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (http://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 
22 """This module defines PipelineTask class and related methods.
23 """
24 
25 __all__ = ["DatasetTypeDescriptor", "PipelineTask"] # Classes in this module
26 
27 from lsst.daf.butler import DatasetType
28 from .config import (InputDatasetConfig, OutputDatasetConfig,
29  InitInputDatasetConfig, InitOutputDatasetConfig)
30 from .task import Task
31 
32 
33 class ScalarError(TypeError):
34  """Exception raised when dataset type is configured as scalar
35  but there are multiple DataIds in a Quantum for that dataset.
36 
37  Parameters
38  ----------
39  key : `str`
40  Name of the configuration field for dataset type.
41  numDataIds : `int`
42  Actual number of DataIds in a Quantum for this dataset type.
43  """
44  def __init__(self, key, numDataIds):
45  super().__init__(("Expected scalar for output dataset field {}, "
46  "received {} DataIds").format(key, numDataIds))
47 
48 
50  """Describe DatasetType and its options for PipelineTask.
51 
52  This class contains DatasetType and all relevant options that are used by
53  PipelineTask. Typically this is derived from configuration classes but
54  sub-classes of PipelineTask can also define additional DatasetTypes that
55  are not part of the task configuration.
56 
57  Parameters
58  ----------
59  datasetType : `DatasetType`
60  scalar : `bool`
61  `True` if this is a scalar dataset.
62  """
63 
64  def __init__(self, datasetType, scalar, manualLoad):
65  self._datasetType = datasetType
66  self._scalar = scalar
67  self._manualLoad = manualLoad
68 
69  @classmethod
70  def fromConfig(cls, datasetConfig):
71  """Make DatasetTypeDescriptor instance from configuration object.
72 
73  Parameters
74  ----------
75  datasetConfig : `lsst.pex.config.Config`
76  Instance of one the `InputDatasetConfig`, `OutputDatasetConfig`,
77  `InitInputDatasetConfig`, or `InitOutputDatasetConfig` types
78 
79  Returns
80  -------
81  descriptor : `DatasetTypeDescriptor`
82  """
83  datasetType = DatasetType(name=datasetConfig.name,
84  dimensions=datasetConfig.dimensions,
85  storageClass=datasetConfig.storageClass)
86  # Use scalar=True for Init dataset types
87  scalar = getattr(datasetConfig, 'scalar', True)
88  manualLoad = getattr(datasetConfig, 'manualLoad', False)
89  return cls(datasetType=datasetType, scalar=scalar, manualLoad=manualLoad)
90 
91  @property
92  def datasetType(self):
93  """`DatasetType` instance.
94  """
95  return self._datasetType
96 
97  @property
98  def scalar(self):
99  """`True` if this is a scalar dataset.
100  """
101  return self._scalar
102 
103  @property
104  def manualLoad(self):
105  """`True` if the task will handle loading the data
106  """
107  return self._manualLoad
108 
109 
111  """Base class for all pipeline tasks.
112 
113  This is an abstract base class for PipelineTasks which represents an
114  algorithm executed by framework(s) on data which comes from data butler,
115  resulting data is also stored in a data butler.
116 
117  PipelineTask inherits from a `pipe.base.Task` and uses the same
118  configuration mechanism based on `pex.config`. PipelineTask sub-class
119  typically implements `run()` method which receives Python-domain data
120  objects and returns `pipe.base.Struct` object with resulting data.
121  `run()` method is not supposed to perform any I/O, it operates entirely
122  on in-memory objects. `runQuantum()` is the method (can be re-implemented
123  in sub-class) where all necessary I/O is performed, it reads all input
124  data from data butler into memory, calls `run()` method with that data,
125  examines returned `Struct` object and saves some or all of that data back
126  to data butler. `runQuantum()` method receives `daf.butler.Quantum`
127  instance which defines all input and output datasets for a single
128  invocation of PipelineTask.
129 
130  Subclasses must be constructable with exactly the arguments taken by the
131  PipelineTask base class constructor, but may support other signatures as
132  well.
133 
134  Attributes
135  ----------
136  canMultiprocess : bool, True by default (class attribute)
137  This class attribute is checked by execution framework, sub-classes
138  can set it to ``False`` in case task does not support multiprocessing.
139 
140  Parameters
141  ----------
142  config : `pex.config.Config`, optional
143  Configuration for this task (an instance of ``self.ConfigClass``,
144  which is a task-specific subclass of `PipelineTaskConfig`).
145  If not specified then it defaults to `self.ConfigClass()`.
146  log : `lsst.log.Log`, optional
147  Logger instance whose name is used as a log name prefix, or ``None``
148  for no prefix.
149  initInputs : `dict`, optional
150  A dictionary of objects needed to construct this PipelineTask, with
151  keys matching the keys of the dictionary returned by
152  `getInitInputDatasetTypes` and values equivalent to what would be
153  obtained by calling `Butler.get` with those DatasetTypes and no data
154  IDs. While it is optional for the base class, subclasses are
155  permitted to require this argument.
156  """
157 
158  canMultiprocess = True
159 
160  def __init__(self, *, config=None, log=None, initInputs=None, **kwargs):
161  super().__init__(config=config, log=log, **kwargs)
162 
164  """Return persistable outputs that are available immediately after
165  the task has been constructed.
166 
167  Subclasses that operate on catalogs should override this method to
168  return the schema(s) of the catalog(s) they produce.
169 
170  It is not necessary to return the PipelineTask's configuration or
171  other provenance information in order for it to be persisted; that is
172  the responsibility of the execution system.
173 
174  Returns
175  -------
176  datasets : `dict`
177  Dictionary with keys that match those of the dict returned by
178  `getInitOutputDatasetTypes` values that can be written by calling
179  `Butler.put` with those DatasetTypes and no data IDs. An empty
180  `dict` should be returned by tasks that produce no initialization
181  outputs.
182  """
183  return {}
184 
185  @classmethod
186  def getInputDatasetTypes(cls, config):
187  """Return input dataset type descriptors for this task.
188 
189  Default implementation finds all fields of type `InputDatasetConfig`
190  in configuration (non-recursively) and uses them for constructing
191  `DatasetTypeDescriptor` instances. The names of these fields are used
192  as keys in returned dictionary. Subclasses can override this behavior.
193 
194  Parameters
195  ----------
196  config : `Config`
197  Configuration for this task. Typically datasets are defined in
198  a task configuration.
199 
200  Returns
201  -------
202  Dictionary where key is the name (arbitrary) of the input dataset
203  and value is the `DatasetTypeDescriptor` instance. Default
204  implementation uses configuration field name as dictionary key.
205  """
206  return cls.getDatasetTypes(config, InputDatasetConfig)
207 
208  @classmethod
209  def getOutputDatasetTypes(cls, config):
210  """Return output dataset type descriptors for this task.
211 
212  Default implementation finds all fields of type `OutputDatasetConfig`
213  in configuration (non-recursively) and uses them for constructing
214  `DatasetTypeDescriptor` instances. The keys of these fields are used
215  as keys in returned dictionary. Subclasses can override this behavior.
216 
217  Parameters
218  ----------
219  config : `Config`
220  Configuration for this task. Typically datasets are defined in
221  a task configuration.
222 
223  Returns
224  -------
225  Dictionary where key is the name (arbitrary) of the output dataset
226  and value is the `DatasetTypeDescriptor` instance. Default
227  implementation uses configuration field name as dictionary key.
228  """
229  return cls.getDatasetTypes(config, OutputDatasetConfig)
230 
231  @classmethod
232  def getInitInputDatasetTypes(cls, config):
233  """Return dataset type descriptors that can be used to retrieve the
234  ``initInputs`` constructor argument.
235 
236  Datasets used in initialization may not be associated with any
237  Dimension (i.e. their data IDs must be empty dictionaries).
238 
239  Default implementation finds all fields of type
240  `InitInputInputDatasetConfig` in configuration (non-recursively) and
241  uses them for constructing `DatasetTypeDescriptor` instances. The
242  names of these fields are used as keys in returned dictionary.
243  Subclasses can override this behavior.
244 
245  Parameters
246  ----------
247  config : `Config`
248  Configuration for this task. Typically datasets are defined in
249  a task configuration.
250 
251  Returns
252  -------
253  Dictionary where key is the name (arbitrary) of the input dataset
254  and value is the `DatasetTypeDescriptor` instance. Default
255  implementation uses configuration field name as dictionary key.
256 
257  When the task requires no initialization inputs, should return an
258  empty dict.
259  """
260  return cls.getDatasetTypes(config, InitInputDatasetConfig)
261 
262  @classmethod
263  def getInitOutputDatasetTypes(cls, config):
264  """Return dataset type descriptors that can be used to write the
265  objects returned by `getOutputDatasets`.
266 
267  Datasets used in initialization may not be associated with any
268  Dimension (i.e. their data IDs must be empty dictionaries).
269 
270  Default implementation finds all fields of type
271  `InitOutputDatasetConfig` in configuration (non-recursively) and uses
272  them for constructing `DatasetTypeDescriptor` instances. The names of
273  these fields are used as keys in returned dictionary. Subclasses can
274  override this behavior.
275 
276  Parameters
277  ----------
278  config : `Config`
279  Configuration for this task. Typically datasets are defined in
280  a task configuration.
281 
282  Returns
283  -------
284  Dictionary where key is the name (arbitrary) of the output dataset
285  and value is the `DatasetTypeDescriptor` instance. Default
286  implementation uses configuration field name as dictionary key.
287 
288  When the task produces no initialization outputs, should return an
289  empty dict.
290  """
291  return cls.getDatasetTypes(config, InitOutputDatasetConfig)
292 
293  @classmethod
294  def getDatasetTypes(cls, config, configClass):
295  """Return dataset type descriptors defined in task configuration.
296 
297  This method can be used by other methods that need to extract dataset
298  types from task configuration (e.g. `getInputDatasetTypes` or
299  sub-class methods).
300 
301  Parameters
302  ----------
303  config : `Config`
304  Configuration for this task. Typically datasets are defined in
305  a task configuration.
306  configClass : `type`
307  Class of the configuration object which defines dataset type.
308 
309  Returns
310  -------
311  Dictionary where key is the name (arbitrary) of the output dataset
312  and value is the `DatasetTypeDescriptor` instance. Default
313  implementation uses configuration field name as dictionary key.
314  Returns empty dict if configuration has no fields with the specified
315  ``configClass``.
316  """
317  dsTypes = {}
318  for key, value in config.items():
319  if isinstance(value, configClass):
320  dsTypes[key] = DatasetTypeDescriptor.fromConfig(value)
321  return dsTypes
322 
323  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
324  """Run task algorithm on in-memory data.
325 
326  This method is called by `runQuantum` to operate on input in-memory
327  data and produce coressponding output in-memory data. It receives
328  arguments which are dictionaries with input data and input/output
329  DataIds. Many simple tasks do not need to know DataIds so default
330  implementation of this method calls `run` method passing input data
331  objects as keyword arguments. Most simple tasks will implement `run`
332  method, more complex tasks that need to know about output DataIds
333  will override this method instead.
334 
335  All three arguments to this method are dictionaries with keys equal
336  to the name of the configuration fields for dataset type. If dataset
337  type is configured with ``scalar`` fiels set to ``True`` then it is
338  expected that only one dataset appears on input or output for that
339  dataset type and dictionary value will be a single data object or
340  DataId. Otherwise if ``scalar`` is ``False`` (default) then value
341  will be a list (even if only one item is in the list).
342 
343  The method returns `Struct` instance with attributes matching the
344  configuration fields for output dataset types. Values stored in
345  returned struct are single object if ``scalar`` is ``True`` or
346  list of objects otherwise. If tasks produces more than one object
347  for some dataset type then data objects returned in ``struct`` must
348  match in count and order corresponding DataIds in ``outputDataIds``.
349 
350  Parameters
351  ----------
352  inputData : `dict`
353  Dictionary whose keys are the names of the configuration fields
354  describing input dataset types and values are Python-domain data
355  objects (or lists of objects) retrieved from data butler.
356  inputDataIds : `dict`
357  Dictionary whose keys are the names of the configuration fields
358  describing input dataset types and values are DataIds (or lists
359  of DataIds) that task consumes for corresponding dataset type.
360  DataIds are guaranteed to match data objects in ``inputData``
361  outputDataIds : `dict`
362  Dictionary whose keys are the names of the configuration fields
363  describing output dataset types and values are DataIds (or lists
364  of DataIds) that task is to produce for corresponding dataset
365  type.
366 
367  Returns
368  -------
369  struct : `Struct`
370  Standard convention is that this method should return `Struct`
371  instance containing all output data. Struct attribute names
372  should correspond to the names of the configuration fields
373  describing task output dataset types. If something different
374  is returned then `saveStruct` method has to be re-implemented
375  accordingly.
376  """
377  return self.run(**inputData)
378 
379  def run(self, **kwargs):
380  """Run task algorithm on in-memory data.
381 
382  This method should be implemented in a subclass unless tasks overrides
383  `adaptArgsAndRun` to do something different from its default
384  implementation. With default implementation of `adaptArgsAndRun` this
385  method will receive keyword arguments whose names will be the same as
386  names of configuration fields describing input dataset types. Argument
387  values will be data objects retrieved from data butler. If a dataset
388  type is configured with ``scalar`` field set to ``True`` then argument
389  value will be a single object, otherwise it will be a list of objects.
390 
391  If the task needs to know its input or output DataIds then it has to
392  override `adaptArgsAndRun` method instead.
393 
394  Returns
395  -------
396  struct : `Struct`
397  See description of `adaptArgsAndRun` method.
398 
399  Examples
400  --------
401  Typical implementation of this method may look like::
402 
403  def run(self, input, calib):
404  # "input", "calib", and "output" are the names of the config fields
405 
406  # Assuming that input/calib datasets are `scalar` they are simple objects,
407  # do something with inputs and calibs, produce output image.
408  image = self.makeImage(input, calib)
409 
410  # If output dataset is `scalar` then return object, not list
411  return Struct(output=image)
412 
413  """
414  raise NotImplementedError("run() is not implemented")
415 
416  def runQuantum(self, quantum, butler):
417  """Execute PipelineTask algorithm on single quantum of data.
418 
419  Typical implementation of this method will use inputs from quantum
420  to retrieve Python-domain objects from data butler and call
421  `adaptArgsAndRun` method on that data. On return from
422  `adaptArgsAndRun` this method will extract data from returned
423  `Struct` instance and save that data to butler.
424 
425  The `Struct` returned from `adaptArgsAndRun` is expected to contain
426  data attributes with the names equal to the names of the
427  configuration fields defining output dataset types. The values of
428  the data attributes must be data objects corresponding to
429  the DataIds of output dataset types. All data objects will be
430  saved in butler using DataRefs from Quantum's output dictionary.
431 
432  This method does not return anything to the caller, on errors
433  corresponding exception is raised.
434 
435  Parameters
436  ----------
437  quantum : `Quantum`
438  Object describing input and output corresponding to this
439  invocation of PipelineTask instance.
440  butler : object
441  Data butler instance.
442 
443  Raises
444  ------
445  `ScalarError` if a dataset type is configured as scalar but receives
446  multiple DataIds in `quantum`. Any exceptions that happen in data
447  butler or in `adaptArgsAndRun` method.
448  """
449 
450  def makeDataRefs(descriptors, refMap):
451  """Generate map of DatasetRefs and DataIds.
452 
453  Given a map of DatasetTypeDescriptor and a map of Quantum
454  DatasetRefs makes maps of DataIds and and DatasetRefs.
455  For scalar dataset types unpacks DatasetRefs and DataIds.
456 
457  Parameters
458  ----------
459  descriptors : `dict`
460  Map of (dataset key, DatasetTypeDescriptor).
461  refMap : `dict`
462  Map of (dataset type name, DatasetRefs).
463 
464  Returns
465  -------
466  dataIds : `dict`
467  Map of (dataset key, DataIds)
468  dataRefs : `dict`
469  Map of (dataset key, DatasetRefs)
470 
471  Raises
472  ------
473  ScalarError
474  Raised if dataset type is configured as scalar but more than
475  one DatasetRef exists for it.
476  """
477  dataIds = {}
478  dataRefs = {}
479  for key, descriptor in descriptors.items():
480  keyDataRefs = refMap[descriptor.datasetType.name]
481  keyDataIds = [dataRef.dataId for dataRef in keyDataRefs]
482  if descriptor.scalar:
483  # unpack single-item lists
484  if len(keyDataRefs) != 1:
485  raise ScalarError(key, len(keyDataRefs))
486  keyDataRefs = keyDataRefs[0]
487  keyDataIds = keyDataIds[0]
488  dataIds[key] = keyDataIds
489  if not descriptor.manualLoad:
490  dataRefs[key] = keyDataRefs
491  return dataIds, dataRefs
492 
493  # lists of DataRefs/DataIds for input datasets
494  descriptors = self.getInputDatasetTypes(self.config)
495  inputDataIds, inputDataRefs = makeDataRefs(descriptors, quantum.predictedInputs)
496 
497  # get all data from butler
498  inputs = {}
499  for key, dataRefs in inputDataRefs.items():
500  if isinstance(dataRefs, list):
501  inputs[key] = [butler.get(dataRef) for dataRef in dataRefs]
502  else:
503  inputs[key] = butler.get(dataRefs)
504  del inputDataRefs
505 
506  # lists of DataRefs/DataIds for output datasets
507  descriptors = self.getOutputDatasetTypes(self.config)
508  outputDataIds, outputDataRefs = makeDataRefs(descriptors, quantum.outputs)
509 
510  # call run method with keyword arguments
511  struct = self.adaptArgsAndRun(inputs, inputDataIds, outputDataIds, butler)
512 
513  # store produced ouput data
514  self.saveStruct(struct, outputDataRefs, butler)
515 
516  def saveStruct(self, struct, outputDataRefs, butler):
517  """Save data in butler.
518 
519  Convention is that struct returned from ``run()`` method has data
520  field(s) with the same names as the config fields defining
521  output DatasetTypes. Subclasses may override this method to implement
522  different convention for `Struct` content or in case any
523  post-processing of data may be needed.
524 
525  Parameters
526  ----------
527  struct : `Struct`
528  Data produced by the task packed into `Struct` instance
529  outputDataRefs : `dict`
530  Dictionary whose keys are the names of the configuration fields
531  describing output dataset types and values are lists of DataRefs.
532  DataRefs must match corresponding data objects in ``struct`` in
533  number and order.
534  butler : object
535  Data butler instance.
536  """
537  structDict = struct.getDict()
538  descriptors = self.getOutputDatasetTypes(self.config)
539  for key in descriptors.keys():
540  dataList = structDict[key]
541  dataRefs = outputDataRefs[key]
542  if not isinstance(dataRefs, list):
543  # scalar outputs, make them lists again
544  dataRefs = [dataRefs]
545  dataList = [dataList]
546  # TODO: check that data objects and data refs are aligned
547  for dataRef, data in zip(dataRefs, dataList):
548  butler.put(data, dataRef.datasetType.name, dataRef.dataId)
549 
550  def getResourceConfig(self):
551  """Return resource configuration for this task.
552 
553  Returns
554  -------
555  Object of type `~config.ResourceConfig` or ``None`` if resource
556  configuration is not defined for this task.
557  """
558  return getattr(self.config, "resources", None)
def getDatasetTypes(cls, config, configClass)
def __init__(self, datasetType, scalar, manualLoad)
Definition: pipelineTask.py:64
def runQuantum(self, quantum, butler)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def __init__(self, key, numDataIds)
Definition: pipelineTask.py:44
def __init__(self, config=None, log=None, initInputs=None, kwargs)
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler)
def saveStruct(self, struct, outputDataRefs, butler)