Excerpt | ||
---|---|---|
| ||
Clustering example explaining the behavior of the WSO2 Message Broker. |
This example is based on the External Cassandra and ZooKeeper Servers clustering scenario. The steps below walk you through an example implementation of the deployment pattern.
1. In your local machine, set up two WSO2 MB instances running with external Cassandra server and Zookeeper server (you can setup Cassandra and Zookeeper profiles to run on the same machine). To do this, define a port offset in $CARBON_Home
Table of Contents | ||||||
---|---|---|---|---|---|---|
|
Introduction
This sample demonstrates how to receive messages published to a queue in a particular MB instance can be received by a consumer of the same queue in another MB instance. In order to demonstrate this, two MB instances need to be run as follows.
Reference | Listener Port |
---|---|
MB1 | 5672 |
MB2 | 5673 |
Info |
---|
In order to run two MB instances, you should do a Port Offset Configuration in |
...
. Alternatively, use the following command to start one of the |
...
MB instances. |
The port offset avoids possible port conflicts when multiple WSO2 Carbon-based products are run on the same host. For example,
Code Block | ||
---|---|---|
| ||
<!-- Ports offset. This entry will set the value of the ports defined below
to the define value + Offset. e.g. Offset=2 and HTTPS port=9443 will set
the effective HTTPS port to 9445 -->
<Offset>1</Offset>
|
2. Let's call the first broker instance MB1 (runs on port 5672) and the other MB2 (runs on port 5673).
...
|
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Building the sample
The following classes need to be created for this sample.
A client to make subscriptions to the queue named
MyQueue
at MB1. The code of this class should be as follows.Code Block language java import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class ConsumeClient { public void consumeMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Destination destination = (Destination) initialContext.lookup("myQueue"); MessageConsumer messageConsumer = queueSession.createConsumer(destination); TextMessage textMessage = (TextMessage) messageConsumer.receive(); System.out.println("Got message ==> " + textMessage.getText()); try { Thread.sleep(9000); } catch (Exception e) { System.out.println(e); } messageConsumer.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { ConsumeClient sendConsumeClient = new ConsumeClient(); sendConsumeClient.consumeMessage(); } }
...
A client to publish messages to the queue named
MyQueue
at MB2. The code of this class should be as follows.Code Block language java import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class SendClient { public static void main(String[] args) { SendClient sendClient = new SendClient(); sendClient.sendMessage(); } public void sendMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); TextMessage textMessage = queueSession.createTextMessage(); textMessage.setText("Test message"); System.out.println("Sending Message : " + textMessage.getText().length()); // Send message Queue queue = (Queue) initialContext.lookup("myQueue"); QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); // Housekeeping queueSender.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } }
5. Run the consumer to receive messages from MB1. You will notice that the message sent to MB2 can be received by MB1. Even if the MB2 instance is stopped, the message will be sent to the consumer.
The deployment diagram for this example is similar to the following.
...
Executing the sample
First run the message sender class to publish messages in the MyQueue
queue at MB2. Then run the queue consumer class to consume messages published to the MyQueue
queue from MB1.
Analyzing the output
It will be possible to view the message published in MB2 from MB3. You can check this in the output log of your console. Alternatively, you can check the queue contents of MyQueue
in the Management Console of MB1.