Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Objectives

Durable topic subscriptions persist the subscription on behalf of the client subscription when it is not online. In simple words, durable

Table of Contents
maxLevel3
minLevel3

Introduction

Durable topics keep messages around persistently for any until a suitable consumer is available to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactiveSee Managing Durable Topic Subscriptions for more information.

Prerequisites

Install and run the WSO2 Message Broker using the instructions in the Getting Started section.

Create a durable topic Subscription 

The following code can be used to create a durable topic subscription with subscription ID "mySub1". 

...

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The <MB_HOME>/Samples/DurableTopicSubscriber/src/org/sample/jms directory has the following classes.

  • DurableTopicSubscriber.java creates a durable topic subscription named mySub1. The configuration of this file is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    

...

  • package 

...

  • org.sample.jms

...

  • ;
    import javax.jms.

...

  • *;
    import javax.

...

  • naming.

...

  • Context;
    import javax.

...

  • naming.

...

  • InitialContext;
    import 

...

  • java.

...

  • util.

...

  • Properties;
    

...

  • public class DurableTopicSubscriber {
        public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "andesConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        private String topicName = "

...

  • newTopic";
        private String subscriptionId = "mySub1";
        private boolean useListener = true;
        private int delayBetMessages = 200;
        private int messageCount = 

...

  • 10;
        private SampleMessageListener messageListener;
        

...

  • private 

...

  • TopicConnection 

...

  • topicConnection;
        private TopicSession topicSession;
        

...

  • private 

...

  • TopicSubscriber topicSubscriber;
        public 

...

  • void 

...

  • subscribe()

...

  •  {
        

...

  •     try 

...

  • {
     

...

  •  

...

  •   

...

  •         

...

  • System.out.println("Starting the subscriber");
                Properties properties = new Properties();
                properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
                properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
                properties.put("topic." + topicName, topicName);
                InitialContext 

...

  • ctx = new InitialContext(properties);
                // Lookup connection factory
                TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
                

...

  • topicConnection = connFactory.createTopicConnection();
                topicConnection.start();

...

  • 
               

...

  •  topicSession =
                        topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                // create durable subscriber with subscription ID
                Topic topic = (Topic) ctx.lookup(topicName);

...

  • 
              

...

  •   topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
                if (!useListener)

...

  •  {
                    for (int count = 0;

...

  •  count < messageCount; count++) {
                        Message message = topicSubscriber.receive();
                        System.out.println("count = " + count);
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println(count + ". textMessage.getText() = " + textMessage.getText());
                        }
                        if (delayBetMessages != 0) 

...

  • {
                            Thread.sleep(delayBetMessages);
                        }
                    }
                    topicConnection.close();
                } else {

...

  • 
                    messageListener = new SampleMessageListener(

...

  • delayBetMessages);
                    topicSubscriber.setMessageListener(messageListener);
             

...

  •    }
            } catch (Exception e) {
     

...

  •            e.printStackTrace();
          

...

  •   

...

  • }
        }
        public String getTCPConnectionURL(String username, 

...

  • String password) {
            return new 

...

  • StringBuffer()

...

  • 
          

...

  •      

...

  •      

...

  • .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
        public void 

...

  • stopSubscriber()

...

  •  throws JMSException {
           

...

  •  topicSubscriber.

...

  • close();
        

...

  •  

...

  •   

...

Following is the SampleMessageListener.java class used to consume messages used in the above code.

...

languagejava

...

  •  topicSession.close();
            topicConnection.close();
            System.out.println("Closing Subscriber");
     

...

  •  

...

  •  

...

  •  

...

  • }
    }
  • SampleMessageListener.java creates a consumer for the durable topic subscription. The configuration of this file is as follows.

    Code Block
    languagejava
    /*
     * Copyright 2004,2005 The Apache Software Foundation.
     *
     

...

  • * Licensed under the Apache License, Version 2.0 (the "License");
     * you 

...

  • may not use this file 

...

  • except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    

...

  • package org.sample.jms;
    import javax.jms.*;
    public class SampleMessageListener implements MessageListener {
        private int 

...

  • delay = 

...

  • 0;
        private 

...

  • int 

...

  • currentMsgCount 

...

  • = 0;
        

...

  • public SampleMessageListener(int delay) 

...

  • {
     

...

  •      

...

  •  

...

  •  

...

  • this.delay = 

...

  • delay;
        

...

  • }
    

...

  •  

...

  •  

...

  •  

...

  •  public void onMessage(Message message) 

...

  • {
    

...

  •  

...

  •  

...

  •  

...

  •      

...

  • TextMessage 

...

  • receivedMessage = (TextMessage) message;
            try {
                System.out.println("Got the message ==> " + (currentMsgCount+1) + " 

...

  • - 

...

  • "+ receivedMessage.getText());
                currentMsgCount++;
                if(delay != 0) {
     

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •        try {
    

...

  •  

...

  •  

...

  •          

...

  •  

...

  •  

...

  •        Thread.sleep(delay);
     

...

  •  

...

  •  

...

  •          

...

  •  

...

  •  

...

  •   } catch (InterruptedException e) {
       

...

  •    

...

  •  

...

  •          

...

  •  

...

  •  

...

  •   //silently ignore
     

...

  •      

...

  •  

...

  •  

...

  •  

...

  •  

...

  •       }
      

...

  •  

...

  •  

...

  •  

...

  •  

...

  •       }
      

...

  •  

...

  •      } catch (JMSException e) {
       

...

  •          e.printStackTrace();
            }
       

...

  •  

...

  • }
    }
  • TopicPublisher.java creates a publisher to publish messages in the durable topic. The configuration of this file is as follows.
  • Main.java defines the method for calling the three clients mentioned above. The configuration of this file is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.naming.NamingException;
    
    public class Main {
        public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
            

...

  • DurableTopicSubscriber 

...

  • durableTopicSubscriber = new DurableTopicSubscriber();
            

...

  • durableTopicSubscriber.subscribe();
            TopicPublisher topicPublisher = new 

...

  • TopicPublisher(

...

  • );
            

...

  • topicPublisher.publishMessage(5);
            

...

  • Thread.sleep(5000);
            

...

  • durableTopicSubscriber.stopSubscriber();
            TopicPublisher 

...

  • topicPublisher2 = new TopicPublisher();
     

...

  •      

...

  •  

...

  •  

...

  • topicPublisher2.publishMessage(5);
    

...

  •         Thread.sleep(5000);
    

...

  •  

...

  •        DurableTopicSubscriber durableTopicSubscriber2 = 

...

  • new DurableTopicSubscriber();

...

  • 
            

...

  • durableTopicSubscriber2.subscribe();
            TopicPublisher topicPublisher3 = new 

...

  • TopicPublisher();
            

...

  • topicPublisher3.publishMessage(5);
            

...

  • Thread.sleep(5000);
            

...

  • durableTopicSubscriber2.stopSubscriber();
        }
    }

...

To publish messages to the topic created above, a normal topic publisher code can be used.

Scenario demonstrating the functionality of durable topic subscriptions

...

Executing the sample

Run the ant command from the <MB_Home>/samples/DurableTopicSubscriber directory.

Analyzing the output

The scenario used in this sample to demonstrate durable topic subscriptions is as follows.

  1. durableTopicSubscriber is run to create a durable topic subscriber.
  2. Send 5 messages are sent to the "myTopic" topic topic.The  The messages will be received and printed by the subscriber named durableTopicSubscriber.
  3. Now stop the The durableTopicSubscriber is stopped.
  4. Run the The publisher is run again . Send the and 5 more messages againare sent.
  5. Now while running durableTopicSubscriber again, send While running durableTopicSubscriber again, 5 different messages are sent to the same topic.You  You will see that all 10 messages (including the messages sent to the topic when the subscriber was absent) are consumed by the the durableTopicSubscriber.
  6. Now continue to send 200 more messages .After are sent. durableTopicSubscriber will close after 100 messages are received, durableTopicSubscriber will close. After running durableTopicSubscriber again . After running durableTopicSubscriber again you will receive nothing (as durableTopicSubscriber was unsubscribed after receiving 100 messages).

...