This documentation is for WSO2 Message Broker version 2.0.1. View documentation for the latest release.

Message Broker Clustering Example

This example is based on the following scenarios discussed in section clustered deployment patterns.

The steps below walk you through an example implementation of the given deployment pattern.

1. In your local machine, set up two WSO2 MB instances running with external Cassandra server and Zookeeper server (note that you can setup Cassandra and Zookeeper to run on the same machine). To do this, define a port offset in $CARBON_Home/repository/conf/carbon.xml in one of the two WSO2 MB instances.

The port offset avoids possible port conflicts when multiple WSO2 Carbon-based products are run on same host. For example,

<!-- Ports offset. This entry will set the value of the ports defined below 
to the define value + Offset. e.g. Offset=2 and HTTPS port=9443 will set
 the effective HTTPS port to 9445 -->
 
<Offset>1</Offset>

 2. Let's call the first broker instance MB1 (runs on port 5672) and the other MB2 (runs on port 5673).

3. Using the following JMS client, make subscriptions to a queue "myQueue" at MB1.

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class ConsumeClient {
  public void consumeMessage() {
   Properties initialContextProperties = new Properties();
   initialContextProperties.put("java.naming.factory.initial",
        "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
   String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
         initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
   initialContextProperties.put("queue.myQueue", "myQueue");
        try {
            InitialContext initialContext = new InitialContext(initialContextProperties);
            QueueConnectionFactory queueConnectionFactory
                    = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
            QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
            queueConnection.start();
            QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Destination destination = (Destination) initialContext.lookup("myQueue");
            MessageConsumer messageConsumer = queueSession.createConsumer(destination);
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            System.out.println("Got message ==> " + textMessage.getText());
            try {
                Thread.sleep(9000);
            } catch (Exception e) {
                System.out.println(e);
            }
            messageConsumer.close();
            queueSession.close();
            queueConnection.stop();
            queueConnection.close();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ConsumeClient sendConsumeClient = new ConsumeClient();
        sendConsumeClient.consumeMessage();
    }
}

4. Using the Message Sender described below, send a message to "myQueue" at MB2.

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SendClient {
    public static void main(String[] args) {
        SendClient sendClient = new SendClient();
        sendClient.sendMessage();
    }
    public void sendMessage() {
        Properties initialContextProperties = new Properties();
        initialContextProperties.put("java.naming.factory.initial",
                "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
        String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";
        initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
        initialContextProperties.put("queue.myQueue", "myQueue");

        try {
            InitialContext initialContext = new InitialContext(initialContextProperties);
            QueueConnectionFactory queueConnectionFactory
                    = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
            QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
            queueConnection.start();
            QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            TextMessage textMessage = queueSession.createTextMessage();
            textMessage.setText("Test message");
            System.out.println("Sending Message : " + textMessage.getText().length());
            // Send message
            Queue queue = (Queue) initialContext.lookup("myQueue");
            QueueSender queueSender = queueSession.createSender(queue);
            queueSender.send(textMessage);
            // Housekeeping
            queueSender.close();
            queueSession.close();
            queueConnection.stop();
            queueConnection.close();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

5. Run the consumer to receive messages from MB1. You will notice that the message sent to MB2 can be received by MB1. Even if the MB2 instance is stopped, the message will be sent to the consumer.

The deployment diagram for this example is similar to the following.

                                                   

typical deployment

Figure: Deployment with single, external Zookeeper and Cassandra servers.