LSSTApplications  8.0.0.0+107,8.0.0.1+13,9.1+18,9.2,master-g084aeec0a4,master-g0aced2eed8+6,master-g15627eb03c,master-g28afc54ef9,master-g3391ba5ea0,master-g3d0fb8ae5f,master-g4432ae2e89+36,master-g5c3c32f3ec+17,master-g60f1e072bb+1,master-g6a3ac32d1b,master-g76a88a4307+1,master-g7bce1f4e06+57,master-g8ff4092549+31,master-g98e65bf68e,master-ga6b77976b1+53,master-gae20e2b580+3,master-gb584cd3397+53,master-gc5448b162b+1,master-gc54cf9771d,master-gc69578ece6+1,master-gcbf758c456+22,master-gcec1da163f+63,master-gcf15f11bcc,master-gd167108223,master-gf44c96c709
LSSTDataManagementBasePackage
EventSystem.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008, 2009, 2010 LSST Corporation.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <http://www.lsstcorp.org/LegalNotices/>.
23  */
24 
34 #include <iomanip>
35 #include <sstream>
36 #include <stdexcept>
37 
38 
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netdb.h>
44 
46 #include "lsst/pex/policy/Policy.h"
47 #include "lsst/pex/exceptions.h"
51 #include "lsst/ctrl/events/Event.h"
54 
55 namespace pexExceptions =lsst::pex::exceptions;
56 
57 using namespace std;
58 
59 namespace lsst {
60 namespace ctrl {
61 namespace events {
67 EventSystem::EventSystem() {
69 }
70 
73 EventSystem::~EventSystem() {
74 }
75 
80 EventSystem& EventSystem::getDefaultEventSystem() {
81  if (defaultEventSystem == 0) {
82 
83  // create the _IPId here, rather than
84  // reconstructing it every time we create an
85  // identificationId
86 
87  char buf [255];
88  struct hostent *ent;
89  unsigned char a,b,c,d;
90 
91  gethostname(buf, 255) ;
92  ent = (struct hostent *)gethostbyname(buf) ;
93 
94  a = ent->h_addr_list[0][0] & 0xFF;
95  b = ent->h_addr_list[0][1] & 0xFF;
96  c = ent->h_addr_list[0][2] & 0xFF;
97  d = ent->h_addr_list[0][3] & 0xFF;
98 
99  _IPId = (a << 24) | (b << 16) | (c << 8) | d;
100 
101  // create the default EventSystem object
102  defaultEventSystem = new EventSystem();
103  }
104  return *defaultEventSystem;
105 
106 }
107 
108 EventSystem *EventSystem::defaultEventSystem = 0;
109 int EventSystem::_IPId = 0;
110 short EventSystem::_localId = 0;
111 
115 void EventSystem::createTransmitter(const pexPolicy::Policy& policy) {
116  boost::shared_ptr<EventTransmitter> transmitter(new EventTransmitter(policy));
117  boost::shared_ptr<EventTransmitter> transmitter2;
118  if ((transmitter2 = getTransmitter(transmitter->getTopicName())) == 0) {
119  _transmitters.push_back(transmitter);
120  return;
121  }
122  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ transmitter->getTopicName() + " is already registered with EventSystem");
123 }
124 
130 void EventSystem::createTransmitter(const std::string& hostName, const std::string& topicName, int hostPort) {
131  boost::shared_ptr<EventTransmitter> transmitter;
132  if ((transmitter = getTransmitter(topicName)) == 0) {
133  boost::shared_ptr<EventTransmitter> transmitter(new EventTransmitter(hostName, topicName, hostPort));
134  _transmitters.push_back(transmitter);
135  return;
136  }
137  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ topicName + " is already registered with EventSystem");
138 }
139 
143 void EventSystem::createReceiver(const pexPolicy::Policy& policy) {
144  boost::shared_ptr<EventReceiver> receiver(new EventReceiver(policy));
145  boost::shared_ptr<EventReceiver> receiver2;
146  if ((receiver2 = getReceiver(receiver->getTopicName())) == 0) {
147  _receivers.push_back(receiver);
148  return;
149  }
150  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic " + receiver->getTopicName() + " is already registered with EventSystem");
151 }
152 
158 void EventSystem::createReceiver(const std::string& hostName, const std::string& topicName, int hostPort) {
159  boost::shared_ptr<EventReceiver> receiver;
160  if ((receiver = getReceiver(topicName)) == 0) {
161  boost::shared_ptr<EventReceiver> receiver(new EventReceiver(hostName, topicName, hostPort));
162  _receivers.push_back(receiver);
163  return;
164  }
165  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ topicName + " is already registered with EventSystem");
166 }
167 
174 void EventSystem::createReceiver(const std::string& hostName, const std::string& topicName, const std::string& selector, int hostPort) {
175  boost::shared_ptr<EventReceiver> receiver(new EventReceiver(hostName, topicName, selector, hostPort));
176  _receivers.push_back(receiver);
177 }
178 
179 
186 void EventSystem::publishEvent(const std::string& topicName, Event& event) {
187  boost::shared_ptr<EventTransmitter> transmitter;
188  if ((transmitter = getTransmitter(topicName)) == 0) {
189  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ topicName + " is not registered with EventSystem");
190  }
191  transmitter->publishEvent(event);
192 }
193 
196 boost::shared_ptr<EventTransmitter> EventSystem::getTransmitter(const std::string& name) {
197  list<boost::shared_ptr<EventTransmitter> >::iterator i;
198  for (i = _transmitters.begin(); i != _transmitters.end(); i++) {
199  if ((*i)->getTopicName() == name)
200  return *i;
201  }
202  return boost::shared_ptr<EventTransmitter>();
203 }
204 
205 
211 Event* EventSystem::receiveEvent(const std::string& topicName) {
212  return receiveEvent(topicName, EventReceiver::infiniteTimeout);
213 }
214 
223 Event* EventSystem::receiveEvent(const std::string& topicName, const long timeout) {
224  boost::shared_ptr<EventReceiver> receiver;
225  if ((receiver = getReceiver(topicName)) == 0) {
226  throw LSST_EXCEPT(pexExceptions::RuntimeError, "Topic "+ topicName +" is not registered with EventSystem");
227  }
228 
229  return receiver->receiveEvent(timeout);
230 }
231 
232 
235 boost::shared_ptr<EventReceiver> EventSystem::getReceiver(const std::string& name) {
236  list<boost::shared_ptr<EventReceiver> >::iterator i;
237  for (i = _receivers.begin(); i != _receivers.end(); i++) {
238  if ((*i)->getTopicName() == name)
239  return *i;
240  }
241  return boost::shared_ptr<EventReceiver>();
242 }
243 
244 int64_t EventSystem::createOriginatorId() {
245  int64_t pid = getpid();
246 
247  int64_t originatorId = _IPId & 0x0FFFFFFFF;
248  int64_t locid = _localId & 0x7FFF;
249 
250  originatorId = (locid << 49) | (pid << 32) | originatorId;
251 
252  _localId++;
253  return originatorId;
254 }
255 
261 int EventSystem::extractIPId(int64_t identificationId) {
262  return identificationId & 0xFFFFFFFF;
263 }
264 
268 int EventSystem::extractProcessId(int64_t identificationId) {
269  return (identificationId & 0x1FFFF00000000LL) >> 32;
270 }
271 
275 short EventSystem::extractLocalId(int64_t identificationId) {
276  return (identificationId & 0xFFFE000000000000LL) >> 49;
277 }
278 
279 StatusEvent* EventSystem::castToStatusEvent(Event* event) {
280  return (StatusEvent *)event;
281 }
282 
283 CommandEvent* EventSystem::castToCommandEvent(Event* event) {
284  return (CommandEvent *)event;
285 }
286 
287 }
288 }
289 }
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:56
Receive events from the event bus.
Definition: EventReceiver.h:63
defines the EventLog class
a container for holding hierarchical configuration data in memory.
Definition: Policy.h:169
Representation of an LSST Event.
Definition: StatusEvent.h:63
int d
Definition: KDTree.cc:89
Representation of an LSST Event.
Definition: CommandEvent.h:63
Include files required for standard LSST Exception handling.
defines the EventLibrary class
Transmit events to the event bus.
defines the CommandEvent class
defines the Event class
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:69
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
Representation of an LSST Event.
Definition: Event.h:67
afw::table::Key< double > b
Interface for PropertySet class.
Coordinate publishing and receiving events.
Definition: EventSystem.h:65
defines the EventSystem class