Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Objectives

...

Prerequisites

Install and run the WSO2 Message Broker using the instructions in section Installation Guide.

 

Create subscriptions to available hierarchical topic patterns

...

languagejava

...

Table of Contents
maxLevel3
minLevel3

Introduction

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes.

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games. The configuration of this class is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    

...

  • public class SampleHierarchicalTopicsClient extends Thread{
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String topicName_1 = "

...

  • Games";
        String topicName_2 = "

...

  • Games.

...

  • Cricket";
        String topicName_3 = "

...

  • Games.Cricket.

...

  • SL";
        String topicName_4 = "

...

  • Games.Cricket.

...

  • India";
        String 

...

  • topicName_5 

...

  • = "Games.Cricket.India.Delhi";
        String topicName_6 = "Games.Cricket.*";
        String topicName_7 

...

  • = "Games.Cricket.#";
        @Override
        public void 

...

  • run()

...

  •  {
       

...

  •      try 

...

  • {
    

...

  •  

...

  •  

...

  •  

...

  •  

...

  •         

...

  • subscribe();
            } 

...

  • catch 

...

  • (NamingException 

...

  • e) 

...

  • {
            

...

  •  

...

  •  

...

  •  

...

  •  

...

  • e.

...

  • printStackTrace(

...

  • );
            

...

  • } 

...

  • catch 

...

  • (JMSException e) {
            

...

  •     e.printStackTrace();
            

...

  • }
    

...

  •  

...

  •    }
        public void subscribe() 

...

  • throws NamingException, JMSException {
            

...

  • InitialContext 

...

  • ctx = 

...

  • init();
            

...

  • // Lookup connection factory
            TopicConnectionFactory 

...

  • connFactory 

...

  • = (

...

  • TopicConnectionFactory) ctx.lookup(

...

  • CF_

...

  • NAME);
            

...

  • TopicConnection 

...

  • topicConnection = 

...

  • connFactory.createTopicConnection();
            

...

  • topicConnection.start();
            

...

  • TopicSession 

...

  • topicSession =
    

...

  •                 

...

  • topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            Topic topic1 

...

  • = topicSession.createTopic(topicName_1);
            

...

  • Topic 

...

  • topic2 = 

...

  • topicSession.

...

  • createTopic(

...

  • topicName_2)

...

  • ;
    

...

  •  

...

  •        Topic topic3 

...

  • = topicSession.createTopic(topicName_3);
            Topic topic4 = topicSession.createTopic(topicName_4);
         

...

  •    Topic topic5 = 

...

  • topicSession.createTopic(topicName_5);
            Topic topic6 

...

  • = (Topic) ctx.lookup(topicName_6);
            Topic topic7 = 

...

  • (Topic) ctx.lookup(topicName_7);
            

...

  • TopicSubscriber topicSubscriber1 

...

  • = topicSession.createSubscriber(topic6);
            

...

  • TopicSubscriber 

...

  • topicSubscriber2 = 

...

  • topicSession.

...

  • createSubscriber(

...

  • topic7)

...

  • ;
    

...

  •  

...

  •        // Receive messages
       

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  Message message1;
    	System.out.println(" Receiving messages for " + topicName_6 + " :");
            

...

  • while 

...

  • ((message1 = topicSubscriber1.receive(

...

  • 5000)) 

...

  • != null){
                if (message1 

...

  • instanceof TextMessage) {
                    TextMessage textMessage 

...

  • = (TextMessage) message1;
          

...

  •           System.out.println("Got Message from subscriber1 => " + 

...

  • textMessage.

...

  • getText());
            

...

  •     }
        

...

  •     

...

  • }
      

...

  •       Message 

...

  • message2;
    	System.out.println(" Receiving messages for " + topicName_7 + " :");
        

...

  •     while ((message2 

...

  • = topicSubscriber2.receive(5000)) != null){
             

...

  •  

...

  •  

...

  •  

...

  • if 

...

  • (

...

  • message2 instanceof TextMessage) {
             

...

  •  

...

  •       TextMessage textMessage = 

...

  • (TextMessage) message2;
            

...

  •         

...

  • System.out.println("Got Message from subscriber2 => " + textMessage.getText());
              

...

  •  

...

  •  

...

  • }
        

...

  •     }
     

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  

...

  •  topicSubscriber1.close();
           

...

  •  topicSubscriber2.close();
            

...

  • topicSession.close();
            topicConnection.stop();
            topicConnection.

...

  • close();
        }
        private InitialContext init() throws NamingException 

...

  • {
        

...

  •     Properties properties = new Properties();
            properties.

...

  • put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.

...

  • put("

...

  • topic."+topicName_6,topicName_6);
            properties.put("topic."+topicName_7,topicName_7);
            return new InitialContext(properties);
        }
        private String getTCPConnectionURL(String username, String password) {
    

...

  •      

...

Publish a message to each parent and child topic

...

languagejava

...

  •    // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
    }
  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above. The configuration of this class is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    import javax.jms.JMSException;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    import java.util.Properties;
    public class TopicPublisher {
        public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
        private static final String CF_NAME_PREFIX = "connectionfactory.";
        private static final String CF_NAME = "qpidConnectionfactory";
        String userName = "admin";
        String password = "admin";
        private static String CARBON_CLIENT_ID = "carbon";
        private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
        private static String CARBON_DEFAULT_HOSTNAME = "localhost";
        private static String CARBON_DEFAULT_PORT = "5672";
        String topicName_1 = "Games";
        String topicName_2 = "Games.Cricket";
        String topicName_3 = "Games.Cricket.SL";
        String topicName_4 = "Games.Cricket.India";
        String topicName_5 = "Games.Cricket.India.Delhi";
        public void publishMessage() throws NamingException, JMSException {
            InitialContext ctx = init();
            

...

  • // Lookup connection factory
            TopicConnectionFactory 

...

  • connFactory 

...

  • = 

...

  • (TopicConnectionFactory) ctx.lookup(CF_NAME);
            TopicConnection 

...

  • topicConnection = 

...

  • connFactory.

...

  • createTopicConnection(

...

  • );
    

...

  •  

...

  •        

...

  • topicConnection.start();
            

...

  • TopicSession topicSession =
           

...

  •          

...

  • topicConnection.

...

  • createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            

...

  • Topic topic1 = (Topic) ctx.lookup(topicName_1);
            Topic 

...

  • topic2 

...

  • = 

...

  • (Topic) 

...

  • ctx.lookup(topicName_2);
            Topic topic3 

...

  • = 

...

  • (Topic) ctx.lookup(topicName_3);
            Topic topic4 = (Topic) 

...

  • ctx.

...

  • lookup(

...

  • topicName_4);
            

...

  • Topic topic5 = (Topic) ctx.lookup(topicName_5);
    
            

...

  • javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
            javax.jms.TopicPublisher 

...

  • topicPublisher2 

...

  • = 

...

  • topicSession.createPublisher(

...

  • topic2);
        

...

  •     javax.jms.TopicPublisher topicPublisher3 

...

  • = topicSession.createPublisher(topic3);
            

...

  • javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
            javax.jms.TopicPublisher 

...

  • topicPublisher5 

...

  • = 

...

  • topicSession.createPublisher(topic5);
            // Create the messages to send
      

...

  •       TextMessage textMessage1 

...

  • = topicSession.createTextMessage("Message for Cricket");
            TextMessage textMessage2 

...

  • = topicSession.createTextMessage("Message for SL");
            TextMessage textMessage3 = topicSession.createTextMessage("Message for India");
            TextMessage textMessage4 = topicSession.createTextMessage("Message for Delhi");
            topicPublisher2.

...

  • publish(textMessage1);
            topicPublisher3.publish(textMessage2);
            topicPublisher4.publish(textMessage3);
            topicPublisher5.

...

  • publish(textMessage4);
        

...

According to the above sample topicSubscriber1 is listening on pattern "WSO2.*" where it receives only the messages published to all the child elements under the parent topic '' WSO2 ". As the topicSubscriber2 listens on pattern "WSO2.#" it receives both the messages published for parent topic ' WSO2 ' as well as to its child topics like MB etc. Hence topicSubscriber1 only receives 1 message while the topicSubscriber2 receives 2 messages.

...

  •     topicSession.close();
            topicConnection.close();
        }
        private InitialContext init() throws NamingException {
            Properties properties = new Properties();
            properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
            properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
            properties.put("topic."+topicName_1,topicName_1);
            properties.put("topic."+topicName_2,topicName_2);
            properties.put("topic."+topicName_3,topicName_3);
            properties.put("topic."+topicName_4,topicName_4);
            properties.put("topic."+topicName_5,topicName_5);
    
            return new InitialContext(properties);
        }
        private String getTCPConnectionURL(String username, String password) {
            // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
            return new StringBuffer()
                    .append("amqp://").append(username).append(":").append(password)
                    .append("@").append(CARBON_CLIENT_ID)
                    .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                    .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                    .toString();
        }
    }
  • Main.java class defines the method to call both the clients. The configuration of this class is as follows.

    Code Block
    languagejava
    /*
    *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
    *
    *  WSO2 Inc. licenses this file to you under the Apache License,
    *  Version 2.0 (the "License"); you may not use this file except
    *  in compliance with the License.
    *  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
    package org.sample.jms;
    
    import javax.jms.JMSException;
    import javax.naming.NamingException;
    public class Main {
        public static void main(String[] args) throws NamingException, JMSException {
            SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
            hierarchicalTopicsClient.start();
            TopicPublisher topicPublisher = new TopicPublisher();
            topicPublisher.publishMessage();
        }
    }

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

Code Block
     [java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination).
     [java] log4j:WARN Please initialize the log4j system properly.
     [java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination).
     [java] log4j:WARN Please initialize the log4j system properly.
     [java]  Receiving messages for Games.Cricket.* :
     [java]  Receiving messages for Games.Cricket.# :