See Prerequisites to Run the MB Samples for a list of prerequisites.

About the sample

This sample demonstrates how the Time to Live(TTL) value can be set to messages that are published to WSO2 Message Broker. It first introduces a sample JMS client named QueueSender, which is used to send messages with TTL value set or without a TTL value for a queue in WSO2 Message Broker. Then it introduces a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.


Code Block
package org.sample.jms;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

 * Sample sender to send the messages with/without TTL
public class SampleQueueSender {

    public static final String QPID_ICF =  	                  "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

     * Send the specified number of messages with the specified ttl.
     * @param noOfMessages Number of messages that need to be sent
     * @param timeToLive Time to live value for mesages
     * @throws NamingException
     * @throws JMSException
    public void sendMessages(int noOfMessages, long timeToLive) throws  		   NamingException,JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, 						     getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory)     ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        for(int i = 0; i < noOfMessages; i++){
	    //send the text message in persistent delivery mode with a time to live value at priority level 4
            queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);

     * Creates amqp url.
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
Code Block
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

 * Sample Receiver to receive the messages which were not expired
public class SampleQueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

     * Register Subscriber for a queue.
     * @return MessageConsumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
    public MessageConsumer registerSubscriber() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  (Queue) ctx.lookup(queueName);
        MessageConsumer consumer = queueSession.createConsumer(queue);
        return consumer;

     * Receive messages using the consumer.
     * @param consumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
    public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {

        int receivedMessageCount = 0;
	//have 5 seconds as receive timeout value to stop the consumer
        while(null != consumer.receive(5000)){
            receivedMessageCount ++;
        System.out.println("Received message count: " + receivedMessageCount);


     * Close the connections at the end of operation
     * @param consumer The message consumer object of the subscriber.
     * @throws JMSException
    public void closeConnections(MessageConsumer consumer) throws JMSException{

     * Creates amqp url.
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
Code Block
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

 * Sample executor class for message TTL
public class Main {

    public static void main(String[] args) throws NamingException, JMSException {

        SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
        MessageConsumer consumer = queueReceiver.registerSubscriber();

        //Send messages with very less time to live value
        System.out.println("Sending 5 messages with TTL value of 1sec");
        SampleQueueSender queueSenderWithTTL = new SampleQueueSender();

        //send messages without time to live value
        System.out.println("Sending 5 messages without TTL");
        SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();

        //send messages with considerable time to live value
        System.out.println("Sending 5 messages TTL value of 10sec");
        SampleQueueSender queueSenderWithMediumTTL = new SampleQueueSender();
        //close the connection

Building the sample

Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample directory.

Analyzing the output

You will get the following log in your console.
