48 #include <sys/socket.h>
53 #include <activemq/core/ActiveMQConnectionFactory.h>
54 #include <activemq/exceptions/ActiveMQException.h>
56 namespace pexPolicy = lsst::pex::policy;
57 namespace pexExceptions = lsst::pex::exceptions;
59 namespace activemqCore = activemq::core;
85 if (!policy.
exists(
"topicName")) {
86 throw LSST_EXCEPT(pexExceptions::NotFoundError,
"topicName not found in policy");
89 std::string topicName = policy.
getString(
"topicName");
96 if (!policy.
exists(
"hostName")) {
97 throw LSST_EXCEPT(pexExceptions::NotFoundError,
"hostName not found in policy");
100 std::string hostName = policy.
getString(
"hostName");
103 hostPort = policy.
getInt(
"hostPort");
125 init(hostName, topicName,
"", hostPort);
138 init(hostName, topicName, selector, hostPort);
144 void EventReceiver::init(
const std::string& hostName,
const std::string& topicName,
const std::string& selector,
int hostPort) {
158 std::stringstream ss;
162 string jmsURL =
"tcp://"+hostName+
":"+ss.str()+
"?wireFormat=openwire";
164 activemqCore::ActiveMQConnectionFactory* connectionFactory =
165 new activemqCore::ActiveMQConnectionFactory( jmsURL );
169 _connection = connectionFactory->createConnection();
171 delete connectionFactory;
173 catch (cms::CMSException& e) {
174 delete connectionFactory;
175 std::string msg(
"Failed to connect to broker: ");
176 msg += e.getMessage();
177 msg +=
" (is broker running?)";
178 throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
190 }
catch ( cms::CMSException& e ) {
191 throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string(
"Trouble creating EventReceiver: ") + e.getMessage());
215 cms::TextMessage* textMessage;
217 cms::Message* msg =
_consumer->receive(timeout);
218 if (msg == NULL)
return NULL;
219 textMessage =
dynamic_cast<cms::TextMessage*
>(msg);
220 if (textMessage == NULL)
221 throw LSST_EXCEPT(pexExceptions::RuntimeError,
"Unexpected JMS Message type");
222 }
catch (activemq::exceptions::ActiveMQException& e) {
223 throw LSST_EXCEPT(pexExceptions::RuntimeError, e.getMessage());
247 }
catch ( cms::CMSException& e ) {
255 }
catch ( cms::CMSException& e ) {
264 }
catch ( cms::CMSException& e ) {
270 }
catch ( cms::CMSException& e ) {
277 }
catch ( cms::CMSException& e ) {
285 }
catch ( cms::CMSException& e ) {
int getInt(const std::string &name) const
bool exists(const std::string &name) const
Singleton use to make sure the events library is initialized.
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
create LSST Events from JMS Messages
bool getBool(const std::string &name) const
EventReceiver(const pexPolicy::Policy &policy)
Receives events based on Policy file contents.
a container for holding hierarchical configuration data in memory.
boost::shared_ptr< PropertySet > Ptr
const std::string getString(const std::string &name) const
cms::MessageConsumer * _consumer
static const int DEFAULTHOSTPORT
static Event * createEvent(cms::TextMessage *msg)
return an Event object, based on the type received in the TextMessage.
definition of the LogRecord, RecordProperty and Prop classes
defines the EventFactory class
Include files required for standard LSST Exception handling.
defines the EventLibrary class
cms::Connection * _connection
Defines the (deprecated) Component class.
virtual ~EventReceiver()
destructor method
static const long infiniteTimeout
Interface for DateTime class.
cms::Destination * _destination
defines the EventReceiver class
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
#define LSST_EXCEPT(type,...)
defines information pertaining to the Event Broker
std::string getTopicName()
returns the topic for this EventReceiver
Representation of an LSST Event.
Interface for PropertySet class.
Event * receiveEvent()
wait until an event is received.
defines the EventSystem class