Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

This sample demonstrates how persistent queues can be created and used in Message Broker using the JMS API. It first introduces a sample JMS client by the name "QueueSender" which is used to send messages to a known, created queue in WSO2 Message Broker, and then introduces a sample JMS client by the name "QueueReceiver" to receive messages and print in the console.

Prerequisites

1.  Ensure Ensure that you have ,the following: 

  1. Dependencies located in

...

  1. <PRODUCT_

...

  1. HOME>/client-lib in class path

...

Running the Sample

Prior to running following "QueueSender" class we need to register at least one "QueueReceiver" binding prior sending messages to the queue as you see in $CARBON_HOME/samples/JMSClient. This can be done by one of following ways.

  1. First log into WSO2 Message Broker management console and create a queue named 'testQueue' ("Queues -> Add" menu in the "Main" menu). A quick guide on creating queues can be found in section Adding Queues. 
  2. Run "QueueReceiver" class depicted below. It

    Maven dependencies to run the JMS client :

    Code Block
    languagehtml/xml
    <dependency>
       <groupId>org.wso2.andes.wso2</groupId>
       <artifactId>andes-client</artifactId>
       <version>0.13.wso2v3</version>
    </dependency>
    <dependency>
       <groupId>org.apache.geronimo.specs.wso2</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
       <version>1.1.0.wso2v1</version>
    </dependency>
    <dependency>            
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
    </dependency>
    <dependency>
       <groupId>slf4j.wso2</groupId>
       <artifactId>slf4j</artifactId>
       <version>1.5.10.wso2v1</version>
    </dependency>


Running the Sample

Prior to running following "QueueSender" class we need to register at least one "QueueReceiver" binding prior sending messages to the queue as you see in $CARBON_HOME/samples/JMSClient. This can be done by one of following ways.

  1. First log into WSO2 Message Broker management console and create a queue named 'testQueue' ("Queues -> Add" menu in the "Main" menu). A quick guide on creating queues can be found in section Adding Queues. 
  2. Run "QueueReceiver" class depicted below. It will register a binding to that queue. When you have run that client code you will see queue created by the queue receiver class is visible in the management console ("Queues -> Browse").  

...

Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueSender {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";

    public static void main(String[] args) throws NamingException, JMSException {
        QueueSender queueSender = new QueueSender();
        queueSender.sendMessages();
    }
    public void sendMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        QueueConnection queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        QueueSession queueSession =
                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        queueSender.send(textMessage);
        queueSender.close();
        queueSession.close();
        queueConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append(":@").append(passwordCARBON_CLIENT_ID)
                .append("@/").append(CARBON_VIRTUAL_CLIENTHOST_IDNAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_VIRTUAL_HOST_NAME)_DEFAULT_PORT).append("'")
                .toString();
          .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }

}

Execute the code and view the created queue and its increased message count using Message Broker management console.

...

}

}

Execute the code and view the created queue and its increased message count using Message Broker management console.

Info

In addition to using javax.jms.QueueSender class to send the messages you can also use a javax.jms.MessageProducer client and send the messages to a destination queue. Following is the way of creating a JMS MessageProducer.

javax.jms.MessageProducer messageProducer = queueSession.createProducer(queue);

messageProducer.send(textMessage);

messageProducer.close();


Next, execute the following "QueueReceiver" JMS client, using which messages can be received to 'testQueue'.

 

Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";

    public static void main(String[] args) throws NamingException, JMSException {
        QueueReceiver queueReceiver = new QueueReceiver();
        queueReceiver.receiveMessages();
    }
    public void receiveMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        QueueConnection queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        QueueSession queueSession =
                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  (Queue) ctx.lookup(queueName);
        MessageConsumer queueReceiver = queueSession.createConsumer(queue);
        TextMessage message = (TextMessage) queueReceiver.receive();
        System.out.println("Got message ==>" + message.getText());
        queueReceiver.close();
        queueSession.close();
        queueConnection.stop();
        queueConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username)).append(":").append(password)
                .append(":@").append(passwordCARBON_CLIENT_ID)
                .append("@/").append(CARBON_CLIENTVIRTUAL_HOST_IDNAME)
                .append("?brokerlist='tcp://").append(CARBON_VIRTUAL_HOST_NAME)_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }

.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }

}
The received messages get printed on the console. Note the message count getting decreased for 'testQueue' using the Management Console.
}

The received messages get printed on the console. Note the message count getting decreased for 'testQueue' using the Management Console.
 
Info
titleImportant

When subscribing and publishing to a queue in a tenant the qualified queue name, 'DOMAIN_NAME/Queuename' should be given as follows.

String queueName = "mydomain.com/testQueue";
Queue queue =  (Queue) ctx.lookup(queueName);