46 #include <sys/socket.h>
51 #include <activemq/core/ActiveMQConnectionFactory.h>
53 namespace dafBase = lsst::daf::base;
54 namespace pexExceptions = lsst::pex::exceptions;
58 using std::numeric_limits;
64 EventTransmitter::EventTransmitter(
const std::string& hostName,
const std::string& topicName,
int hostPort) {
67 init(hostName, topicName, hostPort);
78 _topicName = topicName;
91 string brokerUri =
"tcp://"+hostName+
":"+ss.str()+
"?wireFormat=openwire&transport.useAsyncSend=true";
93 activemq::core::ActiveMQConnectionFactory* connectionFactory =
94 new activemq::core::ActiveMQConnectionFactory( brokerUri );
98 _connection = connectionFactory->createConnection();
100 delete connectionFactory;
102 catch (cms::CMSException& e) {
103 delete connectionFactory;
104 std::string msg(
"Failed to connect to broker: ");
105 msg += e.getMessage();
106 msg +=
" (is broker running?)";
107 throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
110 _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
116 _topic =
new activemq::commands::ActiveMQTopic(_topicName);
120 _producer = _session->createProducer(NULL);
121 _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
122 }
catch ( cms::CMSException& e ) {
123 throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string(
"Trouble creating EventTransmitter: ") + e.getMessage());
127 void EventTransmitter::publishEvent(
Event& event) {
129 cms::TextMessage* message = _session->createTextMessage();
131 event.marshall(message);
133 message->setStringProperty(
"TOPIC", _topicName);
137 message->setLongProperty(
"PUBTIME", pubtime);
139 _producer->send(_topic, message);
143 std::string EventTransmitter::getTopicName() {
147 EventTransmitter::~EventTransmitter() {
154 if( _producer != NULL )
156 }
catch ( cms::CMSException& e ) {
163 if( _session != NULL )
165 if( _connection != NULL )
166 _connection->close();
167 }
catch ( cms::CMSException& e ) {
172 if( _session != NULL )
174 }
catch ( cms::CMSException& e ) {
180 if( _connection != NULL )
182 }
catch ( cms::CMSException& e ) {
static DateTime now(void)
defines the EventTransmitter class
Singleton use to make sure the events library is initialized.
Include files required for standard LSST Exception handling.
long long nsecs(Timescale scale=TAI) const
defines the EventLibrary class
Interface for DateTime class.
#define LSST_EXCEPT(type,...)
information about the Event Broker
Representation of an LSST Event.
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Interface for PropertySet class.
defines the EventSystem class