LSSTApplications  10.0-2-g4f67435,11.0.rc2+1,11.0.rc2+12,11.0.rc2+3,11.0.rc2+4,11.0.rc2+5,11.0.rc2+6,11.0.rc2+7,11.0.rc2+8
LSSTDataManagementBasePackage
repositoryIterator.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008, 2009, 2010, 2011, 2012 LSST Corporation.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <http://www.lsstcorp.org/LegalNotices/>.
22 #
23 """Tools to help you iterate over a set of repositories.
24 
25 Helpful while creating them or harvesting data from them.
26 """
27 import itertools
28 
29 import numpy
30 
31 STR_PADDING = 5 # used by _getDTypeList; the number of characters to add to the first string value seen
32  # when estimating the number of characters needed to store values for a key
33 
34 def _getDTypeList(keyTuple, valTuple):
35  """Construct a numpy dtype for a data ID or repository ID
36 
37  @param[in] keyTuple: ID key names, in order
38  @param[in] valTuple: a value tuple
39  @return numpy dtype as a list
40 
41  @warning: this guesses at string length (STR_PADDING + length of string in valTuple);
42  longer strings will be truncated when inserted into numpy structured arrays
43  """
44  typeList = []
45  for name, val in itertools.izip(keyTuple, valTuple):
46  if isinstance(val, str):
47  predLen = len(val) + STR_PADDING
48  typeList.append((name, str, predLen))
49  else:
50  typeList.append((name, numpy.array([val]).dtype))
51  return typeList
52 
53 class SourceData(object):
54  """Accumulate a set of measurements from a set of source tables
55 
56  To use:
57  - specify the desired source measurements when constructing this object
58  - call addSourceMetrics for each repository you harvest data from
59  - call finalize to produce the final data
60 
61  Data available after calling finalize:
62  - self.sourceArr: a numpy structured array of shape (num repositories, num sources)
63  containing named columns for:
64  - source ID
65  - each data ID key
66  - each item of data extracted from the source table
67  - self.sourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArr)
68  - self.repoArr: a numpy structured array of shape (num repositories,)
69  containing a named column for each repository key (see RepositoryIterator)
70 
71  @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted
72  """
73  def __init__(self, datasetType, sourceKeyTuple):
74  """
75  @param[in] datasetType: dataset type for source
76  @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables
77 
78  @raise RuntimeError if sourceKeyTuple is empty
79  """
80  if len(sourceKeyTuple) < 1:
81  raise RuntimeError("Must specify at least one key in sourceKeyTuple")
82  self.datasetType = datasetType
83  self._sourceKeyTuple = tuple(sourceKeyTuple)
84 
85  self._idKeyTuple = None # tuple of data ID keys, in order; set by first call to _getSourceMetrics
86  self._idKeyDTypeList = None # numpy dtype for data ID tuple, as a list of (key, type);
87  # set by first call to _getSourceMetrics
88  self._sourceDTypeList = None # numpy dtype for source data, as a list of (key, type);
89  # set by first call to _getSourceMetrics
90  self._repoKeyTuple = None # tuple of repo ID keys, in order; set by first call to addSourceMetrics
91  self._repoDTypeList = None # numpy dtype for repoArr, as a list of (key, type);
92  # set by first call to addSourceMetrics
93 
94  self._tempDataList = [] # list (one entry per repository)
95  # of dict of source ID: tuple of data ID data concatenated with source metric data, where:
96  # data ID data is in order self._idKeyTuple
97  # source metric data is in order self._sourceKeyTuple
98  self.repoInfoList = [] # list of repoInfo
99 
100  def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
101  """Obtain the desired source measurements from a list of source tables
102 
103  Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables
104  (one per data ID) and saves them as a dict of source ID: list of data
105 
106  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
107  @param[in] idValList: a list of data ID value tuples;
108  each tuple contains values in the order in idKeyTuple
109  @param[in] sourceTableList: a list of source tables, one per entry in idValList
110 
111  @return a dict of source id: data id tuple + source data tuple
112  where source data tuple order matches sourceKeyTuple
113  and data id tuple matches self._idKeyTuple (which is set from the first idKeyTuple)
114 
115  @raise RuntimeError if idKeyTuple is different than it was for the first call.
116 
117  GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make
118  a subclass of GetRepositoryDataTask that also returns sourceTableList.
119 
120  Updates instance variables:
121  - self._idKeyTuple if not already set.
122  """
123  if self._idKeyTuple is None:
124  self._idKeyTuple = tuple(idKeyTuple)
125  self._idKeyDTypeList = _getDTypeList(keyTuple = self._idKeyTuple,
126  valTuple = idValList[0])
127  else:
128  if self._idKeyTuple != tuple(idKeyTuple):
129  raise RuntimeError("idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" % \
130  (idKeyTuple, self._idKeyTuple))
131 
132  dataDict = {}
133  for idTuple, sourceTable in itertools.izip(idValList, sourceTableList):
134  if len(sourceTable) == 0:
135  continue
136 
137  idList = sourceTable.get("id")
138  dataList = [sourceTable.get(key) for key in self._sourceKeyTuple]
139 
140  if self._sourceDTypeList is None:
141  self._sourceDTypeList = [(key, arr.dtype)
142  for key, arr in itertools.izip(self._sourceKeyTuple, dataList)]
143 
144  transposedDataList = zip(*dataList)
145  del dataList
146 
147  dataDict.update((srcId, idTuple + tuple(data))
148  for srcId, data in itertools.izip(idList, transposedDataList))
149  return dataDict
150 
151  def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList):
152  """Accumulate source measurements from a list of source tables.
153 
154  Once you have accumulated all source measurements, call finalize to process the data.
155 
156  @param[in] repoInfo: a RepositoryInfo instance
157  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
158  @param[in] idValList: a list of data ID value tuples;
159  each tuple contains values in the order in idKeyTuple
160  @param[in] sourceTableList: a list of source tables, one per entry in idValList
161 
162  @raise RuntimeError if idKeyTuple is different than it was for the first call.
163 
164  Accumulates the data in temporary cache self._tempDataList.
165 
166  @return number of sources
167  """
168  if self._repoKeyTuple is None:
169  self._repoKeyTuple = repoInfo.keyTuple
170  self._repoDTypeList = repoInfo.dtype
171 
172  dataDict = self._getSourceMetrics(idKeyTuple, idValList, sourceTableList)
173 
174  self._tempDataList.append(dataDict)
175  self.repoInfoList.append(repoInfo)
176  return len(dataDict)
177 
178  def finalize(self):
179  """Process the accumulated source measurements to create the final data products.
180 
181  Only call this after you have added all source metrics using addSourceMetrics.
182 
183  Reads temporary cache self._tempDataList and then deletes it.
184  """
185  if len(self._tempDataList) == 0:
186  raise RuntimeError("No data found")
187 
188  fullSrcIdSet = set()
189  for dataIdDict in self._tempDataList:
190  fullSrcIdSet.update(dataIdDict.iterkeys())
191 
192  # source data
193  sourceArrDType = [("sourceId", int)] + self._idKeyDTypeList + self._sourceDTypeList
194  # data for missing sources (only for the data in the source data dict, so excludes srcId)
195  nullSourceTuple = tuple(numpy.zeros(1, dtype=self._idKeyDTypeList + self._sourceDTypeList)[0])
196 
197  sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple) for srcId in fullSrcIdSet]
198  for srcDataDict in self._tempDataList]
199 
200  self.sourceArr = numpy.array(sourceData, dtype=sourceArrDType)
201  del sourceData
202 
203  self.sourceIdDict = dict((srcId, i) for i, srcId in enumerate(fullSrcIdSet))
204 
205  # repository data
206  repoData = [repoInfo.valTuple for repoInfo in self.repoInfoList]
207  self.repoArr = numpy.array(repoData, dtype=self._repoDTypeList)
208 
209  self._tempDataList = None
210 
211 
212 class RepositoryInfo(object):
213  """Information about one data repository
214 
215  Constructed by RepositoryIterator and used by SourceData.
216  """
217  def __init__(self, keyTuple, valTuple, dtype, name):
218  if len(keyTuple) != len(valTuple):
219  raise RuntimeError("lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
220  self.keyTuple = tuple(keyTuple)
221  self.valTuple = tuple(valTuple)
222  self.dtype = dtype
223  self.name = name
224 
225 
226 class RepositoryIterator(object):
227  """Iterate over a set of data repositories that use a naming convention based on parameter values
228  """
229  def __init__(self, formatStr, **dataDict):
230  """Construct a repository iterator from a dict of name: valueList
231 
232  @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d"
233  @param[in] **dataDict: name=valueList pairs
234  """
235  self._formatStr = formatStr
236  self._keyTuple = tuple(sorted(dataDict.keys()))
237  self._valListOfLists = [numpy.array(dataDict[key]) for key in self._keyTuple]
238  self._dtype = [(key, self._valListOfLists[i].dtype) \
239  for i, key in enumerate(self._keyTuple)]
240 
241  def __iter__(self):
242  """Retrieve next RepositoryInfo object
243  """
244  for valTuple in itertools.product(*self._valListOfLists):
245  valDict = dict(zip(self._keyTuple, valTuple))
246  name=self.format(valDict)
247  yield RepositoryInfo(keyTuple=self._keyTuple, valTuple=valTuple, dtype=self._dtype, name=name)
248 
249  def __len__(self):
250  """Return the number of items in the iterator"""
251  n = 1
252  for valTuple in self._valListOfLists:
253  n *= len(valTuple)
254  return n
255 
256  def format(self, valDict):
257  """Return formatted string for a specified value dictionary
258 
259  @param[in] valDict: a dict of key: value pairs that identify a repository
260  """
261  return self._formatStr % valDict
262 
263  def getKeyTuple(self):
264  """Return the a tuple of keys in the same order as items in value tuples
265  """
266  return self._keyTuple
267 
268  def _getDTypeList(self):
269  """Get a dtype for a structured array of repository keys
270  """
271  return self._dtype