LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Stages.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008, 2009, 2010 LSST Corporation.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <http://www.lsstcorp.org/LegalNotices/>.
23  */
24 
25 
50 #if LSST_HAVE_OPEN_MP
51 # include <omp.h>
52 #endif
53 
54 #include <algorithm>
55 #include <memory>
56 #include <utility>
57 
58 #include "boost/format.hpp"
59 #include "boost/scoped_array.hpp"
60 
62 #include "lsst/pex/exceptions.h"
63 #include "lsst/pex/logging/Log.h"
64 
65 #include "lsst/afw/detection/DiaSource.h"
66 #include "lsst/afw/image/Filter.h"
67 #include "lsst/mops/MovingObjectPrediction.h"
68 
69 #include "lsst/ap/ChunkManager.h"
70 #include "lsst/ap/Match.h"
71 #include "lsst/ap/Point.h"
72 #include "lsst/ap/Stages.h"
73 #include "lsst/ap/Time.h"
74 #include "lsst/ap/Utils.h"
75 
82 using lsst::afw::detection::DiaSource;
83 using lsst::afw::detection::DiaSourceSet;
84 using lsst::afw::detection::PersistableDiaSourceVector;
86 using lsst::mops::MovingObjectPrediction;
87 
88 namespace ex = lsst::pex::exceptions;
89 
90 
91 namespace lsst { namespace ap {
92 
93 // -- Constants ----------------
94 
95 static boost::uint32_t const HAS_MATCH = 1;
96 static boost::uint32_t const HAS_KNOWN_VARIABLE_MATCH = 2;
97 static boost::uint32_t const HAS_MOVING_OBJECT_MATCH = 4;
98 static char const * const VAR_PROB_THRESH_KEY[6] = {
99  "uVarProbThreshold",
100  "gVarProbThreshold",
101  "rVarProbThreshold",
102  "iVarProbThreshold",
103  "zVarProbThreshold",
104  "yVarProbThreshold"
105 };
106 
107 // -- typedefs and templates for chunks and spatial indexes ----------------
108 
109 namespace detail {
110 
112 
113 typedef std::vector<ObjectChunk> ObjectChunkVector;
114 
117 
119 
120 } // end of namespace detail
121 
122 #if defined(__GNUC__) && __GNUC__ > 3
123 # pragma GCC visibility push(hidden)
124 #endif
125 
127 template class ZoneEntry<detail::ObjectChunk>;
128 template class ZoneEntry<DiaSourceChunk>;
129 
132 
133 template class ZoneIndex<detail::ObjectEntry>;
134 template class ZoneIndex<detail::DiaSourceEntry>;
135 
139 #if defined(__GNUC__) && __GNUC__ > 3
140 # pragma GCC visibility pop
141 #endif
142 
143 
144 namespace detail {
145 
146 // -- Proper motion correction for objects ----------------
147 
149 std::pair<double, double> correctProperMotion(Object const& obj, double const epoch) {
150  static double const RAD_PER_MAS = (RADIANS_PER_DEGREE/360000.0);
151  // (rad/mas)*(sec/year)/(km/AU)
152  static double const SCALE = RAD_PER_MAS*(365.25*86400/149597870.691);
153 
154  double ra = radians(obj.getRa()); // rad
155  double decl = radians(obj.getDec()); // rad
156 
157  // Convert ra, dec to unit vector in cartesian coordinate system
158  double const sinDecl = std::sin(decl);
159  double const cosDecl = std::cos(decl);
160  double const sinRa = std::sin(ra);
161  double const cosRa = std::cos(ra);
162  double x = cosRa * cosDecl;
163  double y = sinRa * cosDecl;
164  double z = sinDecl;
165 
166  // compute space motion vector (radians per year)
167  double const pmRa = obj.getMuRa()*RAD_PER_MAS/cosDecl;
168  double const pmDecl = obj.getMuDecl()*RAD_PER_MAS;
169  // divide radial velociy by distance to source
170  double const w = obj.getParallax()*obj.getRadialVelocity()*SCALE;
171 
172  double const mx = - pmRa*y - pmDecl*cosRa + x*w;
173  double const my = pmRa*x - pmDecl*sinRa + y*w;
174  double const mz = pmDecl*cosDecl + z*w;
175 
176  // Linear interpolation of position
177  double const dt = (epoch - obj.getEpoch()) * (1/365.25); // julian years
178  x += mx*dt;
179  y += my*dt;
180  z += mz*dt;
181 
182  // Store unit vector for corrected position, convert back to
183  // spherical coords and store scaled integer ra/dec
184  double d2 = x*x + y*y;
185  ra = (d2 == 0.0) ? 0 : degrees(std::atan2(y, x));
186  decl = (z == 0.0) ? 0 : degrees(std::atan2(z, std::sqrt(d2)));
187  if (ra < 0.0) {
188  ra += 360.0;
189  }
190  return std::make_pair(ra, decl);
191 }
192 
193 
194 // -- Match processors ----------------
195 
197 template <typename ZoneEntryT>
199 
200 public :
201 
203  typedef typename std::vector<Match>::iterator MatchIterator;
204 
207  int const _threshold;
208 
210  VisitProcessingContext & context,
211  MatchPairVector & matches,
212  Filter const filter
213  ) :
214  _matches(matches),
215  _filter(filter),
216  _threshold(context.getPipelinePolicy()->getInt(VAR_PROB_THRESH_KEY[filter.getId()]))
217  {}
218 
219  void operator()(DiaSourceEntry & ds, MatchIterator begin, MatchIterator end) {
220  boost::uint32_t flags = HAS_MATCH;
221  do {
222  ZoneEntryT * const obj = begin->_match;
223  double const dist = degrees(begin->_distance);
224  ++begin;
225  // record match results (to be persisted later)
226  _matches.push_back(MatchPair(ds._data->getId(), obj->_data->getId(), dist));
227  if (obj->_data->getVarProb(_filter) >= _threshold) {
228  // flag ds as matching a known variable
229  flags |= HAS_KNOWN_VARIABLE_MATCH;
230  }
231  } while (begin != end);
232  ds._flags = flags;
233  }
234 };
235 
236 
239 
241  typedef std::vector<Match>::iterator MatchIterator;
242 
244 
245  explicit MovingObjectPredictionMatchProcessor(MatchPairVector & results) : _results(results) {}
246 
248  ds._flags |= HAS_MOVING_OBJECT_MATCH;
249  double dx = ell._cosRa * ell._cosDec - ds._x;
250  double dy = ell._sinRa * ell._cosDec - ds._y;
251  double dz = ell._sinDec - ds._z;
252  double dist = 2.0*std::asin(0.5*std::sqrt(dx*dx + dy*dy + dz*dz));
253  // record match results (to be persisted later)
254  _results.push_back(MatchPair(ell._data->getId(), ds._data->getId(), degrees(dist)));
255  }
256 };
257 
258 
259 // -- Zone index filters and functors ----------------
260 
263  bool operator()(DiaSourceEntry const & ds) {
264  return (ds._flags & HAS_KNOWN_VARIABLE_MATCH) == 0;
265  }
266 };
267 
268 
272 
273  DiscardLargeEllipseFilter(Policy::Ptr const policy) :
274  semiMajorAxisThreshold(policy->getDouble("semiMajorAxisThreshold")) {}
275 
276  bool operator()(lsst::mops::MovingObjectPrediction const & p) {
277  return p.getSemiMajorAxisLength() < semiMajorAxisThreshold;
278  }
279 };
280 
281 
284 
285  typedef std::map<int, ObjectChunk> ChunkMap;
286  typedef ChunkMap::value_type ChunkMapValue;
287  typedef ChunkMap::iterator ChunkMapIterator;
288 
291  Point const _fovCen;
292  double const _fovRad;
295  boost::int64_t const _idNamespace;
296 
298  IdPairVector & results,
299  VisitProcessingContext & context
300  ) :
301  _results(results),
302  _zsc(context.getDecomposition()),
303  _fovCen(context.getFov().getCenterRa(), context.getFov().getCenterDec()),
304  _fovRad(context.getFov().getRadius()),
305  _chunks(),
306  _filter(context.getFilter()),
307  _idNamespace(static_cast<boost::int64_t>(context.getFilter().getId() + 1) << 56)
308  {
309  ObjectChunkVector & chunks = context.getChunks();
310  // build a map of ids to chunks
311  for (ObjectChunkVector::iterator i(chunks.begin()), end(chunks.end()); i != end; ++i) {
312  _chunks.insert(ChunkMapValue(i->getId(), *i));
313  }
314  }
315 
316  void operator()(DiaSourceEntry const & entry) {
317  // TODO - this logic should be moved into Python to make it easier to change and more configureable
318  static boost::int64_t const SHAPE_DIFFERS_IN_BOTH_EXPOSURES_MASK = 1 << 2;
319  static boost::int64_t const POSITIVE_FLUX_EXCURSION_MASK = (1 << 3) | (1<< 4);
320  static boost::int64_t const idLimit = INT64_C(1) << 56;
321  // Generate at most 1 object for a pair of difference sources
322  if ((entry._data->getDiaSourceToId() & (1 << 30)) != 0) {
323  return;
324  }
325  boost::int64_t classFlags = entry._data->getFlagClassification();
326  // Don't generate objects for cosmic rays
327  if (((classFlags & SHAPE_DIFFERS_IN_BOTH_EXPOSURES_MASK) != 0) &&
328  ((classFlags & POSITIVE_FLUX_EXCURSION_MASK) != 0)) {
329  return;
330  }
331  // TODO: Don't generate objects for fast movers. Requires knowledge of
332  // ellipticity of difference source after PSF deconvolution, which is not
333  // available for DC3a.
334 
335  if ((entry._flags & (HAS_MATCH | HAS_KNOWN_VARIABLE_MATCH)) == 0) {
336  // difference source had no matches - record it as the source of a new object
337  boost::int64_t id = entry._data->getId();
338  if (id >= idLimit) {
339  throw LSST_EXCEPT(ex::RangeError, "DiaSource id doesn't fit in 56 bits");
340  }
341  // generate a new simplified object (id, position, proper motions, variability probabilities)
342  // and assign it to the appropriate chunk
343  Object obj;
344 
345  obj._objectId = id | _idNamespace;
346  obj._ra = entry._data->getRa();
347  obj._decl = entry._data->getDec();
348  obj._muRa = 0.0;
349  obj._muDecl = 0.0;
350  obj._parallax = 0.0;
351  obj._radialVelocity = 0.0;
352  obj._varProb[0] = 0;
353  obj._varProb[1] = 0;
354  obj._varProb[2] = 0;
355  obj._varProb[3] = 0;
356  obj._varProb[4] = 0;
357  obj._varProb[5] = 0;
358  obj._varProb[_filter.getId()] = 100;
359 
360  _results.push_back(IdPair(id, obj._objectId));
361 
362  // find the chunk the new object belongs to and insert the new object into it
363  int const chunkId = _zsc.radecToChunk(obj._ra, obj._decl);
364  ChunkMapIterator c = _chunks.find(chunkId);
365  if (c == _chunks.end()) {
366  throw LSST_EXCEPT(ex::RuntimeError,
367  (boost::format("new object from DIASource %1% ra,dec=(%2%, %3%) x,y=(%4%, %5%) "
368  "not in any chunk overlapping FOV with center (%6%, %7%), "
369  "radius=%8%: new object would go to chunk %9%; distance to FOV "
370  "center is %10% deg") % id % obj._ra % obj._decl %
371  entry._data->getXAstrom() % entry._data->getYAstrom() % _fovCen._ra %
372  _fovCen._dec % _fovRad % chunkId % _fovCen.distance(Point(obj._ra, obj._decl))).str());
373  }
374  c->second.insert(obj);
375  }
376  }
377 };
378 
379 
380 // -- Index creation ----------------
381 
382 template <typename EntryT>
384  ZoneIndex<EntryT> & index,
385  std::vector<typename EntryT::Chunk> const & chunks,
386  double const epoch
387 ) {
388  typedef typename EntryT::Data Data;
389  typedef typename EntryT::Chunk Chunk;
390  typedef std::vector<Chunk> ChunkVector;
391  typedef typename ChunkVector::const_iterator ChunkIterator;
392  typedef typename ChunkVector::size_type Size;
393 
394  index.clear();
395  if (chunks.empty()) {
396  return;
397  }
398 
399  Stopwatch watch(true);
400 
401  // determine stripe bounds for the input chunks
402  ZoneStripeChunkDecomposition const & zsc = index.getDecomposition();
403  int minStripe = 0x7FFFFFFF;
404  int maxStripe = -1 - minStripe;
405  ChunkIterator const end(chunks.end());
406  for (ChunkIterator c(chunks.begin()); c != end; ++c) {
407  int const stripeId = ZoneStripeChunkDecomposition::chunkToStripe(c->getId());
408  if (stripeId > maxStripe) {
409  maxStripe = stripeId;
410  }
411  if (stripeId < minStripe) {
412  minStripe = stripeId;
413  }
414  }
415  assert(maxStripe >= minStripe && "invalid stripe bounds for chunk list");
416 
417  // Partition input chunks into stripes
418  int const numStripes = maxStripe - minStripe + 1;
419  boost::scoped_array<ChunkVector> stripes(new ChunkVector[numStripes]);
420  for (ChunkIterator c(chunks.begin()); c != end; ++c) {
421  int const stripeId = ZoneStripeChunkDecomposition::chunkToStripe(c->getId());
422  assert(stripeId >= minStripe && stripeId <= maxStripe && "stripe id out of bounds");
423  stripes[stripeId - minStripe].push_back(*c);
424  }
425 
426  double minDec = zsc.getStripeDecMin(minStripe) - 0.001;
427  double maxDec = zsc.getStripeDecMax(maxStripe) + 0.001;
428  index.setDecBounds(std::max(minDec, -90.0), std::min(maxDec, 90.0));
429 
430  // Loop over stripes. Note that due to proper motion, objects from
431  // different stripes can end up in the same zone. Objects are never
432  // migrated across chunks/stripes; rather, their J2000 coordinates
433  // determine the chunks that they will belong to. As time goes on,
434  // the spatial extent of a chunk will grow, where the rate of growth
435  // is bounded by Barnard's star with a proper motion of ~10.5 arcsec/year.
436  // When intersecting the bounding circle of a FOV with chunk boundaries
437  // to determine which chunks must be loaded for association, the bounding
438  // circle must be padded to ensure all relevant objects are loaded.
439  try {
440  for (int s = 0; s < numStripes; ++s) {
441  ChunkVector & vec = stripes[s];
442  Size const numChunks = vec.size();
443 
444  // Loop over chunks in stripe
445  for (Size c = 0; c < numChunks; ++c) {
446  Chunk * const ch = &vec[c];
447  int const numBlocks = ch->blocks();
448  int i = 0;
449 
450  // loop over blocks in chunk
451  for (int b = 0; b < numBlocks; ++b) {
452  int const numEntries = ch->entries(b);
453  Data * const block = ch->getBlock(b);
454  ChunkEntryFlag const * const flags = ch->getFlagBlock(b);
455 
456  // loop over entries in block
457  for (int e = 0; e < numEntries; ++e, ++i) {
458  if ((flags[e] & Chunk::DELETED) == 0) {
459  std::pair<double, double> pos = correctProperMotion(block[e], epoch);
460  index.insert(pos.first, pos.second, &block[e], ch, i);
461  }
462  }
463  }
464  }
465  }
466  } catch(...) {
467  index.clear();
468  throw LSST_EXCEPT(ex::RuntimeError, "Failed to build zone index");
469  }
470 
471  watch.stop();
472  Log log(Log::getDefaultLog(), "lsst.ap");
473  int numElements = static_cast<int>(index.size());
474  Rec(log, Log::INFO) << "inserted elements into zone index" <<
475  Prop<int>("numElements", numElements) <<
476  Prop<double>("time", watch.seconds()) << Rec::endr;
477 
478  // zone structure is filled, sort individual zones (on right ascension)
479  watch.start();
480  index.sort();
481  watch.stop();
482  Rec(log, Log::INFO) << "sorted zone index" <<
483  Prop<int>("numElements", numElements) <<
484  Prop<double>("time", watch.seconds()) << Rec::endr;
485 }
486 
487 } // end of namespace detail
488 
489 
490 // -- Template instantiations ----------------
491 
492 #if defined(__GNUC__) && __GNUC__ > 3
493 # pragma GCC visibility push(hidden)
494 #endif
495 template class detail::ObjectMatchProcessor<detail::ObjectEntry>;
497 
498 template std::size_t distanceMatch<
501  PassthroughFilter<detail::DiaSourceEntry>,
502  PassthroughFilter<detail::ObjectEntry>,
503  detail::ObjectMatchProcessor<detail::ObjectEntry>
504 >(
505  ZoneIndex<detail::DiaSourceEntry> &,
506  ZoneIndex<detail::ObjectEntry> &,
507  double const,
508  PassthroughFilter<detail::DiaSourceEntry> &,
509  PassthroughFilter<detail::ObjectEntry> &,
510  detail::ObjectMatchProcessor<detail::ObjectEntry> &
511 );
512 
513 template std::size_t ellipseMatch<
514  MovingObjectPrediction,
516  PassthroughFilter<detail::MovingObjectEllipse>,
517  PassthroughFilter<detail::DiaSourceEntry>,
518  detail::MovingObjectPredictionMatchProcessor
519 >(
520  EllipseList<MovingObjectPrediction> &,
521  ZoneIndex<detail::DiaSourceEntry> &,
522  PassthroughFilter<detail::MovingObjectEllipse> &,
523  PassthroughFilter<detail::DiaSourceEntry> &,
524  detail::MovingObjectPredictionMatchProcessor &
525 );
527 #if defined(__GNUC__) && __GNUC__ > 3
528 # pragma GCC visibility pop
529 #endif
530 
531 
532 // -- VisitProcessingContext ----------------
533 
535  Policy::Ptr const policy,
536  PropertySet::Ptr const event,
537  std::string const & runId,
538  int const workerId,
539  int const numWorkers
540 ) :
541  lsst::daf::base::Citizen(typeid(*this)),
542  _policy(policy),
543  _chunkIds(),
544  _chunks(),
545  _objectIndex(policy->getInt("zonesPerDegree"),
546  policy->getInt("zonesPerStripe"),
547  policy->getInt("maxEntriesPerZoneEstimate")),
548  _diaSourceIndex(policy->getInt("zonesPerDegree"),
549  policy->getInt("zonesPerStripe"),
550  policy->getInt("maxEntriesPerZoneEstimate")),
551  _deadline(),
552  _fov(),
553  _runId(runId),
554  _visitId(-1),
555  _matchRadius(policy->getDouble("matchRadius")),
556  _ellipseScalingFactor(policy->getDouble("ellipseScalingFactor")),
557  _filter(),
558  _workerId(workerId),
559  _numWorkers(numWorkers),
560  _debugSharedMemory(policy->getBool("debugSharedMemory"))
561 {
562  double ra = event->getAsDouble("ra");
563  double dec = event->getAsDouble("decl");
564  _fov = CircularRegion(ra, dec, policy->getDouble("fovRadius"));
565  _visitId = event->getAsInt("visitId");
566  if (event->exists("matchRadius")) {
567  _matchRadius = event->getAsDouble("matchRadius");
568  }
569  _visitTime = event->getAsDouble("dateObs");
570 
571  // DC3a: set association pipeline deadline to 10 minutes
572  // after creation of a visit processing context.
574  _deadline.tv_sec += 600;
575  std::string filterName = event->getAsString("filter");
576  LogicalLocation location(policy->getString("filterTableLocation"));
577  _filter = Filter(filterName);
578 }
579 
580 
582 
583 
584 void VisitProcessingContext::setDiaSources(boost::shared_ptr<PersistableDiaSourceVector> diaSources) {
585  _diaSources = diaSources->getSources();
587  int const sz = static_cast<int>(_diaSources.size());
588  if (sz == 0) {
589  return;
590  }
591  Stopwatch watch(true);
592  double minDec = 90.0;
593  double maxDec = -90.0;
594  for (int i = 0; i < sz; ++i) {
595  double dec = _diaSources[i]->getDec();
596  if (dec < minDec) {
597  minDec = dec;
598  }
599  if (dec > maxDec) {
600  maxDec = dec;
601  }
602  }
603  assert(maxDec >= minDec && "invalid dec bounds for DiaSource list");
604  _diaSourceIndex.setDecBounds(minDec, maxDec);
605  watch.stop();
606  Log log(Log::getDefaultLog(), "lsst.ap");
607  Rec(log, Log::INFO) << "set dec bounds for difference source index" <<
608  Prop<int>("numElements", sz) <<
609  Prop<double>("time", watch.seconds()) << Rec::endr;
610  watch.start();
611  try {
612  for (int i = 0; i < sz; ++i) {
613  _diaSourceIndex.insert(_diaSources[i]->getRa(), _diaSources[i]->getDec(),
614  _diaSources[i].get(), 0, 0);
615  }
616  } catch (...) {
618  throw;
619  }
620  watch.stop();
621  Rec(log, Log::INFO) << "inserted difference sources into zone index" <<
622  Prop<int>("numElements", sz) <<
623  Prop<double>("time", watch.seconds()) << Rec::endr;
624  watch.start();
626  watch.stop();
627  Rec(log, Log::INFO) << "sorted difference source zone index" <<
628  Prop<int>("numElements", sz) <<
629  Prop<double>("time", watch.seconds()) << Rec::endr;
630 }
631 
632 
635 }
636 
637 
638 // -- Load stage ----------------
639 
644 void initialize(std::string const & runId) {
645  // create shared memory object if it doesn't already exist
646  volatile SharedObjectChunkManager manager(runId);
647 }
648 
649 
655  context.getChunkIds().clear();
656  computeChunkIds(context.getChunkIds(), context.getFov(), context.getDecomposition(), 0, 1);
657  SharedObjectChunkManager manager(context.getRunId());
658  manager.registerVisit(context.getVisitId());
659 }
660 
661 
667 
669  typedef std::vector<Chunk> ChunkVector;
670  typedef std::vector<Chunk>::iterator ChunkIterator;
671 
672  SharedObjectChunkManager manager(context.getRunId());
673  Log log(Log::getDefaultLog(), "lsst.ap");
674 
675  try {
676 
677  // From FOV, get chunk ids to be handled by this worker
678  Stopwatch watch(true);
680  context.getChunkIds(),
681  context.getFov(),
682  context.getDecomposition(),
683  context.getWorkerId(),
684  context.getNumWorkers()
685  );
686  watch.stop();
687  Rec(log, Log::INFO) << "computed chunk ids in FOV for worker " <<
688  Prop<int>("numChunks", static_cast<int>(context.getChunkIds().size())) <<
689  Prop<double>("time", watch.seconds()) << Rec::endr;
690 
691  // Register interest in or create chunks via the chunk manager
692  std::vector<Chunk> toRead;
693  std::vector<Chunk> toWaitFor;
694  watch.start();
695  manager.startVisit(toRead, toWaitFor, context.getVisitId(), context.getChunkIds());
696  watch.stop();
697  Rec(log, Log::INFO) << "started processing visit" <<
698  Prop<double>("time", watch.seconds()) << Rec::endr;
699 
700  // record pointers to all chunks being handled by the slice
701  ChunkVector & chunks = context.getChunks();
702  chunks.clear();
703  chunks.insert(chunks.end(), toRead.begin(), toRead.end());
704  chunks.insert(chunks.end(), toWaitFor.begin(), toWaitFor.end());
705 
706  // Read data files
707  watch.start();
708  std::string refNamePattern(context.getPipelinePolicy()->getString("objectChunkFileNamePattern"));
709  std::string deltaNamePattern(context.getPipelinePolicy()->getString("objectDeltaChunkFileNamePattern"));
711  ps->set<std::string>("runId", context.getRunId());
712  ChunkVector::size_type numToRead(toRead.size());
713  for (ChunkIterator i(toRead.begin()), end(toRead.end()); i != end; ++i) {
714  Chunk & c = *i;
715  ps->set<int>("chunkId", c.getId());
716  ps->set<int>("stripeId", ZoneStripeChunkDecomposition::chunkToStripe(c.getId()));
717  ps->set<int>("chunkSeqNum", ZoneStripeChunkDecomposition::chunkToSequence(c.getId()));
718  c.read(LogicalLocation(refNamePattern, ps).locString(), false);
719  c.readDelta(LogicalLocation(deltaNamePattern, ps).locString(), false);
720  c.setUsable();
721  }
722  watch.stop();
723  Rec(log, Log::INFO) << "read chunk files" <<
724  Prop<int>("numChunks", static_cast<int>(numToRead)) <<
725  Prop<double>("time", watch.seconds()) << Rec::endr;
726 
727  toRead.clear();
728  ChunkVector::size_type numToWaitFor = toWaitFor.size();
729  if (numToWaitFor > 0) {
730  watch.start();
731  // Wait for chunks that are owned by another visit
732  manager.waitForOwnership(toRead, toWaitFor, context.getVisitId(), context.getDeadline());
733  watch.stop();
734  Rec(log, Log::INFO) << "acquired ownership of pre-existing chunks" <<
735  Prop<int>("numChunks", static_cast<int>(numToWaitFor)) <<
736  Prop<double>("time", watch.seconds()) << Rec::endr;
737 
738  // Read in chunks that were not successfully read by the previous owner
739  watch.start();
740  numToRead = toRead.size();
741  for (ChunkIterator i(toRead.begin()), end(toRead.end()); i != end; ++i) {
742  Chunk & c = *i;
743  ps->set<int>("chunkId", c.getId());
744  ps->set<int>("stripeId", ZoneStripeChunkDecomposition::chunkToStripe(c.getId()));
745  ps->set<int>("chunkSeqNum", ZoneStripeChunkDecomposition::chunkToSequence(c.getId()));
746  c.read(LogicalLocation(refNamePattern, ps).locString(), false);
747  c.readDelta(LogicalLocation(deltaNamePattern, ps).locString(), false);
748  c.setUsable();
749  }
750  watch.stop();
751  Rec(log, Log::INFO) << "read straggling chunks" <<
752  Prop<int>("numChunks", static_cast<int>(numToRead)) <<
753  Prop<double>("time", watch.seconds()) << Rec::endr;
754  }
755 
756  } catch (ex::Exception & except) {
757  Rec(log, Log::FATAL) << except.what() << Rec::endr;
758  manager.failVisit(context.getVisitId());
759  } catch (std::exception & except) {
760  log.log(Log::FATAL, except.what());
761  manager.failVisit(context.getVisitId());
762  } catch (...) {
763  log.log(Log::FATAL, "caught unknown exception");
764  manager.failVisit(context.getVisitId());
765  }
766 }
767 
768 
776  if (!context.debugSharedMemory()) {
777  // if the shared memory object used for chunk storage hasn't yet been unlinked, do so now
779  }
780  SharedObjectChunkManager manager(context.getRunId());
781  if (manager.isVisitInFlight(context.getVisitId())) {
782  try {
783  // Build zone index on objects
784  manager.getChunks(context.getChunks(), context.getChunkIds());
785  context.buildObjectIndex();
786  } catch(...) {
787  manager.endVisit(context.getVisitId(), true);
788  throw;
789  }
790  } else {
791  // One or more workers failed in the load phase - rollback the visit
792  manager.endVisit(context.getVisitId(), true);
793  throw LSST_EXCEPT(ex::RuntimeError,
794  "Association pipeline failed to read Object data for FOV");
795  }
796 }
797 
798 
799 // -- MatchDiaSource stage ----------------
800 
809  MatchPairVector & matches,
810  VisitProcessingContext & context
811 ) {
812  SharedObjectChunkManager manager(context.getRunId());
813 
814  try {
815 
816  matches.clear();
817  matches.reserve(65536);
818 
819  detail::ObjectMatchProcessor<detail::ObjectEntry> mlp(context, matches, context.getFilter());
822 
823  Stopwatch watch(true);
824  std::size_t nm = distanceMatch<
830  >(
831  context.getDiaSourceIndex(),
832  context.getObjectIndex(),
833  context.getMatchRadius()/3600.0, // match routine expects degrees, not arc-seconds
834  pdf,
835  pof,
836  mlp
837  );
838  watch.stop();
839  Log log(Log::getDefaultLog(), "lsst.ap");
840  Rec(log, Log::INFO) << "matched difference sources to objects" <<
841  Prop<int>("numDiaSources", context.getDiaSourceIndex().size()) <<
842  Prop<int>("numObjects", context.getObjectIndex().size()) <<
843  Prop<int>("numMatches", static_cast<int>(nm)) <<
844  Prop<double>("time", watch.seconds()) << Rec::endr;
845 
846  } catch (...) {
847  manager.endVisit(context.getVisitId(), true);
848  throw;
849  }
850 }
851 
852 
853 // -- MatchMop stage ----------------
854 
867  MatchPairVector & matches,
868  IdPairVector & newObjects,
869  VisitProcessingContext & context,
870  lsst::mops::MovingObjectPredictionVector & predictions
871 ) {
872  SharedObjectChunkManager manager(context.getRunId());
873 
874  try {
875  double const ellScale = context.getPipelinePolicy()->getDouble("ellipseScalingFactor");
876  double const smaaClamp = context.getPipelinePolicy()->getDouble("semiMajorAxisClamp");
877  double const smiaClamp = context.getPipelinePolicy()->getDouble("semiMinorAxisClamp");
878 
879  matches.clear();
880  matches.reserve(8192);
881  newObjects.clear();
882  newObjects.reserve(8192);
883 
885 
886  // discard difference sources with known variable matches
887  Stopwatch watch(true);
889  int nr = context.getDiaSourceIndex().pack(dvf);
890  watch.stop();
891  Log log(Log::getDefaultLog(), "lsst.ap");
892  Rec(log, Log::INFO) << "removed difference sources matching known variables from index" <<
893  Prop<int>("numRemoved", nr) <<
894  Prop<double>("time", watch.seconds()) << Rec::endr;
895 
896  // build ellipses required for matching from predictions
897  watch.start();
899  ellipses.reserve(predictions.size());
901  lsst::mops::MovingObjectPredictionVector::iterator const end = predictions.end();
902  for (lsst::mops::MovingObjectPredictionVector::iterator i = predictions.begin(); i != end; ++i) {
903  i->setSemiMajorAxisLength(i->getSemiMajorAxisLength() * ellScale);
904  i->setSemiMinorAxisLength(i->getSemiMinorAxisLength() * ellScale);
905  if (elf(*i)) {
906  // clamp error ellipses if necessary
907  if (smaaClamp > 0.0 && i->getSemiMajorAxisLength() > smaaClamp) {
908  i->setSemiMajorAxisLength(smaaClamp);
909  }
910  if (smiaClamp > 0.0 && i->getSemiMinorAxisLength() > smiaClamp) {
911  i->setSemiMinorAxisLength(smiaClamp);
912  }
913  ellipses.push_back(Ellipse<lsst::mops::MovingObjectPrediction>(*i));
914  }
915  }
916  watch.stop();
917  Rec(log, Log::INFO) << "built list of match parameters for moving object predictions" <<
918  Prop<int>("numPredictions", static_cast<int>(ellipses.size())) <<
919  Prop<double>("time", watch.seconds()) << Rec::endr;
920 
921  // match them against difference sources
924  watch.start();
925  std::size_t nm = ellipseMatch<
926  MovingObjectPrediction,
931  >(
932  ellipses,
933  context.getDiaSourceIndex(),
934  pef,
935  pdf,
936  mpp
937  );
938  watch.stop();
939  Rec(log, Log::INFO) << "matched moving object predictions to difference sources" <<
940  Prop<int>("numPredictions", static_cast<int>(ellipses.size())) <<
941  Prop<int>("numDiaSources", context.getDiaSourceIndex().size()) <<
942  Prop<int>("numMatches", static_cast<int>(nm)) <<
943  Prop<double>("time", watch.seconds()) << Rec::endr;
944 
945  // Create new objects from difference sources with no matches
946  watch.start();
947  detail::NewObjectCreator createObjects(newObjects, context);
948  context.getDiaSourceIndex().apply(createObjects);
949  watch.stop();
950  Rec(log, Log::INFO) << "created new objects" <<
951  Prop<int>("numObjects", static_cast<int>(newObjects.size())) <<
952  Prop<double>("time", watch.seconds()) << Rec::endr;
953  } catch (...) {
954  manager.endVisit(context.getVisitId(), true);
955  throw;
956  }
957 }
958 
959 
960 // -- Store stage ----------------
961 
968 
970  typedef std::vector<Chunk> ChunkVector;
971  typedef std::vector<Chunk>::iterator ChunkIterator;
972 
973  SharedObjectChunkManager manager(context.getRunId());
974  Log log(Log::getDefaultLog(), "lsst.ap");
975  try {
976  Stopwatch watch(true);
977  std::string deltaNamePattern = context.getPipelinePolicy()->getString("objectDeltaChunkFileNamePattern");
979  ps->set<std::string>("runId", context.getRunId());
980  ChunkVector & chunks = context.getChunks();
981  for (ChunkIterator i(chunks.begin()), end(chunks.end()); i != end; ++i) {
982  Chunk & c = *i;
983  ps->set<int>("chunkId", c.getId());
984  ps->set<int>("stripeId", ZoneStripeChunkDecomposition::chunkToStripe(c.getId()));
985  ps->set<int>("chunkSeqNum", ZoneStripeChunkDecomposition::chunkToSequence(c.getId()));
986  std::string file = LogicalLocation(deltaNamePattern, ps).locString();
987  verifyPathName(file);
988  c.writeDelta(file, true, false);
989  }
990  watch.stop();
991  Rec(log, Log::INFO) << "wrote chunk delta files" <<
992  Prop<int>("numChunks", static_cast<int>(chunks.size())) <<
993  Prop<double>("time", watch.seconds()) << Rec::endr;
994  } catch (ex::Exception & except) {
995  Rec(log, Log::FATAL) << except.what() << Rec::endr;
996  manager.failVisit(context.getVisitId());
997  } catch (std::exception & except) {
998  log.log(Log::FATAL, except.what());
999  manager.failVisit(context.getVisitId());
1000  } catch (...) {
1001  log.log(Log::FATAL, "caught unknown exception");
1002  manager.failVisit(context.getVisitId());
1003  }
1004 }
1005 
1006 
1013  SharedObjectChunkManager manager(context.getRunId());
1014  manager.failVisit(context.getVisitId());
1015 }
1016 
1017 
1028 bool endVisit(VisitProcessingContext & context, bool const rollback) {
1029  SharedObjectChunkManager manager(context.getRunId());
1030  bool committed = manager.endVisit(context.getVisitId(), rollback);
1031  Log log(Log::getDefaultLog(), "lsst.ap");
1032  if (committed) {
1033  log.log(Log::INFO, "Committed visit");
1034  } else {
1035  log.log(Log::FATAL, "Rolled back visit");
1036  }
1037  return committed;
1038 }
1039 
1040 
1041 }} // end of namespace lsst::ap
1042 
DiaSourceIndex _diaSourceIndex
Definition: Stages.h:166
A circular region of the unit sphere (sky).
Holds a pair of ids and the distance between the corresponding positions on the unit sphere...
Definition: Results.h:65
ZoneEntry< DiaSourceChunk > DiaSourceEntry
Definition: Stages.cc:116
ChunkMap::iterator ChunkMapIterator
Definition: Stages.cc:287
double _ra
Definition: Object.h:56
Class for managing chunks of Object instances in shared memory.
double _cosDec
cosine of ellipse center dec
Definition: EllipseTypes.h:66
bool endVisit(int const visitId, bool const rollback)
Definition: ChunkManager.h:96
VisitProcessingContext(lsst::pex::policy::Policy::Ptr const policy, lsst::daf::base::PropertySet::Ptr const event, std::string const &runId, int const workerId, int const numWorkers)
Definition: Stages.cc:534
Container for inter-stage association pipeline state.
Definition: Stages.h:69
void computeChunkIds(std::vector< int > &chunkIds, CircularRegion const &region, ZoneStripeChunkDecomposition const &zsc, int const workerId=0, int const numWorkers=1)
Definition: SpatialUtil.cc:308
Ellipse< MovingObjectPrediction > MovingObjectEllipse
Definition: Stages.cc:118
double dx
Definition: ImageUtils.cc:90
double _sinRa
sine of ellipse center ra
Definition: EllipseTypes.h:67
void registerVisit(int const visitId)
Definition: ChunkManager.h:64
double _z
unit vector z coordinate of entity position
Definition: ZoneTypes.h:66
boost::uint32_t _flags
Reserved.
Definition: ZoneTypes.h:61
void operator()(MovingObjectEllipse &ell, DiaSourceEntry &ds)
Definition: Stages.cc:247
CsvWriter & endr(CsvWriter &w)
Definition: Csv.cc:330
A partial representation of a full LSST Object containing only id, position, proper motions...
Definition: Object.h:53
int y
LogRec Rec
Definition: Log.h:838
void storeSliceObjects(VisitProcessingContext &context)
Definition: Stages.cc:967
void insert(double const ra, double const dec, Data *const data, Chunk *const chunk, int const index)
Definition: ZoneTypes.h:229
boost::int64_t _objectId
Definition: Object.h:55
bool operator()(DiaSourceEntry const &ds)
Definition: Stages.cc:263
std::size_t ellipseMatch(EllipseList< FirstEntryT > &first, ZoneIndex< SecondEntryT > &second, FirstFilterT &firstFilter, SecondFilterT &secondFilter, MatchPairProcessorT &matchPairProcessor)
Definition: Match.h:386
Class for logical location of a persisted Persistable instance.
A manager for Object chunks that exist in shared memory.
Definition: ChunkManager.h:45
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
CircularRegion const & getFov() const
Definition: Stages.h:120
std::string const & getRunId() const
Definition: Stages.h:135
SharedObjectChunkManager::ObjectChunk ObjectChunk
Definition: Stages.h:88
boost::shared_ptr< PropertySet > Ptr
Definition: PropertySet.h:90
Contains spatial information for a single point used during cross-matching.
Definition: ZoneTypes.h:54
ZoneStripeChunkDecomposition const & _zsc
Definition: Stages.cc:290
unsigned char ChunkEntryFlag
Definition: Chunk.h:142
void failVisit(VisitProcessingContext &context)
Definition: Stages.cc:1012
double getStripeDecMax(int const stripeId) const
Definition: SpatialUtil.h:181
void apply(FunctionT &function)
Definition: ZoneTypes.cc:321
std::vector< int > const & getChunkIds() const
Definition: Stages.h:95
Spatial crossmatch algorithms.
void operator()(DiaSourceEntry const &entry)
Definition: Stages.cc:316
double getDec() const
Definition: Object.h:72
double getRa() const
Definition: Object.h:68
double _radialVelocity
Definition: Object.h:61
lsst::afw::image::Filter const _filter
Definition: Stages.cc:294
void log(int importance, const std::string &message, const lsst::daf::base::PropertySet &properties)
double _sinDec
sine of ellipse center dec
Definition: EllipseTypes.h:65
void sort()
Sorts each zone in the index (on right ascension)
Definition: ZoneTypes.cc:279
void operator()(DiaSourceEntry &ds, MatchIterator begin, MatchIterator end)
Definition: Stages.cc:219
std::vector< Match >::iterator MatchIterator
Definition: Stages.cc:203
std::size_t distanceMatch(ZoneIndex< FirstEntryT > &first, ZoneIndex< SecondEntryT > &second, double const radius, FirstFilterT &firstFilter, SecondFilterT &secondFilter, MatchListProcessorT &matchListProcessor)
Definition: Match.h:153
double dy
Definition: ImageUtils.cc:90
std::map< int, ObjectChunk > ChunkMap
Definition: Stages.cc:285
lsst::afw::image::Filter getFilter() const
Definition: Stages.h:126
std::vector< MatchPair > MatchPairVector
A list of MatchPair instances.
Definition: Results.h:123
void initialize(std::string const &runId)
Definition: Stages.cc:644
A default &quot;let everything through&quot; filter implementation.
Definition: Match.h:70
a place to record messages and descriptions of the state of processing.
Definition: Log.h:154
Miscellaneous helper methods.
ZoneStripeChunkDecomposition const & getDecomposition() const
Definition: Stages.h:117
Contains spatial information for a single ellipse on the unit sphere (sky).
Definition: EllipseTypes.h:52
MovingObjectPredictionMatchProcessor(MatchPairVector &results)
Definition: Stages.cc:245
void matchMops(MatchPairVector &matches, IdPairVector &newObjects, VisitProcessingContext &context, lsst::mops::MovingObjectPredictionVector &predictions)
Definition: Stages.cc:866
double min
Definition: attributes.cc:216
A list of ellipses, implemented using std::vector.
Definition: EllipseTypes.h:141
ObjectMatchProcessor(VisitProcessingContext &context, MatchPairVector &matches, Filter const filter)
Definition: Stages.cc:209
Filter which discards difference sources matching known variable objects.
Definition: Stages.cc:262
void failVisit(int const visitId)
Definition: ChunkManager.h:67
void setDiaSources(boost::shared_ptr< lsst::afw::detection::PersistableDiaSourceVector > diaSources)
Definition: Stages.cc:584
lsst::afw::image::Filter _filter
Definition: Stages.h:176
ZoneStripeChunkDecomposition const & getDecomposition() const
Definition: ZoneTypes.h:281
definition of the DualLog class
lsst::daf::base::PropertySet PropertySet
Definition: Wcs.cc:57
Container for a sequence of adjacent zones.
Definition: ZoneTypes.h:199
C++ pipeline stage implementation methods.
ZoneEntry< ObjectChunk > ObjectEntry
Definition: Stages.cc:115
double max
Definition: attributes.cc:218
double _decl
Definition: Object.h:57
void buildZoneIndex(ZoneIndex< EntryT > &index, std::vector< typename EntryT::Chunk > const &chunks, double const epoch)
Definition: Stages.cc:383
Data * _data
Pointer to the corresponding data object.
Definition: ZoneTypes.h:58
void registerVisit(VisitProcessingContext &context)
Definition: Stages.cc:654
Include files required for standard LSST Exception handling.
Stores entries inside a single zone (a narrow declination stripe) in a sorted array.
Definition: ZoneTypes.h:113
DataT * _data
pointer to data object (not owned by this object!)
Definition: EllipseTypes.h:55
Records ids of difference sources with no matches.
Definition: Stages.cc:283
double w
Definition: CoaddPsf.cc:57
std::string const & locString(void) const
double _muRa
Definition: Object.h:58
A decomposition of the unit sphere into zones, stripes, and chunks.
Definition: SpatialUtil.h:69
int x
AngleUnit const radians
constant with units of radians
Definition: Angle.h:91
A point on the unit sphere (sky), specified in spherical polar coordinates.
Definition: Point.h:46
SharedObjectChunkManager::ObjectChunk ObjectChunk
Definition: Stages.cc:111
bool endVisit(VisitProcessingContext &context, bool const rollback)
Definition: Stages.cc:1028
TimeSpec & systemTime()
Definition: Time.cc:113
Filter which discards predicted moving objects with large position error ellipses.
Definition: Stages.cc:270
std::pair< boost::int64_t, boost::int64_t > IdPair
Holds a pair of ids.
Definition: Results.h:121
void getChunks(std::vector< ObjectChunk > &chunks, std::vector< int > const &chunkIds)
Definition: ChunkManager.h:89
Contains a pointer to a match and an associated distance.
Definition: Match.h:77
Convenience wrapper for the C library timespec struct and a simple profiling class.
std::vector< IdPair > IdPairVector
A list of IdPair instances.
Definition: Results.h:125
Holds an integer identifier for an LSST filter.
Definition: Filter.h:107
void buildObjectIndex(VisitProcessingContext &context)
Definition: Stages.cc:775
ObjectIndex & getObjectIndex()
Definition: Stages.h:108
double getRadialVelocity() const
Definition: Object.h:88
double _y
unit vector y coordinate of entity position
Definition: ZoneTypes.h:65
Utility class for profiling.
Definition: Time.h:98
void verifyPathName(std::string const &name)
Definition: Utils.cc:49
double _parallax
Definition: Object.h:60
virtual char const * what(void) const
Definition: Exception.cc:112
Processor for lists of objects matching a difference source.
Definition: Stages.cc:198
std::pair< double, double > correctProperMotion(Object const &obj, double const epoch)
Definition: Stages.cc:149
std::vector< Match >::iterator MatchIterator
Definition: Stages.cc:241
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
bool operator()(lsst::mops::MovingObjectPrediction const &p)
Definition: Stages.cc:276
MatchWithDistance< ZoneEntryT > Match
Definition: Stages.cc:202
Processor for matches between moving object predictions and difference sources.
Definition: Stages.cc:238
std::vector< lsst::afw::detection::DiaSource::Ptr > _diaSources
Definition: Stages.h:167
int pack(FilterT &filter)
Definition: ZoneTypes.cc:299
std::vector< ObjectChunk > const & getChunks() const
Definition: Stages.h:98
Class for storing generic metadata.
Definition: PropertySet.h:82
static void destroyInstance(std::string const &name)
double getMatchRadius() const
Definition: Stages.h:138
boost::int64_t const _idNamespace
Definition: Stages.cc:295
void clear()
Removes all entries from every zone in the index.
Definition: ZoneTypes.cc:205
Class for representing points on the sky, with support for random perturbations.
std::vector< ObjectChunk > ObjectChunkVector
Definition: Stages.cc:113
bool debugSharedMemory() const
Definition: Stages.h:156
static int chunkToSequence(int const chunkId)
Definition: SpatialUtil.h:169
afw::table::Key< double > b
static int chunkToStripe(int const chunkId)
Definition: SpatialUtil.h:164
TimeSpec const & getDeadline() const
Definition: Stages.h:123
void setDecBounds(double const minDec, double const maxDec)
Sets the range of declination values the index will accept data for.
Definition: ZoneTypes.cc:225
double getMuDecl() const
Definition: Object.h:80
AngleUnit const degrees
Definition: Angle.h:92
boost::shared_ptr< lsst::pex::policy::Policy > getPipelinePolicy()
Definition: Stages.h:132
NewObjectCreator(IdPairVector &results, VisitProcessingContext &context)
Definition: Stages.cc:297
void matchDiaSources(MatchPairVector &matches, VisitProcessingContext &context)
Definition: Stages.cc:808
double _muDecl
Definition: Object.h:59
ChunkMap::value_type ChunkMapValue
Definition: Stages.cc:286
int size() const
Returns the number of entries in the index.
Definition: ZoneTypes.cc:214
double getMuRa() const
Definition: Object.h:76
Interface for LogicalLocation class.
void loadSliceObjects(VisitProcessingContext &context)
Definition: Stages.cc:666
double _cosRa
cosine of ellipse center ra
Definition: EllipseTypes.h:68
dictionary Point
Definition: __init__.py:38
double _x
unit vector x coordinate of entity position
Definition: ZoneTypes.h:64
double seconds() const
Definition: Time.cc:156
DiscardLargeEllipseFilter(Policy::Ptr const policy)
Definition: Stages.cc:273
boost::int16_t _varProb[NUM_FILTERS]
Definition: Object.h:62
DiaSourceIndex & getDiaSourceIndex()
Definition: Stages.h:111
std::vector< ObjectChunk > _chunks
Definition: Stages.h:164
double getEpoch() const
Definition: Object.h:100
double getParallax() const
Definition: Object.h:84
double getStripeDecMin(int const stripeId) const
Definition: SpatialUtil.h:174
Class encapsulating an identifier for an LSST filter.