Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel3
minLevel3

Introduction

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Building the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes.

SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games. The configuration of this class is as follows.

...

languagejava

...

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Table of Contents
maxLevel3
minLevel3

About the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes:

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games.

  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above.

  • Main.java class defines the method to call both the clients.

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleSampleHierarchicalTopicsClient.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    String topicName_6 = "Games.Cricket.*";
    String topicName_7 = "Games.Cricket.#";

    private boolean isSubscriptionComplete = false;

    @Override
    public void run() {
        try {
         

...

 

...

 

...

 

...

subscribe();
        

...

} 

...

catch 

...

(NamingException e) {
         

...

 

...

 

...

 

...

e.

...

printStackTrace(

...

);
        

...

} 

...

catch 

...

(JMSException e) {
        

...

 

...

 

...

 

...

 

...

e.

...

printStackTrace(

...

);
        }

...

 

...

 

...

 

...

 }

    public void subscribe() 

...

throws 

...

NamingException, 

...

JMSException {
        

...

InitialContext 

...

ctx = 

...

init(

...

);
        // 

...

Lookup 

...

connection factory
       

...

 

...

TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        

...

TopicConnection 

...

topicConnection = 

...

connFactory.

...

createTopicConnection(

...

)

...

;

...

 

...

       topicConnection.start();

    

...

 

...

 

...

 

...

 

...

//Create two topic sessions since a number of clients cannot be connected from the same 

...

session

...

 

...

 

...

 

...

     TopicSession topicSession1 =
         

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

topicConnection.

...

createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        TopicSession topicSession2 =
 

...

         

...

      topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

Topic 

...

topic1 

...

= topicSession1.createTopic(topicName_1);
        

...

Topic 

...

topic2 = 

...

topicSession1.

...

createTopic(

...

topicName_2)

...

;

...

 

...

       Topic topic3 = topicSession1.createTopic(topicName_3);
  

...

 

...

 

...

 

...

 

...

  Topic topic4 = topicSession1.createTopic(topicName_4);
        Topic topic5 

...

= topicSession1.createTopic(topicName_5);
        Topic topic6 = (Topic) ctx.lookup(topicName_6);
      

...

 

...

 

...

Topic 

...

topic7 =

...

 

...

(Topic) ctx.lookup(topicName_7);
        TopicSubscriber topicSubscriber1 = topicSession1.createSubscriber(topic6);

...

  

...

      TopicSubscriber topicSubscriber2 = 

...

topicSession2.

...

createSubscriber(topic7);

       

...

 isSubscriptionComplete = true;
        

...

// Receive messages
        Message 

...

message1;
	System.out.println(" Receiving messages for " + topicName_6 + " :");
    

...

    while 

...

((message1 

...

= 

...

topicSubscriber1.receive(5000)) 

...

!= null){
         

...

 

...

 

...

 

...

if 

...

(message1 instanceof TextMessage)

...

 

...

{
      

...

 

...

         

...

TextMessage textMessage = (TextMessage) message1;
       

...

         

...

System.out.

...

println("

...

Got Message from subscriber1 => " + textMessage.getText());
     

...

 

...

 

...

 

...

 

...

 

...

 

...

 }
       

...

 

...

}

        

...

Message 

...

message2;
	System.out.println(" Receiving messages for " + topicName_7 + " :");
        while 

...

(

...

(message2 = topicSubscriber2.receive(5000)) != null){
            

...

if 

...

(message2 instanceof TextMessage) {
      

...

          TextMessage textMessage = (TextMessage) message2;
  

...

              System.out.println("Got Message from subscriber2 => " + textMessage.

...

getText());
           

...

TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above. The configuration of this class is as follows.

...

languagejava

...

 }
        }

        topicSubscriber1.close();
        topicSubscriber2.close();
        topicSession1.close();
        topicSession2.close();
        topicConnection.stop();
        topicConnection.close();
    }

    private InitialContext init() throws NamingException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_6,topicName_6);
        properties.put("topic."+topicName_7,topicName_7);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
    public boolean isSubscriptionComplete(){
        return this.isSubscriptionComplete;
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    public void publishMessage() throws NamingException, JMSException {

        InitialContext ctx = init();
        

...

// Lookup connection factory
        

...

TopicConnectionFactory connFactory = 

...

(TopicConnectionFactory) ctx.lookup(CF_NAME);
        

...

TopicConnection 

...

topicConnection 

...

= connFactory.createTopicConnection();
        

...

topicConnection.start();
        

...

TopicSession 

...

topicSession =

...

             

...

 

...

 

...

 

...

topicConnection.

...

createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        

...

Topic 

...

topic1 = (Topic) 

...

ctx.

...

lookup(topicName_1);
        Topic topic2 = (Topic) 

...

ctx.

...

lookup(

...

topicName_2);
        Topic topic3 = (Topic) 

...

ctx.

...

lookup(

...

topicName_3);

...


     

...

   Topic topic4 = 

...

(Topic) ctx.lookup(topicName_4);
        

...

Topic topic5 = (Topic) ctx.lookup(topicName_5);

        

...

javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
    

...

    javax.jms.TopicPublisher 

...

topicPublisher2 

...

= 

...

topicSession.createPublisher(topic2);

...

 

...

 

...

      

...

javax.jms.TopicPublisher topicPublisher3 = 

...

topicSession.createPublisher(topic3);
        

...

javax.

...

jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
        

...

javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);

       

...

 // Create the messages to send
        TextMessage textMessage1 = 

...

topicSession.

...

createTextMessage("

...

Message for Games");
        

...

TextMessage textMessage2 = topicSession.createTextMessage("Message for Cricket");
        TextMessage 

...

textMessage3 = topicSession.createTextMessage("Message for SL");
        

...

TextMessage textMessage4 = topicSession.createTextMessage("Message for India");
    

...

    TextMessage 

...

textMessage5 

...

= 

...

topicSession.createTextMessage("Message for Delhi");
        

...

topicPublisher1.publish(textMessage1);
        topicPublisher2.publish(textMessage2);
      

...

 

...

 

...

topicPublisher3.publish(textMessage3);
        topicPublisher4.publish(textMessage4);
        topicPublisher5.

...

publish(textMessage5);
        topicSession.close();
        topicConnection.close();
    }

    private InitialContext init() throws NamingException {
    

...

    Properties properties = new Properties();
        properties.

...

put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        

...

properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        

...

properties.

...

put("topic."+topicName_1,topicName_1);
    

...

Main.java class defines the method to call both the clients. The configuration of this class is as follows.

Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;

import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException {
        SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.start();
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage();
    }
}

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

Code Block
[java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination). [java] log4j:WARN Please initialize the log4j system properly. [java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination). [java] log4j:WARN Please initialize the log4j system properly.
    properties.put("topic."+topicName_2,topicName_2);
        properties.put("topic."+topicName_3,topicName_3);
        properties.put("topic."+topicName_4,topicName_4);
        properties.put("topic."+topicName_5,topicName_5);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
 
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.start();
        while (!hierarchicalTopicsClient.isSubscriptionComplete()){
            Thread.sleep(500);
        }
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage();
    }

}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

Code Block
     [java]  Receiving messages for Games.Cricket.* :
     [java]  Receiving messages for Games.Cricket.# :