This documentation is for WSO2 Message Broker version 2.1.0. View documentation for the latest release.

Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

Objectives

Durable topic subscriptions persists the subscription on behalf of the client subscription when it is not online. In simple words, durable topics keep messages around persistently for any suitable consumer to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactive.

Prerequisites

Install and run the WSO2 Message Broker using the instructions in section Getting Started.

Create a durable topic Subscription 

Following code can be used to create a durable topic subscription with subscription Id  "mySub1". 

/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;

public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "myTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 100;
    private SampleMessageListener messageListener;
    public static void main(String[] args) {
        DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
        durableTopicSubscriber.subscribe();
    }
    public void subscribe()  {
        try {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic."+topicName,topicName);
            System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // create durable subscriber with subscription ID
            Topic topic = (Topic) ctx.lookup(topicName);
            javax.jms.TopicSubscriber  topicSubscriber = topicSession.createDurableSubscriber(topic,subscriptionId);
            if(!useListener)  {
                for(int count=0;count<messageCount;count++) {
                    Message message = topicSubscriber.receive();
                    System.out.println("count = " + count);
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println(count+". textMessage.getText() = " + textMessage.getText());
                    }
                    if(delayBetMessages !=0)    {
                        Thread.sleep(delayBetMessages);
                    }
                }
                topicConnection.close();
            } else {
                 messageListener = new SampleMessageListener(topicConnection,topicSession,topicSubscriber,
                         delayBetMessages,messageCount,subscriptionId);
                 topicSubscriber.setMessageListener(messageListener);
                 Thread.sleep(90*1000*60);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public void stop(){
          this.messageListener.close();
    }
}


 

Following  is the SampleMessageListener.java class used to consume messages used in the above code.

/*
 * Copyright 2004,2005 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private TopicSubscriber topicSubscriber;
    private int delay = 0;
    private int messageCount = 0;
    private int currentMsgCount = 0;
    private String subscriptionId = "";
    public SampleMessageListener(TopicConnection topicConnection,
                                 TopicSession topicSession,
                                 TopicSubscriber topicSubscriber, int delay, int messageCount, String subscriptionId) {
        this.topicConnection = topicConnection;
        this.topicSession = topicSession;
        this.topicSubscriber = topicSubscriber;
        this.delay = delay;
        this.messageCount = messageCount;
        this.subscriptionId = subscriptionId;
    }
    public void onMessage(Message message) {
        TextMessage receivedMessage = (TextMessage) message;
        try {
            System.out.println(currentMsgCount+". Got the message ==> " + receivedMessage.getText());
            currentMsgCount++;
            if(currentMsgCount == messageCount){
                close();
            }
            if(delay != 0) {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    //silently ignore
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public void close() {
        try {
            System.out.println("unSubscribing Subscriber");
            this.topicSession.unsubscribe(subscriptionId);
            this.topicConnection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public void stop() {
        try {
            System.out.println("closing Subscriber");
            topicSubscriber.close();
            this.topicConnection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 

To publish messages to the above created topic a normal topic publisher code can be used.

Scenario demonstrating  functionality of durable topic subscriptions

 

  1. Run DurableTopicSubscriber and make a durable topic subscriber.
  2. Send 5 messages to the topic "myTopic".
  3. They will be received and printed by durableTopicSubscriber.
  4. Now stop the  durableTopicSubscriber.
  5. Run the publisher again. Send above 5 messages again.
  6. Now while running  durableTopicSubscriber again, send 5 more different messages to the same topic.
  7. You will see all 10 (including messages sent to the topic when the subscriber was absent) messages are consumed by the  durableTopicSubscriber.
  8. Now continue to send 200 more messages.
  9. After 100 messages are received, durableTopicSubscriber will be closed. 
  10. After running durableTopicSubscriber again you will receive nothing (as durableTopicSubscriber was unsubscribed after receiving 100 messages).

 

 




  • No labels