Table of Contents | ||||
---|---|---|---|---|
|
Introduction
This sample demonstrates how durable or non-durable topics can be created and used in WSO2 Message Broker using the RabbitMQ .NET/C# client. It first introduces a sample .NET client named named TopicPublisher
, that publishes messages to a known, created topic in Message Broker. Then it introduces a sample .NET client named named TopicConsumer
that that listens for messages and prints message contents to the console.
Table of Contents | ||||
---|---|---|---|---|
|
Prerequisites
To run this sample:
...
Create a
TopicConsumer .NE
T client to receive messages from thetest-topic
topic by adding a class with the following code.Code Block language java /*using *System; Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_TopicClient { class TopicConsumer {using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_TopicClient { class TopicConsumer { static void Main(string[] args) { TopicConsumer topicConsumer = new TopicConsumer(); topicConsumer.GetMessages(); } public void GetMessages() { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol static void Main(string[] args)= Protocols.AMQP_0_9_1; { factory.VirtualHost = "/carbon"; TopicConsumer topicConsumer = new TopicConsumer() factory.UserName = "admin"; topicConsumer.GetMessages(); factory.Password = "admin"; } factory.HostName = "localhost"; public void GetMessages() factory.Port {= 5672; //Setup the connection with the message broker factory.Protocol = protocol; using (IConnection conn ConnectionFactory= factory.CreateConnection()) = new ConnectionFactory(); { IProtocol protocol = Protocols.AMQP_0_9_1; using (IModel factory.VirtualHostch = "/carbon"; conn.CreateModel()) factory.UserName = "admin";{ factory.Password = "admin"; // Declare a topic exchange to be factory.HostNamebound = "localhost"; factory.Port = 5672;to retrieve messages, here we have used the default topic exchange of WSO2 MB factory.Protocol = protocol; ch.ExchangeDeclare("amq.topic", "topic"); using (IConnection conn = factory.CreateConnection()) // Declare a topic name, {here we use a non-durable topic. To make it durable use the 2nd parameter as 'true' using (IModel ch = conn.CreateModel()) ch.QueueDeclare("test-topic", false, false, false, {null); // DeclareBind athe topicTopic exchangein to bethe boundexchange to retrieve messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclareQueueBind("test-topic", "amq.topic", "test-topic"); // Declare a topicconsumer name,which herelistens weon usethe amessages non-durable topic. To make it durable use the 2nd parameter as 'true' published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work. ch.QueueDeclare("test-topic", false, false, false, null); // The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>) // Bind the TopicQueueingBasicConsumer inconsumer to= the exchangenew QueueingBasicConsumer(ch); ch.QueueBindBasicConsume("test-topic", "amq.topic"false, "test-topic"); 1", false, true, null, consumer); // Declare a consumer whichwhile listens(true) on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work. { // The syntax is BasicConsume(<queuename>, <noAck>,<consumerTag>, <noLocal>, <exclusive>, <arguments>, <Consumer>) try { QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicConsume("test-topic", false, "1", false, true, null, consumer RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue(); while (true) byte[] body = e.Body; { string message = Encoding.UTF8.GetString(body); try { Console.WriteLine("Received Message : " + message); RabbitMQ.Client.Events.BasicDeliverEventArgs e = (RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue(); ch.BasicAck(e.DeliveryTag, false); } byte[] body = e.Body; catch (OperationCanceledException e) string message = Encoding.UTF8.GetString(body); { Console.WriteLine("Received Message : " + message); Console.WriteLine(e); ch.BasicAck(e.DeliveryTag, false); break; } } catch (OperationCanceledException} e) } { } } Console.WriteLine(e); break; } } } } } } }
Info At least one TopicConsumer binding should exist before sending messages to the topic. Therefore, this TopicConsumer class should be run before the TopicPublisher class. Alternatively, you can manually create the test-topic topic in the MB Management Console. See Adding Topics for detailed instructions.
Create a
TopicPublisher .NET
client to send messages to thetest-topic
topic by adding a class with the following code.Code Block language java /* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ } }
Info At least one TopicConsumer binding should exist before sending messages to the topic. Therefore, this TopicConsumer class should be run before the TopicPublisher class. Alternatively, you can manually create the test-topic topic in the MB Management Console. See Adding Topics for detailed instructions.
Create a
TopicPublisher .NET
client to send messages to thetest-topic
topic by adding a class with the following code.Code Block language java using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_Topic_Publisher { class TopicPublisher { static void Main(string[] args) { TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.PublishMessage("Test Message"); Console.WriteLine("Message Sent.."); Console.ReadLine(); } public void PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_9_1; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); IBasicProperties basicProperties = ch.CreateBasicProperties(); //Setting JMS Message ID. basicProperties.MessageId = "ID:" + System.Guid.NewGuid().ToString(); //Setting content-type for message as we are sending a text message. basicProperties.ContentType = "text/plain"; // Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'. // The syntax is ch.BasicPublish(<exchange_name>, <topic_name>, <message_properties>,<message_body>) ch.BasicPublish("amq.topic", "test-topic", basicProperties, Encoding.UTF8.GetBytes(message)); } } } } }
- Add a Main.java class defining the method to call both the classes mentioned above.
...