Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Objectives

Durable topic subscriptions persist the subscription on behalf of the client subscription when it is not online. In simple words, durable topics keep messages around persistently for any until a suitable consumer is available to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactive.

Prerequisites

Install and run the WSO2 Message Broker using the instructions in the Getting Started section.

Create a durable topic Subscription 

The following code can be used to create a durable topic subscription with subscription ID "mySub1". 

...

languagejava

...

 See Managing Durable Topic Subscriptions for more information.

Table of Contents
maxLevel3
minLevel3

About the sample

The <MB_HOME>/Samples/DurableTopicSubscriber/src/org/sample/jms directory has the following classes.

  • DurableTopicSubscriber.java class creates a durable topic subscription named mySub1.

  • SampleMessageListener.java class creates a consumer for the durable topic subscription. 

  • TopicPublisher.java class creates a publisher to publish messages in the durable topic. 

  • Main.java defines the method for calling the three clients mentioned above. 

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleDurableTopicSubscriber.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "newTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 10;
    private SampleMessageListener messageListener;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private TopicSubscriber topicSubscriber;
    public void subscribe() {
        try {
            System.out.println("Starting the subscriber");
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic." + topicName, topicName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
         
DurableTopicSubscriber
 
durableTopicSubscriber
 
=
 
new DurableTopicSubscriber();
// create durable subscriber with subscription ID
   
durableTopicSubscriber.subscribe();
     
}
    Topic 
public
topic 
void
= 
subscribe
(Topic) ctx.lookup(topicName);
{
         
try
 
{
  topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
       
Properties
   
properties
 
=
 
new
if 
Properties
(
);
!useListener) {
                for 
properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
(int count = 0; count < messageCount; count++) {
       
properties.put(CF_NAME_PREFIX
 
+
 
CF_NAME,
 
getTCPConnectionURL(userName,
 
password));
         Message message = 
properties
topicSubscriber.
put("topic."+topicName,topicName);
receive();
                    System.out.println("
getTCPConnectionURL(userName,password)
count = " + 
getTCPConnectionURL(userName, password)
count);
             
InitialContext
     
ctx
 
=
 
new
if 
InitialContext
(
properties);
message instanceof TextMessage) {
         
//
 
Lookup connection
 
factory
             
TopicConnectionFactory
TextMessage 
connFactory
textMessage = (
TopicConnectionFactory) ctx.lookup(CF_NAME)
TextMessage) message;
            
TopicConnection
 
topicConnection
 
=
 
connFactory.createTopicConnection();
         System.out.println(count + ". 
topicConnection.start(
textMessage.getText() = " + textMessage.getText());
            
TopicSession topicSession =
        }
                    if 
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
(delayBetMessages != 0) {
               
//
 
create
 
durable
 
subscriber
 
with
 
subscription
 
ID
   Thread.sleep(delayBetMessages);
         
Topic
 
topic
 
=
 
(Topic)
 
ctx.lookup(topicName);
       }
     
javax.jms.TopicSubscriber
  
topicSubscriber
 
=
 
topicSession.createDurableSubscriber(topic,subscriptionId);
       }
     
if(!useListener)
  
{
         topicConnection.close();
       
for(int
 
count=0;count<messageCount;count++)
 
{
   } else {
               
Message
 
message
messageListener = 
topicSubscriber.receive(
new SampleMessageListener(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);
        
System.out.println("count
 
=
 
"
 
+
 
count);
}
        } catch (Exception e) {
       
if
 
(message
 
instanceof
 
TextMessage)
 
{
 e.printStackTrace();
        }
    }
    public String getTCPConnectionURL(String username, String 
TextMessage
password) 
textMessage
{
=
 
(TextMessage)
 
message;
      return new StringBuffer()
                
System
.
out.println(count+". textMessage.getText() = " + textMessage.getText());
append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
   
}
             .append("/").append(CARBON_VIRTUAL_HOST_NAME)
        
if(delayBetMessages
 
!=0)
    
{
   .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
Thread.sleep(delayBetMessages);
    public void stopSubscriber() throws JMSException {
        topicSubscriber.close();
  
}
      topicSession.close();
        topicConnection.close();
 
}
       System.out.println("Closing Subscriber");
    }
}
Localtab
titleSampleMessageListener.java
Code Block
languagejava
package org.sample.jms;
topicConnection.close()
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int 
}
delay 
else
= 
{
0;
    private int currentMsgCount = 0;
    
messageListener = new
public SampleMessageListener(
topicConnection,topicSession,topicSubscriber,
int delay) {
        this.delay = delay;
    }
    public void 
delayBetMessages,messageCount,subscriptionId);
onMessage(Message message) {
        TextMessage receivedMessage = (TextMessage) message;
   
topicSubscriber.setMessageListener(messageListener);
     try {
            
Thread
System.out.
sleep(90*1000*60);
println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
     
}
       currentMsgCount++;
 
}
 
catch
 
(Exception
 
e)
 
{
       if(delay != 0) {
  
e.printStackTrace();
         
}
     
}
try {
   
public
 
String
 
getTCPConnectionURL(String
 
username,
 
String
 
password)
 
{
         
return
 
new
 
StringBuffer
Thread.sleep(delay);
                } catch 
.append("amqp://").append(username).append(":").append(password)
(InterruptedException e) {
                    
.append("@").append(CARBON_CLIENT_ID)
//silently ignore
                }
    
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
        }
        
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
} catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;
import 
.toString()
javax.jms.JMSException;
import javax.jms.QueueSession;
import 
} public void stop(){ this.messageListener.close(); } }

Following is the SampleMessageListener.java class used to consume messages used in the above code.

Code Block
languagejava
/* * Copyright 2004,2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import javax.jms.*; public class SampleMessageListener implements MessageListener { private TopicConnection topicConnection; private TopicSession topicSession; private TopicSubscriber topicSubscriber; private int delay = 0; private int messageCount = 0; private int currentMsgCount = 0; private String subscriptionId = ""; public SampleMessageListener(TopicConnection topicConnection,
javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
       
TopicSession topicSession,
 topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
    
TopicSubscriber
 
topicSubscriber,
 
int
 
delay,
 
int
Topic 
messageCount,
topic 
String subscriptionId) {
= (Topic)ctx.lookup(topicName);
     
this.topicConnection
 
=
 
topicConnection;
 // Create the messages to send
  
this.topicSession
 
=
 
topicSession;
    TextMessage textMessage = 
this.topicSubscriber = topicSubscriber
topicSession.createTextMessage("Test Message");
        
this.delay
javax.jms.TopicPublisher topicPublisher = 
delay
topicSession.createPublisher(topic);
     
this.messageCount =
 
messageCount;
  System.out.println("Sending " + numOfMsgs + " messages 
this.subscriptionId = subscriptionId;
to Topic: " + topicName);
  
}
     
public
 
void
for 
onMessage
(
Message
int 
message)
i 
{
= 0; i < numOfMsgs; i++)
   
TextMessage
 
receivedMessage
 
=
 
(TextMessage)
 
message;
  {
      
try
 
{
      topicPublisher.publish(textMessage);
      
System.out.println(currentMsgCount+".
 
Got
 
the
 
message
 
==>
 
"
 
+
 
receivedMessage
Thread.
getText
sleep(
)
1000);
         }
     
currentMsgCount++;
   topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    
if(delay
}
!=
 
0)
 
{
  public String getTCPConnectionURL(String username, String password) {
        
try {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
         
Thread.sleep(delay);
       .append("amqp://").append(username).append(":").append(password)
         
}
 
catch
 
(InterruptedException
 
e)
 
{
   .append("@").append(CARBON_CLIENT_ID)
                
//silently ignore
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
                
}
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
        
}
        
} catch (JMSException e) {
.toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
e.printStackTrace(); } }
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
    public static void 
close
main(String[] args) 
{
throws NamingException, JMSException, 
try
InterruptedException {
        DurableTopicSubscriber durableTopicSubscriber = new 
System.out.println("unSubscribing Subscriber"
DurableTopicSubscriber();

        
this
durableTopicSubscriber.
topicSession.unsubscribe
subscribe(
subscriptionId
);
        TopicPublisher topicPublisher = new 
this.topicConnection.close
TopicPublisher();
        
} catch (JMSException e) {
topicPublisher.publishMessage(5);
        
e.printStackTrace(
Thread.sleep(5000);
        
}
durableTopicSubscriber.stopSubscriber();
    
}
    
public void stop() {
TopicPublisher topicPublisher2 = new TopicPublisher();
   
try
 
{
    topicPublisher2.publishMessage(5);
        
System
Thread.
out.println("closing Subscriber"
sleep(5000);
        DurableTopicSubscriber durableTopicSubscriber2 = new 
topicSubscriber.close
DurableTopicSubscriber();
        
this.topicConnection.close
durableTopicSubscriber2.subscribe();
        TopicPublisher topicPublisher3 
}
= 
catch
new TopicPublisher(
JMSException e) {
);
        topicPublisher3.publishMessage(5);
        
e
Thread.
printStackTrace
sleep(5000);
        
}
durableTopicSubscriber2.stopSubscriber();
    }
}

To publish messages to the topic created above, a normal topic publisher code can be used.

Scenario demonstrating the functionality of durable topic subscriptions

...

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from the <MB_Home>/samples/DurableTopicSubscriber directory.

Analyzing the output

The scenario used in this sample to demonstrate durable topic subscriptions is as follows.

  1. durableTopicSubscriber is run to create a durable topic subscriber.
  2. Send 5 messages are sent to the "myTopic" topic topic.The  The messages will be received and printed by the subscriber named durableTopicSubscriber.
  3. Now stop the The durableTopicSubscriber is stopped.
  4. Run the The publisher is run again . Send the and 5 more messages againare sent.
  5. Now while running durableTopicSubscriber again, send While running durableTopicSubscriber again, 5 different messages are sent to the same topic.You  You will see that all 10 messages (including the messages sent to the topic when the subscriber was absent) are consumed by the durableTopicSubscriber.
  6. Now continue to send 200 more messages.
  7. After 100 messages are received, durableTopicSubscriber will close. 
  8. After running durableTopicSubscriber again you will receive nothing (as durableTopicSubscriber was unsubscribed after receiving 100 messages).

...

  1. the durableTopicSubscriber.