LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
multiMatch.py
Go to the documentation of this file.
1 from builtins import zip
2 from builtins import range
3 from builtins import object
4 import numpy
5 import collections
6 import lsst.afw.geom
7 from .tableLib import SchemaMapper, CoordKey, SourceRecord
8 
9 class MultiMatch(object):
10 
11  def __init__(self, schema, dataIdFormat, coordField="coord", idField="id", radius=None,
12  RecordClass=SourceRecord):
13  """Initialize a multi-catalog match.
14 
15  Arguments:
16  schema -------- schema shared by all catalogs to be included in the match.
17  dataIdFormat -- dict of name: type for all data ID keys (e.g. {"visit":int, "ccd":int}).
18  coordField ---- prefix for _ra and _dec fields that contain the coordinates to use for the match.
19  idField ------- name of the field in schema that contains unique object IDs.
20  radius -------- lsst.afw.geom.Angle; maximum separation for a match. Defaults to 0.5 arcseconds.
21  RecordClass --- type of record (a subclass of lsst.afw.table.BaseRecord) to expect in catalogs
22  to be matched.
23  """
24  if radius is None:
25  radius = 0.5*lsst.afw.geom.arcseconds
26  elif not isinstance(radius, lsst.afw.geom.Angle):
27  raise ValueError("'radius' argument must be an Angle")
28  self.radius = radius
29  self.mapper = SchemaMapper(schema)
30  self.mapper.addMinimalSchema(schema, True)
31  self.coordKey = CoordKey(schema[coordField])
32  self.idKey = schema.find(idField).key
33  self.dataIdKeys = {}
34  outSchema = self.mapper.editOutputSchema()
35  self.objectKey = outSchema.addField("object", type=numpy.int64, doc="Unique ID for joined sources")
36  for name, dataType in dataIdFormat.items():
37  self.dataIdKeys[name] = outSchema.addField(name, type=dataType, doc="'%s' data ID component")
38  # self.result will be a catalog containing the union of all matched records, with an 'object' ID
39  # column that can be used to group matches. Sources that have ambiguous matches may appear
40  # multiple times.
41  self.result = None
42  # self.reference will be a subset of self.result, with exactly one record for each group of matches
43  # (we'll use the one from the first catalog matched into this group)
44  # We'll use this to match against each subsequent catalog.
45  self.reference = None
46  # A set of ambiguous objects that we may want to ultimately remove from the final merged catalog.
47  self.ambiguous = set()
48  # Table used to allocate new records for the ouput catalog.
49  self.table = RecordClass.Table.make(self.mapper.getOutputSchema())
50  # Counter used to assign the next object ID
51  self.nextObjId = 1
52 
53  def makeRecord(self, inputRecord, dataId, objId):
54  """Create a new result record from the given input record, using the given data ID and object ID
55  to fill in additional columns."""
56  outputRecord = self.table.copyRecord(inputRecord, self.mapper)
57  for name, key in self.dataIdKeys.items():
58  outputRecord.set(key, dataId[name])
59  outputRecord.set(self.objectKey, objId)
60  return outputRecord
61 
62  def add(self, catalog, dataId):
63  """Add a new catalog to the match, corresponding to the given data ID.
64  """
65  if self.result is None:
66  self.result = self.table.Catalog(self.table)
67  for record in catalog:
68  self.result.append(self.makeRecord(record, dataId, objId=self.nextObjId))
69  self.nextObjId += 1
70  self.reference = self.result.copy(deep=False)
71  return
72  # Temporary dict mapping object ID to reference record
73  # Will remove from this dict as objects are matched.
74  objById = {record.get(self.objectKey): record for record in self.reference}
75  # Temporary dict mapping source ID to new catalog record.
76  # Will remove from this dict as objects are matched.
77  newById = {record.get(self.idKey): record for record in catalog}
78  # Temporary dict mapping new source ID to a set of associated objects.
79  newToObj = {}
80  matches = lsst.afw.table.matchRaDec(self.reference, catalog, self.radius)
81  for refRecord, newRecord, distance in matches:
82  objId = refRecord.get(self.objectKey)
83  if objById.pop(objId, None) is None:
84  # We've already matched this object against another new source,
85  # mark it as ambiguous.
86  self.ambiguous.add(objId)
87  if newById.pop(newRecord.get(self.idKey), None) is None:
88  # We've already matched this new source to one or more other objects
89  # Mark all involved objects as ambiguous
90  self.ambiguous.add(objId)
91  self.ambiguous |= newToObj.get(newRecord.get(self.idKey), set())
92  # Populate the newToObj dict (setdefault trick is an idiom for appending to a dict-of-sets)
93  newToObj.setdefault(newRecord.get(self.idKey), set()).add(objId)
94  # Add a new result record for this match.
95  self.result.append(self.makeRecord(newRecord, dataId, objId))
96  # Add any unmatched sources from the new catalog as new objects to both the joined result catalog
97  # and the reference catalog.
98  for newRecord in newById.values():
99  resultRecord = self.makeRecord(newRecord, dataId, self.nextObjId)
100  self.nextObjId += 1
101  self.result.append(resultRecord)
102  self.reference.append(resultRecord)
103 
104  def finish(self, removeAmbiguous=True):
105  """Return the final match catalog, after sorting it by object, copying it to ensure contiguousness,
106  and optionally removing ambiguous matches.
107 
108  After calling finish(), the in-progress state of the matcher is returned to the state it was
109  just after construction, with the exception of the object ID counter (which is not reset).
110  """
111  if removeAmbiguous:
112  result = self.table.Catalog(self.table)
113  for record in self.result:
114  if record.get(self.objectKey) not in self.ambiguous:
115  result.append(record)
116  else:
117  result = self.result
118  result.sort(self.objectKey)
119  result = result.copy(deep=True)
120  self.result = None
121  self.reference = None
122  self.ambiguous = set()
123  return result
124 
125 
126 class GroupView(collections.Mapping):
127  """A mapping (i.e. dict-like object) that provides convenient operations on the concatenated
128  catalogs returned by a MultiMatch object.
129 
130  A GroupView provides access to a catalog of grouped objects, in which the grouping is indicated by
131  a field for which all records in a group have the same value. Once constructed, it allows operations
132  similar to those supported by SQL "GROUP BY", such as filtering and aggregate calculation.
133  """
134 
135  @classmethod
136  def build(cls, catalog, groupField="object"):
137  """!Construct a GroupView from a concatenated catalog.
138 
139  @param[in] catalog Input catalog, containing records grouped by a field in which all records
140  in the same group have the same value. Must be sorted by the group field.
141  @param[in] groupField Name or Key for the field that indicates groups.
142  """
143  groupKey = catalog.schema.find(groupField).key
144  ids, indices = numpy.unique(catalog.get(groupKey), return_index=True)
145  groups = numpy.zeros(len(ids), dtype=object)
146  ends = list(indices[1:]) + [len(catalog)]
147  for n, (i1, i2) in enumerate(zip(indices, ends)):
148  groups[n] = catalog[int(i1):int(i2)] # casts are a work-around for DM-8557
149  assert (groups[n].get(groupKey) == ids[n]).all()
150  return cls(catalog.schema, ids, groups)
151 
152  def __init__(self, schema, ids, groups):
153  """Direct constructor; most users should call build() instead.
154 
155  This constructor takes the constituent arrays of the object directly, to allow multiple
156  methods for construction.
157  """
158  self.schema = schema
159  self.ids = ids
160  self.groups = groups
161  self.count = sum(len(cat) for cat in self.groups)
162 
163  def __len__(self):
164  """Return the number of groups"""
165  return len(self.ids)
166 
167  def __iter__(self):
168  """Iterate over group field values"""
169  return self.ids
170 
171  def __getitem__(self, key):
172  """Return the catalog subset that corresponds to an group field value"""
173  index = numpy.searchsorted(self.ids, key)
174  if self.ids[index] != key:
175  raise KeyError("Group with ID {0} not found".format(key))
176  return self.groups[index]
177 
178  def where(self, predicate):
179  """Return a new GroupView that contains only groups for which the given predicate function
180  returns True.
181 
182  The predicate function is called once for each group, and passed a single argument: the subset
183  catalog for that group.
184  """
185  mask = numpy.zeros(len(self), dtype=bool)
186  for i in range(len(self)):
187  mask[i] = predicate(self.groups[i])
188  return type(self)(self.schema, self.ids[mask], self.groups[mask])
189 
190  def aggregate(self, function, field=None, dtype=float):
191  """!Run an aggregate function on each group, returning an array with one element for each group.
192 
193  @param[in] function Callable object that computes the aggregate value. If field is None,
194  called with the entire subset catalog as an argument. If field is not
195  None, called with an array view into that field.
196  @param[in] field A string name or Key object that indicates a single field the aggregate
197  is computed over.
198  @param[in] dtype Data type for the output array.
199  """
200  result = numpy.zeros(len(self), dtype=dtype)
201  if field is not None:
202  key = self.schema.find(field).key
203  f = lambda cat: function(cat.get(key))
204  else:
205  f = function
206  for i in range(len(self)):
207  result[i] = f(self.groups[i])
208  return result
209 
210  def apply(self, function, field=None, dtype=float):
211  """!Run a non-aggregate function on each group, returning an array with one element for each record.
212 
213  @param[in] function Callable object that computes the aggregate value. If field is None,
214  called with the entire subset catalog as an argument. If field is not
215  None, called with an array view into that field.
216  @param[in] field A string name or Key object that indicates a single field the aggregate
217  is computed over.
218  @param[in] dtype Data type for the output array.
219  """
220  result = numpy.zeros(self.count, dtype=dtype)
221  if field is not None:
222  key = self.schema.find(field).key
223  f = lambda cat: function(cat.get(key))
224  else:
225  f = function
226  last = 0
227  for i in range(len(self)):
228  next = last + len(self.groups[i])
229  result[last:next] = f(self.groups[i])
230  last = next
231  return result
bool all(CoordinateExpr< N > const &expr)
Return true if all elements are true.
def aggregate
Run an aggregate function on each group, returning an array with one element for each group...
Definition: multiMatch.py:190
std::vector< Match< typename Cat::Record, typename Cat::Record > > matchRaDec(Cat const &cat, Angle radius, bool symmetric)
def build
Construct a GroupView from a concatenated catalog.
Definition: multiMatch.py:136
A class representing an Angle.
Definition: Angle.h:103
def apply
Run a non-aggregate function on each group, returning an array with one element for each record...
Definition: multiMatch.py:210
A FunctorKey used to get or set celestial coordinates from a pair of Angle keys.
Definition: aggregates.h:119