This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.
Publishing and Receiving Messages from a Topic
This sample demonstrates how durable or non-durable topics can be created and used in WSO2 Message Broker using the RabbitMQ .NET/C# client. It first introduces a sample .NET client named TopicPublisher
, that publishes messages to a known, created topic in Message Broker. Then it introduces a sample .NET client named TopicConsumer
that listens for messages and prints message contents to the console.
Prerequisites
To run this sample:
- Download and add the
RabbitMQ.Client.dll
file as a reference in your .NET project. You can download this file from http://www.rabbitmq.com/dotnet.html or the WSO2 repository. - See Prerequisites to Run the MB Samples for a list of other prerequisites.
Building the sample
Create a
TopicConsumer .NE
T client to receive messages from thetest-topic
topic by adding a class with the following code.using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_TopicClient { class TopicConsumer { static void Main(string[] args) { TopicConsumer topicConsumer = new TopicConsumer(); topicConsumer.GetMessages(); } public void GetMessages() { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_9_1; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); // Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true' ch.QueueDeclare("test-topic", false, false, false, null); // Bind the Topic in to the exchange ch.QueueBind("test-topic", "amq.topic", "test-topic"); // Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work. // The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>) QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicConsume("test-topic", false, "1", false, true, null, consumer); while (true) { try { RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] body = e.Body; string message = Encoding.UTF8.GetString(body); Console.WriteLine("Received Message : " + message); ch.BasicAck(e.DeliveryTag, false); } catch (OperationCanceledException e) { Console.WriteLine(e); break; } } } } } } }
At least one TopicConsumer binding should exist before sending messages to the topic. Therefore, this TopicConsumer class should be run before the TopicPublisher class. Alternatively, you can manually create the test-topic topic in the MB Management Console. See Adding Topics for detailed instructions.
Create a
TopicPublisher .NET
client to send messages to thetest-topic
topic by adding a class with the following code.using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_Topic_Publisher { class TopicPublisher { static void Main(string[] args) { TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.PublishMessage("Test Message"); Console.WriteLine("Message Sent.."); Console.ReadLine(); } public void PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_9_1; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); IBasicProperties basicProperties = ch.CreateBasicProperties(); //Setting JMS Message ID. basicProperties.MessageId = "ID:" + System.Guid.NewGuid().ToString(); //Setting content-type for message as we are sending a text message. basicProperties.ContentType = "text/plain"; // Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'. // The syntax is ch.BasicPublish(<exchange_name>, <topic_name>, <message_properties>,<message_body>) ch.BasicPublish("amq.topic", "test-topic", basicProperties, Encoding.UTF8.GetBytes(message)); } } } } }
- Add a Main.java class defining the method to call both the classes mentioned above.
Executing the sample
Run this sample from your C# project.