This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Receiving Messages from an MB Cluster

This sample demonstrates how to receive messages published to a queue in a particular MB instance can be received by a consumer of the same queue in another MB instance. In order to demonstrate this, two MB instances need to be run as follows.

ReferenceListener Port
MB15672
MB25673

In order to run two MB instances, you should do a Port Offset Configuration in <MB_HOME>/repository/conf/carbon.xml. Alternatively, use the following command to start one of the MB instances.

wso2server.sh -DportOffset=1

See the following topics for instructions:

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The following classes need to be created for this sample.

  • A client to make subscriptions to the queue named MyQueue at MB1. The code of this class should be as follows.

    import javax.jms.*;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class ConsumeClient {
      public void consumeMessage() {
       Properties initialContextProperties = new Properties();
       initialContextProperties.put("java.naming.factory.initial",
            "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
       String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'";
             initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
       initialContextProperties.put("queue.myQueue", "myQueue");
            try {
                InitialContext initialContext = new InitialContext(initialContextProperties);
                QueueConnectionFactory queueConnectionFactory
                        = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
                QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
                queueConnection.start();
                QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                Destination destination = (Destination) initialContext.lookup("myQueue");
                MessageConsumer messageConsumer = queueSession.createConsumer(destination);
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                System.out.println("Got message ==> " + textMessage.getText());
                try {
                    Thread.sleep(9000);
                } catch (Exception e) {
                    System.out.println(e);
                }
                messageConsumer.close();
                queueSession.close();
                queueConnection.stop();
                queueConnection.close();
            } catch (NamingException e) {
                e.printStackTrace();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            ConsumeClient sendConsumeClient = new ConsumeClient();
            sendConsumeClient.consumeMessage();
        }
    }
  • A client to publish messages to the queue named MyQueue at MB2. The code of this class should be as follows.

    import javax.jms.*;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class SendClient {
        public static void main(String[] args) {
            SendClient sendClient = new SendClient();
            sendClient.sendMessage();
        }
        public void sendMessage() {
            Properties initialContextProperties = new Properties();
            initialContextProperties.put("java.naming.factory.initial",
                    "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
            String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'";
            initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
            initialContextProperties.put("queue.myQueue", "myQueue");
    
            try {
                InitialContext initialContext = new InitialContext(initialContextProperties);
                QueueConnectionFactory queueConnectionFactory
                        = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory");
                QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
                queueConnection.start();
                QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                TextMessage textMessage = queueSession.createTextMessage();
                textMessage.setText("Test message");
                System.out.println("Sending Message : " + textMessage.getText().length());
                // Send message
                Queue queue = (Queue) initialContext.lookup("myQueue");
                QueueSender queueSender = queueSession.createSender(queue);
                queueSender.send(textMessage);
                // Housekeeping
                queueSender.close();
                queueSession.close();
                queueConnection.stop();
                queueConnection.close();
            } catch (NamingException e) {
                e.printStackTrace();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

Executing the sample

First run the message sender class to publish messages in the MyQueue queue at MB2. Then run the queue consumer class to consume messages published to the MyQueue queue from MB1.

Analyzing the output

It will be possible to view the message published in MB2 from MB3. You can check this in the output log of your console. Alternatively, you can check the queue contents of MyQueue in the Management Console of MB1.