22 """Tools to help you iterate over a set of repositories.
24 Helpful while creating them or harvesting data from them.
34 def _getDTypeList(keyTuple, valTuple):
35 """Construct a numpy dtype for a data ID or repository ID
37 @param[in] keyTuple: ID key names, in order
38 @param[in] valTuple: a value tuple
39 @return numpy dtype as a list
41 @warning: this guesses at string length (STR_PADDING + length of string in valTuple);
42 longer strings will be truncated when inserted into numpy structured arrays
45 for name, val
in zip(keyTuple, valTuple):
46 if isinstance(val, str):
47 predLen = len(val) + STR_PADDING
48 typeList.append((name, str, predLen))
50 typeList.append((name, numpy.array([val]).dtype))
55 """Accumulate a set of measurements from a set of source tables
58 - specify the desired source measurements when constructing this object
59 - call addSourceMetrics for each repository you harvest data from
60 - call finalize to produce the final data
62 Data available after calling finalize:
63 - self.sourceArr: a numpy structured array of shape (num repositories, num sources)
64 containing named columns for:
67 - each item of data extracted from the source table
68 - self.sourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArr)
69 - self.repoArr: a numpy structured array of shape (num repositories,)
70 containing a named column for each repository key (see RepositoryIterator)
72 @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted
75 def __init__(self, datasetType, sourceKeyTuple):
77 @param[in] datasetType: dataset type for source
78 @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables
80 @raise RuntimeError if sourceKeyTuple is empty
82 if len(sourceKeyTuple) < 1:
83 raise RuntimeError(
"Must specify at least one key in sourceKeyTuple")
102 def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
103 """Obtain the desired source measurements from a list of source tables
105 Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables
106 (one per data ID) and saves them as a dict of source ID: list of data
108 @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
109 @param[in] idValList: a list of data ID value tuples;
110 each tuple contains values in the order in idKeyTuple
111 @param[in] sourceTableList: a list of source tables, one per entry in idValList
113 @return a dict of source id: data id tuple + source data tuple
114 where source data tuple order matches sourceKeyTuple
115 and data id tuple matches self._idKeyTuple (which is set from the first idKeyTuple)
117 @raise RuntimeError if idKeyTuple is different than it was for the first call.
119 GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make
120 a subclass of GetRepositoryDataTask that also returns sourceTableList.
122 Updates instance variables:
123 - self._idKeyTuple if not already set.
128 valTuple=idValList[0])
130 if self.
_idKeyTuple_idKeyTuple != tuple(idKeyTuple):
131 raise RuntimeError(
"idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" %
135 for idTuple, sourceTable
in zip(idValList, sourceTableList):
136 if len(sourceTable) == 0:
139 idList = sourceTable.get(
"id")
140 dataList = [sourceTable.get(key)
for key
in self.
_sourceKeyTuple_sourceKeyTuple]
146 transposedDataList =
list(zip(*dataList))
149 dataDict.update((srcId, idTuple + tuple(data))
150 for srcId, data
in zip(idList, transposedDataList))
154 """Accumulate source measurements from a list of source tables.
156 Once you have accumulated all source measurements, call finalize to process the data.
158 @param[in] repoInfo: a RepositoryInfo instance
159 @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
160 @param[in] idValList: a list of data ID value tuples;
161 each tuple contains values in the order in idKeyTuple
162 @param[in] sourceTableList: a list of source tables, one per entry in idValList
164 @raise RuntimeError if idKeyTuple is different than it was for the first call.
166 Accumulates the data in temporary cache self._tempDataList.
168 @return number of sources
174 dataDict = self.
_getSourceMetrics_getSourceMetrics(idKeyTuple, idValList, sourceTableList)
181 """Process the accumulated source measurements to create the final data products.
183 Only call this after you have added all source metrics using addSourceMetrics.
185 Reads temporary cache self._tempDataList and then deletes it.
188 raise RuntimeError(
"No data found")
192 fullSrcIdSet.update(
iter(dataIdDict.keys()))
199 sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple)
for srcId
in fullSrcIdSet]
202 self.
sourceArrsourceArr = numpy.array(sourceData, dtype=sourceArrDType)
205 self.
sourceIdDictsourceIdDict = dict((srcId, i)
for i, srcId
in enumerate(fullSrcIdSet))
208 repoData = [repoInfo.valTuple
for repoInfo
in self.
repoInfoListrepoInfoList]
215 """Information about one data repository
217 Constructed by RepositoryIterator and used by SourceData.
220 def __init__(self, keyTuple, valTuple, dtype, name):
221 if len(keyTuple) != len(valTuple):
222 raise RuntimeError(
"lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
230 """Iterate over a set of data repositories that use a naming convention based on parameter values
234 """Construct a repository iterator from a dict of name: valueList
236 @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d"
237 @param[in] **dataDict: name=valueList pairs
240 self.
_keyTuple_keyTuple = tuple(sorted(dataDict.keys()))
243 for i, key
in enumerate(self.
_keyTuple_keyTuple)]
246 """Retrieve next RepositoryInfo object
248 for valTuple
in itertools.product(*self.
_valListOfLists_valListOfLists):
249 valDict = dict(zip(self.
_keyTuple_keyTuple, valTuple))
250 name = self.
formatformat(valDict)
254 """Return the number of items in the iterator"""
261 """Return formatted string for a specified value dictionary
263 @param[in] valDict: a dict of key: value pairs that identify a repository
268 """Return the a tuple of keys in the same order as items in value tuples
272 def _getDTypeList(self):
273 """Get a dtype for a structured array of repository keys
def __init__(self, keyTuple, valTuple, dtype, name)
def __init__(self, formatStr, **dataDict)
def format(self, valDict)
def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList)
def __init__(self, datasetType, sourceKeyTuple)
def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.