This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Introduction

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes.

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games. The configuration of this class is as follows.

    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class SampleHierarchicalTopicsClient extends Thread{
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String topicName_1 = "Games";
        String topicName_2 = "Games.Cricket";
        String topicName_3 = "Games.Cricket.SL";
        String topicName_4 = "Games.Cricket.India";
        String topicName_5 = "Games.Cricket.India.Delhi";
        String topicName_6 = "Games.Cricket.*";
        String topicName_7 = "Games.Cricket.#";
        @Override
        public void run() {
            try {
                subscribe();
            } catch (NamingException e) {
                e.printStackTrace();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        public void subscribe() throws NamingException, JMSException {
            InitialContext ctx = init();
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Topic topic1 = topicSession.createTopic(topicName_1);
            Topic topic2 = topicSession.createTopic(topicName_2);
            Topic topic3 = topicSession.createTopic(topicName_3);
            Topic topic4 = topicSession.createTopic(topicName_4);
            Topic topic5 = topicSession.createTopic(topicName_5);
            Topic topic6 = (Topic) ctx.lookup(topicName_6);
            Topic topic7 = (Topic) ctx.lookup(topicName_7);
            TopicSubscriber topicSubscriber1 = topicSession.createSubscriber(topic6);
            TopicSubscriber topicSubscriber2 = topicSession.createSubscriber(topic7);
            // Receive messages
            Message message1;
    	System.out.println(" Receiving messages for " + topicName_6 + " :");
            while ((message1 = topicSubscriber1.receive(5000)) != null){
                if (message1 instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message1;
                    System.out.println("Got Message from subscriber1 => " + textMessage.getText());
                }
            }
            Message message2;
    	System.out.println(" Receiving messages for " + topicName_7 + " :");
            while ((message2 = topicSubscriber2.receive(5000)) != null){
                if (message2 instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message2;
                    System.out.println("Got Message from subscriber2 => " + textMessage.getText());
                }
            }
            topicSubscriber1.close();
            topicSubscriber2.close();
            topicSession.close();
            topicConnection.stop();
            topicConnection.close();
        }
        private InitialContext init() throws NamingException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic."+topicName_6,topicName_6);
            properties.put("topic."+topicName_7,topicName_7);
            return new InitialContext(properties);
        }
        private String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
    }
  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above. The configuration of this class is as follows.

    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class TopicPublisher {
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String topicName_1 = "Games";
        String topicName_2 = "Games.Cricket";
        String topicName_3 = "Games.Cricket.SL";
        String topicName_4 = "Games.Cricket.India";
        String topicName_5 = "Games.Cricket.India.Delhi";
        public void publishMessage() throws NamingException, JMSException {
            InitialContext ctx = init();
            // Lookup connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection topicConnection = connFactory.createTopicConnection();
            topicConnection.start();
            TopicSession topicSession =
                    topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Topic topic1 = (Topic) ctx.lookup(topicName_1);
            Topic topic2 = (Topic) ctx.lookup(topicName_2);
            Topic topic3 = (Topic) ctx.lookup(topicName_3);
            Topic topic4 = (Topic) ctx.lookup(topicName_4);
            Topic topic5 = (Topic) ctx.lookup(topicName_5);
    
            javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
            javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
            javax.jms.TopicPublisher topicPublisher3 = topicSession.createPublisher(topic3);
            javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
            javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);
            // Create the messages to send
            TextMessage textMessage1 = topicSession.createTextMessage("Message for Cricket");
            TextMessage textMessage2 = topicSession.createTextMessage("Message for SL");
            TextMessage textMessage3 = topicSession.createTextMessage("Message for India");
            TextMessage textMessage4 = topicSession.createTextMessage("Message for Delhi");
            topicPublisher2.publish(textMessage1);
            topicPublisher3.publish(textMessage2);
            topicPublisher4.publish(textMessage3);
            topicPublisher5.publish(textMessage4);
            topicSession.close();
            topicConnection.close();
        }
        private InitialContext init() throws NamingException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic."+topicName_1,topicName_1);
            properties.put("topic."+topicName_2,topicName_2);
            properties.put("topic."+topicName_3,topicName_3);
            properties.put("topic."+topicName_4,topicName_4);
            properties.put("topic."+topicName_5,topicName_5);
    
            return new InitialContext(properties);
        }
        private String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
    }
  • Main.java class defines the method to call both the clients. The configuration of this class is as follows.

    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    
    import javax.jms.JMSException;
    import javax.naming.NamingException;
    public class Main {
        public static void main(String[] args) throws NamingException, JMSException {
            SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
            hierarchicalTopicsClient.start();
            TopicPublisher topicPublisher = new TopicPublisher();
            topicPublisher.publishMessage();
        }
    }

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

     [java]  Receiving messages for Games.Cricket.* :
     [java]  Receiving messages for Games.Cricket.# :
  • No labels