Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This sample demonstrates how a message expiration time can be set for messages sent to WSO2 MB. 

Table of Contents
maxLevel3
minLevel3

...

See Prerequisites to Run the MB Samples for a list of prerequisites.

About the sample

In this sample, a JMS subscriber connects to WSO2 MB and publishes messages to a queue using a 'transacted' session. Using this session ensures that the messages published will persist in WSO2 MB (i.e. will be stored to the DB) only when they are committed. Therefore, as demonstrated by this sample, publishing messages to WSO2 MB through a transacted session involves two steps:

  1. The messages have to be sent from the publisher client.
  2. The messages have to be committed from the publisher client.

The This sample demonstrates how the Time to Live(TTL) value can be set to messages that are published to WSO2 Message Broker. It first introduces a sample JMS client named QueueSender, which is used to send messages with TTL value set or without a TTL value for a queue in WSO2 Message Broker. Then it introduces a sample JMS client named QueueReceiver to receive the messages, which are not expired at that time and prints the number of received messages on the console.

The <MB_HOME>/Samples/JMSQueueClientJmsExpirationSample/src/org/sample/jms directory directory has the following classes implementing the behaviour explained above.:

Localtabgroup
Localtab
titleTransactionalQueuePublisherSampleQueueSender.javaJava
Code Block
languagejava
package org.sample.jms;

import orgjavax.apachejms.log4j.LoggerDeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * ThisSample classsender containsto methodssend whichthe ismessages usedwith/without in creating and using a transactional JMS message publisher.
 TTL
 */
public class TransactionalQueuePublisherSampleQueueSender {

    privatepublic static final LoggerString logQPID_ICF = Logger.getLogger(TransactionalQueuePublisher.class);  	   /**      * Andes initial context factory.      */"org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private public static final String ANDESCF_NAME_ICFPREFIX = "org.wso2.andes.jndi.PropertiesFileInitialContextFactoryconnectionfactory.";
    /**private static final String QUEUE_NAME_PREFIX  * Connection factory name prefix.= "queue.";
     */
    public private static final String CF_NAME_PREFIX = "connectionfactory.qpidConnectionfactory";
    /**String userName = "admin";
  * Andes connectionString factorypassword name.
= "admin";
    */private static    public static final String CF_NAMEString CARBON_CLIENT_ID = "andesConnectionfactorycarbon";
    /**private static String CARBON_VIRTUAL_HOST_NAME   * The authorized username for the AMQP connection url.
     */= "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
     private static final String userNameCARBON_DEFAULT_PORT = "admin5672";
    /**String queueName = "expirationTestQueue";
    private *QueueConnection ThequeueConnection;
authorized password for the AMQPprivate connectionQueueSession url.queueSession;

    /*/*
    private static* finalSend Stringthe passwordspecified = "admin";
    /**number of messages with the specified ttl.
     * Client@param idnoOfMessages forNumber theof AMQPmessages connectionthat url.need to be sent
  */   * @param timeToLive privateTime staticto finallive String CARBON_CLIENT_ID = "carbon";value for mesages
     /** @throws NamingException
   * MB's Virtual* host@throws nameJMSException
should be match with this, default*/
name is "carbon" can bepublic configured.void sendMessages(int noOfMessages, long timeToLive) throws */ 		   NamingException,JMSException private{
static final String CARBON_VIRTUAL_HOST_NAME = "carbon";   Properties properties /**= new Properties();
   * IP Address of the host for AMQP connection url.
 properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
    */     private static final String CARBON_DEFAULT_HOSTNAME = "localhost";
    /**properties.put(CF_NAME_PREFIX + CF_NAME, 						     getTCPConnectionURL(userName, password));
        * Standard AMQP port number for the connection url.properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        InitialContext ctx */= new InitialContext(properties);
  private static final String CARBON_DEFAULT_PORT = "5672";
    /** // Lookup connection factory
       * QueueQueueConnectionFactory prefixconnFactory for= initializing(QueueConnectionFactory) context.    ctx.lookup(CF_NAME);
 */     private static final String QUEUE_NAME_PREFIX queueConnection = "queue."connFactory.createQueueConnection();
    /**    queueConnection.start();
 * The queue connection in which the messagesqueueSession would be published.= queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
     */
   // Send message
   private QueueConnection queueConnection;   Queue queue /**
= (Queue)ctx.lookup(queueName);
    * The queue session in// whichcreate the messagesmessage wouldto besend
published.      */  TextMessage textMessage  private QueueSession queueSession= queueSession.createTextMessage("Test Message Content");
    /**    javax.jms.QueueSender queueSender  * The queue in which the messages would be published.
     */
    private Queue queue;
    /**
     * Creates a transactional JMS publisher.= queueSession.createSender(queue);
        for(int i = 0; i < noOfMessages; i++){
	    //send the text message in persistent delivery mode with a time to live value at priority level 4
     *      * @param queueName The name of the queue to which messages should be published. queueSender.send(textMessage, DeliveryMode.PERSISTENT,4,timeToLive);
        }
     * @throws NamingException queueSender.close();
    * @throws JMSException  queueSession.close();
   */     public TransactionalQueuePublisher(String queueName) throws NamingException, JMSException {
queueConnection.close();
    }

    /**
    // Creating* propertiesCreates foramqp theurl.
initial context    *
    Properties properties* =@param new Properties();
    username The username for the amqp url.
   properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF); * @param password The password for the amqp properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        // Creating initial context
        InitialContext initialContext = new InitialContext(properties);url.
     * @return AMQP url.
     */
    private String getTCPConnectionURL(String username, String password) {
        //  // Lookup connection factoryamqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        QueueConnectionFactoryreturn connFactorynew = StringBuffer(QueueConnectionFactory)
initialContext.lookup(CF_NAME);         // Create a JMS connection   .append("amqp://").append(username).append(":").append(password)
     queueConnection = connFactory.createQueueConnection();         queueConnection.start();.append("@").append(CARBON_CLIENT_ID)
        // Create JMS session object. Here we mentioned that the messages will be published transactionally to the .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp:// broker.
 ").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
      queueSession = queueConnection.createQueueSession(true, QueueSession.SESSION_TRANSACTED);         // Look up a JMS queue.append("'")
             queue = (Queue) initialContext.lookuptoString(queueName);
        // Adding a shutdown hook listener
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run()}
}
Localtab
titleSampleQueueReceiver.Java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

/**
 * Sample Receiver to receive the messages which were not expired
 */
public class SampleQueueReceiver {
    public static final String QPID_ICF        try {= "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME   shutdownPublisher()= "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
} catch (JMSException jmsException) {private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private throw new RuntimeException(jmsException.getMessage(), jmsException)static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName }= "expirationTestQueue";
    private QueueConnection queueConnection;
    }private QueueSession queueSession;

    /**
});     }* Register Subscriber for a /**queue.
     * Publishes@return aMessageConsumer JMSThe message. consumer object of the subscriber.
*      * @param messageContent The message content to publish.@throws NamingException
     * @throws JMSException
     */
    public voidMessageConsumer sendMessageregisterSubscriber(String messageContent) throws NamingException, JMSException {
        //Properties Createproperties the= message to sendnew Properties();
         TextMessage textMessage = queueSession.createTextMessage(messageContentproperties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        // Sending a message
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
       QueueSender queueSender = queueSession.createSender(queue);properties.put("queue."+ queueName,queueName);
        InitialContext ctx = new queueSender.sendInitialContext(textMessageproperties);
        log.info("Message sent : " + textMessage.getText());// Lookup connection factory
       } QueueConnectionFactory connFactory =  /**
 (QueueConnectionFactory) ctx.lookup(CF_NAME);
   * Committing all messages that arequeueConnection being= sentconnFactory.createQueueConnection();
     *   queueConnection.start();
  * @throws JMSException    queueSession  */
    public void commitMessages() throws JMSException {= queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  log.info("Committing messages.");(Queue) ctx.lookup(queueName);
        MessageConsumer consumer = queueSession.commitcreateConsumer(queue);
    }     /**return consumer;
    }

* Rollbacks all sent messages./**
     * Receive messages using the consumer.
* @throws JMSException      */ @param consumer The message publicconsumer voidobject rollbackMessages()of throwsthe JMSExceptionsubscriber.
{     * @throws NamingException
 log.info("Rollbacks all uncommitted messages."); * @throws JMSException
     queueSession.rollback();*/
    }public void receiveMessages(MessageConsumer consumer) throws /**NamingException, JMSException {

  * Gets an AMQP connection string. int receivedMessageCount = 0;
	//have *5 seconds as receive timeout value *to stop @paramthe usernameconsumer
authorized username for the connection string.   while(null !=  * @param password authorizes password for the connection string.consumer.receive(5000)){
            receivedMessageCount ++;
* @return AMQP Connection URL    }
 */     private String getTCPConnectionURL(String username, String password) {System.out.println("Received message count: " + receivedMessageCount);

    }

    // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'**
     * Close the connections returnat newthe StringBuffer()end of operation
     * @param consumer The message consumer object of the .append("amqp://").append(username).append(":").append(password)
   subscriber.
     * @throws JMSException
     */
    public void .append("@").append(CARBON_CLIENT_ID)closeConnections(MessageConsumer consumer) throws JMSException{
        consumer.close();
        queueSession.close();
        queueConnection.append("/").append(CARBON_VIRTUAL_HOST_NAME)stop();
        queueConnection.close();
    }

    /**
     .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
  * Creates amqp url.
     *
     *  .append("'")
     @param username The username for the amqp url.
     * @param password The password .toString();for the amqp url.
 }     /** @return     * Shutting down the consumerAMQP url.
     */
    private *String @throws JMSException
  getTCPConnectionURL(String username, String password) {
  */     public void shutdownPublisher() throws JMSException {// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new  log.info("Shutting down publisher.");StringBuffer()
          // Housekeeping     .append("amqp://").append(username).append(":").append(password)
   if (null != queueSession) {             queueSession.close();.append("@").append(CARBON_CLIENT_ID)
        }         if (null != queueConnection) {
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
                queueConnection.close();
        }append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
}
Localtab
titleQueueConsumerMain.java
Code Block
languagejava
package org.sample.jms;

import orgjavax.apachejms.log4j.LoggerJMSException;
import javax.jms.JMSExceptionMessageConsumer;
import javax.jmsnaming.MessageNamingException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
/**
 * This class contains methods and properties relate to Queue Receiver (Subscriber)
 */
public class QueueConsumer {
    private static Logger log = Logger.getLogger(QueueConsumer.class
/**
 * Sample executor class for message TTL
 */
public class Main {

    public static void main(String[] args) throws NamingException, JMSException {

        SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
        MessageConsumer consumer = queueReceiver.registerSubscriber();

   /**     //Send messages with *very Andesless initialtime contextto factory.live value
    */     public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"System.out.println("Sending 5 messages with TTL value of 1sec");
    /**    SampleQueueSender queueSenderWithTTL *= Connection factory name prefix.new SampleQueueSender();
       */
 queueSenderWithTTL.sendMessages(5,1000);
   public static final String CF_NAME_PREFIX = "connectionfactory." queueReceiver.receiveMessages(consumer);

   /**     //send *messages Andeswithout connectiontime factoryto name.live value
    */     public static final String CF_NAME = "andesConnectionfactory";System.out.println("Sending 5 messages without TTL");
       /** SampleQueueSender queueSenderWithoutTTL = new SampleQueueSender();
* The authorized username for the AMQP connection url.queueSenderWithoutTTL.sendMessages(5,0);
     */   queueReceiver.receiveMessages(consumer);

private static final String userName = "admin";  //send messages with /**considerable time to live value
 * The authorized password for the AMQP connection url.
     */ System.out.println("Sending 5 messages TTL value of 10sec");
     private static final StringSampleQueueSender passwordqueueSenderWithMediumTTL = "admin"new SampleQueueSender();
    /**    queueSenderWithMediumTTL.sendMessages(5,10000);
 * Client id for the AMQP connection urlqueueReceiver.receiveMessages(consumer);
     *   //close the connection
  private static final String CARBON_CLIENT_ID = "carbon"; queueReceiver.closeConnections(consumer);
    }
/**
     * MB's Virtual host name should be match with this, default name is "carbon" can be configured.
     */
    private static final String CARBON_VIRTUAL_HOST_NAME = "carbon";
    /**
     * IP Address of the host for AMQP connection url.
     */
    private static final String CARBON_DEFAULT_HOSTNAME = "localhost";
    /**
     * Standard AMQP port number for the connection url.
     */
    private static final String CARBON_DEFAULT_PORT = "5672";
    /**
     * Queue prefix for initializing context.
     */
    private static final String QUEUE_NAME_PREFIX = "queue.";
    /**
     * The queue connection in which the messages would be published.
     */
    private QueueConnection queueConnection;
    /**
     * The queue session in which the messages would be published.
     */
    private QueueSession queueSession;
    /**
     * The message consumer for the subscriber.
     */
    private MessageConsumer consumer;
    /**
     * Creating a Message Consumer.
     *
     * @param queueName The name of the queue in which the subscriber should listen to.
     * @throws NamingException
     * @throws JMSException
     */
    public QueueConsumer(String queueName) throws NamingException, JMSException {
        // Creating properties for the initial context
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        // Creating initial context
        InitialContext initialContext = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) initialContext.lookup(CF_NAME);
        // Create a JMS connection
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        // Create JMS session object
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Look up a JMS queue
        Queue queue = (Queue) initialContext.lookup(queueName);
        // Create JMS consumer
        consumer = queueSession.createConsumer(queue);
        // Adding a shutdown hook listener
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    shutdownConsumer();
                } catch (JMSException jmsException) {
                    throw new RuntimeException(jmsException.getMessage(), jmsException);
                }
            }
        });
    }
    /**
     * Receives a single message through the subscriber.
     *
     * @return true if a message was received, else false
     * @throws NamingException
     * @throws JMSException
     */
    public boolean receiveMessage() throws NamingException, JMSException {
        long waitingTime = 5000;
        Message receivedMessage = this.consumer.receive(waitingTime);
        if (null == receivedMessage) {
            log.info("No messages were received within " + waitingTime / 1000 + " seconds.");
            return false;
        } else {
            TextMessage message = (TextMessage) receivedMessage;
            log.info("Received message : " + message.getText());
            return true;
        }
    }
    /**
     * Gets an AMQP connection string.
     *
     * @param username authorized username for the connection string.
     * @param password authorizes password for the connection string.
     * @return AMQP Connection URL
     */
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT)
                .append("'")
                .toString();
    }
    /**
     * Shutting down the consumer.
     */
    public void shutdownConsumer() throws JMSException {
        log.info("Shutting down consumer.");
        // Housekeeping
        if (null != consumer) {
            consumer.close();
        }
        if (null != queueSession) {
            queueSession.close();
        }
        if (null != queueConnection) {
            queueConnection.stop();
        }
        if (null != queueConnection) {
            queueConnection.close();
        }
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
import javax.naming.NamingException;
/**
 * The following class contains a publisher transactional sample. This sample uses publisher transactions so that it
 * would help in recovering published messages in case if the server goes down. This helps to prevent message loss.
 */
public class MainClass {
    private static final Logger log = Logger.getLogger(MainClass.class);
    /**
     * The main method for the transactional publishing sample.
     *
     * @param args The arguments passed.
     * @throws NamingException
     * @throws JMSException
     */
    public static void main(String[] args) throws NamingException, JMSException {
        // Creating a message consumer
        QueueConsumer queueConsumer = new QueueConsumer("Transactional-Queue");
        // Creating a transactional message publisher
        TransactionalQueuePublisher transactionalQueuePublisher = new TransactionalQueuePublisher("Transactional-Queue");
        log.info("------Sample for Message Sending and Committing.------");
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My First Message.");
        // Attempts to receive a message. No messages were received here as the send message was not committed.
        queueConsumer.receiveMessage();
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Second Message.");
        // Committing all published messages.
        transactionalQueuePublisher.commitMessages();
        // Receives a message.
        queueConsumer.receiveMessage();
        // Receives a message.
        queueConsumer.receiveMessage();
        log.info("------Sample for Message Sending, Rollback and Committing.------");
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Third Message.");
        // Attempts to receive a message. No messages were received here as the sent message was not committed.
        queueConsumer.receiveMessage();
        // Rollbacks all published messages. This can be used in-case if the server has gone down and in need of
        // recovering published messages.
        transactionalQueuePublisher.rollbackMessages();
        // Publishes a messages
        transactionalQueuePublisher.sendMessage("My Forth Message.");
        // Committing all published messages.
        transactionalQueuePublisher.commitMessages();
        // Receives a message.
        queueConsumer.receiveMessage();
        // Attempts to receive a message. No messages were received here as all the messages were received.
        queueConsumer.receiveMessage();
        // Shutting down the sample.
        System.exit(0);
    }
}

Building the sample

Run the ant command from <MB_HOME>/Samples/TransactionalPublisher directory.

Analyzing the output

The result log shown above explains how the transactional session has worked when publishing messages:

Code Block
linenumberstrue
[java] INFO : org.sample.jms.MainClass - ------Sample for Message Sending and Committing.------ [java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My First Message. [java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Second Message. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Committing messages. [java] INFO : org.sample.jms.QueueConsumer - Received message : My First Message. [java] INFO : org.sample.jms.QueueConsumer - Received message : My Second Message. [java] INFO : org.sample.jms.MainClass - ------Sample for Message Sending, Rollback and Committing.------ [java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Third Message. [java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Rollbacks all uncommitted messages. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Message sent : My Forth Message. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Committing messages. [java] INFO : org.sample.jms.QueueConsumer - Received message : My Forth Message. [java] INFO : org.sample.jms.QueueConsumer - No messages were received within 5 seconds. [java] INFO : org.sample.jms.TransactionalQueuePublisher - Shutting down publisher. [java] INFO : org.sample.jms.QueueConsumer - Shutting down consumer
}

Building the sample

Run the ant command from the <MB_HOME>/Samples/JmsExpirationSample directory.

Analyzing the output

You will get the following log in your console.

Code Block
linenumberstrue
[java] Sending 5 messages with TTL value of 1sec
[java] Received message count: 0
[java] Sending 5 messages without TTL
[java] Received message count: 5
[java] Sending 5 messages TTL value of 10sec
[java] Received message count: 5

The first 5 messages were published with a TTL value of one second and none of them got delivered since they expired. In the second case, 5 messages were sent without a TTL value and all of them got delivered. In the last case, 5 messages were sent with a TTL value of ten seconds and all of them got delivered since they could reach the recipient before the messages expire.