Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace MB_TopicClient


    class TopicConsumer
        static void Main(string[] args)
            TopicConsumer topicConsumer = new TopicConsumer();

        public void getMessages()

    //Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();  
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;

            using (IConnection conn = factory.CreateConnection())
                using (IModel ch = conn.CreateModel())

// Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB
                    ch.ExchangeDeclare("amq.topic", "topic");
// Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true'  
                    ch.QueueDeclare("test-topic", false, false, false, null);
// Bind the Topic in to the exchange
                    ch.QueueBind("test-topic", "amq.topic", "test-topic");

// Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work.
// The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>)
					QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);

                    ch.BasicConsume("test-topic", false, "1", false, true, null, consumer);
                    while (true)
                    		RabbitMQ.Client.Events.BasicDeliverEventArgs e =(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                  			byte[] body = e.Body;
                  			string message = Encoding.UTF8.GetString(body);
                  			ch.BasicAck(e.DeliveryTag, false);

                 		catch (OperationCanceledException e)

