46 #include <sys/socket.h>
51 #include <activemq/core/ActiveMQConnectionFactory.h>
53 namespace dafBase = lsst::daf::base;
54 namespace pexExceptions = lsst::pex::exceptions;
58 using std::numeric_limits;
64 EventTransmitter::EventTransmitter(
const std::string& hostName,
const std::string& topicName,
int hostPort) {
67 init(hostName, topicName, hostPort);
78 _topicName = topicName;
91 string brokerUri =
"tcp://"+hostName+
":"+ss.str()+
"?wireFormat=openwire&transport.useAsyncSend=true";
93 activemq::core::ActiveMQConnectionFactory* connectionFactory =
94 new activemq::core::ActiveMQConnectionFactory( brokerUri );
98 _connection = connectionFactory->createConnection();
100 delete connectionFactory;
102 catch (cms::CMSException& e) {
103 delete connectionFactory;
104 std::string msg(
"Failed to connect to broker: ");
105 msg += e.getMessage();
106 msg +=
" (is broker running?)";
107 throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
110 _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
116 _topic =
new activemq::commands::ActiveMQTopic(_topicName);
120 _producer = _session->createProducer(NULL);
121 _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
122 }
catch ( cms::CMSException& e ) {
123 throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string(
"Trouble creating EventTransmitter: ") + e.getMessage());
127 void EventTransmitter::publishEvent(
Event& event) {
129 cms::TextMessage* message = _session->createTextMessage();
131 event.marshall(message);
133 message->setStringProperty(
"TOPIC", _topicName);
137 message->setLongProperty(
"PUBTIME", pubtime);
139 _producer->send(_topic, message);
143 std::string EventTransmitter::getTopicName() {
147 EventTransmitter::~EventTransmitter() {
154 if( _producer != NULL )
156 }
catch ( cms::CMSException& e ) {
163 if( _session != NULL )
165 if( _connection != NULL )
166 _connection->close();
167 }
catch ( cms::CMSException& e ) {
172 if( _session != NULL )
174 }
catch ( cms::CMSException& e ) {
180 if( _connection != NULL )
182 }
catch ( cms::CMSException& e ) {
static DateTime now(void)
defines the EventTransmitter class
Singleton use to make sure the events library is initialized.
long long nsecs(Timescale scale=TAI) const
defines the EventLibrary class
Interface for DateTime class.
#define LSST_EXCEPT(type,...)
information about the Event Broker
Representation of an LSST Event.
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Interface for PropertySet class.
defines the EventSystem class
Include files required for standard LSST Exception handling.