Introduction
This sample demonstrates how to receive messages published to a queue in a particular MB instance can be received by a consumer of the same queue in another MB instance. In order to demonstrate this, two MB instances need to be run as follows.
Reference | Listener Port |
---|---|
MB1 | 5672 |
MB2 | 5673 |
In order to run two MB instances, you should do a Port Offset Configuration in <MB_HOME>/repository/conf/carbon.xml
. Alternatively, use the following command to start one of the MB instances.
wso2server.sh -DportOffset=1
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Building the sample
The following classes need to be created for this sample.
A client to make subscriptions to the queue named
MyQueue
at MB1. The code of this class should be as follows.import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class ConsumeClient { public void consumeMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Destination destination = (Destination) initialContext.lookup("myQueue"); MessageConsumer messageConsumer = queueSession.createConsumer(destination); TextMessage textMessage = (TextMessage) messageConsumer.receive(); System.out.println("Got message ==> " + textMessage.getText()); try { Thread.sleep(9000); } catch (Exception e) { System.out.println(e); } messageConsumer.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { ConsumeClient sendConsumeClient = new ConsumeClient(); sendConsumeClient.consumeMessage(); } }
A client to publish messages to the queue named
MyQueue
at MB2. The code of this class should be as follows.import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class SendClient { public static void main(String[] args) { SendClient sendClient = new SendClient(); sendClient.sendMessage(); } public void sendMessage() { Properties initialContextProperties = new Properties(); initialContextProperties.put("java.naming.factory.initial", "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"); String connectionString = "amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'"; initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString); initialContextProperties.put("queue.myQueue", "myQueue"); try { InitialContext initialContext = new InitialContext(initialContextProperties); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup("qpidConnectionfactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); TextMessage textMessage = queueSession.createTextMessage(); textMessage.setText("Test message"); System.out.println("Sending Message : " + textMessage.getText().length()); // Send message Queue queue = (Queue) initialContext.lookup("myQueue"); QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); // Housekeeping queueSender.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } }
Executing the sample
First run the message sender class to publish messages in the MyQueue
queue at MB2. Then run the queue consumer class to consume messages published to the MyQueue
queue from MB1.
Analyzing the output
It will be possible to view the message published in MB2 from MB3. You can check this in the output log of your console. Alternatively, you can check the queue contents of MyQueue
in the Management Console of MB1.