Introduction
This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
...
This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.
About the sample
The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms
directory has the following classes.:
SampleHierarchicalTopicsClient.java
class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games
. The code of this class is as follows.
Code Block |
---|
|
/*
* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleHierarchicalTopicsClient extends Thread{
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName_1 = "Games";
String topicName_2 = "Games.Cricket";
String topicName_3 = "Games.Cricket.SL";
String topicName_4 = "Games.Cricket.India";
String topicName_5 = "Games.Cricket.India.Delhi";
String topicName_6 = "Games.Cricket.*";
String topicName_7 = "Games.Cricket.#";
@Override
public void run() {
try {
subscribe();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void subscribe() throws NamingException, JMSException {
InitialContext ctx = init();
// Lookup connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
TopicSession topicSession =
.
TopicPublisher.java
class defines a client that publishes messages in the hierarchical topic structure mentioned above.
Main.java
class defines the method to call both the clients.
Click the relevant tab to see the code.
Localtabgroup |
---|
Localtab |
---|
title | SampleHierarchicalTopicsClient.java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleHierarchicalTopicsClient extends Thread{
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName_1 = "Games";
String topicName_2 = "Games.Cricket";
String topicName_3 = "Games.Cricket.SL";
String topicName_4 = "Games.Cricket.India";
String topicName_5 = "Games.Cricket.India.Delhi";
String topicName_6 = "Games.Cricket.*";
String topicName_7 = "Games.Cricket.#";
private boolean isSubscriptionComplete = false;
@Override
public void run() {
try {
|
|
|
...
...
...
catch (NamingException e) {
|
|
|
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
throws NamingException, JMSException {
|
|
|
...
...
...
...
...
...
...
...
...
...
...
(TopicConnectionFactory) ctx.lookup(CF_NAME);
|
|
|
...
...
...
= connFactory.createTopicConnection();
|
|
|
...
...
topicConnection.start();
//Create two topic sessions since a number of clients cannot be connected from the same session
TopicSession topicSession1 =
|
|
|
...
...
...
...
...
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
|
|
|
...
...
...
...
...
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic1 = topicSession1.createTopic(topicName_1);
Topic topic2 = topicSession1.createTopic(topicName_2);
|
|
|
...
Topic topic3 = topicSession1.createTopic(topicName_3);
|
|
|
...
...
= topicSession1.createTopic(topicName_4);
Topic topic5 = topicSession1.createTopic(topicName_5);
|
|
|
...
...
...
...
...
ctx.lookup(topicName_6);
Topic topic7 = |
|
|
...
(Topic) ctx.lookup(topicName_7);
TopicSubscriber topicSubscriber1 = topicSession1.createSubscriber(topic6);
|
|
|
...
...
topicSubscriber2 = topicSession2.createSubscriber( |
|
|
...
...
isSubscriptionComplete = true;
|
|
|
...
...
...
...
...
...
...
...
Message message1;
System.out.println(" Receiving messages for |
|
|
...
...
...
while ((message1 = topicSubscriber1. |
|
|
...
...
...
...
...
...
...
...
instanceof TextMessage) {
|
|
|
...
...
...
...
...
...
...
...
...
...
...
...
...
System.out.println("Got Message from subscriber1 => " + |
|
|
...
...
...
...
...
message2;
System.out.println(" Receiving messages for " + topicName_7 + " :");
|
|
|
...
...
...
...
topicSubscriber2.receive(5000)) != null){
if (message2 |
|
|
...
instanceof TextMessage) {
TextMessage |
|
|
...
...
...
(TextMessage) message2;
System.out. |
|
|
...
...
Got Message from subscriber2 => " + textMessage.getText());
}
}
topicSubscriber1. |
|
|
...
...
);
topicSubscriber2.close();
topicSession1.close();
topicSession2. |
|
|
...
...
);
topicConnection.stop();
topicConnection. |
|
|
...
close();
}
private InitialContext init() throws NamingException {
Properties properties = new |
|
|
...
...
...
TopicPublisher.java
class defines a client that publishes messages in the hierarchical topic structure mentioned above. The code of this class is as follows.
...
...
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put("topic."+topicName_6,topicName_6);
properties.put("topic."+topicName_7,topicName_7);
return new InitialContext(properties);
}
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
public boolean isSubscriptionComplete(){
return this.isSubscriptionComplete;
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String |
|
|
...
...
...
...
connectionfactory.";
private static final String |
|
|
...
...
...
qpidConnectionfactory";
String |
|
|
...
...
...
...
admin";
private static String |
|
|
...
...
...
carbon";
private static String |
|
|
...
CARBON_VIRTUAL_HOST_NAME = " |
|
|
...
...
...
...
String CARBON_DEFAULT_HOSTNAME = "localhost";
private static |
|
|
...
String CARBON_DEFAULT_PORT = |
|
|
...
"5672";
String topicName_1 = "Games";
|
|
|
...
...
...
...
String topicName_2 = "Games.Cricket";
|
|
|
...
...
...
"Games.Cricket.SL";
String |
|
|
...
topicName_4 = "Games.Cricket.India";
|
|
|
...
String topicName_5 = "Games.Cricket.India.Delhi";
|
|
|
...
...
...
publishMessage() throws NamingException, JMSException {
InitialContext ctx = |
|
|
...
...
...
...
...
...
...
...
TopicConnectionFactory) ctx.lookup( |
|
|
...
...
...
...
...
connFactory.createTopicConnection();
|
|
|
...
...
...
...
...
...
...
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
|
|
|
...
...
(Topic) ctx.lookup(topicName_1);
|
|
|
...
...
(Topic) ctx.lookup(topicName_2);
|
|
|
...
...
(Topic) ctx.lookup(topicName_3);
|
|
|
...
...
...
...
...
...
...
...
(Topic) ctx.lookup(topicName_5);
|
|
|
...
javax.jms.TopicPublisher topicPublisher1 = topicSession. |
|
|
...
createPublisher(topic1);
javax.jms.TopicPublisher |
|
|
...
...
...
...
javax.jms.TopicPublisher topicPublisher3 = topicSession. |
|
|
...
...
javax.jms.TopicPublisher topicPublisher4 = topicSession. |
|
|
...
createPublisher(topic4);
javax.jms.TopicPublisher topicPublisher5 = |
|
|
...
...
...
...
// Create the messages to send
TextMessage textMessage1 = |
|
|
...
...
createTextMessage("Message for Games");
TextMessage textMessage2 = topicSession. |
|
|
...
createTextMessage("Message for Cricket");
TextMessage |
|
|
...
textMessage3 = topicSession.createTextMessage("Message for SL");
|
|
|
...
...
...
...
topicSession.createTextMessage("Message for India");
TextMessage |
|
|
...
...
= topicSession.createTextMessage("Message for Delhi");
|
|
|
...
...
...
topicPublisher2.publish(textMessage2);
|
|
|
...
...
...
...
...
...
...
...
...
...
...
...
private InitialContext init( |
|
|
...
) throws NamingException {
Properties properties = new Properties();
|
|
|
...
...
...
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
|
|
|
...
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
|
|
|
...
...
...
properties.put("topic."+topicName_1,topicName_1);
properties.put("topic."+topicName_2,topicName_2);
properties. |
|
|
...
...
topic."+topicName_3,topicName_3);
properties.put("topic."+topicName_4,topicName_4);
properties. |
|
|
...
...
topic."+topicName_5,topicName_5);
return new InitialContext(properties);
}
private String |
|
|
...
getTCPConnectionURL(String username, String password) {
|
|
|
...
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp:// |
|
|
...
{hostname}:{port}'
return |
|
|
...
...
...
Main.java
class defines the method to call both the clients. The code of this class is as follows.
...
...
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
hierarchicalTopicsClient.start();
while (!hierarchicalTopicsClient.isSubscriptionComplete()){
|
|
|
...
...
...
...
...
...
}
TopicPublisher topicPublisher = new TopicPublisher();
topicPublisher.publishMessage();
}
} |
|
|
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Executing the sample
Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber
directory.
...