LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
EventSystem.cc
Go to the documentation of this file.
1 // -*- lsst-c++ -*-
2 
3 /*
4  * LSST Data Management System
5  * Copyright 2008-2015 AURA/LSST.
6  *
7  * This product includes software developed by the
8  * LSST Project (http://www.lsst.org/).
9  *
10  * This program is free software: you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation, either version 3 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the LSST License Statement and
21  * the GNU General Public License along with this program. If not,
22  * see <https://www.lsstcorp.org/LegalNotices/>.
23  */
24 
34 #include <memory>
35 
38 
39 namespace pexExceptions = lsst::pex::exceptions;
40 
41 namespace lsst {
42 namespace ctrl {
43 namespace events {
44 
47 }
48 
50 }
51 
53  static EventSystem eventSystem;
54  return eventSystem;
55 
56 }
57 
59 std::list<PTR(EventTransmitter)> EventSystem::_transmitters;
60 std::list<PTR(EventReceiver)> EventSystem::_receivers;
61 std::list<PTR(EventEnqueuer)> EventSystem::_enqueuers;
62 std::list<PTR(EventDequeuer)> EventSystem::_dequeuers;
63 
64 void EventSystem::createTransmitter(std::string const& hostName, std::string const& topicName, int hostPort) {
65  PTR(Transmitter) transmitter;
66  if ((transmitter = getTransmitter(topicName)) != 0)
67  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ topicName + " is already registered with EventSystem");
68  PTR(EventTransmitter) evTransmitter(new EventTransmitter(hostName, topicName, hostPort));
69  _transmitters.push_back(evTransmitter);
70 }
71 
72 void EventSystem::createEnqueuer(std::string const& hostName, std::string const& queueName, int hostPort) {
73  PTR(Transmitter) transmitter;
74  if ((transmitter = getTransmitter(queueName)) != 0)
75  throw LSST_EXCEPT(pexExceptions::RuntimeError, "queue "+ queueName + " is already registered with EventSystem");
76  PTR(EventEnqueuer) evTransmitter(new EventEnqueuer(hostName, queueName, hostPort));
77  _enqueuers.push_back(evTransmitter);
78 }
79 
80 void EventSystem::createReceiver(std::string const& hostName, std::string const& topicName, int hostPort) {
81  PTR(Receiver) receiver;
82  if ((receiver = getReceiver(topicName)) == 0) {
83  PTR(EventReceiver) receiver(new EventReceiver(hostName, topicName, hostPort));
84  _receivers.push_back(receiver);
85  return;
86  }
87  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic "+ topicName + " is already registered with EventSystem");
88 }
89 
90 void EventSystem::createDequeuer(std::string const& hostName, std::string const& queueName, int hostPort) {
91  PTR(Receiver) receiver;
92  if ((receiver = getReceiver(queueName)) != 0)
93  throw LSST_EXCEPT(pexExceptions::RuntimeError, "queue "+ queueName + " is already registered with EventSystem");
94 
95  PTR(EventDequeuer) evReceiver(new EventDequeuer(hostName, queueName, hostPort));
96  _dequeuers.push_back(evReceiver);
97 }
98 
99 void EventSystem::createReceiver(std::string const& hostName, std::string const& topicName, std::string const& selector, int hostPort) {
100  PTR(Receiver) receiver;
101  if ((receiver = getReceiver(topicName)) != 0)
102  throw LSST_EXCEPT(pexExceptions::RuntimeError, "topic"+ topicName + " is already registered with EventSystem");
103 
104  PTR(EventReceiver) evReceiver(new EventReceiver(hostName, topicName, selector, hostPort));
105  _receivers.push_back(evReceiver);
106 }
107 
108 void EventSystem::createDequeuer(std::string const& hostName, std::string const& queueName, std::string const& selector, int hostPort) {
109  PTR(Receiver) receiver;
110  if ((receiver = getReceiver(queueName)) != 0)
111  throw LSST_EXCEPT(pexExceptions::RuntimeError, "queue"+ queueName + " is already registered with EventSystem");
112 
113  PTR(EventDequeuer) evReceiver(new EventDequeuer(hostName, queueName, selector, hostPort));
114  _dequeuers.push_back(evReceiver);
115 }
116 
117 void EventSystem::publishEvent(std::string const& destinationName, Event& event) {
118  PTR(Transmitter) transmitter;
119  if ((transmitter = getTransmitter(destinationName)) == 0) {
120  throw LSST_EXCEPT(pexExceptions::RuntimeError, "destination "+ destinationName + " is not registered with EventSystem");
121  }
122  transmitter->publishEvent(event);
123 }
124 
127 PTR(Transmitter) EventSystem::getTransmitter(std::string const& name) {
128  for (PTR(EventTransmitter) transmitter : _transmitters) {
129  if ((transmitter)->getTopicName() == name) {
130  return transmitter;
131  }
132  }
133 
134  for (PTR(EventEnqueuer) qTransmitter : _enqueuers) {
135  if ((qTransmitter)->getQueueName() == name) {
136  return qTransmitter;
137  }
138  }
139  return PTR(Transmitter)();
140 }
141 
142 
143 PTR(Event) EventSystem::receiveEvent(std::string const& destinationName) {
144  return receiveEvent(destinationName, EventReceiver::infiniteTimeout);
145 }
146 
147 PTR(Event) EventSystem::receiveEvent(std::string const& destinationName, const long timeout) {
148  PTR(Receiver) receiver;
149  if ((receiver = getReceiver(destinationName)) == 0) {
150  throw LSST_EXCEPT(pexExceptions::RuntimeError, "destination "+ destinationName +" is not registered with EventSystem");
151  }
152 
153  return receiver->receiveEvent(timeout);
154 }
155 
156 PTR(LocationId) EventSystem::createOriginatorId() const {
157  return PTR(LocationId)(new LocationId());
158 }
159 
162 PTR(Receiver) EventSystem::getReceiver(std::string const& name) {
163  for (PTR(EventReceiver) receiver : _receivers) {
164  if ((receiver)->getTopicName() == name)
165  return receiver;
166  }
167  for (PTR(EventDequeuer) qReceiver : _dequeuers) {
168  if ((qReceiver)->getQueueName() == name)
169  return qReceiver;
170  }
171  return PTR(Receiver)();
172 }
173 
174 
175 PTR(StatusEvent) EventSystem::castToStatusEvent(PTR(Event) event) {
176  return std::static_pointer_cast<lsst::ctrl::events::StatusEvent>(event);
177 }
178 
179 PTR(CommandEvent) EventSystem::castToCommandEvent(PTR(Event) event) {
180  return std::static_pointer_cast<lsst::ctrl::events::CommandEvent>(event);
181 }
182 
183 }}}
static EventSystem & getDefaultEventSystem()
return the default EventSystem object, which can access all previously created Transmitters and recei...
Definition: EventSystem.cc:52
void publishEvent(std::string const &destinationName, Event &event)
send an event to a destination
Definition: EventSystem.cc:117
Singleton use to make sure the events library is initialized.
Definition: EventLibrary.h:49
void createEnqueuer(std::string const &hostName, std::string const &queueName, int hostPort=EventBroker::DEFAULTHOSTPORT)
create a EventQueuer to send messages to the message broker
Definition: EventSystem.cc:72
Receive events from the event bus.
Definition: EventReceiver.h:58
table::Key< std::string > name
Definition: ApCorrMap.cc:71
static std::list< boost::shared_ptr< EventTransmitter > > _transmitters
Definition: EventSystem.h:192
static std::list< boost::shared_ptr< EventDequeuer > > _dequeuers
Definition: EventSystem.h:196
void createReceiver(std::string const &hostName, std::string const &topicName, int hostPort=EventBroker::DEFAULTHOSTPORT)
create an EventReceiver which will receive message
Definition: EventSystem.cc:80
Receive events from the event bus.
Definition: EventDequeuer.h:58
Representation of an LSST Event.
Definition: StatusEvent.h:61
Representation of an LSST CommandEvent.
Definition: CommandEvent.h:61
defines the EventLibrary class
Receive events from the event bus.
Definition: Receiver.h:61
static std::list< boost::shared_ptr< EventReceiver > > _receivers
Definition: EventSystem.h:193
static const long infiniteTimeout
Definition: Receiver.h:101
Transmit events to the event bus.
void createDequeuer(std::string const &hostName, std::string const &queueName, int hostPort=EventBroker::DEFAULTHOSTPORT)
create a EventDequeuer which will receive message
Definition: EventSystem.cc:90
Transmit events to the event bus.
Definition: Transmitter.h:59
static EventSystem * defaultEventSystem
Definition: EventSystem.h:183
#define LSST_EXCEPT(type,...)
Definition: Exception.h:46
static std::list< boost::shared_ptr< EventEnqueuer > > _enqueuers
Definition: EventSystem.h:195
Representation of an LSST Event.
Definition: Event.h:61
boost::shared_ptr< Transmitter > getTransmitter(std::string const &name)
Definition: EventSystem.cc:127
#define PTR(...)
Definition: base.h:41
Represent process that created an event.
Definition: LocationId.h:52
void createTransmitter(std::string const &hostName, std::string const &topicName, int hostPort=EventBroker::DEFAULTHOSTPORT)
create an EventTransmitter to send messages to the message broker
Definition: EventSystem.cc:64
static void initializeLibrary()
initialize the ActiveMQ library, but only do it once.
Definition: EventLibrary.cc:41
boost::shared_ptr< Receiver > getReceiver(std::string const &name)
Definition: EventSystem.cc:162
Transmit events to the event bus.
Definition: EventEnqueuer.h:57
This object allows creation of the system&#39;s event transmitters and receivers, which can be specified ...
Definition: EventSystem.h:65
defines the EventSystem class