Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel3
minLevel3

Introduction

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

...

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Table of Contents
maxLevel3
minLevel3

About the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes.:

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games. The configuration of this class is as follows.

    Code Block
    languagejava
    /* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.sample.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class SampleHierarchicalTopicsClient extends Thread{ public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName_1 = "Games"; String topicName_2 = "Games.Cricket"; String topicName_3 = "Games.Cricket.SL"; String topicName_4 = "Games.Cricket.India"; String topicName_5 = "Games.Cricket.India.Delhi"; String topicName_6 = "Games.Cricket.*"; String topicName_7 = "Games.Cricket.#"; @Override public void run() { try { subscribe(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public void subscribe() throws NamingException, JMSException { InitialContext ctx = init(); // Lookup connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME); TopicConnection topicConnection = connFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession =

    .

  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above.

  • Main.java class defines the method to call both the clients.

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleSampleHierarchicalTopicsClient.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    String topicName_6 = "Games.Cricket.*";
    String topicName_7 = "Games.Cricket.#";

    private boolean isSubscriptionComplete = false;

    @Override
    public void run() {
        try {
            

...

subscribe();
        

...

} 

...

catch (NamingException e) {
         

...

 

...

 

...

 

...

e.

...

printStackTrace(

...

);
        

...

} 

...

catch 

...

(JMSException e) {
         

...

 

...

 

...

 

...

e.

...

printStackTrace(

...

);
        

...

}

...

 

...

 

...

  

...

}

   

...

 

...

public 

...

void subscribe(

...

) 

...

throws NamingException, JMSException {
     

...

 

...

 

...

 

...

InitialContext ctx

...

 = init();
        

...

// 

...

Lookup 

...

connection factory
        

...

TopicConnectionFactory 

...

connFactory = 

...

(TopicConnectionFactory) ctx.lookup(CF_NAME);
        

...

TopicConnection 

...

topicConnection 

...

= connFactory.createTopicConnection();
       

...

 

...

topicConnection.start();

        //Create two topic sessions since a number of clients cannot be connected from the same session
        TopicSession topicSession1 =
   

...

 

...

 

...

 

...

 

...

         topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
      

...

 

...

 

...

TopicSession 

...

topicSession2 =
                

...

topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        Topic topic1 = topicSession1.createTopic(topicName_1);
        Topic topic2 = topicSession1.createTopic(topicName_2);
    

...

    Topic topic3 = topicSession1.createTopic(topicName_3);
 

...

       Topic topic4 

...

= topicSession1.createTopic(topicName_4);
        Topic topic5 = topicSession1.createTopic(topicName_5);
        

...

Topic 

...

topic6 = 

...

(

...

Topic)

...

 ctx.lookup(topicName_6);
        Topic topic7 = 

...

(Topic) ctx.lookup(topicName_7);
        TopicSubscriber topicSubscriber1 = topicSession1.createSubscriber(topic6);
        

...

TopicSubscriber 

...

topicSubscriber2 = topicSession2.createSubscriber(

...

topic7);

...


        isSubscriptionComplete = true;
     

...

 

...

 

...

 

...

// 

...

Receive 

...

messages

...

 

...

       Message message1;
	System.out.println(" Receiving messages for 

...

" + topicName_6 + " 

...

:");
  

...

      while ((message1 = topicSubscriber1.

...

receive(5000)

...

) != 

...

null){
     

...

       

...

if 

...

(

...

message1 

...

instanceof TextMessage) {
     

...

       

...

    TextMessage 

...

textMessage 

...

= 

...

(TextMessage) 

...

message1;

...

 

...

         

...

 

...

 

...

 

...

 

...

  System.out.println("Got Message from subscriber1 => " + 

...

textMessage.

...

getText());
         

...

   }
        }

 

...

       Message 

...

message2;
	System.out.println(" Receiving messages for " + topicName_7 + " :");
    

...

    while 

...

((message2 

...

= 

...

topicSubscriber2.receive(5000)) != null){
            if (message2 

...

instanceof TextMessage) {
                TextMessage 

...

textMessage 

...

= 

...

(TextMessage) message2;
                System.out.

...

println("

...

Got Message from subscriber2 => " + textMessage.getText());
            }
        }

        topicSubscriber1.

...

close(

...

);
        topicSubscriber2.close();
        topicSession1.close();
        topicSession2.

...

close(

...

);
        topicConnection.stop();
        topicConnection.

...

close();
    }

    private InitialContext init() throws NamingException {
        Properties properties = new 

...

Properties();
       

...

 

...

TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above. The configuration of this class is as follows.

...

languagejava

...

properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_6,topicName_6);
        properties.put("topic."+topicName_7,topicName_7);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public boolean isSubscriptionComplete(){
        return this.isSubscriptionComplete;
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String 

...

CF_

...

NAME_

...

PREFIX = "

...

connectionfactory.";
    private static final String 

...

CF_

...

NAME = "

...

qpidConnectionfactory";
    String 

...

userName = "

...

admin";
    String 

...

password = "

...

admin";
    private static String 

...

CARBON_CLIENT_

...

ID = "

...

carbon";
    private static String 

...

CARBON_VIRTUAL_HOST_NAME = "

...

carbon";
    

...

private 

...

static 

...

String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static 

...

String CARBON_DEFAULT_PORT = 

...

"5672";
    String topicName_1 = "Games";

...

 

...

 

...

 

...

 String topicName_2 = "Games.Cricket";
    

...

String 

...

topicName_3 = 

...

"Games.Cricket.SL";
    String 

...

topicName_4 = "Games.Cricket.India";
   

...

 String topicName_5 = "Games.Cricket.India.Delhi";
    

...

public 

...

void 

...

publishMessage() throws NamingException, JMSException {

        InitialContext ctx = 

...

init();
        

...

// 

...

Lookup 

...

connection 

...

factory
        

...

TopicConnectionFactory 

...

connFactory = (

...

TopicConnectionFactory) ctx.lookup(

...

CF_

...

NAME);
        

...

TopicConnection 

...

topicConnection = 

...

connFactory.createTopicConnection();
        

...

topicConnection.start();
        

...

TopicSession 

...

topicSession =
   

...

 

...

          

...

 

...

 topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        

...

Topic topic1 = 

...

(Topic) ctx.lookup(topicName_1);
        

...

Topic topic2 = 

...

(Topic) ctx.lookup(topicName_2);
        

...

Topic topic3 = 

...

(Topic) ctx.lookup(topicName_3);
        

...

Topic topic4 = (Topic) 

...

ctx.

...

lookup(

...

topicName_4);
        

...

Topic 

...

topic5 

...

= 

...

(Topic) ctx.lookup(topicName_5);

        

...

javax.jms.TopicPublisher topicPublisher1 = topicSession.

...

createPublisher(topic1);
        javax.jms.TopicPublisher 

...

topicPublisher2 

...

= topicSession.

...

createPublisher(topic2);
        

...

javax.jms.TopicPublisher topicPublisher3 = topicSession.

...

createPublisher(topic3);
        

...

javax.jms.TopicPublisher topicPublisher4 = topicSession.

...

createPublisher(topic4);
        javax.jms.TopicPublisher topicPublisher5 = 

...

topicSession.

...

createPublisher(

...

topic5);

       

...

 // Create the messages to send
        TextMessage textMessage1 = 

...

topicSession.

...

createTextMessage("Message for Games");
        TextMessage textMessage2 = topicSession.

...

createTextMessage("Message for Cricket");
        TextMessage 

...

textMessage3 = topicSession.createTextMessage("Message for SL");
    

...

    TextMessage 

...

textMessage4 

...

= 

...

topicSession.createTextMessage("Message for India");
        TextMessage 

...

textMessage5 

...

= topicSession.createTextMessage("Message for Delhi");
        

...

topicPublisher1.

...

publish(textMessage1);
   

...

     topicPublisher2.publish(textMessage2);
        

...

topicPublisher3.

...

publish(textMessage3);
        

...

topicPublisher4.

...

publish(textMessage4);
        

...

topicPublisher5.

...

publish(textMessage5);
        

...

topicSession.

...

close();
        

...

topicConnection.

...

close();
    }

   

...

 

...

private InitialContext init(

...

) throws NamingException {
        Properties properties = new Properties();
      

...

 

...

 

...

properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        

...

properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
      

...

 

...

 

...

properties.put("topic."+topicName_1,topicName_1);
        properties.put("topic."+topicName_2,topicName_2);
        properties.

...

put("

...

topic."+topicName_3,topicName_3);
        properties.put("topic."+topicName_4,topicName_4);
        properties.

...

put("

...

topic."+topicName_5,topicName_5);
        return new InitialContext(properties);
    }

    private String 

...

getTCPConnectionURL(String username, String password) {
        

...

// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://

...

{hostname}:{port}'
        return 

...

new StringBuffer()
  

...

     

...

Main.java class defines the method to call both the clients. The configuration of this class is as follows.

...

languagejava

...

         .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
 
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.start();
        while (!hierarchicalTopicsClient.isSubscriptionComplete()){
        

...

 

...

 

...

 

...

 

...

Thread.sleep(500);
        

...

}
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage();
    }

}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

...