LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
EventTransmitter.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008, 2009, 2010 LSST Corporation.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <http://www.lsstcorp.org/LegalNotices/>.
23  */
24 
34 #include <iomanip>
35 #include <sstream>
36 #include <stdexcept>
37 #include <limits>
38 #include <cstring>
39 #include <time.h>
40 
41 #include "lsst/ctrl/events/Event.h"
44 #include "lsst/daf/base/DateTime.h"
46 #include "lsst/pex/exceptions.h"
48 #include "lsst/pex/policy/Policy.h"
50 #include <sys/socket.h>
51 #include <sys/un.h>
54 
55 #include <activemq/core/ActiveMQConnectionFactory.h>
56 
57 namespace dafBase = lsst::daf::base;
58 namespace pexExceptions = lsst::pex::exceptions;
59 namespace pexLogging = lsst::pex::logging;
60 
61 
62 using namespace std;
63 using std::numeric_limits;
64 
65 namespace lsst {
66 namespace ctrl {
67 namespace events {
68 
94 EventTransmitter::EventTransmitter( const pexPolicy::Policy& policy) {
95  int hostPort;
96 
98 
99  try {
100  _turnEventsOff = policy.getBool("turnEventsoff");
101  } catch (pexPolicy::NameNotFound& e) {
102  _turnEventsOff = false;
103  }
104  if (_turnEventsOff == true)
105  return;
106 
107  if (!policy.exists("topicName")) {
108  throw LSST_EXCEPT(pexExceptions::NotFoundError, "topicName not found in policy");
109  }
110  _topicName = policy.getString("topicName");
111 
112  std::string hostName;
113  try {
114  hostName = policy.getString("hostName");
115  } catch (pexPolicy::NameNotFound& e) {
116  throw LSST_EXCEPT(pexExceptions::NotFoundError, "hostName not found in policy");
117  }
118 
119  try {
120  hostPort = policy.getInt("hostPort");
121  } catch (pexPolicy::NameNotFound& e) {
122  hostPort = EventBroker::DEFAULTHOSTPORT;
123  }
124  init(hostName, _topicName, hostPort);
125 }
126 
136 EventTransmitter::EventTransmitter( const std::string& hostName, const std::string& topicName, int hostPort) {
138 
139  _turnEventsOff = false;
140  init(hostName, topicName, hostPort);
141 }
142 
145 void EventTransmitter::init( const std::string& hostName, const std::string& topicName, int hostPort) {
146  _connection = NULL;
147  _session = NULL;
148  // _destination = NULL;
149  _producer = NULL;
150  _topicName = topicName;
151  _topic = NULL;
152 
153  if (_turnEventsOff == true)
154  return;
155 
156  // set up a connection to the ActiveMQ server for message transmission
157  try {
158  std::stringstream ss;
159 
160  ss << hostPort;
161 
162  /*
163  * Create a ConnectionFactory to connect to hostName, and
164  * create a topic for this.
165  */
166  string brokerUri = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire&transport.useAsyncSend=true";
167 
168  activemq::core::ActiveMQConnectionFactory* connectionFactory =
169  new activemq::core::ActiveMQConnectionFactory( brokerUri );
170 
171  _connection = 0;
172  try {
173  _connection = connectionFactory->createConnection();
174  _connection->start();
175  delete connectionFactory;
176  }
177  catch (cms::CMSException& e) {
178  delete connectionFactory;
179  std::string msg("Failed to connect to broker: ");
180  msg += e.getMessage();
181  msg += " (is broker running?)";
182  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
183  }
184 
185  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
186 
187  // Create the destination topic
188  //_destination = _session->createTopic( topicName );
189 
190  // Create Topic
191  _topic = new activemq::commands::ActiveMQTopic(_topicName);
192 
193 
194  // Create a MessageProducer from the Session to the Topic or Queue
195  _producer = _session->createProducer(NULL);
196  _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
197  } catch ( cms::CMSException& e ) {
198  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating EventTransmitter: ") + e.getMessage());
199  }
200 }
201 
202 void EventTransmitter::publishEvent(Event& event) {
203  long long pubtime;
204  cms::TextMessage* message = _session->createTextMessage();
205 
206  event.marshall(message);
207 
208  message->setStringProperty("TOPIC", _topicName);
209 
210  // wait until the last moment to timestamp publication time
211  pubtime = dafBase::DateTime::now().nsecs();
212  message->setLongProperty("PUBTIME", pubtime);
213 
214  _producer->send(_topic, message);
215  delete message;
216 }
217 
220 std::string EventTransmitter::getTopicName() {
221  return _topicName;
222 }
223 
226 EventTransmitter::~EventTransmitter() {
227 
228 /*
229  // Destroy resources.
230  try {
231  if( _destination != NULL )
232  delete _destination;
233  } catch ( cms::CMSException& e ) {
234  e.printStackTrace();
235  }
236  _destination = NULL;
237 */
238  if (_topic != NULL)
239  delete _topic;
240 
241  try {
242  if( _producer != NULL )
243  delete _producer;
244  } catch ( cms::CMSException& e ) {
245  e.printStackTrace();
246  }
247  _producer = NULL;
248 
249  // Close open resources.
250  try {
251  if( _session != NULL )
252  _session->close();
253  if( _connection != NULL )
254  _connection->close();
255  } catch ( cms::CMSException& e ) {
256  e.printStackTrace();
257  }
258 
259  try {
260  if( _session != NULL )
261  delete _session;
262  } catch ( cms::CMSException& e ) {
263  e.printStackTrace();
264  }
265  _session = NULL;
266 
267  try {
268  if( _connection != NULL )
269  delete _connection;
270  } catch ( cms::CMSException& e ) {
271  e.printStackTrace();
272  }
273  _connection = NULL;
274 }
275 
276 }
277 }
278 }
static DateTime now(void)
Definition: DateTime.cc:553
int getInt(const std::string &name) const
Definition: Policy.h:603
defines the EventTransmitter class
bool exists(const std::string &name) const
Definition: Policy.h:944
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:56
bool getBool(const std::string &name) const
Definition: Policy.h:589
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
const std::string getString(const std::string &name) const
Definition: Policy.h:631
long long nsecs(Timescale scale=TAI) const
Definition: DateTime.cc:440
definition of the LogRecord, RecordProperty and Prop classes
Include files required for standard LSST Exception handling.
defines the EventLibrary class
Defines the (deprecated) Component class.
defines the Event class
Interface for DateTime class.
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:69
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
defines information pertaining to the Event Broker
Representation of an LSST Event.
Definition: Event.h:67
Interface for PropertySet class.
defines the EventSystem class