LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes | List of all members
lsst.ctrl.events::EventReceiver Class Reference

Receive events from the event bus. More...

#include <EventReceiver.h>

Public Member Functions

 EventReceiver (const pexPolicy::Policy &policy)
 Receives events based on Policy file contents. More...
 
 EventReceiver (const std::string &hostName, const std::string &topicName, int hostPort=EventBroker::DEFAULTHOSTPORT)
 Receives events from the specified host and topic. More...
 
 EventReceiver (const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort=EventBroker::DEFAULTHOSTPORT)
 Receives events from the specified host and topic. More...
 
virtual ~EventReceiver ()
 destructor method More...
 
EventreceiveEvent ()
 wait until an event is received. More...
 
EventreceiveEvent (long timeout)
 wait for a length of time for an event to be received. More...
 
std::string getTopicName ()
 returns the topic for this EventReceiver More...
 

Static Public Attributes

static const long infiniteTimeout = -1
 

Private Member Functions

void init (const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
 

Private Attributes

cms::Connection * _connection
 
cms::Session * _session
 
cms::Destination * _destination
 
cms::MessageConsumer * _consumer
 
bool _turnEventsOff
 
std::string _topic
 
std::string _selector
 

Detailed Description

Receive events from the event bus.

Definition at line 63 of file EventReceiver.h.

Constructor & Destructor Documentation

lsst.ctrl.events::EventReceiver::EventReceiver ( const pexPolicy::Policy policy)

Receives events based on Policy file contents.

Parameters
policythe policy object to use when building the receiver
Exceptions
throwslsst::pex::exceptions::NotFoundError if topicName isn't specified
throwslsst::pex::exceptions::NotFoundError if hostName isn't specified
throwslsst::pex::exceptions::RuntimeError if connection fails to initialize

Definition at line 73 of file EventReceiver.cc.

73  {
74  //EventLibrary().initializeLibrary();
75  int hostPort;
76 
77  try {
78  _turnEventsOff = policy.getBool("turnEventsOff");
79  } catch (pexPolicy::NameNotFound& e) {
80  _turnEventsOff = false;
81  }
82  if (_turnEventsOff == true)
83  return;
84 
85  if (!policy.exists("topicName")) {
86  throw LSST_EXCEPT(pexExceptions::NotFoundError, "topicName not found in policy");
87  }
88 
89  std::string topicName = policy.getString("topicName");
90  try {
91  _turnEventsOff = policy.getBool("turnEventsOff");
92  } catch (pexPolicy::NameNotFound& e) {
93  _turnEventsOff = false;
94  }
95 
96  if (!policy.exists("hostName")) {
97  throw LSST_EXCEPT(pexExceptions::NotFoundError, "hostName not found in policy");
98  }
99 
100  std::string hostName = policy.getString("hostName");
101 
102  try {
103  hostPort = policy.getInt("hostPort");
104  } catch (pexPolicy::NameNotFound& e) {
105  hostPort = EventBroker::DEFAULTHOSTPORT;
106  }
107 
108  try {
109  _selector = policy.getString("selector");
110  } catch (pexPolicy::NameNotFound& e) {
111  _selector = "";
112  }
113  init(hostName, topicName, _selector, hostPort);
114 }
int getInt(const std::string &name) const
Definition: Policy.h:603
bool exists(const std::string &name) const
Definition: Policy.h:944
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
bool getBool(const std::string &name) const
Definition: Policy.h:589
const std::string getString(const std::string &name) const
Definition: Policy.h:631
static const int DEFAULTHOSTPORT
Definition: EventBroker.h:43
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
lsst.ctrl.events::EventReceiver::EventReceiver ( const std::string &  hostName,
const std::string &  topicName,
int  hostPort = EventBroker::DEFAULTHOSTPORT 
)

Receives events from the specified host and topic.

Parameters
hostNamethe machine hosting the message broker
topicNamethe topic to receive events from
hostPortthe port the message broker is listening on
Exceptions
throwslsst::pex::exceptions::RuntimeError if connection fails to initialize

Definition at line 123 of file EventReceiver.cc.

123  {
124  _turnEventsOff = false;
125  init(hostName, topicName, "", hostPort);
126 }
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
lsst.ctrl.events::EventReceiver::EventReceiver ( const std::string &  hostName,
const std::string &  topicName,
const std::string &  selector,
int  hostPort = EventBroker::DEFAULTHOSTPORT 
)

Receives events from the specified host and topic.

Parameters
hostNamethe machine hosting the message broker
topicNamethe topic to receive events from
selectorthe message selector expression to use. A selector value of "" is equivalent to no selector.
hostPortthe port the message broker is listening on
Exceptions
throwslsst::pex::exceptions::RuntimeError if connection fails to initialize

Definition at line 136 of file EventReceiver.cc.

136  {
137  _turnEventsOff = false;
138  init(hostName, topicName, selector, hostPort);
139 }
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
lsst.ctrl.events::EventReceiver::~EventReceiver ( )
virtual

destructor method

Definition at line 241 of file EventReceiver.cc.

241  {
242 
243  // Destroy resources.
244  try {
245  if( _destination != NULL )
246  delete _destination;
247  } catch ( cms::CMSException& e ) {
248  e.printStackTrace();
249  }
250  _destination = NULL;
251 
252  try {
253  if( _consumer != NULL )
254  delete _consumer;
255  } catch ( cms::CMSException& e ) {
256  e.printStackTrace();
257  }
258  _consumer = NULL;
259 
260  // Close open resources.
261  try {
262  if( _session != NULL )
263  _session->close();
264  } catch ( cms::CMSException& e ) {
265  e.printStackTrace();
266  }
267  try {
268  if( _connection != NULL )
269  _connection->close();
270  } catch ( cms::CMSException& e ) {
271  e.printStackTrace();
272  }
273 
274  try {
275  if( _session != NULL )
276  delete _session;
277  } catch ( cms::CMSException& e ) {
278  e.printStackTrace();
279  }
280  _session = NULL;
281 
282  try {
283  if( _connection != NULL )
284  delete _connection;
285  } catch ( cms::CMSException& e ) {
286  e.printStackTrace();
287  }
288  _connection = NULL;
289 }
cms::MessageConsumer * _consumer
Definition: EventReceiver.h:94
cms::Destination * _destination
Definition: EventReceiver.h:91

Member Function Documentation

std::string lsst.ctrl.events::EventReceiver::getTopicName ( )

returns the topic for this EventReceiver

Definition at line 235 of file EventReceiver.cc.

235  {
236  return _topic;
237 }
void lsst.ctrl.events::EventReceiver::init ( const std::string &  hostName,
const std::string &  topicName,
const std::string &  selector,
int  hostPort 
)
private

private method for initialization of EventReceiver. Sets up use of local sockets or activemq, depending on how the policy file was configured.

Definition at line 144 of file EventReceiver.cc.

144  {
145 
146  EventLibrary().initializeLibrary();
147  _connection = NULL;
148  _session = NULL;
149  _destination = NULL;
150  _consumer = NULL;
151  _topic = topicName;
152  _selector = selector;
153 
154  if (_turnEventsOff == true)
155  return;
156 
157  try {
158  std::stringstream ss;
159 
160  ss << hostPort;
161 
162  string jmsURL = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire";
163 
164  activemqCore::ActiveMQConnectionFactory* connectionFactory =
165  new activemqCore::ActiveMQConnectionFactory( jmsURL );
166 
167  _connection = 0;
168  try {
169  _connection = connectionFactory->createConnection();
170  _connection->start();
171  delete connectionFactory;
172  }
173  catch (cms::CMSException& e) {
174  delete connectionFactory;
175  std::string msg("Failed to connect to broker: ");
176  msg += e.getMessage();
177  msg += " (is broker running?)";
178  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
179  }
180 
181  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
182 
183  _destination = _session->createTopic( topicName );
184 
185  if (_selector == "")
186  _consumer = _session->createConsumer( _destination );
187  else
188  _consumer = _session->createConsumer( _destination, selector );
189 
190  } catch ( cms::CMSException& e ) {
191  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating EventReceiver: ") + e.getMessage());
192  }
193 }
cms::MessageConsumer * _consumer
Definition: EventReceiver.h:94
cms::Destination * _destination
Definition: EventReceiver.h:91
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
Event * lsst.ctrl.events::EventReceiver::receiveEvent ( )

wait until an event is received.

Wait until an Event is received Note: Caller is responsible for deleting received Event.

Definition at line 199 of file EventReceiver.cc.

199  {
201 }
static const long infiniteTimeout
Definition: EventReceiver.h:79
Event * receiveEvent()
wait until an event is received.
Event * lsst.ctrl.events::EventReceiver::receiveEvent ( long  timeout)

wait for a length of time for an event to be received.

Wait to receive an event for a length of time. Note: Caller is responsible for deleting received Event.

Parameters
timeoutthe length of time to waitm in milliseconds

Definition at line 208 of file EventReceiver.cc.

208  {
209  PropertySet::Ptr psp;
210 
211  if (_turnEventsOff == true)
212  return NULL;
213 
214 
215  cms::TextMessage* textMessage;
216  try {
217  cms::Message* msg = _consumer->receive(timeout);
218  if (msg == NULL) return NULL;
219  textMessage = dynamic_cast<cms::TextMessage* >(msg);
220  if (textMessage == NULL)
221  throw LSST_EXCEPT(pexExceptions::RuntimeError, "Unexpected JMS Message type");
222  } catch (activemq::exceptions::ActiveMQException& e) {
223  throw LSST_EXCEPT(pexExceptions::RuntimeError, e.getMessage());
224  }
225 
226 
227  Event* event = EventFactory().createEvent(textMessage);
228  delete textMessage;
229 
230  return event;
231 }
boost::shared_ptr< PropertySet > Ptr
Definition: PropertySet.h:90
cms::MessageConsumer * _consumer
Definition: EventReceiver.h:94
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46

Member Data Documentation

cms::Connection* lsst.ctrl.events::EventReceiver::_connection
private

Definition at line 85 of file EventReceiver.h.

cms::MessageConsumer* lsst.ctrl.events::EventReceiver::_consumer
private

Definition at line 94 of file EventReceiver.h.

cms::Destination* lsst.ctrl.events::EventReceiver::_destination
private

Definition at line 91 of file EventReceiver.h.

std::string lsst.ctrl.events::EventReceiver::_selector
private

Definition at line 103 of file EventReceiver.h.

cms::Session* lsst.ctrl.events::EventReceiver::_session
private

Definition at line 88 of file EventReceiver.h.

std::string lsst.ctrl.events::EventReceiver::_topic
private

Definition at line 100 of file EventReceiver.h.

bool lsst.ctrl.events::EventReceiver::_turnEventsOff
private

Definition at line 97 of file EventReceiver.h.

const long lsst.ctrl.events::EventReceiver::infiniteTimeout = -1
static

Definition at line 79 of file EventReceiver.h.


The documentation for this class was generated from the following files: