Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagecsharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace MB_TopicClient

{

    class TopicConsumer
    {
        static void Main(string[] args)
       {
            TopicConsumer topicConsumer = new TopicConsumer();
            topicConsumer.getMessages();
        }


        public void getMessages()
        {

    //Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();  
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;

            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())

                { 
// Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB
                    ch.ExchangeDeclare("amq.topic", "topic");
            
// Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true'  
                    ch.QueueDeclare("test-topic", false, false, false, null);
 
// Bind the Topic in to the exchange
                    ch.QueueBind("test-topic", "amq.topic", "test-topic");
                    

// Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work.
// The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>)
                      
					
					QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);

                    ch.BasicConsume("test-topic", false, "1", false, true, null, consumer);
                    while (true)
                    {
                    	try
                    	{
                    		RabbitMQ.Client.Events.BasicDeliverEventArgs e =(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                  			byte[] body = e.Body;
                  			string message = Encoding.UTF8.GetString(body);
                  			Console.WriteLine(message);
                  			ch.BasicAck(e.DeliveryTag, false);

                 		}
                 		catch (OperationCanceledException e)
                 		{
                 			Console.WriteLine(e);
                  			break;
                 		} 

                    } 
                }
            }
        }
    }
} 

...