50 #include <sys/socket.h>
55 #include <activemq/core/ActiveMQConnectionFactory.h>
57 namespace dafBase = lsst::daf::base;
58 namespace pexExceptions = lsst::pex::exceptions;
59 namespace pexLogging = lsst::pex::logging;
63 using std::numeric_limits;
100 _turnEventsOff = policy.
getBool(
"turnEventsoff");
102 _turnEventsOff =
false;
104 if (_turnEventsOff ==
true)
107 if (!policy.
exists(
"topicName")) {
108 throw LSST_EXCEPT(pexExceptions::NotFoundError,
"topicName not found in policy");
110 _topicName = policy.
getString(
"topicName");
112 std::string hostName;
116 throw LSST_EXCEPT(pexExceptions::NotFoundError,
"hostName not found in policy");
120 hostPort = policy.
getInt(
"hostPort");
122 hostPort = EventBroker::DEFAULTHOSTPORT;
124 init(hostName, _topicName, hostPort);
136 EventTransmitter::EventTransmitter(
const std::string& hostName,
const std::string& topicName,
int hostPort) {
139 _turnEventsOff =
false;
140 init(hostName, topicName, hostPort);
150 _topicName = topicName;
153 if (_turnEventsOff ==
true)
158 std::stringstream ss;
166 string brokerUri =
"tcp://"+hostName+
":"+ss.str()+
"?wireFormat=openwire&transport.useAsyncSend=true";
168 activemq::core::ActiveMQConnectionFactory* connectionFactory =
169 new activemq::core::ActiveMQConnectionFactory( brokerUri );
173 _connection = connectionFactory->createConnection();
174 _connection->start();
175 delete connectionFactory;
177 catch (cms::CMSException& e) {
178 delete connectionFactory;
179 std::string msg(
"Failed to connect to broker: ");
180 msg += e.getMessage();
181 msg +=
" (is broker running?)";
182 throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
185 _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
191 _topic =
new activemq::commands::ActiveMQTopic(_topicName);
195 _producer = _session->createProducer(NULL);
196 _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
197 }
catch ( cms::CMSException& e ) {
198 throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string(
"Trouble creating EventTransmitter: ") + e.getMessage());
202 void EventTransmitter::publishEvent(
Event& event) {
204 cms::TextMessage* message = _session->createTextMessage();
206 event.marshall(message);
208 message->setStringProperty(
"TOPIC", _topicName);
212 message->setLongProperty(
"PUBTIME", pubtime);
214 _producer->send(_topic, message);
220 std::string EventTransmitter::getTopicName() {
226 EventTransmitter::~EventTransmitter() {
242 if( _producer != NULL )
244 }
catch ( cms::CMSException& e ) {
251 if( _session != NULL )
253 if( _connection != NULL )
254 _connection->close();
255 }
catch ( cms::CMSException& e ) {
260 if( _session != NULL )
262 }
catch ( cms::CMSException& e ) {
268 if( _connection != NULL )
270 }
catch ( cms::CMSException& e ) {
static DateTime now(void)
int getInt(const std::string &name) const
defines the EventTransmitter class
bool exists(const std::string &name) const
Singleton use to make sure the events library is initialized.
bool getBool(const std::string &name) const
a container for holding hierarchical configuration data in memory.
const std::string getString(const std::string &name) const
long long nsecs(Timescale scale=TAI) const
definition of the LogRecord, RecordProperty and Prop classes
Include files required for standard LSST Exception handling.
defines the EventLibrary class
Defines the (deprecated) Component class.
Interface for DateTime class.
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
#define LSST_EXCEPT(type,...)
defines information pertaining to the Event Broker
Representation of an LSST Event.
Interface for PropertySet class.
defines the EventSystem class