Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.

Table of Contents
maxLevel3
minLevel3

About the sample

The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms directory has the following classes:

...

Localtabgroup
Localtab
titleSampleHierarchicalTopicsClient.java
Code Block
languagejava
/*
*package org.sample.jms;

Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactoryimport javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class SampleHierarchicalTopicsClient extends Thread{
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5 = "Games.Cricket.India.Delhi";
    String topicName_6 = "Games.Cricket.*";
    String userNametopicName_7 = "adminGames.Cricket.#";

    private Stringboolean passwordisSubscriptionComplete = "admin"false;

    private@Override
static String CARBON_CLIENT_ID = "carbon";public void run() {
 private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
     try {
   private static String CARBON_DEFAULT_HOSTNAME = "localhost";    subscribe();
private static String CARBON_DEFAULT_PORT = "5672";   } catch String topicName_1 = "Games";(NamingException e) {
     String topicName_2 = "Games.Cricket";     e.printStackTrace();
      String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";} catch (JMSException e) {
         String topicName_5 = "Games.Cricket.India.Delhi"e.printStackTrace();
    String topicName_6 = "Games.Cricket.*";
 }
   String topicName_7 = "Games.Cricket.#"; }

   @Override     public void runsubscribe() {throws NamingException,        try JMSException {
        InitialContext ctx =  subscribeinit();
        }// catchLookup (NamingExceptionconnection e)factory
{        TopicConnectionFactory connFactory =  (TopicConnectionFactory) ectx.printStackTracelookup(CF_NAME);
        }TopicConnection topicConnection catch= connFactory.createTopicConnection(JMSException e);
{             e.printStackTracetopicConnection.start();

       } //Create two topic sessions }since a number of clients publiccannot voidbe subscribe()connected throwsfrom NamingException,the JMSExceptionsame {session
        InitialContextTopicSession ctxtopicSession1 =
 init();         // Lookup connection factory   topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
    TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
  TopicSession topicSession2 =
      TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start(createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);

       TopicSession Topic topicSessiontopic1 =                 topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic1 = topicSessiontopicSession1.createTopic(topicName_1);
        Topic topic2 = topicSessiontopicSession1.createTopic(topicName_2);
        Topic topic3 = topicSessiontopicSession1.createTopic(topicName_3);
        Topic topic4 = topicSessiontopicSession1.createTopic(topicName_4);
        Topic topic5 = topicSessiontopicSession1.createTopic(topicName_5);
        Topic topic6 = (Topic) ctx.lookup(topicName_6);
        Topic topic7 = (Topic) ctx.lookup(topicName_7);
        TopicSubscriber topicSubscriber1 = topicSessiontopicSession1.createSubscriber(topic6);
        TopicSubscriber topicSubscriber2 = topicSessiontopicSession2.createSubscriber(topic7);

       // ReceiveisSubscriptionComplete messages= true;
       Message message1 // Receive messages
        Message message1;
	System.out.println(" Receiving messages for " + topicName_6 + " :");
        while ((message1 = topicSubscriber1.receive(5000)) != null){
            if (message1 instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message1;
                System.out.println("Got Message from subscriber1 => " + textMessage.getText());
            }
        }

        Message message2;
	System.out.println(" Receiving messages for " + topicName_7 + " :");
        while ((message2 = topicSubscriber2.receive(5000)) != null){
            if (message2 instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message2;
                System.out.println("Got Message from subscriber2 => " + textMessage.getText());
            }
        }

        topicSubscriber1.close();
        topicSubscriber2.close();
        topicSessiontopicSession1.close();
        topicSession2.close();
        topicConnection.stop();
        topicConnection.close();
    }

    private InitialContext init() throws NamingException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_6,topicName_6);
        properties.put("topic."+topicName_7,topicName_7);
        return new InitialContext(properties);
    }

    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
    String topicName_2 = "Games.Cricket";
    String topicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India    public boolean isSubscriptionComplete(){
        return this.isSubscriptionComplete;
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;

import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class TopicPublisher {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String topicName_5userName = "Games.Cricket.India.Delhiadmin";
    publicString voidpassword publishMessage() throws NamingException, JMSException {= "admin";
    private static String  InitialContext ctx = init()CARBON_CLIENT_ID = "carbon";
    private static String  // Lookup connection factoryCARBON_VIRTUAL_HOST_NAME = "carbon";
    private static     TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME)String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String  TopicConnection topicConnection = connFactory.createTopicConnection();
    CARBON_DEFAULT_PORT = "5672";
    String topicName_1 = "Games";
   topicConnection.start(); String topicName_2 = "Games.Cricket";
    TopicSessionString topicSessiontopicName_3 = "Games.Cricket.SL";
    String topicName_4 = "Games.Cricket.India";
    String topicName_5   topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE)= "Games.Cricket.India.Delhi";
    public void publishMessage() throws TopicNamingException, topic1 = (Topic) ctx.lookup(topicName_1);JMSException {

        TopicInitialContext topic2ctx = (Topic) ctx.lookup(topicName_2 init();
        Topic// topic3Lookup =connection (Topic) ctx.lookup(topicName_3);factory
        TopicConnectionFactory Topic topic4connFactory = (TopicTopicConnectionFactory) ctx.lookup(topicNameCF_4NAME);
        TopicTopicConnection topic5topicConnection = connFactory.createTopicConnection(Topic) ctx.lookup(topicName_5;
        topicConnection.start();
         javax.jms.TopicPublisher topicPublisher1TopicSession topicSession =
topicSession.createPublisher(topic1);             javax.jms.TopicPublisher topicPublisher2 = topicSessiontopicConnection.createPublisher(topic2createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        javax.jms.TopicPublisher topicPublisher3Topic topic1 = topicSession.createPublisher(topic3(Topic) ctx.lookup(topicName_1);
        javax.jms.TopicPublisher topicPublisher4Topic topic2 = (Topic) topicSessionctx.createPublisherlookup(topic4topicName_2);
        javax.jms.TopicPublisher topicPublisher5Topic topic3 = (Topic) topicSessionctx.createPublisherlookup(topic5topicName_3);
        //Topic Createtopic4 the= messages to send(Topic) ctx.lookup(topicName_4);
        Topic TextMessagetopic5 textMessage1= =(Topic) topicSessionctx.createTextMessage("Message for Cricket"lookup(topicName_5);

       TextMessage textMessage2 javax.jms.TopicPublisher topicPublisher1 = topicSession.createTextMessage("Message for SL"createPublisher(topic1);
        TextMessage textMessage3javax.jms.TopicPublisher topicPublisher2 = topicSession.createTextMessage("Message for India"createPublisher(topic2);
        TextMessage textMessage4javax.jms.TopicPublisher topicPublisher3 = topicSession.createTextMessage("Message for Delhi");createPublisher(topic3);
        javax.jms.TopicPublisher topicPublisher4 = topicPublisher2topicSession.publishcreatePublisher(textMessage1topic4);
        topicPublisher3.publish(textMessage2javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);

       topicPublisher4.publish(textMessage3);
        topicPublisher5.publish(textMessage4); // Create the messages to send
        TextMessage textMessage1 = topicSession.close(createTextMessage("Message for Games");
        TextMessage textMessage2 = topicSession.createTextMessage("Message  topicConnection.close(for Cricket");
    }    TextMessage privatetextMessage3 InitialContext= init() throws NamingException {topicSession.createTextMessage("Message for SL");
        TextMessage PropertiestextMessage4 properties = new Properties();= topicSession.createTextMessage("Message for India");
        TextMessage textMessage5 = propertiestopicSession.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICFcreateTextMessage("Message for Delhi");
        propertiestopicPublisher1.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password))publish(textMessage1);
        topicPublisher2.publish(textMessage2);
        propertiestopicPublisher3.put("topic."+topicName_1,topicName_1publish(textMessage3);
        properties.put("topic."+topicName_2,topicName_2topicPublisher4.publish(textMessage4);
        propertiestopicPublisher5.put("topic."+topicName_3,topicName_3publish(textMessage5);
        propertiestopicSession.put("topic."+topicName_4,topicName_4close();
        propertiestopicConnection.put("topic."+topicName_5,topicName_5close();
    }

   return newprivate InitialContext init(properties); throws NamingException {
  }     private StringProperties getTCPConnectionURL(String username, String password) {properties = new Properties();
         // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
         return new StringBuffer()properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName_1,topicName_1);
        properties.appendput("amqp://").append(username).append(":").append(password)topic."+topicName_2,topicName_2);
        properties.put("topic."+topicName_3,topicName_3);
        properties.appendput("@").append(CARBON_CLIENT_ID)topic."+topicName_4,topicName_4);
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)properties.put("topic."+topicName_5,topicName_5);
        return new InitialContext(properties);
     .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")}

    private String getTCPConnectionURL(String username, String password) {
     .toString();     }
}
Localtab
titleMain.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
 
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
        hierarchicalTopicsClient.start();
        while (!hierarchicalTopicsClient.isSubscriptionComplete()){
            Thread.sleep(500);
        }
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage();
    }

}

Prerequisites

See Prerequisites to Run the MB Samples for a list of prerequisites.

...