JMS Message Listener
Objectives
This sample demonstrates how to receive messages asynchronously from a JMS Queue or a Topic. The sample message listener will be waiting for 10 messages to be received into each topic and the queue and will be closing itself after that.
Prerequisites
Ensure that you have the following:
- Dependencies located in
<PRODUCT_HOME>/client-lib
in class path Maven dependencies to run the JMS client :
<dependency> <groupId>org.wso2.andes.wso2</groupId> <artifactId>andes-client</artifactId> <version>0.13.wso2v3</version> </dependency> <dependency> <groupId>org.apache.geronimo.specs.wso2</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1.0.wso2v1</version> </dependency>
Running the Sample
You need to run the "MessageListenerClient" class when testing this sample.
MessageListenerClient: The following code is used to publish messages to a given queue and topic and listen asynchronously on them.
/* * Copyright 2004,2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class MessageListenerClient { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName = "foo.bar"; String queueName = "queue"; public static void main(String[] args) throws NamingException, JMSException, InterruptedException { MessageListenerClient messageListenerClient = new MessageListenerClient(); messageListenerClient.registerSubscribers(); } public void registerSubscribers() throws NamingException,InterruptedException, JMSException { try { InitialContext ctx = initQueue(); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("qpidConnectionfactory"); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) ctx.lookup(topicName); TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(new SampleMessageListener(topicConnection, topicSession, topicSubscriber)); publishMessagesToTopic(); Thread.sleep(5000); QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection queueConnection = connFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Queue queue = (Queue) ctx.lookup(queueName); MessageConsumer queueReceiver = queueSession.createConsumer(queue); queueReceiver.setMessageListener(new SampleMessageListener(queueReceiver, queueSession, queueConnection)); publishMessagesToQueue(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } private void publishMessagesToTopic() throws NamingException, JMSException, InterruptedException { InitialContext ctx = initQueue(); TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) ctx.lookup("qpidConnectionfactory"); TopicConnection tConnection = tConnectionFactory.createTopicConnection(); tConnection.start(); TopicSession tSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) ctx.lookup(topicName); javax.jms.TopicPublisher topicPublisher = tSession.createPublisher(topic); for (int i = 0; i < 10; i++) { TextMessage topicMessage = tSession.createTextMessage("Topic Message - " + (i + 1)); topicPublisher.publish(topicMessage); Thread.sleep(1000); } tConnection.close(); tSession.close(); topicPublisher.close(); } private void publishMessagesToQueue() throws NamingException, JMSException, InterruptedException { InitialContext ctx = initQueue(); QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection connection = connFactory.createQueueConnection(); connection.start(); QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); Queue queue = (Queue) ctx.lookup(queueName); javax.jms.QueueSender queueSender = session.createSender(queue); for (int i = 0; i < 10; i++) { TextMessage queueMessage = session.createTextMessage(" Queue Message - " + (i + 1)); queueSender.send(queueMessage); Thread.sleep(1000); } connection.close(); session.close(); queueSender.close(); } private InitialContext initQueue() throws NamingException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put("queue." + queueName, queueName); properties.put("topic." + topicName, topicName); InitialContext ctx = new InitialContext(properties); return ctx; } private String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
SampleMessageListener : The following code acts as the sample message listener, which is implemented from javax.jms.MessageListener.
/* * Copyright 2004,2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. **/ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; public class SampleMessageListener implements javax.jms.MessageListener { private TopicConnection topicConnection; private TopicSession topicSession; private TopicSubscriber topicSubscriber; private QueueConnection queueConnection; private QueueSession queueSession; private MessageConsumer queueReceiver; private int count = 0; public SampleMessageListener(MessageConsumer queueReceiver, QueueSession queueSession, QueueConnection queueConnection) { this.queueReceiver = queueReceiver; this.queueSession = queueSession; this.queueConnection = queueConnection; System.out.println("Starting Queue Listener...."); } public SampleMessageListener(TopicConnection topicConnection, TopicSession topicSession, TopicSubscriber topicSubscriber) { this.topicConnection = topicConnection; this.topicSession = topicSession; this.topicSubscriber = topicSubscriber; System.out.println("Starting Topic Listener...."); } /** * Override this method and add the operation which is needed to be done when a message is arrived * * @param message - the next received message */ @Override public void onMessage(Message message) { count++; TextMessage receivedMessage = (TextMessage) message; try { System.out.println("Got the message ==> " + receivedMessage.getText()); if (count >= 10) { closeAll(receivedMessage); } } catch (JMSException e) { e.printStackTrace(); } } private void closeAll(TextMessage receivedMessage) { try { if (receivedMessage.getText().contains("Queue")) { System.out.println("Closing Queue Listener..........."); queueConnection.close(); queueSession.close(); queueReceiver.close(); } else { System.out.println("Closing Topic Listener..........."); topicConnection.close(); topicSession.close(); topicSubscriber.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
Once the MessageListenerClient class is run, in the console you should see that the MessageListener starts, receives messages and closes after the received message count is 10, for both Topic and Queue. The created topic and queue will be in management console as well.