LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
repositoryIterator.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010, 2011, 2012 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 """Tools to help you iterate over a set of repositories.
23 
24 Helpful while creating them or harvesting data from them.
25 """
26 from builtins import zip
27 from builtins import object
28 import itertools
29 
30 import numpy
31 
32 STR_PADDING = 5 # used by _getDTypeList; the number of characters to add to the first string value seen
33 # when estimating the number of characters needed to store values for a key
34 
35 
36 def _getDTypeList(keyTuple, valTuple):
37  """Construct a numpy dtype for a data ID or repository ID
38 
39  @param[in] keyTuple: ID key names, in order
40  @param[in] valTuple: a value tuple
41  @return numpy dtype as a list
42 
43  @warning: this guesses at string length (STR_PADDING + length of string in valTuple);
44  longer strings will be truncated when inserted into numpy structured arrays
45  """
46  typeList = []
47  for name, val in zip(keyTuple, valTuple):
48  if isinstance(val, str):
49  predLen = len(val) + STR_PADDING
50  typeList.append((name, str, predLen))
51  else:
52  typeList.append((name, numpy.array([val]).dtype))
53  return typeList
54 
55 
56 class SourceData(object):
57  """Accumulate a set of measurements from a set of source tables
58 
59  To use:
60  - specify the desired source measurements when constructing this object
61  - call addSourceMetrics for each repository you harvest data from
62  - call finalize to produce the final data
63 
64  Data available after calling finalize:
65  - self.sourceArr: a numpy structured array of shape (num repositories, num sources)
66  containing named columns for:
67  - source ID
68  - each data ID key
69  - each item of data extracted from the source table
70  - self.sourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArr)
71  - self.repoArr: a numpy structured array of shape (num repositories,)
72  containing a named column for each repository key (see RepositoryIterator)
73 
74  @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted
75  """
76 
77  def __init__(self, datasetType, sourceKeyTuple):
78  """
79  @param[in] datasetType: dataset type for source
80  @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables
81 
82  @raise RuntimeError if sourceKeyTuple is empty
83  """
84  if len(sourceKeyTuple) < 1:
85  raise RuntimeError("Must specify at least one key in sourceKeyTuple")
86  self.datasetType = datasetType
87  self._sourceKeyTuple = tuple(sourceKeyTuple)
88 
89  self._idKeyTuple = None # tuple of data ID keys, in order; set by first call to _getSourceMetrics
90  self._idKeyDTypeList = None # numpy dtype for data ID tuple, as a list of (key, type);
91  # set by first call to _getSourceMetrics
92  self._sourceDTypeList = None # numpy dtype for source data, as a list of (key, type);
93  # set by first call to _getSourceMetrics
94  self._repoKeyTuple = None # tuple of repo ID keys, in order; set by first call to addSourceMetrics
95  self._repoDTypeList = None # numpy dtype for repoArr, as a list of (key, type);
96  # set by first call to addSourceMetrics
97 
98  self._tempDataList = [] # list (one entry per repository)
99  # of dict of source ID: tuple of data ID data concatenated with source metric data, where:
100  # data ID data is in order self._idKeyTuple
101  # source metric data is in order self._sourceKeyTuple
102  self.repoInfoList = [] # list of repoInfo
103 
104  def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
105  """Obtain the desired source measurements from a list of source tables
106 
107  Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables
108  (one per data ID) and saves them as a dict of source ID: list of data
109 
110  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
111  @param[in] idValList: a list of data ID value tuples;
112  each tuple contains values in the order in idKeyTuple
113  @param[in] sourceTableList: a list of source tables, one per entry in idValList
114 
115  @return a dict of source id: data id tuple + source data tuple
116  where source data tuple order matches sourceKeyTuple
117  and data id tuple matches self._idKeyTuple (which is set from the first idKeyTuple)
118 
119  @raise RuntimeError if idKeyTuple is different than it was for the first call.
120 
121  GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make
122  a subclass of GetRepositoryDataTask that also returns sourceTableList.
123 
124  Updates instance variables:
125  - self._idKeyTuple if not already set.
126  """
127  if self._idKeyTuple is None:
128  self._idKeyTuple = tuple(idKeyTuple)
129  self._idKeyDTypeList = _getDTypeList(keyTuple=self._idKeyTuple,
130  valTuple=idValList[0])
131  else:
132  if self._idKeyTuple != tuple(idKeyTuple):
133  raise RuntimeError("idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" %
134  (idKeyTuple, self._idKeyTuple))
135 
136  dataDict = {}
137  for idTuple, sourceTable in zip(idValList, sourceTableList):
138  if len(sourceTable) == 0:
139  continue
140 
141  idList = sourceTable.get("id")
142  dataList = [sourceTable.get(key) for key in self._sourceKeyTuple]
143 
144  if self._sourceDTypeList is None:
145  self._sourceDTypeList = [(key, arr.dtype)
146  for key, arr in zip(self._sourceKeyTuple, dataList)]
147 
148  transposedDataList = list(zip(*dataList))
149  del dataList
150 
151  dataDict.update((srcId, idTuple + tuple(data))
152  for srcId, data in zip(idList, transposedDataList))
153  return dataDict
154 
155  def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList):
156  """Accumulate source measurements from a list of source tables.
157 
158  Once you have accumulated all source measurements, call finalize to process the data.
159 
160  @param[in] repoInfo: a RepositoryInfo instance
161  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
162  @param[in] idValList: a list of data ID value tuples;
163  each tuple contains values in the order in idKeyTuple
164  @param[in] sourceTableList: a list of source tables, one per entry in idValList
165 
166  @raise RuntimeError if idKeyTuple is different than it was for the first call.
167 
168  Accumulates the data in temporary cache self._tempDataList.
169 
170  @return number of sources
171  """
172  if self._repoKeyTuple is None:
173  self._repoKeyTuple = repoInfo.keyTuple
174  self._repoDTypeList = repoInfo.dtype
175 
176  dataDict = self._getSourceMetrics(idKeyTuple, idValList, sourceTableList)
177 
178  self._tempDataList.append(dataDict)
179  self.repoInfoList.append(repoInfo)
180  return len(dataDict)
181 
182  def finalize(self):
183  """Process the accumulated source measurements to create the final data products.
184 
185  Only call this after you have added all source metrics using addSourceMetrics.
186 
187  Reads temporary cache self._tempDataList and then deletes it.
188  """
189  if len(self._tempDataList) == 0:
190  raise RuntimeError("No data found")
191 
192  fullSrcIdSet = set()
193  for dataIdDict in self._tempDataList:
194  fullSrcIdSet.update(iter(dataIdDict.keys()))
195 
196  # source data
197  sourceArrDType = [("sourceId", int)] + self._idKeyDTypeList + self._sourceDTypeList
198  # data for missing sources (only for the data in the source data dict, so excludes srcId)
199  nullSourceTuple = tuple(numpy.zeros(1, dtype=self._idKeyDTypeList + self._sourceDTypeList)[0])
200 
201  sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple) for srcId in fullSrcIdSet]
202  for srcDataDict in self._tempDataList]
203 
204  self.sourceArr = numpy.array(sourceData, dtype=sourceArrDType)
205  del sourceData
206 
207  self.sourceIdDict = dict((srcId, i) for i, srcId in enumerate(fullSrcIdSet))
208 
209  # repository data
210  repoData = [repoInfo.valTuple for repoInfo in self.repoInfoList]
211  self.repoArr = numpy.array(repoData, dtype=self._repoDTypeList)
212 
213  self._tempDataList = None
214 
215 
216 class RepositoryInfo(object):
217  """Information about one data repository
218 
219  Constructed by RepositoryIterator and used by SourceData.
220  """
221 
222  def __init__(self, keyTuple, valTuple, dtype, name):
223  if len(keyTuple) != len(valTuple):
224  raise RuntimeError("lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
225  self.keyTuple = tuple(keyTuple)
226  self.valTuple = tuple(valTuple)
227  self.dtype = dtype
228  self.name = name
229 
230 
231 class RepositoryIterator(object):
232  """Iterate over a set of data repositories that use a naming convention based on parameter values
233  """
234 
235  def __init__(self, formatStr, **dataDict):
236  """Construct a repository iterator from a dict of name: valueList
237 
238  @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d"
239  @param[in] **dataDict: name=valueList pairs
240  """
241  self._formatStr = formatStr
242  self._keyTuple = tuple(sorted(dataDict.keys()))
243  self._valListOfLists = [numpy.array(dataDict[key]) for key in self._keyTuple]
244  self._dtype = [(key, self._valListOfLists[i].dtype)
245  for i, key in enumerate(self._keyTuple)]
246 
247  def __iter__(self):
248  """Retrieve next RepositoryInfo object
249  """
250  for valTuple in itertools.product(*self._valListOfLists):
251  valDict = dict(zip(self._keyTuple, valTuple))
252  name = self.format(valDict)
253  yield RepositoryInfo(keyTuple=self._keyTuple, valTuple=valTuple, dtype=self._dtype, name=name)
254 
255  def __len__(self):
256  """Return the number of items in the iterator"""
257  n = 1
258  for valTuple in self._valListOfLists:
259  n *= len(valTuple)
260  return n
261 
262  def format(self, valDict):
263  """Return formatted string for a specified value dictionary
264 
265  @param[in] valDict: a dict of key: value pairs that identify a repository
266  """
267  return self._formatStr % valDict
268 
269  def getKeyTuple(self):
270  """Return the a tuple of keys in the same order as items in value tuples
271  """
272  return self._keyTuple
273 
274  def _getDTypeList(self):
275  """Get a dtype for a structured array of repository keys
276  """
277  return self._dtype
int iter