Objectives
This sample demonstrates how durable or non-durable topics can be created and used in WSO2 Message Broker using the RabbitMQ .NET/C# client. It first introduces a sample .NET named client named TopicPublisher
, which is used to publish that publishes messages to a known, created topic in Message Broker, and then . Then it introduces a sample .NET client named named TopicConsumer
to listen that listens for messages and print prints message contents to the console.
Table of Contents | ||||
---|---|---|---|---|
|
Prerequisites
To run this code sample, you must download sample:
- Download and add
...
- the
RabbitMQ.Client.dll
...
- file as a reference in your .NET project. You can download this file from http://www.rabbitmq.com/dotnet.html
...
Running the sample
Before running the TopicPublisher class, you must register at least one TopicConsumer binding before sending messages to the topic by doing one of the following:
- Log into WSO2 Message Broker management console and create a topic named 'test-topic' (from the Main menu, choose Topics -> Add).
OR - Run the TopicConsumer class depicted below, which will register a binding to that topic. When you have run the TopicConsumer class, you will see the topic subscription it created is visible in the management console when you choose Topics -> Browse.
...
- See Prerequisites to Run the MB Samples for a list of other prerequisites.
Building the sample
...
Create a
TopicConsumer .NE
T client to receive messages from thetest-topic
topic by adding a class with the following code.Code Block language
...
java using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_
...
TopicClient
...
{
...
class
...
TopicConsumer { static void Main(string[] args)
...
{
...
TopicConsumer
...
topicConsumer = new
...
TopicConsumer();
...
topicConsumer.
...
GetMessages(
...
); }
...
...
public
...
void GetMessages()
...
...
{
...
//Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_
...
9_
...
1; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol;
...
using (IConnection conn = factory.CreateConnection())
...
{ using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to
...
be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic");
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
//
...
Declare
...
a
...
topic name
...
, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true' ch.
...
QueueDeclare("
...
test-topic", false,
...
false,
...
false,
...
null);
...
// Bind the Topic in to the exchange
...
...
...
Next, execute the following TopicConsumer .NET client, which receives messages from 'test-topic'.
...
language | csharp |
---|
...
ch.QueueBind("test-topic", "amq.topic", "test-topic"); // Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work.
...
...
...
// The syntax is
...
BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>)
...
...
QueueingBasicConsumer
...
consumer = new
...
QueueingBasicConsumer(ch);
...
ch.BasicConsume("test-topic", false, "1", false, true, null, consumer);
...
...
...
while
...
(true)
...
{ try {
...
RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
...
byte[] body =
...
e.Body;
...
string message = Encoding.UTF8.GetString(body);
...
Console.WriteLine("Received Message : " + message);
...
...
...
...
ch.BasicAck(e.DeliveryTag, false);
...
...
...
}
...
...
...
...
...
...
...
catch
...
(
...
OperationCanceledException e) {
...
Console.WriteLine(e);
...
...
...
...
...
...
...
...
...
...
...
break;
...
...
...
...
...
...
...
...
...
...
...
...
}
...
...
}
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
}
...
...
...
...
...
} }
...
} }
Info At least one TopicConsumer binding should exist before sending messages to the topic. Therefore, this TopicConsumer class should be run before the TopicPublisher class. Alternatively, you can manually create the test-topic topic in the MB Management Console. See Adding Topics for detailed instructions.
Create a
TopicPublisher .NET
client to send messages to thetest-topic
topic by adding a class with the following code.Code Block language java using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_Topic_Publisher { class TopicPublisher { static void Main(string[] args) { TopicPublisher topicPublisher = new TopicPublisher();
...
topicPublisher.PublishMessage("Test Message"); Console.WriteLine("Message Sent.."); Console.ReadLine(); }
...
...
public
...
void
...
PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_9_1;
...
...
factory.VirtualHost = "/carbon"; factory.UserName = "admin";
...
factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol;
...
using (IConnection conn = factory.CreateConnection()) {
...
using (IModel ch = conn.CreateModel())
...
{
...
// Declare a topic exchange to publish messages, here we have used the default
...
topic exchange of WSO2 MB
...
...
...
ch.ExchangeDeclare("amq.topic", "topic");
...
IBasicProperties basicProperties =
...
ch.
...
CreateBasicProperties(
...
);
...
//Setting JMS Message ID. basicProperties.MessageId
...
= "ID:" + System.Guid.NewGuid().ToString(); //Setting content-type for message as we are sending
...
a text message.
...
...
...
basicProperties.ContentType = "text/plain";
...
// Publish the message to the exchange, it will
...
send it to the routing key which is our name 'myTopic'.
...
// The syntax is ch.BasicPublish(<exchange_name>,
...
<topic_name>, <message_properties>,<message_body>) ch.BasicPublish("amq.topic",
...
"test-topic", basicProperties, Encoding.UTF8.GetBytes(message)); } } } } }
...
...
- Add a Main.java class defining the method to call both the classes mentioned above.
Executing the sample
Run this sample from your C# project.