Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Excerpt
hiddentrue

Clustering example explaining the behavior of the WSO2 Message Broker.

This example is based on the External Cassandra and ZooKeeper Servers clustering scenario.  The steps below walk you through an example implementation of the deployment pattern.

1. In your local machine, set up two WSO2 MB instances running with external Cassandra server and Zookeeper server (you can setup Cassandra and Zookeeper profiles to run on the same machine). To do this, define a port offset in $CARBON_Home

Table of Contents
maxLevel3
minLevel3
printablefalse

Introduction

This sample demonstrates how to receive messages published to a queue in a particular MB instance can be received by a consumer of the same queue in another MB instance. In order to demonstrate this, two MB instances need to be run as follows.

ReferenceListener Port
MB15672
MB25673
Info

In order to run two MB instances, you should do a Port Offset Configuration in <MB_HOME>/repository/conf/carbon.xml

...

. Alternatively, use the following command to start one of the

...

MB instances.

The port offset avoids possible port conflicts when multiple WSO2 Carbon-based products are run on the same host. For example,

Code Block
languagehtml/xml
<!-- Ports offset. This entry will set the value of the ports defined below 
to the define value + Offset. e.g. Offset=2 and HTTPS port=9443 will set
 the effective HTTPS port to 9445 -->
 
<Offset>1</Offset>

 2. Let's call the first broker instance MB1 (runs on port 5672) and the other MB2 (runs on port 5673).

...

wso2server.sh -DportOffset=1

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The following classes need to be created for this sample.

  • A client to make subscriptions to the queue named MyQueue at MB1. The code of this class should be as follows.

    Code Block
    languagejava
    import javax.jms.*;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class ConsumeClient {
      public void consumeMessage() {
       Properties initialContextProperties = new Properties();
       initialContextProperties.put("java.naming.factory.initial",
            "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
       String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
             initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
       initialContextProperties.put("queue.myQueue", "myQueue");
            try {
                InitialContext initialContext = new InitialContext(initialContextProperties);
                QueueConnectionFactory queueConnectionFactory
                        = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
                QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
                queueConnection.start();
                QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                Destination destination = (Destination) initialContext.lookup("myQueue");
                MessageConsumer messageConsumer = queueSession.createConsumer(destination);
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                System.out.println("Got message ==> " + textMessage.getText());
                try {
                    Thread.sleep(9000);
                } catch (Exception e) {
                    System.out.println(e);
                }
                messageConsumer.close();
                queueSession.close();
                queueConnection.stop();
                queueConnection.close();
            } catch (NamingException e) {
                e.printStackTrace();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            ConsumeClient sendConsumeClient = new ConsumeClient();
            sendConsumeClient.consumeMessage();
        }
    }

...

  • A client to publish messages to the queue named MyQueue at MB2. The code of this class should be as follows.

    Code Block
    languagejava
    import javax.jms.*;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class SendClient {
        public static void main(String[] args) {
            SendClient sendClient = new SendClient();
            sendClient.sendMessage();
        }
        public void sendMessage() {
            Properties initialContextProperties = new Properties();
            initialContextProperties.put("java.naming.factory.initial",
                    "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
            String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";
            initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
            initialContextProperties.put("queue.myQueue", "myQueue");
    
            try {
                InitialContext initialContext = new InitialContext(initialContextProperties);
                QueueConnectionFactory queueConnectionFactory
                        = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
                QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
                queueConnection.start();
                QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                TextMessage textMessage = queueSession.createTextMessage();
                textMessage.setText("Test message");
                System.out.println("Sending Message : " + textMessage.getText().length());
                // Send message
                Queue queue = (Queue) initialContext.lookup("myQueue");
                QueueSender queueSender = queueSession.createSender(queue);
                queueSender.send(textMessage);
                // Housekeeping
                queueSender.close();
                queueSession.close();
                queueConnection.stop();
                queueConnection.close();
            } catch (NamingException e) {
                e.printStackTrace();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

5. Run the consumer to receive messages from MB1. You will notice that the message sent to MB2 can be received by MB1. Even if the MB2 instance is stopped, the message will be sent to the consumer.

The deployment diagram for this example is similar to the following.                                                   

...

Executing the sample

First run the message sender class to publish messages in the MyQueue queue at MB2. Then run the queue consumer class to consume messages published to the MyQueue queue from MB1.

Analyzing the output

It will be possible to view the message published in MB2 from MB3. You can check this in the output log of your console. Alternatively, you can check the queue contents of MyQueue in the Management Console of MB1.