LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
EventTransmitter.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008-2015 AURA/LSST.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <https://www.lsstcorp.org/LegalNotices/>.
23  */
24 
33 #include <iomanip>
34 #include <sstream>
35 #include <stdexcept>
36 #include <limits>
37 #include <cstring>
38 #include <time.h>
39 
40 #include "lsst/ctrl/events/Event.h"
43 #include "lsst/daf/base/DateTime.h"
45 #include "lsst/pex/exceptions.h"
46 #include <sys/socket.h>
47 #include <sys/un.h>
50 
51 #include <activemq/core/ActiveMQConnectionFactory.h>
52 
53 namespace dafBase = lsst::daf::base;
54 namespace pexExceptions = lsst::pex::exceptions;
55 
56 
57 using namespace std;
58 using std::numeric_limits;
59 
60 namespace lsst {
61 namespace ctrl {
62 namespace events {
63 
64 EventTransmitter::EventTransmitter( const std::string& hostName, const std::string& topicName, int hostPort) {
66 
67  init(hostName, topicName, hostPort);
68 }
69 
70 /*
71  * private initialization method for configuring EventTransmitter
72  */
73 void EventTransmitter::init( const std::string& hostName, const std::string& topicName, int hostPort) {
74  _connection = NULL;
75  _session = NULL;
76 
77  _producer = NULL;
78  _topicName = topicName;
79  _topic = NULL;
80 
81  // set up a connection to the ActiveMQ server for message transmission
82  try {
83  std::stringstream ss;
84 
85  ss << hostPort;
86 
87  /*
88  * Create a ConnectionFactory to connect to hostName, and
89  * create a topic for this.
90  */
91  string brokerUri = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire&transport.useAsyncSend=true";
92 
93  activemq::core::ActiveMQConnectionFactory* connectionFactory =
94  new activemq::core::ActiveMQConnectionFactory( brokerUri );
95 
96  _connection = 0;
97  try {
98  _connection = connectionFactory->createConnection();
99  _connection->start();
100  delete connectionFactory;
101  }
102  catch (cms::CMSException& e) {
103  delete connectionFactory;
104  std::string msg("Failed to connect to broker: ");
105  msg += e.getMessage();
106  msg += " (is broker running?)";
107  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
108  }
109 
110  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
111 
112  // Create the destination topic
113  //_destination = _session->createTopic( topicName );
114 
115  // Create Topic
116  _topic = new activemq::commands::ActiveMQTopic(_topicName);
117 
118 
119  // Create a MessageProducer from the Session to the Topic or Queue
120  _producer = _session->createProducer(NULL);
121  _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
122  } catch ( cms::CMSException& e ) {
123  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating EventTransmitter: ") + e.getMessage());
124  }
125 }
126 
127 void EventTransmitter::publishEvent(Event& event) {
128  long long pubtime;
129  cms::TextMessage* message = _session->createTextMessage();
130 
131  event.marshall(message);
132 
133  message->setStringProperty("TOPIC", _topicName);
134 
135  // wait until the last moment to timestamp publication time
136  pubtime = dafBase::DateTime::now().nsecs();
137  message->setLongProperty("PUBTIME", pubtime);
138 
139  _producer->send(_topic, message);
140  delete message;
141 }
142 
143 std::string EventTransmitter::getTopicName() {
144  return _topicName;
145 }
146 
147 EventTransmitter::~EventTransmitter() {
148 
149  if (_topic != NULL)
150  delete _topic;
151  _topic = NULL;
152 
153  try {
154  if( _producer != NULL )
155  delete _producer;
156  } catch ( cms::CMSException& e ) {
157  e.printStackTrace();
158  }
159  _producer = NULL;
160 
161  // Close open resources.
162  try {
163  if( _session != NULL )
164  _session->close();
165  if( _connection != NULL )
166  _connection->close();
167  } catch ( cms::CMSException& e ) {
168  e.printStackTrace();
169  }
170 
171  try {
172  if( _session != NULL )
173  delete _session;
174  } catch ( cms::CMSException& e ) {
175  e.printStackTrace();
176  }
177  _session = NULL;
178 
179  try {
180  if( _connection != NULL )
181  delete _connection;
182  } catch ( cms::CMSException& e ) {
183  e.printStackTrace();
184  }
185  _connection = NULL;
186 }
187 
188 }}}
static DateTime now(void)
Definition: DateTime.cc:554
defines the EventTransmitter class
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:55
Include files required for standard LSST Exception handling.
long long nsecs(Timescale scale=TAI) const
Definition: DateTime.cc:441
defines the EventLibrary class
defines the Event class
Interface for DateTime class.
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
information about the Event Broker
Representation of an LSST Event.
Definition: Event.h:62
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:50
Interface for PropertySet class.
defines the EventSystem class