This sample demonstrates how the Time to Live (TTL) can be set for messages published to WSO2 message broker (WSO2 MB). Go to Configuring Message Expiration for more information on this feature.
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
About the sample
This sample demonstrates how the Time to Live(TTL) value can be set to messages that are published to WSO2 Message Broker. It first first introduces a sample JMS client named QueueSender, which is used to send messages with TTL value set or without a TTL value set for a queue in WSO2 Message Broker. Then it introduces uses a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.
...
Localtabgroup |
---|
Localtab |
---|
title | SampleQueueSender.Java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
* Sample sender to send the messages with/without TTL
*/
public class SampleQueueSender {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String QUEUE_NAME_PREFIX = "queue.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "expirationTestQueue";
private QueueConnection queueConnection;
private QueueSession queueSession;
/**
* Send the specified number of messages with the specified ttl.
* @param noOfMessages Number of messages that need to be sent
* @param timeToLive Time to live value for mesages
* @throws NamingException
* @throws JMSException
*/
public void sendMessages(int noOfMessages, long timeToLive) throws NamingException,JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
queueConnection = connFactory.createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
// Send message
Queue queue = (Queue)ctx.lookup(queueName);
// create the message to send
TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
javax.jms.QueueSender queueSender = queueSession.createSender(queue);
for(int i = 0; i < noOfMessages; i++){
//send the text message in persistent delivery mode with a time to live value at priority level 4
queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);
}
queueSender.close();
queueSession.close();
queueConnection.close();
}
/**
* Creates amqp url.
*
* @param username The username for the amqp url.
* @param password The password for the amqp url.
* @return AMQP url.
*/
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
.append("'")
.toString();
}
} |
|
Localtab |
---|
title | SampleQueueReceiver.Java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
* Sample Receiver to receive the messages which were not expired
*/
public class SampleQueueReceiver {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "expirationTestQueue";
private QueueConnection queueConnection;
private QueueSession queueSession;
/**
* Register Subscriber for a queue.
* @return MessageConsumer The message consumer object of the subscriber.
* @throws NamingException
* @throws JMSException
*/
public MessageConsumer registerSubscriber() throws NamingException, JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put("queue."+ queueName,queueName);
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
queueConnection = connFactory.createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//Receive message
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer consumer = queueSession.createConsumer(queue);
return consumer;
}
/**
* Receive messages using the consumer.
* @param consumer The message consumer object of the subscriber.
* @throws NamingException
* @throws JMSException
*/
public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {
int receivedMessageCount = 0;
//have 5 seconds as receive timeout value to stop the consumer
while(null != consumer.receive(5000)){
receivedMessageCount ++;
}
System.out.println("Received message count: " + receivedMessageCount);
}
/**
* Close the connections at the end of operation
* @param consumer The message consumer object of the subscriber.
* @throws JMSException
*/
public void closeConnections(MessageConsumer consumer) throws JMSException{
consumer.close();
queueSession.close();
queueConnection.stop();
queueConnection.close();
}
/**
* Creates amqp url.
*
* @param username The username for the amqp url.
* @param password The password for the amqp url.
* @return AMQP url.
*/
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
.append("'")
.toString();
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;
/**
* Sample executor class for message TTL
*/
public class Main {
public static void main(String[] args) throws NamingException, JMSException {
SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
MessageConsumer consumer = queueReceiver.registerSubscriber();
//Send messages with very less time to live value
System.out.println("Sending 5 messages with TTL value of 1sec");
SampleQueueSender queueSenderWithTTL = new SampleQueueSender();
queueSenderWithTTL.sendMessages(5,1000);
queueReceiver.receiveMessages(consumer);
//send messages without time to live value
System.out.println("Sending 5 messages without TTL");
SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();
queueSenderWithoutTTL.sendMessages(5,0);
queueReceiver.receiveMessages(consumer);
//send messages with considerable time to live value
System.out.println("Sending 5 messages TTL value of 10sec");
SampleQueueSender queueSenderWithMediumTTL = new SampleQueueSender();
queueSenderWithMediumTTL.sendMessages(5,10000);
queueReceiver.receiveMessages(consumer);
//close the connection
queueReceiver.closeConnections(consumer);
}
} |
|
|
Building the sample
Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample
directory.
Analyzing the output
You will get the following log in your console.
...