Tango Message channels/queues
A Proposal for Tango device message channels and queues
Introduction
Each Tango device actually supports:
- A state and a status
- A set of commands
- A set of attributes
Tango device message channel
A Tango device message channel will have:
- A name
- A label: The default will be the queue name
- A description: The default will be "No description"
MessageChannelInfoList *DeviceProxy::message_channel_list_query();
Pogo has to be modified in order to allow user to define the channel(s) their device will support.
Reading / writing into the message channel
Writing into the message channel
It is the device server which writes into the channel. Before writing into the channel, a channel identifier has to be retrieved from the channel name. The call to write data into the channel will look likes:long ChannelId = DeviceImpl::get_channel_id_by_name(const string &channel_name);
template<typename... Args>
DeviceImpl::channel_send(long ChannelId,const Args&... args);
In C++, this call will be a variadic template call from the C++0x standard. It's available in gcc V4.3 and above. For compilers which do not implement this yet, we could used the classical "varargs" method. Similar thing could be done for Python (with the *args method parameter). Check has to be done if it is supported by Boost. Java 1.5 also supports method with variable arguments number. The data which will be sent to the channel has to have a scope which is longer than the method in which they are used (device class data members, static data....). Here is an example of how data could be send to a channel:
In the device .h file
Tango::DevLong the_long;
Tango::DevString the_string;
Tango::DevDouble the double;
Tango::DevShort *the_short_array;
In the device .cpp file
....
the_long = ...;
the_string = ...;
the_double = ...;
the_short_array = new Tango::DevShort[10];
....
long ChannelId = get_queue_id_by_name("TheExampleChannel");
channel_send(ChannelId,the_long,the_string,the_short_array,the_string,the_double);
....
Reading from the message channel
To read from a message channel, a client use a message queue. He has to:
- Create a message queue using the message channel name on which the queue has to be connected
- Read data from the queue
- Close the connection to the channel
The following calls could be implemented:
Tango::MessageQueue DeviceProxy::create_message_queue(const string &channel_name);
Tango::MessageQueue::receive_raw(vector<DeviceAttribute> &arg);
Tango::MessageQueue::close();
To read the data sent to the channel in the previous example, the client code could be:
....
Tango::DevLong received_long;
Tango::DevString received_string;
Tango::devDouble received_double;
Tango::DevVarShortArray *received_short_array;
Tango::DeviceProxy dev("my/device/name");
Tango::MessageQueue mq = dev.create_message_queue("TheExampleChannel");
vector<Tango::DeviceAttribute> vda;
mq.receive_raw(vda);
for (unsigned long loop;loop < vda.size();loop++)
{
switch (vda[loop].get_type())
{
case DEV_LONG:
vda[loop] >> received_long;
do_what_you_want_with_data(received_long);
break;
case DEV_STRING:
vda[loop] >> received_string;
do_what_you_want_with_data(received_string);
break;
case DEV_VAR_SHORTARRAY:
vda[loop] >> received_short_array;
do_what_you_want_with_data(received_short_array);
break;
case DEV_DOUBLE:
vda[loop] >> received_double;
do_what_you_want_with_data(received_double);
break;
}
}
...
mq.close();
The MessageQueue destructor will also close the connection to the message channel in case it is not already closed.
The MessageQueue::receive() method is a blocking call. A similar method with a timeout will be provided. This timeout could be infinite (0). A non-blocking call and a call to get the number of messages in the queue will be provided.
A typed MessageQueue::receive() method could also be implemented. This method creates a strong dependency between the server and the client code (May be too strong dependency). Using this method, the number and types of the data used in the MessageQueue::receive() method has to be the same than those used in the server side when the message has been sent to the channel. No automatic type translation will be provided. This method looks like
Tango::MessageQueue::receive(const Args&... args)
Its usage is something like:
Tango::DevLong received_long;
Tango::DevString received_string,another_received_string;
Tango::devDouble received_double;
Tango::DevVarShortArray *received_short_array;
Tango::DeviceProxy dev("my/device/name");
Tango::MessageQueue mq = dev.create_message_queue("TheExampleChannel");
mq.receive(received_long,received_string,received_short_array,another_received_string,received_double);
....
mq.close();
This call is also a blocking call. It will block until all the data defined in the call has been received in a message boundary. As there is no type conversion, if the received data is not coherent with the data type used in the MesssageQueue::receive() call, an exception will be thrown. This is very similar to a C++ stream usage.
The queue is local to the client and will have:
- A default size
- A high water mark
- A low water mark
Another MessageQueue::create_message_queue_connection() method will be provided allowing the user to define the local queue size, high water mark and low water mark.
Implementation
The basis
In the same way that a Tango device actually has a command_factory() and an attribute_factory() methods, a new method named message_channel_factory() will be added to the the Tango::DeviceClass.Each message channel will be internally represented by an attribute of a new type TANGO::DEV_CHANNEL. These attributes will not be part of the classical Tango device attribute list. They will not be returned by the classical attribute_list_query() method. It will also not be possible to use them in a read_attribute() or write_attribute() classical call.
Nevertheless, by implementing them as attributes, it will be possible to use the Tango Event system (based only on attribute) to send the messages.
The Tango::Attribute class which is used in a Tango class to represent an attribute is foreseen to store only one
data of any Tango type supported for attribute. A new class has to be written inheriting from the already existing Tango::Attribute class but allowing the storage of any number of data from the attribute data type. This could be done using a vector of the union which is actually used to store single value.
The DeviceProxy::message_channel_list_query() call
The call to get all the channels supported by a device could be implemented using the already existing IDL operation Device_3::get_attribute_config_3() with a special string for the list of channels("__AllMessageChannels__").When a new Device_5 IDL interface will be defined, it will be possible to add a specific CORBA operation but it is not immediately required.
The queue filling management
Two commands will be added to the Tango admin device. These commands are:- StopSendingMessage(DevVarStringArray)
- StartSendingMessage(DevVarStringArray)
In a client, when the queue high water mark is reached, a StopSendingMessage will be sent to the server admin device. When the number of messages in the queue will be less or equal to the low water mark, the StartSendingMessage command will be sent to the server admin device.
What happens on the server side when a StopSendingMessage command has been received for a device queue?
Two possibilities:
- Throw exception when the user tries to send more data to the channel
- Manage an internal buffer which will be emptied by a specific thread when the StartSendingMessage command will be received. This is a more complicated solution:
- What will be the size of the internal buffer to store the un-sent messages? We could define a fixed size buffer and what will happen if this buffer becomes full?
- What will be the speed of the thread which will send the messages after the StartSendingMessage will arrive?
The communication behind the channel
At it has already been written, the communication behind the channel will be done using Tango events. A new type of event could be defined: QUEUE_EVENT. A new structure could be defined to transport the message data. It will be very similar to the one we already have for attribute but without several fields like the AttributeDim, the attribute quality, the error list,... This has to be done in the Tango IDL file but will not generate incompatibility because we only need the new methods to insert/extract these new structure types in/from the CORBA Any object used by the notification service. There will be one event per channel_send() method whatever the data number to be transferred will be.On the server side, the DeviceImpl::channel_send() method will be coded using a push_event() method.
On the client side, the DeviceProxy::create_message_queue_connection() will be implemented using a subscribe_event() call. The MessageQueue::close() method will use the unsubscribe_event() method.
The queue itself will be implemented using the already existing event queue. A TangoMonitor has to be added to the event queue to implement the MessageQueue::receive() blocking behavior.
Nevertheless, as Tango channels are not classical attributes, the synchronous calls done during event subscription and re-connection should not be done for channels.
The exception(s) received by the event callback in case of server crash will be passed to the caller during the MessageQueue::receive() call.

