LSSTApplications  18.1.0
LSSTDataManagementBasePackage
multiMatch.py
Go to the documentation of this file.
1 import collections.abc
2 
3 import numpy
4 
5 import lsst.geom
6 from .schemaMapper import SchemaMapper
7 from .aggregates import CoordKey
8 from .source import SourceRecord
9 
10 
11 class MultiMatch:
12 
13  def __init__(self, schema, dataIdFormat, coordField="coord", idField="id", radius=None,
14  RecordClass=SourceRecord):
15  """Initialize a multi-catalog match.
16 
17  Arguments:
18  schema -------- schema shared by all catalogs to be included in the match.
19  dataIdFormat -- dict of name: type for all data ID keys (e.g. {"visit":int, "ccd":int}).
20  coordField ---- prefix for _ra and _dec fields that contain the coordinates to use for the match.
21  idField ------- name of the field in schema that contains unique object IDs.
22  radius -------- lsst.geom.Angle; maximum separation for a match. Defaults to 0.5 arcseconds.
23  RecordClass --- type of record (a subclass of lsst.afw.table.BaseRecord) to expect in catalogs
24  to be matched.
25  """
26  if radius is None:
27  radius = 0.5*lsst.geom.arcseconds
28  elif not isinstance(radius, lsst.geom.Angle):
29  raise ValueError("'radius' argument must be an Angle")
30  self.radius = radius
31  self.mapper = SchemaMapper(schema)
32  self.mapper.addMinimalSchema(schema, True)
33  self.coordKey = CoordKey(schema[coordField])
34  self.idKey = schema.find(idField).key
35  self.dataIdKeys = {}
36  outSchema = self.mapper.editOutputSchema()
37  outSchema.setAliasMap(self.mapper.getInputSchema().getAliasMap())
38  self.objectKey = outSchema.addField(
39  "object", type=numpy.int64, doc="Unique ID for joined sources")
40  for name, dataType in dataIdFormat.items():
41  self.dataIdKeys[name] = outSchema.addField(
42  name, type=dataType, doc="'%s' data ID component")
43  # self.result will be a catalog containing the union of all matched records, with an 'object' ID
44  # column that can be used to group matches. Sources that have ambiguous matches may appear
45  # multiple times.
46  self.result = None
47  # self.reference will be a subset of self.result, with exactly one record for each group of matches
48  # (we'll use the one from the first catalog matched into this group)
49  # We'll use this to match against each subsequent catalog.
50  self.reference = None
51  # A set of ambiguous objects that we may want to ultimately remove from
52  # the final merged catalog.
53  self.ambiguous = set()
54  # Table used to allocate new records for the ouput catalog.
55  self.table = RecordClass.Table.make(self.mapper.getOutputSchema())
56  # Counter used to assign the next object ID
57  self.nextObjId = 1
58 
59  def makeRecord(self, inputRecord, dataId, objId):
60  """Create a new result record from the given input record, using the given data ID and object ID
61  to fill in additional columns."""
62  outputRecord = self.table.copyRecord(inputRecord, self.mapper)
63  for name, key in self.dataIdKeys.items():
64  outputRecord.set(key, dataId[name])
65  outputRecord.set(self.objectKey, objId)
66  return outputRecord
67 
68  def add(self, catalog, dataId):
69  """Add a new catalog to the match, corresponding to the given data ID.
70  """
71  if self.result is None:
72  self.result = self.table.Catalog(self.table)
73  for record in catalog:
74  self.result.append(self.makeRecord(
75  record, dataId, objId=self.nextObjId))
76  self.nextObjId += 1
77  self.reference = self.result.copy(deep=False)
78  return
79  catalog.sort(self.idKey) # pre-sort for speedy by-id access later.
80  # Will remove from this set as objects are matched.
81  unmatchedIds = {record.get(self.idKey) for record in catalog}
82  # Temporary dict mapping new source ID to a set of associated objects.
83  newToObj = {}
84  matches = lsst.afw.table.matchRaDec(self.reference, catalog, self.radius)
85  matchedRefIds = set()
86  matchedCatIds = set()
87  for refRecord, newRecord, distance in matches:
88  objId = refRecord.get(self.objectKey)
89  if objId in matchedRefIds:
90  # We've already matched this object against another new source,
91  # mark it as ambiguous.
92  self.ambiguous.add(objId)
93  matchedRefIds.add(objId)
94  if newRecord.get(self.idKey) in matchedCatIds:
95  # We've already matched this new source to one or more other objects
96  # Mark all involved objects as ambiguous
97  self.ambiguous.add(objId)
98  self.ambiguous |= newToObj.get(newRecord.get(self.idKey), set())
99  matchedCatIds.add(newRecord.get(self.idKey))
100  unmatchedIds.discard(newRecord.get(self.idKey))
101  # Populate the newToObj dict (setdefault trick is an idiom for
102  # appending to a dict-of-sets)
103  newToObj.setdefault(newRecord.get(self.idKey), set()).add(objId)
104  # Add a new result record for this match.
105  self.result.append(self.makeRecord(newRecord, dataId, objId))
106  # Add any unmatched sources from the new catalog as new objects to both
107  # the joined result catalog and the reference catalog.
108  for objId in unmatchedIds:
109  newRecord = catalog.find(objId, self.idKey)
110  resultRecord = self.makeRecord(newRecord, dataId, self.nextObjId)
111  self.nextObjId += 1
112  self.result.append(resultRecord)
113  self.reference.append(resultRecord)
114 
115  def finish(self, removeAmbiguous=True):
116  """Return the final match catalog, after sorting it by object, copying it to ensure contiguousness,
117  and optionally removing ambiguous matches.
118 
119  After calling finish(), the in-progress state of the matcher is returned to the state it was
120  just after construction, with the exception of the object ID counter (which is not reset).
121  """
122  if removeAmbiguous:
123  result = self.table.Catalog(self.table)
124  for record in self.result:
125  if record.get(self.objectKey) not in self.ambiguous:
126  result.append(record)
127  else:
128  result = self.result
129  result.sort(self.objectKey)
130  result = result.copy(deep=True)
131  self.result = None
132  self.reference = None
133  self.ambiguous = set()
134  return result
135 
136 
137 class GroupView(collections.abc.Mapping):
138  """A mapping (i.e. dict-like object) that provides convenient operations on the concatenated
139  catalogs returned by a MultiMatch object.
140 
141  A GroupView provides access to a catalog of grouped objects, in which the grouping is indicated by
142  a field for which all records in a group have the same value. Once constructed, it allows operations
143  similar to those supported by SQL "GROUP BY", such as filtering and aggregate calculation.
144  """
145 
146  @classmethod
147  def build(cls, catalog, groupField="object"):
148  """!Construct a GroupView from a concatenated catalog.
149 
150  @param[in] cls (Class; omit this argument, but Doxygen wants it mentioned)
151  @param[in] catalog Input catalog, containing records grouped by a field in which all records
152  in the same group have the same value. Must be sorted by the group field.
153  @param[in] groupField Name or Key for the field that indicates groups.
154  """
155  groupKey = catalog.schema.find(groupField).key
156  ids, indices = numpy.unique(catalog.get(groupKey), return_index=True)
157  groups = numpy.zeros(len(ids), dtype=object)
158  ends = list(indices[1:]) + [len(catalog)]
159  for n, (i1, i2) in enumerate(zip(indices, ends)):
160  # casts are a work-around for DM-8557
161  groups[n] = catalog[int(i1):int(i2)]
162  assert (groups[n].get(groupKey) == ids[n]).all()
163  return cls(catalog.schema, ids, groups)
164 
165  def __init__(self, schema, ids, groups):
166  """Direct constructor; most users should call build() instead.
167 
168  This constructor takes the constituent arrays of the object directly, to allow multiple
169  methods for construction.
170  """
171  self.schema = schema
172  self.ids = ids
173  self.groups = groups
174  self.count = sum(len(cat) for cat in self.groups)
175 
176  def __len__(self):
177  """Return the number of groups"""
178  return len(self.ids)
179 
180  def __iter__(self):
181  """Iterate over group field values"""
182  return self.ids
183 
184  def __getitem__(self, key):
185  """Return the catalog subset that corresponds to an group field value"""
186  index = numpy.searchsorted(self.ids, key)
187  if self.ids[index] != key:
188  raise KeyError("Group with ID {0} not found".format(key))
189  return self.groups[index]
190 
191  def where(self, predicate):
192  """Return a new GroupView that contains only groups for which the given predicate function
193  returns True.
194 
195  The predicate function is called once for each group, and passed a single argument: the subset
196  catalog for that group.
197  """
198  mask = numpy.zeros(len(self), dtype=bool)
199  for i in range(len(self)):
200  mask[i] = predicate(self.groups[i])
201  return type(self)(self.schema, self.ids[mask], self.groups[mask])
202 
203  def aggregate(self, function, field=None, dtype=float):
204  """!Run an aggregate function on each group, returning an array with one element for each group.
205 
206  @param[in] function Callable object that computes the aggregate value. If field is None,
207  called with the entire subset catalog as an argument. If field is not
208  None, called with an array view into that field.
209  @param[in] field A string name or Key object that indicates a single field the aggregate
210  is computed over.
211  @param[in] dtype Data type for the output array.
212  """
213  result = numpy.zeros(len(self), dtype=dtype)
214  if field is not None:
215  key = self.schema.find(field).key
216 
217  def f(cat):
218  return function(cat.get(key))
219  else:
220  f = function
221  for i in range(len(self)):
222  result[i] = f(self.groups[i])
223  return result
224 
225  def apply(self, function, field=None, dtype=float):
226  """!Run a non-aggregate function on each group, returning an array with one element for each record.
227 
228  @param[in] function Callable object that computes the aggregate value. If field is None,
229  called with the entire subset catalog as an argument. If field is not
230  None, called with an array view into that field.
231  @param[in] field A string name or Key object that indicates a single field the aggregate
232  is computed over.
233  @param[in] dtype Data type for the output array.
234  """
235  result = numpy.zeros(self.count, dtype=dtype)
236  if field is not None:
237  key = self.schema.find(field).key
238 
239  def f(cat):
240  return function(cat.get(key))
241  else:
242  f = function
243  last = 0
244  for i in range(len(self)):
245  next = last + len(self.groups[i])
246  result[last:next] = f(self.groups[i])
247  last = next
248  return result
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.
def __init__(self, schema, dataIdFormat, coordField="coord", idField="id", radius=None, RecordClass=SourceRecord)
Definition: multiMatch.py:14
def build(cls, catalog, groupField="object")
Construct a GroupView from a concatenated catalog.
Definition: multiMatch.py:147
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
daf::base::PropertySet * set
Definition: fits.cc:884
A class representing an angle.
Definition: Angle.h:127
def apply(self, function, field=None, dtype=float)
Run a non-aggregate function on each group, returning an array with one element for each record...
Definition: multiMatch.py:225
template SourceMatchVector matchRaDec(SourceCatalog const &, lsst::geom::Angle, MatchControl const &)
def makeRecord(self, inputRecord, dataId, objId)
Definition: multiMatch.py:59
def __init__(self, schema, ids, groups)
Definition: multiMatch.py:165
bool all(CoordinateExpr< N > const &expr) noexcept
Return true if all elements are true.
table::Key< int > type
Definition: Detector.cc:167
def finish(self, removeAmbiguous=True)
Definition: multiMatch.py:115
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
def aggregate(self, function, field=None, dtype=float)
Run an aggregate function on each group, returning an array with one element for each group...
Definition: multiMatch.py:203
std::vector< SchemaItem< Flag > > * items
def add(self, catalog, dataId)
Definition: multiMatch.py:68
daf::base::PropertyList * list
Definition: fits.cc:885