LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
EventReceiver.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008, 2009, 2010 LSST Corporation.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <http://www.lsstcorp.org/LegalNotices/>.
23  */
24 
34 #include <iomanip>
35 #include <sstream>
36 #include <stdexcept>
37 #include <cstring>
38 
42 #include "lsst/daf/base/DateTime.h"
44 #include "lsst/pex/exceptions.h"
46 #include "lsst/pex/policy/Policy.h"
48 #include <sys/socket.h>
49 #include <sys/un.h>
52 
53 #include <activemq/core/ActiveMQConnectionFactory.h>
54 #include <activemq/exceptions/ActiveMQException.h>
55 
56 namespace pexPolicy = lsst::pex::policy;
57 namespace pexExceptions = lsst::pex::exceptions;
58 
59 namespace activemqCore = activemq::core;
60 
61 namespace lsst {
62 namespace ctrl {
63 namespace events {
64 
74  //EventLibrary().initializeLibrary();
75  int hostPort;
76 
77  try {
78  _turnEventsOff = policy.getBool("turnEventsOff");
79  } catch (pexPolicy::NameNotFound& e) {
80  _turnEventsOff = false;
81  }
82  if (_turnEventsOff == true)
83  return;
84 
85  if (!policy.exists("topicName")) {
86  throw LSST_EXCEPT(pexExceptions::NotFoundError, "topicName not found in policy");
87  }
88 
89  std::string topicName = policy.getString("topicName");
90  try {
91  _turnEventsOff = policy.getBool("turnEventsOff");
92  } catch (pexPolicy::NameNotFound& e) {
93  _turnEventsOff = false;
94  }
95 
96  if (!policy.exists("hostName")) {
97  throw LSST_EXCEPT(pexExceptions::NotFoundError, "hostName not found in policy");
98  }
99 
100  std::string hostName = policy.getString("hostName");
101 
102  try {
103  hostPort = policy.getInt("hostPort");
104  } catch (pexPolicy::NameNotFound& e) {
105  hostPort = EventBroker::DEFAULTHOSTPORT;
106  }
107 
108  try {
109  _selector = policy.getString("selector");
110  } catch (pexPolicy::NameNotFound& e) {
111  _selector = "";
112  }
113  init(hostName, topicName, _selector, hostPort);
114 }
115 
123 EventReceiver::EventReceiver(const std::string& hostName, const std::string& topicName, int hostPort) {
124  _turnEventsOff = false;
125  init(hostName, topicName, "", hostPort);
126 }
127 
136 EventReceiver::EventReceiver(const std::string& hostName, const std::string& topicName, const std::string& selector, int hostPort) {
137  _turnEventsOff = false;
138  init(hostName, topicName, selector, hostPort);
139 }
140 
144 void EventReceiver::init(const std::string& hostName, const std::string& topicName, const std::string& selector, int hostPort) {
145 
147  _connection = NULL;
148  _session = NULL;
149  _destination = NULL;
150  _consumer = NULL;
151  _topic = topicName;
152  _selector = selector;
153 
154  if (_turnEventsOff == true)
155  return;
156 
157  try {
158  std::stringstream ss;
159 
160  ss << hostPort;
161 
162  string jmsURL = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire";
163 
164  activemqCore::ActiveMQConnectionFactory* connectionFactory =
165  new activemqCore::ActiveMQConnectionFactory( jmsURL );
166 
167  _connection = 0;
168  try {
169  _connection = connectionFactory->createConnection();
170  _connection->start();
171  delete connectionFactory;
172  }
173  catch (cms::CMSException& e) {
174  delete connectionFactory;
175  std::string msg("Failed to connect to broker: ");
176  msg += e.getMessage();
177  msg += " (is broker running?)";
178  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
179  }
180 
181  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
182 
183  _destination = _session->createTopic( topicName );
184 
185  if (_selector == "")
186  _consumer = _session->createConsumer( _destination );
187  else
188  _consumer = _session->createConsumer( _destination, selector );
189 
190  } catch ( cms::CMSException& e ) {
191  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating EventReceiver: ") + e.getMessage());
192  }
193 }
194 
201 }
202 
209  PropertySet::Ptr psp;
210 
211  if (_turnEventsOff == true)
212  return NULL;
213 
214 
215  cms::TextMessage* textMessage;
216  try {
217  cms::Message* msg = _consumer->receive(timeout);
218  if (msg == NULL) return NULL;
219  textMessage = dynamic_cast<cms::TextMessage* >(msg);
220  if (textMessage == NULL)
221  throw LSST_EXCEPT(pexExceptions::RuntimeError, "Unexpected JMS Message type");
222  } catch (activemq::exceptions::ActiveMQException& e) {
223  throw LSST_EXCEPT(pexExceptions::RuntimeError, e.getMessage());
224  }
225 
226 
227  Event* event = EventFactory().createEvent(textMessage);
228  delete textMessage;
229 
230  return event;
231 }
232 
236  return _topic;
237 }
238 
242 
243  // Destroy resources.
244  try {
245  if( _destination != NULL )
246  delete _destination;
247  } catch ( cms::CMSException& e ) {
248  e.printStackTrace();
249  }
250  _destination = NULL;
251 
252  try {
253  if( _consumer != NULL )
254  delete _consumer;
255  } catch ( cms::CMSException& e ) {
256  e.printStackTrace();
257  }
258  _consumer = NULL;
259 
260  // Close open resources.
261  try {
262  if( _session != NULL )
263  _session->close();
264  } catch ( cms::CMSException& e ) {
265  e.printStackTrace();
266  }
267  try {
268  if( _connection != NULL )
269  _connection->close();
270  } catch ( cms::CMSException& e ) {
271  e.printStackTrace();
272  }
273 
274  try {
275  if( _session != NULL )
276  delete _session;
277  } catch ( cms::CMSException& e ) {
278  e.printStackTrace();
279  }
280  _session = NULL;
281 
282  try {
283  if( _connection != NULL )
284  delete _connection;
285  } catch ( cms::CMSException& e ) {
286  e.printStackTrace();
287  }
288  _connection = NULL;
289 }
290 
291 }
292 }
293 }
int getInt(const std::string &name) const
Definition: Policy.h:603
bool exists(const std::string &name) const
Definition: Policy.h:944
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:56
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
create LSST Events from JMS Messages
Definition: EventFactory.h:62
bool getBool(const std::string &name) const
Definition: Policy.h:589
EventReceiver(const pexPolicy::Policy &policy)
Receives events based on Policy file contents.
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
boost::shared_ptr< PropertySet > Ptr
Definition: PropertySet.h:90
const std::string getString(const std::string &name) const
Definition: Policy.h:631
cms::MessageConsumer * _consumer
Definition: EventReceiver.h:94
static const int DEFAULTHOSTPORT
Definition: EventBroker.h:43
static Event * createEvent(cms::TextMessage *msg)
return an Event object, based on the type received in the TextMessage.
Definition: EventFactory.cc:76
definition of the LogRecord, RecordProperty and Prop classes
defines the EventFactory class
Include files required for standard LSST Exception handling.
defines the EventLibrary class
Defines the (deprecated) Component class.
virtual ~EventReceiver()
destructor method
static const long infiniteTimeout
Definition: EventReceiver.h:79
Interface for DateTime class.
cms::Destination * _destination
Definition: EventReceiver.h:91
defines the EventReceiver class
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:69
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
defines information pertaining to the Event Broker
std::string getTopicName()
returns the topic for this EventReceiver
Representation of an LSST Event.
Definition: Event.h:67
Interface for PropertySet class.
Event * receiveEvent()
wait until an event is received.
defines the EventSystem class