Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • DurableTopicSubscriber.java class creates a durable topic subscription named mySub1.

  • SampleMessageListener.java class creates a consumer for the durable topic subscription. 

  • TopicPublisher.java class creates a publisher to publish messages in the durable topic. 

  • Main.java defines the method for calling the three clients mentioned above. 

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleDurableTopicSubscriber.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "newTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 10;
    private SampleMessageListener messageListener;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private TopicSubscriber topicSubscriber;
    public void subscribe() {
        try {
            System.out.println("Starting the subscriber");
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic." + topicName, topicName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // create durable subscriber with subscription ID
            Topic topic = (Topic) ctx.lookup(topicName);
            topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
            if (!useListener) {
                for (int count = 0; count < messageCount; count++) {
                    Message message = topicSubscriber.receive();
                    System.out.println("count = " + count);
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println(count + ". textMessage.getText() = " + textMessage.getText());
                    }
                    if (delayBetMessages != 0) {
                        Thread.sleep(delayBetMessages);
                    }
                }
                topicConnection.close();
            } else {
                messageListener = new SampleMessageListener(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public void stopSubscriber() throws JMSException {
        topicSubscriber.close();
        topicSession.close();
        topicConnection.close();
        System.out.println("Closing Subscriber");
    }
}
Localtab
titleSampleMessageListener.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int delay = 0;
    private int currentMsgCount = 0;
    public SampleMessageListener(int delay) {
        this.delay = delay;
    }
    public void onMessage(Message message) {
        TextMessage receivedMessage = (TextMessage) message;
        try {
            System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
            currentMsgCount++;
            if(delay != 0) {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    //silently ignore
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)ctx.lookup(topicName);
        // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
        durableTopicSubscriber.subscribe();
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber.stopSubscriber();
        TopicPublisher topicPublisher2 = new TopicPublisher();
        topicPublisher2.publishMessage(5);
        Thread.sleep(5000);
        DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
        durableTopicSubscriber2.subscribe();
        TopicPublisher topicPublisher3 = new TopicPublisher();
        topicPublisher3.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber2.stopSubscriber();
    }
}

...