LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
Transmitter.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008-2015 AURA/LSST.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <https://www.lsstcorp.org/LegalNotices/>.
23  */
24 
36 
37 #include "lsst/daf/base/DateTime.h"
38 #include "lsst/pex/exceptions.h"
39 
40 #include <activemq/core/ActiveMQConnectionFactory.h>
41 
42 namespace dafBase = lsst::daf::base;
43 namespace pexExceptions = lsst::pex::exceptions;
44 
45 
46 using namespace std;
47 using std::numeric_limits;
48 
49 namespace lsst {
50 namespace ctrl {
51 namespace events {
52 
53 Transmitter::Transmitter() {
55 }
56 
57 
58 /*
59  * private initialization method for configuring Transmitter
60  */
61 void Transmitter::init( const std::string& hostName, const std::string& destinationName, bool createQueue, int hostPort) {
62  _connection = NULL;
63  _session = NULL;
64 
65  _producer = NULL;
66  _destinationName = destinationName;
67  _destination = NULL;
68 
69  // set up a connection to the ActiveMQ server for message transmission
70  try {
71  std::stringstream ss;
72 
73  ss << hostPort;
74 
75  /*
76  * Create a ConnectionFactory to connect to hostName, and
77  * create a topic for this.
78  */
79  string brokerUri = "tcp://"+hostName+":"+ss.str()+"?wireFormat=openwire&transport.useAsyncSend=true";
80 
81  activemq::core::ActiveMQConnectionFactory* connectionFactory =
82  new activemq::core::ActiveMQConnectionFactory( brokerUri );
83 
84  _connection = 0;
85  try {
86  _connection = connectionFactory->createConnection();
87  _connection->start();
88  delete connectionFactory;
89  }
90  catch (cms::CMSException& e) {
91  delete connectionFactory;
92  std::string msg("Failed to connect to broker: ");
93  msg += e.getMessage();
94  msg += " (is broker running?)";
95  throw LSST_EXCEPT(pexExceptions::RuntimeError, msg);
96  }
97 
98  _session = _connection->createSession( cms::Session::AUTO_ACKNOWLEDGE );
99 
100  if (createQueue) {
101  _destination = _session->createQueue(_destinationName);
102  } else {
103  _destination = _session->createTopic(_destinationName);
104  }
105 
106  // Create a MessageProducer from the Session to the Topic or Queue
107  _producer = _session->createProducer(NULL);
108 
109  /* XXX - not sure about this setting for queues */
110  _producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
111  } catch ( cms::CMSException& e ) {
112  throw LSST_EXCEPT(pexExceptions::RuntimeError, std::string("Trouble creating Transmitter: ") + e.getMessage());
113  }
114 }
115 
116 void Transmitter::publishEvent(Event& event) {
117  long long pubtime;
118  cms::TextMessage* message = _session->createTextMessage();
119 
120  event.marshall(message);
121 
122  message->setStringProperty(getDestinationPropertyName(), _destinationName);
123 
124  // wait until the last moment to timestamp publication time
125  pubtime = dafBase::DateTime::now().nsecs();
126  message->setLongProperty("PUBTIME", pubtime);
127 
128  _producer->send(_destination, message);
129  delete message;
130 }
131 
132 std::string Transmitter::getDestinationName() {
133  return _destinationName;
134 }
135 
136 Transmitter::~Transmitter() {
137 
138  if (_destination != NULL)
139  delete _destination;
140  _destination = NULL;
141 
142  try {
143  if( _producer != NULL )
144  delete _producer;
145  } catch ( cms::CMSException& e ) {
146  e.printStackTrace();
147  }
148  _producer = NULL;
149 
150  // Close open resources.
151  try {
152  if( _session != NULL )
153  _session->close();
154  if( _connection != NULL )
155  _connection->close();
156  } catch ( cms::CMSException& e ) {
157  e.printStackTrace();
158  }
159 
160  try {
161  if( _session != NULL )
162  delete _session;
163  } catch ( cms::CMSException& e ) {
164  e.printStackTrace();
165  }
166  _session = NULL;
167 
168  try {
169  if( _connection != NULL )
170  delete _connection;
171  } catch ( cms::CMSException& e ) {
172  e.printStackTrace();
173  }
174  _connection = NULL;
175 }
176 
177 }}}
static DateTime now(void)
Definition: DateTime.cc:601
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:49
long long nsecs(Timescale scale=TAI) const
Definition: DateTime.cc:496
defines the EventLibrary class
defines the Transmitter class
Interface for DateTime class.
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
Representation of an LSST Event.
Definition: Event.h:61
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:41
Include files required for standard LSST Exception handling.