Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • DurableTopicSubscriber.java class creates a durable topic subscription named mySub1. The configuration code of this file class is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.*;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import java.util.Properties;
    public class DurableTopicSubscriber {
        public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "andesConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        private String topicName = "newTopic";
        private String subscriptionId = "mySub1";
        private boolean useListener = true;
        private int delayBetMessages = 200;
        private int messageCount = 10;
        private SampleMessageListener messageListener;
        private TopicConnection topicConnection;
        private TopicSession topicSession;
        private TopicSubscriber topicSubscriber;
        public void subscribe() {
            try {
                System.out.println("Starting the subscriber");
                Properties properties = new Properties();
                properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
                properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
                properties.put("topic." + topicName, topicName);
                InitialContext ctx = new InitialContext(properties);
                // Lookup connection factory
                TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
                topicConnection = connFactory.createTopicConnection();
                topicConnection.start();
                topicSession =
                        topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                // create durable subscriber with subscription ID
                Topic topic = (Topic) ctx.lookup(topicName);
                topicSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
                if (!useListener) {
                    for (int count = 0; count < messageCount; count++) {
                        Message message = topicSubscriber.receive();
                        System.out.println("count = " + count);
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println(count + ". textMessage.getText() = " + textMessage.getText());
                        }
                        if (delayBetMessages != 0) {
                            Thread.sleep(delayBetMessages);
                        }
                    }
                    topicConnection.close();
                } else {
                    messageListener = new SampleMessageListener(delayBetMessages);
                    topicSubscriber.setMessageListener(messageListener);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public String getTCPConnectionURL(String username, String password) {
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
        public void stopSubscriber() throws JMSException {
            topicSubscriber.close();
            topicSession.close();
            topicConnection.close();
            System.out.println("Closing Subscriber");
        }
    }
  • SampleMessageListener.java class creates a consumer for the durable topic subscription. The configuration code of this file class is as follows.

    Code Block
    languagejava
    /*
     * Copyright 2004,2005 The Apache Software Foundation.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.sample.jms;
    import javax.jms.*;
    public class SampleMessageListener implements MessageListener {
        private int delay = 0;
        private int currentMsgCount = 0;
        public SampleMessageListener(int delay) {
            this.delay = delay;
        }
        public void onMessage(Message message) {
            TextMessage receivedMessage = (TextMessage) message;
            try {
                System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
                currentMsgCount++;
                if(delay != 0) {
                    try {
                        Thread.sleep(delay);
                    } catch (InterruptedException e) {
                        //silently ignore
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
  • TopicPublisher.java class creates a publisher to publish messages in the durable topic. The

    configuration

    code of this

    file

    class is as follows.

    Main.java defines the method for calling the three clients mentioned above. The configuration of this file

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class TopicPublisher {
        public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "andesConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String topicName = "newTopic";
        public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic."+topicName,topicName);
            InitialContext ctx = new InitialContext(properties);
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Topic topic = (Topic)ctx.lookup(topicName);
            // Create the messages to send
            TextMessage textMessage = topicSession.createTextMessage("Test Message");
            javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
            System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
            for (int i = 0; i < numOfMsgs; i++)
             {
                 topicPublisher.publish(textMessage);
                 Thread.sleep(1000);
             }
            topicPublisher.close();
            topicSession.close();
            topicConnection.close();
        }
        public String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
    }
  • Main.java defines the method for calling the three clients mentioned above. The code of this class is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.naming.NamingException;
    
    public class Main {
        public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
            DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
            durableTopicSubscriber.subscribe();
            TopicPublisher topicPublisher = new TopicPublisher();
            topicPublisher.publishMessage(5);
            Thread.sleep(5000);
            durableTopicSubscriber.stopSubscriber();
            TopicPublisher topicPublisher2 = new TopicPublisher();
            topicPublisher2.publishMessage(5);
            Thread.sleep(5000);
            DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
            durableTopicSubscriber2.subscribe();
            TopicPublisher topicPublisher3 = new TopicPublisher();
            topicPublisher3.publishMessage(5);
            Thread.sleep(5000);
            durableTopicSubscriber2.stopSubscriber();
        }
    }

...