LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
repositoryIterator.py
Go to the documentation of this file.
2# LSST Data Management System
3# Copyright 2008, 2009, 2010, 2011, 2012 LSST Corporation.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <http://www.lsstcorp.org/LegalNotices/>.
21#
22"""Tools to help you iterate over a set of repositories.
23
24Helpful while creating them or harvesting data from them.
25"""
26import itertools
27
28import numpy
29
30STR_PADDING = 5 # used by _getDTypeList; the number of characters to add to the first string value seen
31# when estimating the number of characters needed to store values for a key
32
33
34def _getDTypeList(keyTuple, valTuple):
35 """Construct a numpy dtype for a data ID or repository ID
36
37 @param[in] keyTuple: ID key names, in order
38 @param[in] valTuple: a value tuple
39 @return numpy dtype as a list
40
41 @warning: this guesses at string length (STR_PADDING + length of string in valTuple);
42 longer strings will be truncated when inserted into numpy structured arrays
43 """
44 typeList = []
45 for name, val in zip(keyTuple, valTuple):
46 if isinstance(val, str):
47 predLen = len(val) + STR_PADDING
48 typeList.append((name, str, predLen))
49 else:
50 typeList.append((name, numpy.array([val]).dtype))
51 return typeList
52
53
55 """Accumulate a set of measurements from a set of source tables
56
57 To use:
58 - specify the desired source measurements when constructing this object
59 - call addSourceMetrics for each repository you harvest data from
60 - call finalize to produce the final data
61
62 Data available after calling finalize:
63 - self.sourceArrsourceArr: a numpy structured array of shape (num repositories, num sources)
64 containing named columns for:
65 - source ID
66 - each data ID key
67 - each item of data extracted from the source table
68 - self.sourceIdDictsourceIdDict: a dict of (source ID: index of axis 1 of self.sourceArrsourceArr)
69 - self.repoArrrepoArr: a numpy structured array of shape (num repositories,)
70 containing a named column for each repository key (see RepositoryIterator)
71
72 @note: sources that had non-finite data (e.g. NaN) for every value extracted are silently omitted
73 """
74
75 def __init__(self, datasetType, sourceKeyTuple):
76 """
77 @param[in] datasetType: dataset type for source
78 @param[in] sourceKeyTuple: list of keys of data items to extract from the source tables
79
80 @raise RuntimeError if sourceKeyTuple is empty
81 """
82 if len(sourceKeyTuple) < 1:
83 raise RuntimeError("Must specify at least one key in sourceKeyTuple")
84 self.datasetTypedatasetType = datasetType
85 self._sourceKeyTuple_sourceKeyTuple = tuple(sourceKeyTuple)
86
87 self._idKeyTuple_idKeyTuple = None # tuple of data ID keys, in order; set by first call to _getSourceMetrics
88 self._idKeyDTypeList_idKeyDTypeList = None # numpy dtype for data ID tuple, as a list of (key, type);
89 # set by first call to _getSourceMetrics
90 self._sourceDTypeList_sourceDTypeList = None # numpy dtype for source data, as a list of (key, type);
91 # set by first call to _getSourceMetrics
92 self._repoKeyTuple_repoKeyTuple = None # tuple of repo ID keys, in order; set by first call to addSourceMetrics
93 self._repoDTypeList_repoDTypeList = None # numpy dtype for repoArr, as a list of (key, type);
94 # set by first call to addSourceMetrics
95
96 self._tempDataList_tempDataList = [] # list (one entry per repository)
97 # of dict of source ID: tuple of data ID data concatenated with source metric data, where:
98 # data ID data is in order self._idKeyTuple
99 # source metric data is in order self._sourceKeyTuple
100 self.repoInfoListrepoInfoList = [] # list of repoInfo
101
102 def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList):
103 """Obtain the desired source measurements from a list of source tables
104
105 Extracts a set of source measurements (specified by sourceKeyTuple) from a list of source tables
106 (one per data ID) and saves them as a dict of source ID: list of data
107
108 @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
109 @param[in] idValList: a list of data ID value tuples;
110 each tuple contains values in the order in idKeyTuple
111 @param[in] sourceTableList: a list of source tables, one per entry in idValList
112
113 @return a dict of source id: data id tuple + source data tuple
114 where source data tuple order matches sourceKeyTuple
115 and data id tuple matches self._idKeyTuple_idKeyTuple (which is set from the first idKeyTuple)
116
117 @raise RuntimeError if idKeyTuple is different than it was for the first call.
118
119 GetRepositoryDataTask.run returns idKeyTuple and idValList; you can easily make
120 a subclass of GetRepositoryDataTask that also returns sourceTableList.
121
122 Updates instance variables:
123 - self._idKeyTuple_idKeyTuple if not already set.
124 """
125 if self._idKeyTuple_idKeyTuple is None:
126 self._idKeyTuple_idKeyTuple = tuple(idKeyTuple)
127 self._idKeyDTypeList_idKeyDTypeList = _getDTypeList(keyTuple=self._idKeyTuple_idKeyTuple,
128 valTuple=idValList[0])
129 else:
130 if self._idKeyTuple_idKeyTuple != tuple(idKeyTuple):
131 raise RuntimeError("idKeyTuple = %s != %s = first idKeyTuple; must be the same each time" %
132 (idKeyTuple, self._idKeyTuple_idKeyTuple))
133
134 dataDict = {}
135 for idTuple, sourceTable in zip(idValList, sourceTableList):
136 if len(sourceTable) == 0:
137 continue
138
139 idList = sourceTable.get("id")
140 dataList = [sourceTable.get(key) for key in self._sourceKeyTuple_sourceKeyTuple]
141
142 if self._sourceDTypeList_sourceDTypeList is None:
143 self._sourceDTypeList_sourceDTypeList = [(key, arr.dtype)
144 for key, arr in zip(self._sourceKeyTuple_sourceKeyTuple, dataList)]
145
146 transposedDataList = list(zip(*dataList))
147 del dataList
148
149 dataDict.update((srcId, idTuple + tuple(data))
150 for srcId, data in zip(idList, transposedDataList))
151 return dataDict
152
153 def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList):
154 """Accumulate source measurements from a list of source tables.
155
156 Once you have accumulated all source measurements, call finalize to process the data.
157
158 @param[in] repoInfo: a RepositoryInfo instance
159 @param[in] idKeyTuple: a tuple of data ID keys; must be the same for each call
160 @param[in] idValList: a list of data ID value tuples;
161 each tuple contains values in the order in idKeyTuple
162 @param[in] sourceTableList: a list of source tables, one per entry in idValList
163
164 @raise RuntimeError if idKeyTuple is different than it was for the first call.
165
166 Accumulates the data in temporary cache self._tempDataList_tempDataList.
167
168 @return number of sources
169 """
170 if self._repoKeyTuple_repoKeyTuple is None:
171 self._repoKeyTuple_repoKeyTuple = repoInfo.keyTuple
172 self._repoDTypeList_repoDTypeList = repoInfo.dtype
173
174 dataDict = self._getSourceMetrics_getSourceMetrics(idKeyTuple, idValList, sourceTableList)
175
176 self._tempDataList_tempDataList.append(dataDict)
177 self.repoInfoListrepoInfoList.append(repoInfo)
178 return len(dataDict)
179
180 def finalize(self):
181 """Process the accumulated source measurements to create the final data products.
182
183 Only call this after you have added all source metrics using addSourceMetrics.
184
185 Reads temporary cache self._tempDataList_tempDataList and then deletes it.
186 """
187 if len(self._tempDataList_tempDataList) == 0:
188 raise RuntimeError("No data found")
189
190 fullSrcIdSet = set()
191 for dataIdDict in self._tempDataList_tempDataList:
192 fullSrcIdSet.update(iter(dataIdDict.keys()))
193
194 # source data
195 sourceArrDType = [("sourceId", int)] + self._idKeyDTypeList_idKeyDTypeList + self._sourceDTypeList_sourceDTypeList
196 # data for missing sources (only for the data in the source data dict, so excludes srcId)
197 nullSourceTuple = tuple(numpy.zeros(1, dtype=self._idKeyDTypeList_idKeyDTypeList + self._sourceDTypeList_sourceDTypeList)[0])
198
199 sourceData = [[(srcId,) + srcDataDict.get(srcId, nullSourceTuple) for srcId in fullSrcIdSet]
200 for srcDataDict in self._tempDataList_tempDataList]
201
202 self.sourceArrsourceArr = numpy.array(sourceData, dtype=sourceArrDType)
203 del sourceData
204
205 self.sourceIdDictsourceIdDict = dict((srcId, i) for i, srcId in enumerate(fullSrcIdSet))
206
207 # repository data
208 repoData = [repoInfo.valTuple for repoInfo in self.repoInfoListrepoInfoList]
209 self.repoArrrepoArr = numpy.array(repoData, dtype=self._repoDTypeList_repoDTypeList)
210
211 self._tempDataList_tempDataList = None
212
213
215 """Information about one data repository
216
217 Constructed by RepositoryIterator and used by SourceData.
218 """
219
220 def __init__(self, keyTuple, valTuple, dtype, name):
221 if len(keyTuple) != len(valTuple):
222 raise RuntimeError("lengths of keyTuple=%s and valTuple=%s do not match" % (keyTuple, valTuple))
223 self.keyTuplekeyTuple = tuple(keyTuple)
224 self.valTuplevalTuple = tuple(valTuple)
225 self.dtypedtype = dtype
226 self.namename = name
227
228
230 """Iterate over a set of data repositories that use a naming convention based on parameter values
231 """
232
233 def __init__(self, formatStr, **dataDict):
234 """Construct a repository iterator from a dict of name: valueList
235
236 @param[in] formatStr: format string using dictionary notation, e.g.: "%(foo)s_%(bar)d"
237 @param[in] **dataDict: name=valueList pairs
238 """
239 self._formatStr_formatStr = formatStr
240 self._keyTuple_keyTuple = tuple(sorted(dataDict.keys()))
241 self._valListOfLists_valListOfLists = [numpy.array(dataDict[key]) for key in self._keyTuple_keyTuple]
242 self._dtype_dtype = [(key, self._valListOfLists_valListOfLists[i].dtype)
243 for i, key in enumerate(self._keyTuple_keyTuple)]
244
245 def __iter__(self):
246 """Retrieve next RepositoryInfo object
247 """
248 for valTuple in itertools.product(*self._valListOfLists_valListOfLists):
249 valDict = dict(zip(self._keyTuple_keyTuple, valTuple))
250 name = self.formatformat(valDict)
251 yield RepositoryInfo(keyTuple=self._keyTuple_keyTuple, valTuple=valTuple, dtype=self._dtype_dtype, name=name)
252
253 def __len__(self):
254 """Return the number of items in the iterator"""
255 n = 1
256 for valTuple in self._valListOfLists_valListOfLists:
257 n *= len(valTuple)
258 return n
259
260 def format(self, valDict):
261 """Return formatted string for a specified value dictionary
262
263 @param[in] valDict: a dict of key: value pairs that identify a repository
264 """
265 return self._formatStr_formatStr % valDict
266
267 def getKeyTuple(self):
268 """Return the a tuple of keys in the same order as items in value tuples
269 """
270 return self._keyTuple_keyTuple
271
272 def _getDTypeList(self):
273 """Get a dtype for a structured array of repository keys
274 """
275 return self._dtype_dtype
def __init__(self, keyTuple, valTuple, dtype, name)
def _getSourceMetrics(self, idKeyTuple, idValList, sourceTableList)
def __init__(self, datasetType, sourceKeyTuple)
def addSourceMetrics(self, repoInfo, idKeyTuple, idValList, sourceTableList)
daf::base::PropertyList * list
Definition: fits.cc:913
daf::base::PropertySet * set
Definition: fits.cc:912
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33