Queue sender and queue receiver
This guide demonstrates how persistent queues can be created and used in Message Broker using JMS API.Following JMS client is used to send messages to a known created queue in WSO2 Message Broker. Queue Receiver can receive messages and message is printed in console.
Â
First log into WSO2 Message Broker Management console and create a queue named 'testQueue'. Click on the 'Add' menu item under the 'Queues' menu to create a queue. To create a queue , the only thing needed to be provided is the name of the queue.
Note: To run this code sample,Â
- You need to have dependencies located at $CARBON_HOME/client-lib in class path.
- You need to run QueueReceiver class prior to QueueSender class when testing this sample (or register at lease one QueueReceiver prior sending messages to the queue as you see in $CARBON_HOME/samples/JMSClient sample).
Using following QueueSender JMS client messages can be sent to 'testQueue'
Â
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class QueueSender { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String QUEUE_NAME_PREFIX = "queue."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String queueName = "testQueue"; public static void main(String[] args) throws NamingException, JMSException { QueueSender queueSender = new QueueSender(); queueSender.sendMessages(); } public void sendMessages() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put(QUEUE_NAME_PREFIX + queueName, queueName); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection queueConnection = connFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Queue queue = (Queue)ctx.lookup(queueName); // create the message to send TextMessage textMessage = queueSession.createTextMessage("Test Message Content"); javax.jms.QueueSender queueSender = queueSession.createSender(queue); queueSender.send(textMessage); queueSender.close(); queueSession.close(); queueConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Â
You can view created queue and its increased message count using Message Broker management console.
Â
Using following QueueReceiver JMS client messages can be received from 'testQueue'
Â
Â
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class QueueReceiver { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String queueName = "testQueue"; public static void main(String[] args) throws NamingException, JMSException { QueueReceiver queueReceiver = new QueueReceiver(); queueReceiver.receiveMessages(); } public void receiveMessages() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put("queue."+ queueName,queueName); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME); QueueConnection queueConnection = connFactory.createQueueConnection(); queueConnection.start(); QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); //Receive message Queue queue = (Queue) ctx.lookup(queueName); MessageConsumer queueReceiver = queueSession.createConsumer(queue); TextMessage message = (TextMessage) queueReceiver.receive(); System.out.println("Got message ==>" + message.getText()); queueReceiver.close(); queueSession.close(); queueConnection.stop(); queueConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Â
Message received gets printed on the console. Now you should be able to see message count gets decreased for 'testQueue' using Management Console.
Â
Â
Â
Copyright © WSO2 Inc. 2005-2013