LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
EventReceiver.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008-2015 AURA/LSST.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <https://www.lsstcorp.org/LegalNotices/>.
23  */
24 
33 #include <iomanip>
34 #include <sstream>
35 #include <stdexcept>
36 #include <cstring>
37 
41 #include "lsst/daf/base/DateTime.h"
43 #include "lsst/pex/exceptions.h"
44 #include <sys/socket.h>
45 #include <sys/un.h>
48 
49 #include <activemq/core/ActiveMQConnectionFactory.h>
50 #include <activemq/exceptions/ActiveMQException.h>
51 
52 namespace pexExceptions = lsst::pex::exceptions;
53 
54 namespace activemqCore = activemq::core;
55 
56 namespace lsst {
57 namespace ctrl {
58 namespace events {
59 
60 EventReceiver::EventReceiver(const std::string& hostName, const std::string& topicName, int hostPort) {
61  init(hostName, topicName, "", hostPort);
62 }
63 
64 EventReceiver::EventReceiver(const std::string& hostName, const std::string& topicName, const std::string& selector, int hostPort) {
65  init(hostName, topicName, selector, hostPort);
66 }
67 
70 void EventReceiver::init(const std::string& hostName, const std::string& topicName, const std::string& selector, int hostPort) {
71 
73  _connection = NULL;
74  _session = NULL;
75  _destination = NULL;
76  _consumer = NULL;
77  _topic = topicName;
78  _selector = selector;
79 
80  try {
81  std::stringstream ss;
82 
83  ss << hostPort;
84 
85  string jmsURL = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire";
86 
87  activemqCore::ActiveMQConnectionFactory* connectionFactory =
88  new activemqCore::ActiveMQConnectionFactory( jmsURL );
89 
90  _connection = 0;
91  try {
92  _connection = connectionFactory->createConnection();
93  _connection->start();
94  delete connectionFactory;
95  }
96  catch (cms::CMSException& e) {
97  delete connectionFactory;
98  std::string msg("Failed to connect to broker: ");
99  msg += e.getMessage();
100  msg += " (is broker running?)";
101  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
102  }
103 
104  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
105 
106  _destination = _session->createTopic( topicName );
107 
108  if (_selector == "")
109  _consumer = _session->createConsumer( _destination );
110  else
111  _consumer = _session->createConsumer( _destination, selector );
112 
113  } catch ( cms::CMSException& e ) {
114  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating EventReceiver: ") + e.getMessage());
115  }
116 }
117 
120 }
121 
123  PropertySet::Ptr psp;
124 
125  cms::TextMessage* textMessage;
126  try {
127  cms::Message* msg = _consumer->receive(timeout);
128  if (msg == NULL) return NULL;
129  textMessage = dynamic_cast<cms::TextMessage* >(msg);
130  if (textMessage == NULL)
131  throw LSST_EXCEPT(pexExceptions::RuntimeError, "Unexpected JMS Message type");
132  } catch (activemq::exceptions::ActiveMQException& e) {
133  throw LSST_EXCEPT(pexExceptions::RuntimeError, e.getMessage());
134  }
135 
136 
137  Event* event = EventFactory().createEvent(textMessage);
138  delete textMessage;
139 
140  return event;
141 }
142 
144  return _topic;
145 }
146 
148 
149  // Destroy resources.
150  try {
151  if( _destination != NULL )
152  delete _destination;
153  } catch ( cms::CMSException& e ) {
154  e.printStackTrace();
155  }
156  _destination = NULL;
157 
158  try {
159  if( _consumer != NULL )
160  delete _consumer;
161  } catch ( cms::CMSException& e ) {
162  e.printStackTrace();
163  }
164  _consumer = NULL;
165 
166  // Close open resources.
167  try {
168  if( _session != NULL )
169  _session->close();
170  } catch ( cms::CMSException& e ) {
171  e.printStackTrace();
172  }
173  try {
174  if( _connection != NULL )
175  _connection->close();
176  } catch ( cms::CMSException& e ) {
177  e.printStackTrace();
178  }
179 
180  try {
181  if( _session != NULL )
182  delete _session;
183  } catch ( cms::CMSException& e ) {
184  e.printStackTrace();
185  }
186  _session = NULL;
187 
188  try {
189  if( _connection != NULL )
190  delete _connection;
191  } catch ( cms::CMSException& e ) {
192  e.printStackTrace();
193  }
194  _connection = NULL;
195 }
196 
197 }}}
virtual ~EventReceiver()
destructor
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:55
create LSST Events from JMS Messages
Definition: EventFactory.h:58
Include files required for standard LSST Exception handling.
boost::shared_ptr< PropertySet > Ptr
Definition: PropertySet.h:90
EventReceiver(const std::string &hostName, const std::string &topicName, int hostPort=EventBroker::DEFAULTHOSTPORT)
Receives events from the specified host and topic.
defines the EventFactory class
static Event * createEvent(cms::TextMessage *msg)
Definition: EventFactory.cc:63
defines the EventLibrary class
Interface for DateTime class.
defines the EventReceiver class
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
information about the Event Broker
Representation of an LSST Event.
Definition: Event.h:62
void init(const std::string &hostName, const std::string &topicName, const std::string &selector, int hostPort)
cms::MessageConsumer * _consumer
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:50
Interface for PropertySet class.
std::string getTopicName()
get topic name
Event * receiveEvent()
Wait until an Event is received.
defines the EventSystem class