22 """Tools to help you iterate over a set of repositories. 
   24 Helpful while creating them or harvesting data from them. 
   34 def _getDTypeList(keyTuple, valTuple):
 
   35     """Construct a numpy dtype for a data ID or repository ID 
   37     @param[in] keyTuple: ID key names, in order 
   38     @param[in] valTuple: a value tuple 
   39     @return numpy dtype as a list 
   41     @warning: this guesses at string length (STR_PADDING + length of string in valTuple); 
   42     longer strings will be truncated when inserted into numpy structured arrays 
   45     for name, val 
in zip(keyTuple, valTuple):
 
   46         if isinstance(val, str):
 
   47             predLen = len(val) + STR_PADDING
 
   48             typeList.append((name, str, predLen))
 
   50             typeList.append((name, numpy.array([val]).dtype))
 
   55     """Accumulate a set of measurements from a set of source tables 
   58     - specify the desired source measurements when constructing this object 
   59     - call addSourceMetrics for each repository you harvest data from 
   60     - call finalize to produce the final data 
   62     Data available after calling finalize: 
   63     - self.sourceArr: a numpy structured array of shape (num repositories, num sources) 
   64         containing named columns for: 
   67         - each item of data extracted from the source table 
   68     - self.sourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArr) 
   69     - self.repoArr: a numpy structured array of shape (num repositories,) 
   70         containing a named column for each repository key (see RepositoryIterator) 
   72     @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted 
   75     def __init__(self, datasetType, sourceKeyTuple):
 
   77         @param[in] datasetType: dataset type for source 
   78         @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables 
   80         @raise RuntimeError if sourceKeyTuple is empty 
   82         if len(sourceKeyTuple) < 1:
 
   83             raise RuntimeError(
"Must specify at least one key in sourceKeyTuple")
 
  102     def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
 
  103         """Obtain the desired source measurements from a list of source tables 
  105         Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables 
  106         (one per data ID) and saves them as a dict of source ID: list of data 
  108         @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call 
  109         @param[in] idValList: a list of data ID value tuples; 
  110             each tuple contains values in the order in idKeyTuple 
  111         @param[in] sourceTableList: a list of source tables, one per entry in idValList 
  113         @return a dict of source id: data id tuple + source data tuple 
  114             where source data tuple order matches sourceKeyTuple 
  115             and data id tuple matches self._idKeyTuple (which is set from the first idKeyTuple) 
  117         @raise RuntimeError if idKeyTuple is different than it was for the first call. 
  119         GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make 
  120         a subclass of GetRepositoryDataTask that also returns sourceTableList. 
  122         Updates instance variables: 
  123         - self._idKeyTuple if not already set. 
  128                                                  valTuple=idValList[0])
 
  130             if self.
_idKeyTuple_idKeyTuple != tuple(idKeyTuple):
 
  131                 raise RuntimeError(
"idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" %
 
  135         for idTuple, sourceTable 
in zip(idValList, sourceTableList):
 
  136             if len(sourceTable) == 0:
 
  139             idList = sourceTable.get(
"id")
 
  140             dataList = [sourceTable.get(key) 
for key 
in self.
_sourceKeyTuple_sourceKeyTuple]
 
  146             transposedDataList = 
list(zip(*dataList))
 
  149             dataDict.update((srcId, idTuple + tuple(data))
 
  150                             for srcId, data 
in zip(idList, transposedDataList))
 
  154         """Accumulate source measurements from a list of source tables. 
  156         Once you have accumulated all source measurements, call finalize to process the data. 
  158         @param[in] repoInfo: a RepositoryInfo instance 
  159         @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call 
  160         @param[in] idValList: a list of data ID value tuples; 
  161             each tuple contains values in the order in idKeyTuple 
  162         @param[in] sourceTableList: a list of source tables, one per entry in idValList 
  164         @raise RuntimeError if idKeyTuple is different than it was for the first call. 
  166         Accumulates the data in temporary cache self._tempDataList. 
  168         @return number of sources 
  174         dataDict = self.
_getSourceMetrics_getSourceMetrics(idKeyTuple, idValList, sourceTableList)
 
  181         """Process the accumulated source measurements to create the final data products. 
  183         Only call this after you have added all source metrics using addSourceMetrics. 
  185         Reads temporary cache self._tempDataList and then deletes it. 
  188             raise RuntimeError(
"No data found")
 
  192             fullSrcIdSet.update(
iter(dataIdDict.keys()))
 
  199         sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple) 
for srcId 
in fullSrcIdSet]
 
  202         self.
sourceArrsourceArr = numpy.array(sourceData, dtype=sourceArrDType)
 
  205         self.
sourceIdDictsourceIdDict = dict((srcId, i) 
for i, srcId 
in enumerate(fullSrcIdSet))
 
  208         repoData = [repoInfo.valTuple 
for repoInfo 
in self.
repoInfoListrepoInfoList]
 
  215     """Information about one data repository 
  217     Constructed by RepositoryIterator and used by SourceData. 
  220     def __init__(self, keyTuple, valTuple, dtype, name):
 
  221         if len(keyTuple) != len(valTuple):
 
  222             raise RuntimeError(
"lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
 
  230     """Iterate over a set of data repositories that use a naming convention based on parameter values 
  234         """Construct a repository iterator from a dict of name: valueList 
  236         @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d" 
  237         @param[in] **dataDict: name=valueList pairs 
  240         self.
_keyTuple_keyTuple = tuple(sorted(dataDict.keys()))
 
  243                        for i, key 
in enumerate(self.
_keyTuple_keyTuple)]
 
  246         """Retrieve next RepositoryInfo object 
  248         for valTuple 
in itertools.product(*self.
_valListOfLists_valListOfLists):
 
  249             valDict = dict(zip(self.
_keyTuple_keyTuple, valTuple))
 
  250             name = self.
formatformat(valDict)
 
  254         """Return the number of items in the iterator""" 
  261         """Return formatted string for a specified value dictionary 
  263         @param[in] valDict: a dict of key: value pairs that identify a repository 
  268         """Return the a tuple of keys in the same order as items in value tuples 
  272     def _getDTypeList(self):
 
  273         """Get a dtype for a structured array of repository keys 
def __init__(self, keyTuple, valTuple, dtype, name)
def __init__(self, formatStr, **dataDict)
def format(self, valDict)
def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList)
def __init__(self, datasetType, sourceKeyTuple)
def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList)
daf::base::PropertyList * list
daf::base::PropertySet * set
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.