LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
repositoryIterator.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008, 2009, 2010, 2011, 2012 LSST Corporation.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <http://www.lsstcorp.org/LegalNotices/>.
21 #
22 """Tools to help you iterate over a set of repositories.
23 
24 Helpful while creating them or harvesting data from them.
25 """
26 import itertools
27 
28 import numpy
29 
30 STR_PADDING = 5 # used by _getDTypeList; the number of characters to add to the first string value seen
31  # when estimating the number of characters needed to store values for a key
32 
33 def _getDTypeList(keyTuple, valTuple):
34  """Construct a numpy dtype for a data ID or repository ID
35 
36  @param[in] keyTuple: ID key names, in order
37  @param[in] valTuple: a value tuple
38  @return numpy dtype as a list
39 
40  @warning: this guesses at string length (STR_PADDING + length of string in valTuple);
41  longer strings will be truncated when inserted into numpy structured arrays
42  """
43  typeList = []
44  for name, val in itertools.izip(keyTuple, valTuple):
45  if isinstance(val, str):
46  predLen = len(val) + STR_PADDING
47  typeList.append((name, str, predLen))
48  else:
49  typeList.append((name, numpy.array([val]).dtype))
50  return typeList
51 
52 class SourceData(object):
53  """Accumulate a set of measurements from a set of source tables
54 
55  To use:
56  - specify the desired source measurements when constructing this object
57  - call addSourceMetrics for each repository you harvest data from
58  - call finalize to produce the final data
59 
60  Data available after calling finalize:
61  - self.sourceArr: a numpy structured array of shape (num repositories, num sources)
62  containing named columns for:
63  - source ID
64  - each data ID key
65  - each item of data extracted from the source table
66  - self.sourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArr)
67  - self.repoArr: a numpy structured array of shape (num repositories,)
68  containing a named column for each repository key (see RepositoryIterator)
69 
70  @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted
71  """
72  def __init__(self, datasetType, sourceKeyTuple):
73  """
74  @param[in] datasetType: dataset type for source
75  @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables
76 
77  @raise RuntimeError if sourceKeyTuple is empty
78  """
79  if len(sourceKeyTuple) < 1:
80  raise RuntimeError("Must specify at least one key in sourceKeyTuple")
81  self.datasetType = datasetType
82  self._sourceKeyTuple = tuple(sourceKeyTuple)
83 
84  self._idKeyTuple = None # tuple of data ID keys, in order; set by first call to _getSourceMetrics
85  self._idKeyDTypeList = None # numpy dtype for data ID tuple, as a list of (key, type);
86  # set by first call to _getSourceMetrics
87  self._sourceDTypeList = None # numpy dtype for source data, as a list of (key, type);
88  # set by first call to _getSourceMetrics
89  self._repoKeyTuple = None # tuple of repo ID keys, in order; set by first call to addSourceMetrics
90  self._repoDTypeList = None # numpy dtype for repoArr, as a list of (key, type);
91  # set by first call to addSourceMetrics
92 
93  self._tempDataList = [] # list (one entry per repository)
94  # of dict of source ID: tuple of data ID data concatenated with source metric data, where:
95  # data ID data is in order self._idKeyTuple
96  # source metric data is in order self._sourceKeyTuple
97  self.repoInfoList = [] # list of repoInfo
98 
99  def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
100  """Obtain the desired source measurements from a list of source tables
101 
102  Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables
103  (one per data ID) and saves them as a dict of source ID: list of data
104 
105  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
106  @param[in] idValList: a list of data ID value tuples;
107  each tuple contains values in the order in idKeyTuple
108  @param[in] sourceTableList: a list of source tables, one per entry in idValList
109 
110  @return a dict of source id: data id tuple + source data tuple
111  where source data tuple order matches sourceKeyTuple
112  and data id tuple matches self._idKeyTuple (which is set from the first idKeyTuple)
113 
114  @raise RuntimeError if idKeyTuple is different than it was for the first call.
115 
116  GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make
117  a subclass of GetRepositoryDataTask that also returns sourceTableList.
118 
119  Updates instance variables:
120  - self._idKeyTuple if not already set.
121  """
122  if self._idKeyTuple is None:
123  self._idKeyTuple = tuple(idKeyTuple)
124  self._idKeyDTypeList = _getDTypeList(keyTuple = self._idKeyTuple,
125  valTuple = idValList[0])
126  else:
127  if self._idKeyTuple != tuple(idKeyTuple):
128  raise RuntimeError("idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" % \
129  (idKeyTuple, self._idKeyTuple))
130 
131  dataDict = {}
132  for idTuple, sourceTable in itertools.izip(idValList, sourceTableList):
133  if len(sourceTable) == 0:
134  continue
135 
136  idList = sourceTable.get("id")
137  dataList = [sourceTable.get(key) for key in self._sourceKeyTuple]
138 
139  if self._sourceDTypeList is None:
140  self._sourceDTypeList = [(key, arr.dtype)
141  for key, arr in itertools.izip(self._sourceKeyTuple, dataList)]
142 
143  transposedDataList = zip(*dataList)
144  del dataList
145 
146  dataDict.update((srcId, idTuple + tuple(data))
147  for srcId, data in itertools.izip(idList, transposedDataList))
148  return dataDict
149 
150  def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList):
151  """Accumulate source measurements from a list of source tables.
152 
153  Once you have accumulated all source measurements, call finalize to process the data.
154 
155  @param[in] repoInfo: a RepositoryInfo instance
156  @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
157  @param[in] idValList: a list of data ID value tuples;
158  each tuple contains values in the order in idKeyTuple
159  @param[in] sourceTableList: a list of source tables, one per entry in idValList
160 
161  @raise RuntimeError if idKeyTuple is different than it was for the first call.
162 
163  Accumulates the data in temporary cache self._tempDataList.
164 
165  @return number of sources
166  """
167  if self._repoKeyTuple is None:
168  self._repoKeyTuple = repoInfo.keyTuple
169  self._repoDTypeList = repoInfo.dtype
170 
171  dataDict = self._getSourceMetrics(idKeyTuple, idValList, sourceTableList)
172 
173  self._tempDataList.append(dataDict)
174  self.repoInfoList.append(repoInfo)
175  return len(dataDict)
176 
177  def finalize(self):
178  """Process the accumulated source measurements to create the final data products.
179 
180  Only call this after you have added all source metrics using addSourceMetrics.
181 
182  Reads temporary cache self._tempDataList and then deletes it.
183  """
184  if len(self._tempDataList) == 0:
185  raise RuntimeError("No data found")
186 
187  fullSrcIdSet = set()
188  for dataIdDict in self._tempDataList:
189  fullSrcIdSet.update(dataIdDict.iterkeys())
190 
191  # source data
192  sourceArrDType = [("sourceId", int)] + self._idKeyDTypeList + self._sourceDTypeList
193  # data for missing sources (only for the data in the source data dict, so excludes srcId)
194  nullSourceTuple = tuple(numpy.zeros(1, dtype=self._idKeyDTypeList + self._sourceDTypeList)[0])
195 
196  sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple) for srcId in fullSrcIdSet]
197  for srcDataDict in self._tempDataList]
198 
199  self.sourceArr = numpy.array(sourceData, dtype=sourceArrDType)
200  del sourceData
201 
202  self.sourceIdDict = dict((srcId, i) for i, srcId in enumerate(fullSrcIdSet))
203 
204  # repository data
205  repoData = [repoInfo.valTuple for repoInfo in self.repoInfoList]
206  self.repoArr = numpy.array(repoData, dtype=self._repoDTypeList)
207 
208  self._tempDataList = None
209 
210 
211 class RepositoryInfo(object):
212  """Information about one data repository
213 
214  Constructed by RepositoryIterator and used by SourceData.
215  """
216  def __init__(self, keyTuple, valTuple, dtype, name):
217  if len(keyTuple) != len(valTuple):
218  raise RuntimeError("lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
219  self.keyTuple = tuple(keyTuple)
220  self.valTuple = tuple(valTuple)
221  self.dtype = dtype
222  self.name = name
223 
224 
225 class RepositoryIterator(object):
226  """Iterate over a set of data repositories that use a naming convention based on parameter values
227  """
228  def __init__(self, formatStr, **dataDict):
229  """Construct a repository iterator from a dict of name: valueList
230 
231  @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d"
232  @param[in] **dataDict: name=valueList pairs
233  """
234  self._formatStr = formatStr
235  self._keyTuple = tuple(sorted(dataDict.keys()))
236  self._valListOfLists = [numpy.array(dataDict[key]) for key in self._keyTuple]
237  self._dtype = [(key, self._valListOfLists[i].dtype) \
238  for i, key in enumerate(self._keyTuple)]
239 
240  def __iter__(self):
241  """Retrieve next RepositoryInfo object
242  """
243  for valTuple in itertools.product(*self._valListOfLists):
244  valDict = dict(zip(self._keyTuple, valTuple))
245  name=self.format(valDict)
246  yield RepositoryInfo(keyTuple=self._keyTuple, valTuple=valTuple, dtype=self._dtype, name=name)
247 
248  def __len__(self):
249  """Return the number of items in the iterator"""
250  n = 1
251  for valTuple in self._valListOfLists:
252  n *= len(valTuple)
253  return n
254 
255  def format(self, valDict):
256  """Return formatted string for a specified value dictionary
257 
258  @param[in] valDict: a dict of key: value pairs that identify a repository
259  """
260  return self._formatStr % valDict
261 
262  def getKeyTuple(self):
263  """Return the a tuple of keys in the same order as items in value tuples
264  """
265  return self._keyTuple
266 
267  def _getDTypeList(self):
268  """Get a dtype for a structured array of repository keys
269  """
270  return self._dtype