...
Now using the following "TopicPublisher" .Net client, messages can be sent to 'test-topic' created earlier.
Code Block | ||
---|---|---|
| ||
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using RabbitMQ.Client; namespace MB_Topic_Publisher { class TopicPublisher { static void Main(string[] args) { TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.PublishMessage("Test Message"); Console.WriteLine("Message Sent.."); Console.ReadLine(); } public void PublishMessage(string message) { //Setup the connection with the message broker ConnectionFactory factory = new ConnectionFactory(); IProtocol protocol = Protocols.AMQP_0_8_QPID; factory.VirtualHost = "/carbon"; factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "localhost"; factory.Port = 5672; factory.Protocol = protocol; using (IConnection conn = factory.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB ch.ExchangeDeclare("amq.topic", "topic"); // Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'. // The syntax is ch.BasicPublish(<exchange_name>, <topic_name>, <message_properties>,<message_body>) ch.BasicPublish("amq.topic", "myTopictest-topic", null, Encoding.UTF8.GetBytes(message)); } } } } } |
Next, execute the following "TopicConsumer" .Net client, using which messages can be received from 'test-topic'.
Code Block | ||
---|---|---|
| ||
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
namespace MB_TopicClient
{
class TopicConsumer
{
static void Main(string[] args)
{
TopicConsumer topicConsumer = new TopicConsumer();
topicConsumer.getMessages();
}
public void getMessages()
{
//Setup the connection with the message broker
ConnectionFactory factory = new ConnectionFactory();
IProtocol protocol = Protocols.AMQP_0_8_QPID;
factory.VirtualHost = "/carbon";
factory.UserName = "admin";
factory.Password = "admin";
factory.HostName = "localhost";
factory.Port = 5672;
factory.Protocol = protocol;
using (IConnection conn = factory.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
// Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB
ch.ExchangeDeclare("amq.topic", "topic");
// Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true'
ch.QueueDeclare("test-topic", false, false, false, null);
// Bind the Topic in to the exchange
ch.QueueBind("test-topic", "amq.topic", "test-topic");
// Declare a consumer which listens on the messages published to 'test-topic' topic
QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
ch.BasicConsume("test-topic", false, consumer);
while (true)
{
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e =(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] body = e.Body;
string message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
ch.BasicAck(e.DeliveryTag, false);
}
catch (OperationCanceledException e)
{
Console.WriteLine(e);
break;
}
}
}
}
}
}
} |