...
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
About the sample
This sample demonstrates how the Time to Live(TTL) value can be set to messages that are published to WSO2 Message Broker. It first introduces a sample JMS client named QueueSender, which is used to send messages with TTL value set or without a TTL value for a queue in WSO2 Message Broker. Then it introduces a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.
...
Localtabgroup |
---|
Localtab |
---|
title | SampleQueueSender.Java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
* Sample sender to send the messages with/without TTL
*/
public class SampleQueueSender {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String QUEUE_NAME_PREFIX = "queue.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "expirationTestQueue";
private QueueConnection queueConnection;
private QueueSession queueSession;
/**
* Send the specified number of messages with the specified ttl.
* @param noOfMessages Number of messages that need to be sent
* @param timeToLive Time to live value for mesages
* @throws NamingException
* @throws JMSException
*/
public void sendMessages(int noOfMessages, long timeToLive) throws NamingException,JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
queueConnection = connFactory.createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
// Send message
Queue queue = (Queue)ctx.lookup(queueName);
// create the message to send
TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
javax.jms.QueueSender queueSender = queueSession.createSender(queue);
for(int i = 0; i < noOfMessages; i++){
//send the text message in persistent delivery mode with a time to live value at priority level 4
queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);
}
queueSender.close();
queueSession.close();
queueConnection.close();
}
/**
* Creates amqp url.
*
* @param username The username for the amqp url.
* @param password The password for the amqp url.
* @return AMQP url.
*/
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
.append("'")
.toString();
}
} |
|
Localtab |
---|
title | SampleQueueReceiver.Java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
* Sample Receiver to receive the messages which were not expired
*/
public class SampleQueueReceiver {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "expirationTestQueue";
private QueueConnection queueConnection;
private QueueSession queueSession;
/**
* Register Subscriber for a queue.
* @return MessageConsumer The message consumer object of the subscriber.
* @throws NamingException
* @throws JMSException
*/
public MessageConsumer registerSubscriber() throws NamingException, JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put("queue."+ queueName,queueName);
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
queueConnection = connFactory.createQueueConnection();
queueConnection.start();
queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//Receive message
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer consumer = queueSession.createConsumer(queue);
return consumer;
}
/**
* Receive messages using the consumer.
* @param consumer The message consumer object of the subscriber.
* @throws NamingException
* @throws JMSException
*/
public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException {
int receivedMessageCount = 0;
//have 5 seconds as receive timeout value to stop the consumer
while(null != consumer.receive(5000)){
receivedMessageCount ++;
}
System.out.println("Received message count: " + receivedMessageCount);
}
/**
* Close the connections at the end of operation
* @param consumer The message consumer object of the subscriber.
* @throws JMSException
*/
public void closeConnections(MessageConsumer consumer) throws JMSException{
consumer.close();
queueSession.close();
queueConnection.stop();
queueConnection.close();
}
/**
* Creates amqp url.
*
* @param username The username for the amqp url.
* @param password The password for the amqp url.
* @return AMQP url.
*/
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
.append("'")
.toString();
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;
/**
* Sample executor class for message TTL
*/
public class Main {
public static void main(String[] args) throws NamingException, JMSException {
SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
MessageConsumer consumer = queueReceiver.registerSubscriber();
//Send messages with very less time to live value
System.out.println("Sending 5 messages with TTL value of 1sec");
SampleQueueSender queueSenderWithTTL = new SampleQueueSender();
queueSenderWithTTL.sendMessages(5,1000);
queueReceiver.receiveMessages(consumer);
//send messages without time to live value
System.out.println("Sending 5 messages without TTL");
SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();
queueSenderWithoutTTL.sendMessages(5,0);
queueReceiver.receiveMessages(consumer);
//send messages with considerable time to live value
System.out.println("Sending 5 messages TTL value of 10sec");
SampleQueueSender queueSenderWithMediumTTL = new SampleQueueSender();
queueSenderWithMediumTTL.sendMessages(5,10000);
queueReceiver.receiveMessages(consumer);
//close the connection
queueReceiver.closeConnections(consumer);
}
} |
|
|
Building the sample
Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample
directory.
Analyzing the output
You will get the following log in your console.
...