LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
lsst::ctrl::events ctrl_events Package

Messaging Overview

The ctrl_events package is used to send messages between networked entities. Messages are send to a message broker on either a “queue” or a “topic”. The broker passes messages to consumers.

Generally speaking the difference between a queue and topic is that a queue is considered a “one-to-one” relationship between a message producer and a message consumer, in the sense that one producer provides a message which can be retrieved by one, and only one, consumer. When a message is sent to a queue, that message is retained by the broker until the message is retrieved by a consumer or until the message queue is destroyed. Once a message is retrieved by a consumer, that message can not be retrieved by any other consumer. Note that one consumer can be waiting on the same queue for a single message. In this case it is indeterminate which consumer will receive the message.

A producer and a consumer relationship on a topic is consider to be “one-to-many”. One producer can produce a message that can be retrieved by one or more consumers. Each consumer will get a copy of the message. The caveat here is that a consumer must be connected to the message broker before the producer sends the message, or it will be lost. If there are no consumers attached to the broker at the time the message is sent, the message is dropped.

There can be more than one producer sending messages on a queue or topic, but the rules for retrieving those messages are as stated above.

Events

The ctrl_events package is a C++ and Python layer on top of the ActiveMQCPP library. The ctrl_events package abstracts the use of the ActiveMQCPP library, providing a simpler interface and a way to send an LSST primitive, PropertySet, easily.

The basic message object type in the package is an Event, which packages a user-defined PropertySet object.

ps.set(“one”, 1)
ps.set(“two”, “two”)
ps.set(“three”, 3.4)
ps.set(“four”, (5, 6, 7, 8))
ev = Event(ps)

In addition to the user-defined PropertySet, there are additional elements which are added to the Event, which include the Event type, its creation time, and its publication time.

Sending Events

There are two ways to send events to the message broker, either via an EventTransmitter or EventEnqueuer.

The EventTransmitter is used to send events to topics. The following code creates an EventTransmitter which is bound to a message broker located on machine “myhost.mydomain.com” for topic “mytopic”. The publishEvent method takes an Event object as an argument, and sends a message containing the data from the Event to the message broker.

transmitter = EventTransmitter(“myhost.mydomain.com”, “mytopic”)
transmitter.publishEvent(ev)

The EventEnqueuer is used to send events to queues. The following code creates an EventEnqueuer which is bound to a message broker located on machine “myotherhost.mydomain.com” for queue “myqueue”. The publishEvent method takes an Event object as an argument, and sends a message containing the data from the Event to the message broker.

enqueuer = EventEnqueuer(“myotherhost.mydomain.com”, “myqueue”)
enqueuer.publishEvent(ev)

Receiving Events

The objects EventReceiver and EventDequeuer are used to obtain Events which were sent to the broker.

To receive events that were sent to a topic, create an EventReceiver. From the time the EventReceiver is created, it can retrieve all Events sent to the topic it is bound to until the EventReceiver is destroyed. Events that were sent on a topic before an EventReceiver was created can not be retrieved by that EventReceiver. If the EventReceiver is destroyed and recreated by the same process, any messages sent to the topic during that time can not be retrieved.

recv = EventReceiver(“myhost.mydomain.com”, “mytopic”)
myEvent = recv.receiveEvent()

Note that all EventReceivers bound to a particular topic will be able to retrieve messages from that topic, and each at their own pace. In other words, one EventReceiver may read all the messages available to it within a minute, while another EventReceiver bound to the same topic may read one message every hour, but each receives all the messages sent to the topic since their creation time. Two different EventReceivers do not interact with each other, and do not affect the messages the other receives.

To receive events that were sent to a queue, create an EventDequeuer, and call its receiveEvent method. An EventDequeuer can retrieve any messages available on the queue it is bound to, until those messages are consumed. Events can be sent to a queue before an EventDequeuer is create, and remain available for consumption until the message broker destroys the queue. (Note that it is possible to configure the message broker to automatically destroy queues and their messages due to inactivity). Once a message is retrieved by an EventDequeuer bound to a queue on that broker, that message is removed from the queue by the broker, and no other EventDequeuer can retrieve that message. If two different EventDequeuers are actively attempting to consume messages on a queue, there are no guarantees about which EventDequeuer will retrieve a particular message. The messages are consumed at the rate that they are requested, and will not be sent to other EventDequeuers once they are sent.

Filtering

As mentioned previously, Events contain some information which is common to all Events that are sent. This specific information is kept in the message “header”, so they can be filtered (see below). The generic information, meaning the information which is not part of the header, is kept in the message payload; this is mainly because the message header can hold only primitive data types.

There are several Event Types that are available in the ctrl_events package. These event types contain additional information which is kept in the header.

Event

StatusEvent

CommandEvent

LogEvent

One of the reasons that some of the information is kept in the header and some is kept in the payload is so that messages can be filtered. EventReceivers and EventDequeuers can request this filtering be done on the messages broker, so they can receive only the messages they are interested in. Filtering is done by specifying a selector string in SQL 92 syntax when creating an EventReceiver or EventDequeuer.

For example, this constructor create an EventReceiver specifies a selector with constant DEST_HOSTNAME equal to ‘myhost.com’. Any messages which are sent to “thistopic” and contain the DEST_HOSTNAME string equal to ‘myhost.com’ will be received by this EventReceiver.

recv = EventReceiver(“mybroker.com”, “thistopic”, "%s = '%s'" % (events.CommandEvent.DEST_HOSTNAME, “myhost.com”))

The code is similar for queues:

recv = EventDequeuer(“mybroker.com”, “thisqueue”, "%s = '%s'" % (events.CommandEvent.DEST_HOSTNAME, “myhost.com”))

There may be a situation where you want to create a selector to filter a value which isn’t part of any existing Event type. Instead of having to create a new Event type, you can specify a set of filterable values like this:

filterable = PropertySet()
filterable.set(“system”, “centos”)
filterable.location(“version”, “7.0”)
ps.set(“values”, (1, 2, 3, 4, 5))
ev = Event(ps, filterable)

In this example, both the values of “system” and “version” would be able to be filtered by a selector string. Please note that only primitive data types can be filtered; complex data type, such as lists, can not.

EventSystem

All producer and consumer types can be created with the EventSystem object, and then addressed directly through that object to send and receive Events.

For example, you can create an EventTransmitter via EventSystem in an initialization routine like this:

es = EventSystem.getDefaultEventSystem()
es.createTransmitter(“mybroker.mydomain.com”, “mytopic”)

and then later address it like this:

es = EventSystem.getDefaultEventSystem()
es.publishEvent(“mytopic”, event)

Similar methods are available from EventSystem for all producer and consumer types.