58 #include "boost/format.hpp"
59 #include "boost/scoped_array.hpp"
65 #include "lsst/afw/detection/DiaSource.h"
67 #include "lsst/mops/MovingObjectPrediction.h"
82 using lsst::afw::detection::DiaSource;
83 using lsst::afw::detection::DiaSourceSet;
84 using lsst::afw::detection::PersistableDiaSourceVector;
86 using lsst::mops::MovingObjectPrediction;
88 namespace ex = lsst::pex::exceptions;
91 namespace lsst {
namespace ap {
95 static boost::uint32_t
const HAS_MATCH = 1;
96 static boost::uint32_t
const HAS_KNOWN_VARIABLE_MATCH = 2;
97 static boost::uint32_t
const HAS_MOVING_OBJECT_MATCH = 4;
98 static char const *
const VAR_PROB_THRESH_KEY[6] = {
122 #if defined(__GNUC__) && __GNUC__ > 3
123 # pragma GCC visibility push(hidden)
139 #if defined(__GNUC__) && __GNUC__ > 3
140 # pragma GCC visibility pop
150 static double const RAD_PER_MAS = (RADIANS_PER_DEGREE/360000.0);
152 static double const SCALE = RAD_PER_MAS*(365.25*86400/149597870.691);
158 double const sinDecl = std::sin(decl);
159 double const cosDecl = std::cos(decl);
160 double const sinRa = std::sin(ra);
161 double const cosRa = std::cos(ra);
162 double x = cosRa * cosDecl;
163 double y = sinRa * cosDecl;
167 double const pmRa = obj.
getMuRa()*RAD_PER_MAS/cosDecl;
168 double const pmDecl = obj.
getMuDecl()*RAD_PER_MAS;
172 double const mx = - pmRa*y - pmDecl*cosRa + x*
w;
173 double const my = pmRa*x - pmDecl*sinRa + y*
w;
174 double const mz = pmDecl*cosDecl + z*
w;
177 double const dt = (epoch - obj.
getEpoch()) * (1/365.25);
184 double d2 = x*x + y*
y;
185 ra = (d2 == 0.0) ? 0 :
degrees(std::atan2(y, x));
186 decl = (z == 0.0) ? 0 :
degrees(std::atan2(z, std::sqrt(d2)));
190 return std::make_pair(ra, decl);
197 template <
typename ZoneEntryT>
216 _threshold(context.getPipelinePolicy()->getInt(VAR_PROB_THRESH_KEY[filter.getId()]))
220 boost::uint32_t flags = HAS_MATCH;
222 ZoneEntryT *
const obj = begin->_match;
223 double const dist =
degrees(begin->_distance);
226 _matches.push_back(
MatchPair(ds.
_data->getId(), obj->_data->getId(), dist));
227 if (obj->_data->getVarProb(_filter) >= _threshold) {
229 flags |= HAS_KNOWN_VARIABLE_MATCH;
231 }
while (begin != end);
248 ds.
_flags |= HAS_MOVING_OBJECT_MATCH;
252 double dist = 2.0*std::asin(0.5*std::sqrt(dx*dx + dy*dy + dz*dz));
264 return (ds.
_flags & HAS_KNOWN_VARIABLE_MATCH) == 0;
274 semiMajorAxisThreshold(policy->getDouble(
"semiMajorAxisThreshold")) {}
276 bool operator()(lsst::mops::MovingObjectPrediction
const & p) {
277 return p.getSemiMajorAxisLength() < semiMajorAxisThreshold;
302 _zsc(context.getDecomposition()),
303 _fovCen(context.getFov().getCenterRa(), context.getFov().getCenterDec()),
304 _fovRad(context.getFov().getRadius()),
306 _filter(context.getFilter()),
307 _idNamespace(static_cast<boost::int64_t>(context.getFilter().getId() + 1) << 56)
311 for (ObjectChunkVector::iterator i(chunks.begin()), end(chunks.end()); i != end; ++i) {
318 static boost::int64_t
const SHAPE_DIFFERS_IN_BOTH_EXPOSURES_MASK = 1 << 2;
319 static boost::int64_t
const POSITIVE_FLUX_EXCURSION_MASK = (1 << 3) | (1<< 4);
320 static boost::int64_t
const idLimit = INT64_C(1) << 56;
322 if ((entry.
_data->getDiaSourceToId() & (1 << 30)) != 0) {
325 boost::int64_t classFlags = entry.
_data->getFlagClassification();
327 if (((classFlags & SHAPE_DIFFERS_IN_BOTH_EXPOSURES_MASK) != 0) &&
328 ((classFlags & POSITIVE_FLUX_EXCURSION_MASK) != 0)) {
335 if ((entry.
_flags & (HAS_MATCH | HAS_KNOWN_VARIABLE_MATCH)) == 0) {
337 boost::int64_t
id = entry.
_data->getId();
339 throw LSST_EXCEPT(ex::RangeError,
"DiaSource id doesn't fit in 56 bits");
358 obj.
_varProb[_filter.getId()] = 100;
363 int const chunkId = _zsc.radecToChunk(obj.
_ra, obj.
_decl);
365 if (c == _chunks.end()) {
367 (
boost::format(
"new object from DIASource %1% ra,dec=(%2%, %3%) x,y=(%4%, %5%) "
368 "not in any chunk overlapping FOV with center (%6%, %7%), "
369 "radius=%8%: new object would go to chunk %9%; distance to FOV "
370 "center is %10% deg") %
id % obj.
_ra % obj.
_decl %
371 entry.
_data->getXAstrom() % entry.
_data->getYAstrom() % _fovCen._ra %
372 _fovCen._dec % _fovRad % chunkId % _fovCen.distance(
Point(obj.
_ra, obj.
_decl))).str());
374 c->second.insert(obj);
382 template <
typename EntryT>
385 std::vector<typename EntryT::Chunk>
const & chunks,
388 typedef typename EntryT::Data Data;
389 typedef typename EntryT::Chunk Chunk;
390 typedef std::vector<Chunk> ChunkVector;
391 typedef typename ChunkVector::const_iterator ChunkIterator;
392 typedef typename ChunkVector::size_type Size;
395 if (chunks.empty()) {
403 int minStripe = 0x7FFFFFFF;
404 int maxStripe = -1 - minStripe;
405 ChunkIterator
const end(chunks.end());
406 for (ChunkIterator c(chunks.begin()); c != end; ++c) {
408 if (stripeId > maxStripe) {
409 maxStripe = stripeId;
411 if (stripeId < minStripe) {
412 minStripe = stripeId;
415 assert(maxStripe >= minStripe &&
"invalid stripe bounds for chunk list");
418 int const numStripes = maxStripe - minStripe + 1;
419 boost::scoped_array<ChunkVector> stripes(
new ChunkVector[numStripes]);
420 for (ChunkIterator c(chunks.begin()); c != end; ++c) {
422 assert(stripeId >= minStripe && stripeId <= maxStripe &&
"stripe id out of bounds");
423 stripes[stripeId - minStripe].push_back(*c);
440 for (
int s = 0; s < numStripes; ++s) {
441 ChunkVector & vec = stripes[s];
442 Size
const numChunks = vec.size();
445 for (Size c = 0; c < numChunks; ++c) {
446 Chunk *
const ch = &vec[c];
447 int const numBlocks = ch->blocks();
451 for (
int b = 0;
b < numBlocks; ++
b) {
452 int const numEntries = ch->entries(
b);
453 Data *
const block = ch->getBlock(
b);
457 for (
int e = 0; e < numEntries; ++e, ++i) {
458 if ((flags[e] & Chunk::DELETED) == 0) {
460 index.
insert(pos.first, pos.second, &block[e], ch, i);
468 throw LSST_EXCEPT(ex::RuntimeError,
"Failed to build zone index");
472 Log log(Log::getDefaultLog(),
"lsst.ap");
473 int numElements =
static_cast<int>(index.
size());
474 Rec(log, Log::INFO) <<
"inserted elements into zone index" <<
482 Rec(log, Log::INFO) <<
"sorted zone index" <<
492 #if defined(__GNUC__) && __GNUC__ > 3
493 # pragma GCC visibility push(hidden)
495 template class detail::ObjectMatchProcessor<detail::ObjectEntry>;
501 PassthroughFilter<detail::DiaSourceEntry>,
502 PassthroughFilter<detail::ObjectEntry>,
503 detail::ObjectMatchProcessor<detail::ObjectEntry>
505 ZoneIndex<detail::DiaSourceEntry> &,
506 ZoneIndex<detail::ObjectEntry> &,
508 PassthroughFilter<detail::DiaSourceEntry> &,
509 PassthroughFilter<detail::ObjectEntry> &,
510 detail::ObjectMatchProcessor<detail::ObjectEntry> &
514 MovingObjectPrediction,
516 PassthroughFilter<detail::MovingObjectEllipse>,
517 PassthroughFilter<detail::DiaSourceEntry>,
518 detail::MovingObjectPredictionMatchProcessor
520 EllipseList<MovingObjectPrediction> &,
521 ZoneIndex<detail::DiaSourceEntry> &,
522 PassthroughFilter<detail::MovingObjectEllipse> &,
523 PassthroughFilter<detail::DiaSourceEntry> &,
524 detail::MovingObjectPredictionMatchProcessor &
527 #if defined(__GNUC__) && __GNUC__ > 3
528 # pragma GCC visibility pop
535 Policy::Ptr
const policy,
537 std::string
const & runId,
541 lsst::daf::base::Citizen(typeid(*this)),
545 _objectIndex(policy->getInt(
"zonesPerDegree"),
546 policy->getInt(
"zonesPerStripe"),
547 policy->getInt(
"maxEntriesPerZoneEstimate")),
548 _diaSourceIndex(policy->getInt(
"zonesPerDegree"),
549 policy->getInt(
"zonesPerStripe"),
550 policy->getInt(
"maxEntriesPerZoneEstimate")),
555 _matchRadius(policy->getDouble(
"matchRadius")),
556 _ellipseScalingFactor(policy->getDouble(
"ellipseScalingFactor")),
559 _numWorkers(numWorkers),
560 _debugSharedMemory(policy->getBool(
"debugSharedMemory"))
562 double ra =
event->getAsDouble(
"ra");
563 double dec =
event->getAsDouble(
"decl");
565 _visitId =
event->getAsInt(
"visitId");
566 if (event->exists(
"matchRadius")) {
575 std::string filterName =
event->getAsString(
"filter");
587 int const sz =
static_cast<int>(
_diaSources.size());
592 double minDec = 90.0;
593 double maxDec = -90.0;
594 for (
int i = 0; i < sz; ++i) {
603 assert(maxDec >= minDec &&
"invalid dec bounds for DiaSource list");
606 Log log(Log::getDefaultLog(),
"lsst.ap");
607 Rec(log, Log::INFO) <<
"set dec bounds for difference source index" <<
612 for (
int i = 0; i < sz; ++i) {
621 Rec(log, Log::INFO) <<
"inserted difference sources into zone index" <<
627 Rec(log, Log::INFO) <<
"sorted difference source zone index" <<
669 typedef std::vector<Chunk> ChunkVector;
670 typedef std::vector<Chunk>::iterator ChunkIterator;
673 Log log(Log::getDefaultLog(),
"lsst.ap");
687 Rec(log, Log::INFO) <<
"computed chunk ids in FOV for worker " <<
692 std::vector<Chunk> toRead;
693 std::vector<Chunk> toWaitFor;
697 Rec(log, Log::INFO) <<
"started processing visit" <<
701 ChunkVector & chunks = context.
getChunks();
703 chunks.insert(chunks.end(), toRead.begin(), toRead.end());
704 chunks.insert(chunks.end(), toWaitFor.begin(), toWaitFor.end());
708 std::string refNamePattern(context.
getPipelinePolicy()->getString(
"objectChunkFileNamePattern"));
709 std::string deltaNamePattern(context.
getPipelinePolicy()->getString(
"objectDeltaChunkFileNamePattern"));
711 ps->set<std::string>(
"runId", context.
getRunId());
712 ChunkVector::size_type numToRead(toRead.size());
713 for (ChunkIterator i(toRead.begin()), end(toRead.end()); i != end; ++i) {
715 ps->set<
int>(
"chunkId", c.getId());
723 Rec(log, Log::INFO) <<
"read chunk files" <<
724 Prop<int>(
"numChunks",
static_cast<int>(numToRead)) <<
728 ChunkVector::size_type numToWaitFor = toWaitFor.size();
729 if (numToWaitFor > 0) {
734 Rec(log, Log::INFO) <<
"acquired ownership of pre-existing chunks" <<
735 Prop<int>(
"numChunks",
static_cast<int>(numToWaitFor)) <<
740 numToRead = toRead.size();
741 for (ChunkIterator i(toRead.begin()), end(toRead.end()); i != end; ++i) {
743 ps->set<
int>(
"chunkId", c.getId());
751 Rec(log, Log::INFO) <<
"read straggling chunks" <<
752 Prop<int>(
"numChunks",
static_cast<int>(numToRead)) <<
759 }
catch (std::exception & except) {
760 log.log(Log::FATAL, except.what());
763 log.log(Log::FATAL,
"caught unknown exception");
781 if (manager.isVisitInFlight(context.
getVisitId())) {
794 "Association pipeline failed to read Object data for FOV");
817 matches.reserve(65536);
839 Log log(Log::getDefaultLog(),
"lsst.ap");
840 Rec(log, Log::INFO) <<
"matched difference sources to objects" <<
843 Prop<int>(
"numMatches", static_cast<int>(nm)) <<
870 lsst::mops::MovingObjectPredictionVector & predictions
875 double const ellScale = context.
getPipelinePolicy()->getDouble(
"ellipseScalingFactor");
876 double const smaaClamp = context.
getPipelinePolicy()->getDouble(
"semiMajorAxisClamp");
877 double const smiaClamp = context.
getPipelinePolicy()->getDouble(
"semiMinorAxisClamp");
880 matches.reserve(8192);
882 newObjects.reserve(8192);
891 Log log(Log::getDefaultLog(),
"lsst.ap");
892 Rec(log, Log::INFO) <<
"removed difference sources matching known variables from index" <<
899 ellipses.reserve(predictions.size());
901 lsst::mops::MovingObjectPredictionVector::iterator
const end = predictions.end();
902 for (lsst::mops::MovingObjectPredictionVector::iterator i = predictions.begin(); i != end; ++i) {
903 i->setSemiMajorAxisLength(i->getSemiMajorAxisLength() * ellScale);
904 i->setSemiMinorAxisLength(i->getSemiMinorAxisLength() * ellScale);
907 if (smaaClamp > 0.0 && i->getSemiMajorAxisLength() > smaaClamp) {
908 i->setSemiMajorAxisLength(smaaClamp);
910 if (smiaClamp > 0.0 && i->getSemiMinorAxisLength() > smiaClamp) {
911 i->setSemiMinorAxisLength(smiaClamp);
917 Rec(log, Log::INFO) <<
"built list of match parameters for moving object predictions" <<
918 Prop<int>(
"numPredictions",
static_cast<int>(ellipses.size())) <<
926 MovingObjectPrediction,
939 Rec(log, Log::INFO) <<
"matched moving object predictions to difference sources" <<
940 Prop<int>(
"numPredictions",
static_cast<int>(ellipses.size())) <<
942 Prop<int>(
"numMatches", static_cast<int>(nm)) <<
950 Rec(log, Log::INFO) <<
"created new objects" <<
951 Prop<int>(
"numObjects",
static_cast<int>(newObjects.size())) <<
970 typedef std::vector<Chunk> ChunkVector;
971 typedef std::vector<Chunk>::iterator ChunkIterator;
974 Log log(Log::getDefaultLog(),
"lsst.ap");
977 std::string deltaNamePattern = context.
getPipelinePolicy()->getString(
"objectDeltaChunkFileNamePattern");
979 ps->set<std::string>(
"runId", context.
getRunId());
980 ChunkVector & chunks = context.
getChunks();
981 for (ChunkIterator i(chunks.begin()), end(chunks.end()); i != end; ++i) {
983 ps->set<
int>(
"chunkId", c.getId());
988 c.writeDelta(file,
true,
false);
991 Rec(log, Log::INFO) <<
"wrote chunk delta files" <<
992 Prop<int>(
"numChunks",
static_cast<int>(chunks.size())) <<
997 }
catch (std::exception & except) {
998 log.log(Log::FATAL, except.what());
1001 log.log(Log::FATAL,
"caught unknown exception");
1031 Log log(Log::getDefaultLog(),
"lsst.ap");
1033 log.
log(Log::INFO,
"Committed visit");
1035 log.
log(Log::FATAL,
"Rolled back visit");
DiaSourceIndex _diaSourceIndex
A circular region of the unit sphere (sky).
Holds a pair of ids and the distance between the corresponding positions on the unit sphere...
ZoneEntry< DiaSourceChunk > DiaSourceEntry
ChunkMap::iterator ChunkMapIterator
Class for managing chunks of Object instances in shared memory.
double _cosDec
cosine of ellipse center dec
bool endVisit(int const visitId, bool const rollback)
VisitProcessingContext(lsst::pex::policy::Policy::Ptr const policy, lsst::daf::base::PropertySet::Ptr const event, std::string const &runId, int const workerId, int const numWorkers)
Container for inter-stage association pipeline state.
void computeChunkIds(std::vector< int > &chunkIds, CircularRegion const ®ion, ZoneStripeChunkDecomposition const &zsc, int const workerId=0, int const numWorkers=1)
Ellipse< MovingObjectPrediction > MovingObjectEllipse
double _sinRa
sine of ellipse center ra
void registerVisit(int const visitId)
double _z
unit vector z coordinate of entity position
boost::uint32_t _flags
Reserved.
void operator()(MovingObjectEllipse &ell, DiaSourceEntry &ds)
CsvWriter & endr(CsvWriter &w)
A partial representation of a full LSST Object containing only id, position, proper motions...
void storeSliceObjects(VisitProcessingContext &context)
void insert(double const ra, double const dec, Data *const data, Chunk *const chunk, int const index)
bool operator()(DiaSourceEntry const &ds)
std::size_t ellipseMatch(EllipseList< FirstEntryT > &first, ZoneIndex< SecondEntryT > &second, FirstFilterT &firstFilter, SecondFilterT &secondFilter, MatchPairProcessorT &matchPairProcessor)
Class for logical location of a persisted Persistable instance.
A manager for Object chunks that exist in shared memory.
a container for holding hierarchical configuration data in memory.
CircularRegion const & getFov() const
std::string const & getRunId() const
SharedObjectChunkManager::ObjectChunk ObjectChunk
boost::shared_ptr< PropertySet > Ptr
Contains spatial information for a single point used during cross-matching.
ZoneStripeChunkDecomposition const & _zsc
unsigned char ChunkEntryFlag
void failVisit(VisitProcessingContext &context)
double getStripeDecMax(int const stripeId) const
void apply(FunctionT &function)
std::vector< int > const & getChunkIds() const
Spatial crossmatch algorithms.
void operator()(DiaSourceEntry const &entry)
lsst::afw::image::Filter const _filter
void log(int importance, const std::string &message, const lsst::daf::base::PropertySet &properties)
double _sinDec
sine of ellipse center dec
void sort()
Sorts each zone in the index (on right ascension)
void operator()(DiaSourceEntry &ds, MatchIterator begin, MatchIterator end)
std::vector< Match >::iterator MatchIterator
std::size_t distanceMatch(ZoneIndex< FirstEntryT > &first, ZoneIndex< SecondEntryT > &second, double const radius, FirstFilterT &firstFilter, SecondFilterT &secondFilter, MatchListProcessorT &matchListProcessor)
MatchPairVector & _matches
std::map< int, ObjectChunk > ChunkMap
double semiMajorAxisThreshold
lsst::afw::image::Filter getFilter() const
std::vector< MatchPair > MatchPairVector
A list of MatchPair instances.
void initialize(std::string const &runId)
A default "let everything through" filter implementation.
a place to record messages and descriptions of the state of processing.
Miscellaneous helper methods.
ZoneStripeChunkDecomposition const & getDecomposition() const
Contains spatial information for a single ellipse on the unit sphere (sky).
MovingObjectPredictionMatchProcessor(MatchPairVector &results)
void matchMops(MatchPairVector &matches, IdPairVector &newObjects, VisitProcessingContext &context, lsst::mops::MovingObjectPredictionVector &predictions)
A list of ellipses, implemented using std::vector.
ObjectMatchProcessor(VisitProcessingContext &context, MatchPairVector &matches, Filter const filter)
Filter which discards difference sources matching known variable objects.
void failVisit(int const visitId)
void setDiaSources(boost::shared_ptr< lsst::afw::detection::PersistableDiaSourceVector > diaSources)
lsst::afw::image::Filter _filter
ZoneStripeChunkDecomposition const & getDecomposition() const
definition of the DualLog class
lsst::daf::base::PropertySet PropertySet
Container for a sequence of adjacent zones.
C++ pipeline stage implementation methods.
ZoneEntry< ObjectChunk > ObjectEntry
Manager::Chunk ObjectChunk
void buildZoneIndex(ZoneIndex< EntryT > &index, std::vector< typename EntryT::Chunk > const &chunks, double const epoch)
Data * _data
Pointer to the corresponding data object.
void registerVisit(VisitProcessingContext &context)
Include files required for standard LSST Exception handling.
Stores entries inside a single zone (a narrow declination stripe) in a sorted array.
DataT * _data
pointer to data object (not owned by this object!)
Records ids of difference sources with no matches.
std::string const & locString(void) const
A decomposition of the unit sphere into zones, stripes, and chunks.
AngleUnit const radians
constant with units of radians
A point on the unit sphere (sky), specified in spherical polar coordinates.
SharedObjectChunkManager::ObjectChunk ObjectChunk
int getNumWorkers() const
bool endVisit(VisitProcessingContext &context, bool const rollback)
Filter which discards predicted moving objects with large position error ellipses.
std::pair< boost::int64_t, boost::int64_t > IdPair
Holds a pair of ids.
void getChunks(std::vector< ObjectChunk > &chunks, std::vector< int > const &chunkIds)
Contains a pointer to a match and an associated distance.
Convenience wrapper for the C library timespec struct and a simple profiling class.
std::vector< IdPair > IdPairVector
A list of IdPair instances.
~VisitProcessingContext()
Holds an integer identifier for an LSST filter.
void buildObjectIndex(VisitProcessingContext &context)
ObjectIndex & getObjectIndex()
double getRadialVelocity() const
double _y
unit vector y coordinate of entity position
Utility class for profiling.
void verifyPathName(std::string const &name)
virtual char const * what(void) const
Processor for lists of objects matching a difference source.
std::pair< double, double > correctProperMotion(Object const &obj, double const epoch)
std::vector< Match >::iterator MatchIterator
#define LSST_EXCEPT(type,...)
bool operator()(lsst::mops::MovingObjectPrediction const &p)
MatchWithDistance< ZoneEntryT > Match
Processor for matches between moving object predictions and difference sources.
std::vector< lsst::afw::detection::DiaSource::Ptr > _diaSources
int pack(FilterT &filter)
std::vector< ObjectChunk > const & getChunks() const
Class for storing generic metadata.
static void destroyInstance(std::string const &name)
double getMatchRadius() const
boost::int64_t const _idNamespace
void clear()
Removes all entries from every zone in the index.
Class for representing points on the sky, with support for random perturbations.
std::vector< ObjectChunk > ObjectChunkVector
bool debugSharedMemory() const
static int chunkToSequence(int const chunkId)
afw::table::Key< double > b
static int chunkToStripe(int const chunkId)
TimeSpec const & getDeadline() const
void setDecBounds(double const minDec, double const maxDec)
Sets the range of declination values the index will accept data for.
boost::shared_ptr< lsst::pex::policy::Policy > getPipelinePolicy()
NewObjectCreator(IdPairVector &results, VisitProcessingContext &context)
void matchDiaSources(MatchPairVector &matches, VisitProcessingContext &context)
ChunkMap::value_type ChunkMapValue
int size() const
Returns the number of entries in the index.
Interface for LogicalLocation class.
void loadSliceObjects(VisitProcessingContext &context)
double _cosRa
cosine of ellipse center ra
double _x
unit vector x coordinate of entity position
MatchPairVector & _results
DiscardLargeEllipseFilter(Policy::Ptr const policy)
boost::int16_t _varProb[NUM_FILTERS]
DiaSourceIndex & getDiaSourceIndex()
std::vector< ObjectChunk > _chunks
double getParallax() const
double getStripeDecMin(int const stripeId) const
Class encapsulating an identifier for an LSST filter.