Objectives
This sample demonstrates how persistent queues can be created and used in Message Broker using the JMS API. It first introduces a sample JMS client by the name "QueueSender" which is used to send messages to a known, created queue in WSO2 Message Broker, and then introduces a sample JMS client by the name "QueueReceiver" to receive messages and print in the console.
Prerequisites
1. Ensure that you have,
- Dependencies located in $CARBON_HOME/client-lib in class path.
Running the Sample
Prior to running following "QueueSender" class we need to register at least one "QueueReceiver" binding prior sending messages to the queue as you see in $CARBON_HOME/samples/JMSClient. This can be done by one of following ways.
- First log into WSO2 Message Broker management console and create a queue named 'testQueue' ("Queues -> Add" menu in the "Main" menu). A quick guide on creating queues can be found in section Adding Queues.
- Run "QueueReceiver" class depicted below. It will register a binding to that queue. When you have run that client code you will see queue created by the queue receiver class is visible in the management console ("Queues -> Browse").
Now using the following "QueueSender" JMS client, messages can be sent to 'testQueue' created earlier.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class QueueSender { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String QUEUE_NAME_PREFIX = "queue."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String queueName = "testQueue"; public static void main(String[] args) throws NamingException, JMSException { QueueSender queueSender = new QueueSender(); queueSender.sendMessages(); } public void sendMessages() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put(QUEUE_NAME_PREFIX + queueName, queueName); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection queueConnection = connFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Queue queue = (Queue)ctx.lookup(queueName); // create the message to send TextMessage textMessage = queueSession.createTextMessage("Test Message Content"); javax.jms.QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); queueSender.close(); queueSession.close(); queueConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Execute the code and view the created queue and its increased message count using Message Broker management console.
In addition to using javax.jms.QueueSender class to send the messages you can also use a javax.jms.MessageProducer client and send the messages to a destination queue. Following is the way of creating a JMS MessageProducer.
javax.jms.MessageProducer messageProducer = queueSession.createProducer(queue);
messageProducer.send(textMessage);
messageProducer.close();
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class QueueReceiver { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String queueName = "testQueue"; public static void main(String[] args) throws NamingException, JMSException { QueueReceiver queueReceiver = new QueueReceiver(); queueReceiver.receiveMessages(); } public void receiveMessages() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put("queue."+ queueName,queueName); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection queueConnection = connFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //Receive message Queue queue = (Queue) ctx.lookup(queueName); MessageConsumer queueReceiver = queueSession.createConsumer(queue); TextMessage message = (TextMessage) queueReceiver.receive(); System.out.println("Got message ==>" + message.getText()); queueReceiver.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }