Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 This sample demonstrates how the Time to Live (TTL)  can be set for messages published to WSO2 message broker (WSO2 MB). Go to Configuring Message Expiration for more information on this feature.

Table of Contents
maxLevel3
minLevel3

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

About the sample

This sample demonstrates how the Time to Live(TTL) value can be set to messages that are published to WSO2 Message Broker. It first  first introduces a sample JMS client named QueueSender, which is used to send messages with TTL value set or without a TTL value set for a queue in WSO2 Message Broker. Then it introduces uses a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.

...

Localtabgroup
Localtab
titleSampleQueueSender.Java
Code Block
languagejava
package org.sample.jms;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * Sample sender to send the messages with/without TTL
 */
public class SampleQueueSender {

    public static final String QPID_ICF =  	                  "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    /**
     * Send the specified number of messages with the specified ttl.
     * @param noOfMessages Number of messages that need to be sent
     * @param timeToLive Time to live value for mesages
     * @throws NamingException
     * @throws JMSException
     */
    public void sendMessages(int noOfMessages, long timeToLive) throws  		   NamingException,JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, 						     getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory)     ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        for(int i = 0; i < noOfMessages; i++){
	    //send the text message in persistent delivery mode with a time to live value at priority level 4
            queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);
        }
        queueSender.close();
        queueSession.close();
        queueConnection.close();
    }

    /**
     * Creates amqp url.
     *
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
}
Localtab
titleSampleQueueReceiver.Java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * Sample Receiver to receive the messages which were not expired
 */
public class SampleQueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "expirationTestQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;

    /**
     * Register Subscriber for a queue.
     * @return MessageConsumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
     */
    public MessageConsumer registerSubscriber() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  (Queue) ctx.lookup(queueName);
        MessageConsumer consumer = queueSession.createConsumer(queue);
        return consumer;
    }

    /**
     * Receive messages using the consumer.
     * @param consumer The message consumer object of the subscriber.
     * @throws NamingException
     * @throws JMSException
     */
    public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {

        int receivedMessageCount = 0;
	//have 5 seconds as receive timeout value to stop the consumer
        while(null != consumer.receive(5000)){
            receivedMessageCount ++;
        }
        System.out.println("Received message count: " + receivedMessageCount);

    }

    /**
     * Close the connections at the end of operation
     * @param consumer The message consumer object of the subscriber.
     * @throws JMSException
     */
    public void closeConnections(MessageConsumer consumer) throws JMSException{
        consumer.close();
        queueSession.close();
        queueConnection.stop();
        queueConnection.close();
    }

    /**
     * Creates amqp url.
     *
     * @param username The username for the amqp url.
     * @param password The password for the amqp url.
     * @return AMQP url.
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

/**
 * Sample executor class for message TTL
 */
public class Main {

    public static void main(String[] args) throws NamingException, JMSException {

        SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
        MessageConsumer consumer = queueReceiver.registerSubscriber();

        //Send messages with very less time to live value
        System.out.println("Sending 5 messages with TTL value of 1sec");
        SampleQueueSender queueSenderWithTTL = new SampleQueueSender();
        queueSenderWithTTL.sendMessages(5,1000);
        queueReceiver.receiveMessages(consumer);

        //send messages without time to live value
        System.out.println("Sending 5 messages without TTL");
        SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();
        queueSenderWithoutTTL.sendMessages(5,0);
        queueReceiver.receiveMessages(consumer);

        //send messages with considerable time to live value
        System.out.println("Sending 5 messages TTL value of 10sec");
        SampleQueueSender queueSenderWithMediumTTL = new SampleQueueSender();
        queueSenderWithMediumTTL.sendMessages(5,10000);
        queueReceiver.receiveMessages(consumer);
        //close the connection
        queueReceiver.closeConnections(consumer);
    }
}

Building the sample

Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample directory.

Analyzing the output

You will get the following log in your console.

...