This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Introduction

This sample demonstrates how persistent queues can be created and used in WSO2 Message Broker using the JMS API. It first introduces a sample JMS client named QueueSender which is used to send messages to a known, created queue in WSO2 Message Broker. Then it introduces a sample JMS client named QueueReceiver to receive messages and print them in the console.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The <MB_HOME>/Samples/JMSQueueClient/src/org/sample/jms directory has the following classes:

  • SampleQueueSender.java class is used to create the sample client which sends messages to the queue named testQueue in WSO2 MB. The code of this class is as follows.

    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class SampleQueueSender {
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String QUEUE_NAME_PREFIX = "queue.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String queueName = "testQueue";
        private QueueConnection queueConnection;
        private QueueSession queueSession;
        public void sendMessages() throws NamingException, JMSException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
            queueConnection = connFactory.createQueueConnection();
            queueConnection.start();
            queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // Send message
            Queue queue = (Queue)ctx.lookup(queueName);
            // create the message to send
            TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
            javax.jms.QueueSender queueSender = queueSession.createSender(queue);
            queueSender.send(textMessage);
            queueSender.close();
            queueSession.close();
            queueConnection.close();
        }
        private String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
     
    }
  • SampleQueueReceiver.java class is used to create the sample client which receives the message sent to the testQueue queue and prints it in the console. The code of this class is as follows.

    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.MessageConsumer;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class SampleQueueReceiver {
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String queueName = "testQueue";
        private QueueConnection queueConnection;
        private QueueSession queueSession;
        public MessageConsumer registerSubscriber() throws NamingException, JMSException{
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("queue."+ queueName,queueName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
            queueConnection = connFactory.createQueueConnection();
            queueConnection.start();
            queueSession =
                    queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            //Receive message
            Queue queue =  (Queue) ctx.lookup(queueName);
            MessageConsumer consumer = queueSession.createConsumer(queue);
            return consumer;
        }
        public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {
            TextMessage message = (TextMessage) consumer.receive();
            System.out.println("Got message from queue receiver==>" + message.getText());
            // Housekeeping
            consumer.close();
            queueSession.close();
            queueConnection.stop();
            queueConnection.close();
        }
        private String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
     
    }
  • The Main.java class defines the main method for calling both the clients mentioned above. The code of this class is as follows.

     /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.naming.NamingException;
    
    public class Main {
    	public static void main(String[] args) throws NamingException, JMSException {
            SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
            MessageConsumer consumer = queueReceiver.registerSubscriber();
            SampleQueueSender queueSender = new SampleQueueSender();
            queueSender.sendMessages();
            queueReceiver.receiveMessages(consumer);
        }
    }

The following should be noted if you are writing a similar sample:

  • It is not possible to use the @ symbol in the username or password.

    It is also not possible to use the percentage (%) sign in the password. When building the connection string URL inside the andes client code of MB, the URL is parsed. This parsing exception happens because the percentage (%) sign acts as the escape character in URL parsing. If using the percentage (%) sign in the connection string is required, use the respective encoding character for the percentage (%) sign in the connection string. For example:

    If you need to pass adm%in as the password, then the % should be encoded with its respective URL encoding character. Therefore, you have to send it as adm%25in.

    For a list of possible URL parsing patterns, see URL encoding reference.
     

  • In addition to using javax.jms.QueueSender class to send the messages you can also use a javax.jms.MessageProducer client and send the messages to a destination queue. Following is the way of creating a JMS MessageProducer.

    javax.jms.MessageProducer messageProducer = queueSession.createProducer(queue);

    messageProducer.send(textMessage);

    messageProducer.close();
     

  • When subscribing and publishing to a queue in a tenant the qualified queue name, DOMAIN_NAME/Queuename should be given as follows.

    String queueName = "mydomain.com/testQueue";

    Queue queue = (Queue) ctx.lookup(queueName);

    See Managing Tenant-specific Subscriptions for more information.

 

Executing the sample

Run the ant command from <MB_HOME>/Samples/JMSQueueClient directory.

Analyzing the output

You will get the following log in your console.

[java] Got message from queue receiver==>Test Message Content

To view the queue in the MB Management Console, log into the MB Management Console. Then click  Browse under  Queus in the  Main  tab. You will see a queue named TestQueue automatically created

  • No labels