Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Objectives

...

Prerequisites

Install and run the WSO2 Message Broker using the instructions in section Installation Guide.

 

Create subscriptions to available hierarchical topic patterns

...

languagejava

...

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Table of Contents
maxLevel3
minLevel3

About the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes:

  • SampleHierarchicalTopicsClient.java class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games.

  • TopicPublisher.java class defines a client that publishes messages in the hierarchical topic structure mentioned above.

  • Main.java class defines the method to call both the clients.

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleSampleHierarchicalTopicsClient.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    String topicName_6 = "Games.Cricket.*";
    String topicName_7 = "Games.Cricket.#";

    private boolean isSubscriptionComplete = false;

    @Override
    public void run() {
        try {
            subscribe();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void subscribe() throws NamingException, JMSException {
        InitialContext ctx = init();
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();

        //Create two topic sessions since a number of clients cannot be connected from the same session
        TopicSession topicSession1 =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        TopicSession topicSession2 =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        Topic topic1 = topicSession1.createTopic(topicName_1);
        Topic topic2 = topicSession1.createTopic(topicName_2);
        Topic topic3 = topicSession1.createTopic(topicName_3);
        Topic topic4 = topicSession1.createTopic(topicName_4);
        Topic topic5 = topicSession1.createTopic(topicName_5);
        Topic topic6 = (Topic) ctx.lookup(topicName_6);
        Topic topic7 = (Topic) ctx.lookup(topicName_7);
        
SampleHierarchicalTopicsClient
TopicSubscriber 
hierarchicalTopicsClient
topicSubscriber1 = 
new SampleHierarchicalTopicsClient(
topicSession1.createSubscriber(topic6);
        TopicSubscriber topicSubscriber2 = 
hierarchicalTopicsClient
topicSession2.
subscribe
createSubscriber(topic7);

       
}
 isSubscriptionComplete = true;
  
public
 
void
 
subscribe()
 
throws
 
NamingException,
 
JMSException
 
{
// Receive messages
      
InitialContext
 
ctx
 
=
Message 
init()
message1;
	System.out.println(" Receiving messages for " + topicName_6 + 
// Lookup connection factory
" :");
        
TopicConnectionFactory connFactory
while ((message1 = topicSubscriber1.receive(
TopicConnectionFactory
5000))
ctx.lookup(CF_NAME);
 != null){
        
TopicConnection
 
topicConnection
 
=
 
connFactory.createTopicConnection();
 if (message1 instanceof TextMessage) {
   
topicConnection.start();
         
TopicSession
 
topicSession
 
=
  TextMessage textMessage = (TextMessage) message1;
          
topicConnection.createTopicSession(false,
 
QueueSession.AUTO_ACKNOWLEDGE);
     System.out.println("Got Message from subscriber1 => 
Topic
" 
topic1 =
+ 
topicSession
textMessage.
createTopic
getText(
topicName_1
));

     
Topic
 
topic2
 
=
 
topicSession.createTopic(topicName_2);
    }
    
Topic
 
topic3
 
=
 
(Topic) ctx.lookup(topicName_3);
 }

       
Topic
 
topic4
Message 
= (Topic) ctx.lookup(topicName_4); TopicSubscriber topicSubscriber1 = topicSession.createSubscriber(topic3
message2;
	System.out.println(" Receiving messages for " + topicName_7 + " :");
        
TopicSubscriber
while 
topicSubscriber2
((message2 = 
topicSession
topicSubscriber2.
createSubscriber
receive(
topic4
5000))
;
 != null){
            if (message2 instanceof 
// Receive messages
TextMessage) {
      
Message
 
message1;
         
while
TextMessage 
((message1
textMessage = 
topicSubscriber1.receive
(
5000)
TextMessage) 
!= null){
message2;
             
if
 
(message1
 
instanceof TextMessage) {
 System.out.println("Got Message from subscriber2 => " + textMessage.getText());
          
TextMessage
 
textMessage
 
=
}
(TextMessage)
 
message1;
       }

        
System
topicSubscriber1.
out.println("Got Message from subscriber1 => " + textMessage.getText());
close();
        topicSubscriber2.close();
        topicSession1.close();
       
}
 topicSession2.close();
       
}
 topicConnection.stop();
       
Message
 
message2
topicConnection.close();
    }

   
while
 
((message2
private 
=
InitialContext 
topicSubscriber2.receive(5000)) != null){
init() throws NamingException {
        Properties 
if
properties 
(message2
= 
instanceof
new 
TextMessage
Properties();
{
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
       
TextMessage textMessage = (TextMessage) message2;
 properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        
System.out.println("Got Message from subscriber2 => " + textMessage.getText()
properties.put("topic."+topicName_6,topicName_6);
        
}
properties.put("topic."+topicName_7,topicName_7);
      
}
  return new InitialContext(properties);
    }

    private String 
topicSubscriber1.close();
getTCPConnectionURL(String username, String password) {
    
topicSubscriber2.close();
    
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
  
topicSession.close();
      return new 
topicConnection.stop
StringBuffer()
;

        
topicConnection.close();
     
}
   
private InitialContext init() throws NamingException {
.append("amqp://").append(username).append(":").append(password)
   
Properties
 
properties
 
=
 
new
 
Properties();
         
properties
.
put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
append("@").append(CARBON_CLIENT_ID)
                
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
                
properties
.
put("topic."+topicName_3,topicName_3);
append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                
properties
.
put("topic."+topicName_4,topicName_4
toString();
    }
    
return
public 
new
boolean 
InitialContext
isSubscriptionComplete(
properties); } private String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }

Publish a message to each parent and child topic

Code Block
languagejava
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicPublisher { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName_1 = "WSO2"; String topicName_2 = "WSO2.MB"; public static void main(String[] args) throws NamingException, JMSException {
){
        return this.isSubscriptionComplete;
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    public void publishMessage() throws NamingException, JMSException {

        InitialContext ctx = init();
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic1 = (Topic) ctx.lookup(topicName_1);
        Topic topic2 = (Topic) ctx.lookup(topicName_2);
        Topic topic3 = (Topic) ctx.lookup(topicName_3);
        Topic topic4 = (Topic) ctx.lookup(topicName_4);
        Topic topic5 = (Topic) ctx.lookup(topicName_5);

        javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
        javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
        javax.jms.TopicPublisher topicPublisher3 = topicSession.createPublisher(topic3);
        javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
        javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);

        // Create the messages to send
        
TopicPublisher
TextMessage 
topicPublisher
textMessage1 =
new TopicPublisher();
 topicSession.createTextMessage("Message for Games");
        TextMessage textMessage2 = 
topicPublisher
topicSession.
publishMessage(
createTextMessage("Message for Cricket");
    
}
    TextMessage 
public
textMessage3 
void
= 
publishMessage() throws NamingException, JMSException {
topicSession.createTextMessage("Message for SL");
        
InitialContext
TextMessage 
ctx
textMessage4 = 
init(
topicSession.createTextMessage("Message for India");
        
//
TextMessage 
Lookup
textMessage5 
connection factory
= topicSession.createTextMessage("Message for Delhi");
     
TopicConnectionFactory
 
connFactory
 
=
 topicPublisher1.publish(
TopicConnectionFactory) ctx.lookup(CF_NAME
textMessage1);
        topicPublisher2.publish(textMessage2);
        
TopicConnection topicConnection = connFactory.createTopicConnection(
topicPublisher3.publish(textMessage3);
        
topicConnection
topicPublisher4.
start
publish(textMessage4);
        
TopicSession topicSession =
topicPublisher5.publish(textMessage5);
        
topicSession.close();
        topicConnection.
createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE
close();
    }

   
Topic
 
topic1
private 
=
InitialContext init(
Topic
)
ctx.lookup(topicName_1);
 throws NamingException {
        
Topic
Properties 
topic2
properties = 
(Topic) ctx.lookup(topicName_2
new Properties();
        
javax
properties.
jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1
put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        
javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
   
//
 
Create
 
the
 
messages
 
to send
 properties.put("topic."+topicName_1,topicName_1);
        
TextMessage textMessage1 = topicSession.createTextMessage("Message for WSO2"
properties.put("topic."+topicName_2,topicName_2);
        
TextMessage textMessage2 = topicSession.createTextMessage("Message for WSO2.MB"
properties.put("topic."+topicName_3,topicName_3);
        
topicPublisher1
properties.
publish(textMessage1
put("topic."+topicName_4,topicName_4);
        
topicPublisher2
properties.
publish(textMessage2);
put("topic."+topicName_5,topicName_5);
        return new 
topicSession.close
InitialContext(properties);
    }

    
topicConnection.close(); }
private String getTCPConnectionURL(String username, String password) {
    
private
 
InitialContext
 
init()
 
throws
 
NamingException {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        
Properties properties =
return new 
Properties
StringBuffer()
;

         
properties.put(Context.INITIAL_CONTEXT_FACTORY,
 
QPID_ICF);
      .append("amqp://").append(username).append(":").append(password)
  
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
              
properties
.
put
append("@"
topic."+topicName_1,topicName_1);
).append(CARBON_CLIENT_ID)
                
properties
.
put
append("/"
topic."+topicName_2,topicName_2);
).append(CARBON_VIRTUAL_HOST_NAME)
         
return
 
new
 
InitialContext(properties);
     
}
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
     
private
 
String
 
getTCPConnectionURL(String
 
username,
 
String
 
password)
 
{
     .toString();
   
//
 
amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer()
}
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
 
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
  
.append("amqp://").append(username).append(":").append(password)
      SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.
append
start(
"@").append(CARBON_CLIENT_ID)
);
        while (!hierarchicalTopicsClient.isSubscriptionComplete()){
            Thread.
append
sleep(
"/").append(CARBON_VIRTUAL_HOST_NAME)
500);
        }
        TopicPublisher topicPublisher 
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
= new TopicPublisher();
        topicPublisher.publishMessage();
    
}

}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

Executing the sample

Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber directory.

Analyzing the output

When you run the sample, you will see the following in the output log in the console.

Code Block
      .toString();[java]  Receiving   }
}

According to the above sample topicSubscriber1 is listening on pattern "WSO2.*" where it receives only the messages published to all the child elements under the parent topic '' WSO2 ". As the topicSubscriber2 listens on pattern "WSO2.#" it receives both the messages published for parent topic ' WSO2 ' as well as to its child topics like MB etc. Hence topicSubscriber1 only receives 1 message while the topicSubscriber2 receives 2 messages.

...

messages for Games.Cricket.* :
     [java]  Receiving messages for Games.Cricket.# :