Objectives
This sample demonstrates how persistent queues can be created and used in Message Broker using the RabbitMQ .Net/C# client. It first introduces a sample .Net client by the name "QueuePublisher" which is used to publish messages to a known, created queue in WSO2 Message Broker, and then introduces a sample .Net client by the name "QueueConsumer" to receive messages and print message content in console.
Prerequisites
In order to run this code sample, you need to download and add RabbitMQ.Client.dll file as a reference in your .net project. You can download that dll file from this website http://www.rabbitmq.com/dotnet.html or here.
Running the Sample
Prior to running following "QueuePublisher" class we need to register at least one " QueueConsumer " binding prior sending messages to the queue This can be done by either,
- Logging into WSO2 Message Broker management console and create a queue named 'test-queue' ("Queues -> Add" menu in the "Main" menu). A quick guide on creating queues can be found in section Adding Queues.
- Run " QueueConsumer" class depicted below. It will register a binding to that queue. When you have run the QueueConsumer code you will see queue created by the QueueConsumer class is visible in the management console ("Queues -> Browse").
Now using the following "QueuePublisher" .Net client, messages can be sent to 'test-queue' created earlier.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace RabbitMQ { class QueuePublisher { static void Main(string[] args) { QueuePublisher publisher=new QueuePublisher(); publisher.PublishMessage("This is a Test " +i); Console.WriteLine("Sent Message "+i); Console.ReadLine(); } public void PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_8_QPID; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { //Declare the exchange for the publisher.Here the exchange type is direct. ch.ExchangeDeclare("amq.direct", "direct"); //Publish the message ch.BasicPublish("amq.direct", "test-queue", null, Encoding.UTF8.GetBytes(message)); } } } }
Next, execute the following "QueueConsumer" .Net client, using which messages can be received from 'test-queue'.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace QueueConsumer { class QueueConsumer { static void Main(string[] args) { QueueConsumer qConsumer = new QueueConsumer(); qConsumer.getMessage(); } public void getMessage() { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_8_QPID; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { //Declare a queue to retrieve messages. ch.QueueDeclare("test-queue", true, false, false, null); //Create the binding between queue and the exchance ch.QueueBind("test-queue", "amq.direct", "test-queue"); QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicConsume("test-queue",false,consumer); while (true) { try { RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] body = e.Body; string message = Encoding.UTF8.GetString(body); Console.WriteLine(message); ch.BasicAck(e.DeliveryTag, false); } catch (OperationCanceledException e) { Console.WriteLine(e); break; } } } } } } }