This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Publishing and Receiving Messages from a Topic

This sample demonstrates how durable or non-durable topics can be created and used in WSO2 Message Broker using the RabbitMQ .NET/C# client. It first introduces a sample .NET client named TopicPublisher, that publishes messages to a known, created topic in Message Broker. Then it introduces a sample .NET client named TopicConsumer that listens for messages and prints message contents to the console.

Prerequisites

To run this sample:

Building the sample

  1. Create a TopicConsumer .NET client to receive messages from the test-topic topic by adding a class with the following code.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using RabbitMQ.Client;
     
    namespace MB_TopicClient
    {
        class TopicConsumer
        {
            static void Main(string[] args)
            {
                TopicConsumer topicConsumer = new TopicConsumer();
                topicConsumer.GetMessages();
            }
     
            public void GetMessages()
            {
                //Setup the connection with the message broker
                ConnectionFactory factory = new ConnectionFactory();
                IProtocol protocol = Protocols.AMQP_0_9_1;
                factory.VirtualHost = "/carbon";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.Protocol = protocol;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel ch = conn.CreateModel())
                    {
                        // Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB
                        ch.ExchangeDeclare("amq.topic", "topic");
                        // Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true'  
                        ch.QueueDeclare("test-topic", false, false, false, null);
                        // Bind the Topic in to the exchange
                        ch.QueueBind("test-topic", "amq.topic", "test-topic");
                        // Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work.
                        // The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>)
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
                        ch.BasicConsume("test-topic", false, "1", false, true, null, consumer);
                        while (true)
                        {
                            try
                            {
                                RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                                byte[] body = e.Body;
                                string message = Encoding.UTF8.GetString(body);
                                Console.WriteLine("Received Message : " + message);
                                ch.BasicAck(e.DeliveryTag, false);
                            }
                            catch (OperationCanceledException e)
                            {
                                Console.WriteLine(e);
                                break;
                            }
                        }
                    }
                }
            }
        }
    }

    At least one TopicConsumer binding should exist before sending messages to the topic. Therefore, this TopicConsumer class should be run before the TopicPublisher class. Alternatively, you can manually create the test-topic topic in the MB Management Console. See Adding Topics for detailed instructions.

  2. Create a TopicPublisher .NET client to send messages to the test-topic topic by adding a class with the following code.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using RabbitMQ.Client;
     
    namespace MB_Topic_Publisher
    {
        class TopicPublisher
        {
            static void Main(string[] args)
            {
                TopicPublisher topicPublisher = new TopicPublisher();
                topicPublisher.PublishMessage("Test Message");
                Console.WriteLine("Message Sent..");
                Console.ReadLine();
            }
     
            public void PublishMessage(string message)
            {
                //Setup the connection with the message broker
                ConnectionFactory factory = new ConnectionFactory();
                IProtocol protocol = Protocols.AMQP_0_9_1;
                factory.VirtualHost = "/carbon";
                factory.UserName = "admin";
                factory.Password = "admin";
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.Protocol = protocol;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel ch = conn.CreateModel())
                    {
                        // Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB
                        ch.ExchangeDeclare("amq.topic", "topic");
                        IBasicProperties basicProperties = ch.CreateBasicProperties();
                        //Setting JMS Message ID.
                        basicProperties.MessageId = "ID:" + System.Guid.NewGuid().ToString();
                        //Setting content-type for message as we are sending a text message.
                        basicProperties.ContentType = "text/plain";
                        // Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'. 
                        // The syntax is ch.BasicPublish(<exchange_name>, <topic_name>, <message_properties>,<message_body>)
                        ch.BasicPublish("amq.topic", "test-topic", basicProperties, Encoding.UTF8.GetBytes(message));
                    }
                }
            }
        }
    }
  3. Add a Main.java class defining the method to call both the classes mentioned above.

Executing the sample

Run this sample from your C# project.