Objectives
...
Prerequisites
Install and run the WSO2 Message Broker using the instructions in section Installation Guide.
Create subscriptions to available hierarchical topic patterns
...
...
This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.
About the sample
The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms
directory has the following classes:
SampleHierarchicalTopicsClient.java
class defines a client that subscribes to a hierarchical topic structure of which the main topic is Games
.
TopicPublisher.java
class defines a client that publishes messages in the hierarchical topic structure mentioned above.
Main.java
class defines the method to call both the clients.
Click the relevant tab to see the code.
Localtabgroup |
---|
Localtab |
---|
title | SampleHierarchicalTopicsClient.java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleHierarchicalTopicsClient extends Thread{
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName_1 = "Games";
String topicName_2 = "Games.Cricket";
String topicName_3 = "Games.Cricket.SL";
String topicName_4 = "Games.Cricket.India";
String topicName_5 = "Games.Cricket.India.Delhi";
String topicName_6 = "Games.Cricket.*";
String topicName_7 = "Games.Cricket.#";
private boolean isSubscriptionComplete = false;
@Override
public void run() {
try {
subscribe();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
public void subscribe() throws NamingException, JMSException {
InitialContext ctx = init();
// Lookup connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
//Create two topic sessions since a number of clients cannot be connected from the same session
TopicSession topicSession1 =
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
TopicSession topicSession2 =
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic1 = topicSession1.createTopic(topicName_1);
Topic topic2 = topicSession1.createTopic(topicName_2);
Topic topic3 = topicSession1.createTopic(topicName_3);
Topic topic4 = topicSession1.createTopic(topicName_4);
Topic topic5 = topicSession1.createTopic(topicName_5);
Topic topic6 = (Topic) ctx.lookup(topicName_6);
Topic topic7 = (Topic) ctx.lookup(topicName_7);
|
| SampleHierarchicalTopicsClienthierarchicalTopicsClientnew SampleHierarchicalTopicsClient(topicSession1.createSubscriber(topic6);
TopicSubscriber topicSubscriber2 = |
| hierarchicalTopicsClientsubscribecreateSubscriber(topic7);
|
| } isSubscriptionComplete = true;
|
| publicvoidsubscribe()throwsNamingException,JMSException{InitialContextctx=init()message1;
System.out.println(" Receiving messages for " + topicName_6 + |
| // Lookup connection factory TopicConnectionFactory connFactorywhile ((message1 = topicSubscriber1.receive( |
| TopicConnectionFactory ctx.lookup(CF_NAME);TopicConnectiontopicConnection=connFactory.createTopicConnection(); if (message1 instanceof TextMessage) {
|
| topicConnection.start();TopicSessiontopicSession= TextMessage textMessage = (TextMessage) message1;
|
| topicConnection.createTopicSession(false,QueueSession.AUTO_ACKNOWLEDGE); System.out.println("Got Message from subscriber1 => |
| Topictopic1 =topicSessioncreateTopictopicName_1
Topictopic2=topicSession.createTopic(topicName_2);Topictopic3=(Topic) ctx.lookup(topicName_3);Topictopic4= (Topic) ctx.lookup(topicName_4);
TopicSubscriber topicSubscriber1 = topicSession.createSubscriber(topic3message2;
System.out.println(" Receiving messages for " + topicName_7 + " :");
|
| TopicSubscribertopicSubscriber2topicSessioncreateSubscribertopic4; != null){
if (message2 instanceof |
| // Receive messages
Messagemessage1;while((message1topicSubscriber1.receive5000)!= null){if(message1instanceof TextMessage) {
System.out.println("Got Message from subscriber2 => " + textMessage.getText());
|
| TextMessagetextMessage=(TextMessage)message1;Systemout.println("Got Message from subscriber1 => " + textMessage.getText());close();
topicSubscriber2.close();
topicSession1.close();
|
| }}Messagemessage2topicConnection.close();
}
|
| while((message2=topicSubscriber2.receive(5000)) != null){
init() throws NamingException {
Properties |
| if(message2instanceofTextMessage{ properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
|
| TextMessage textMessage = (TextMessage) message2;
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
|
| System.out.println("Got Message from subscriber2 => " + textMessage.getText()properties.put("topic."+topicName_6,topicName_6);
|
| }
properties.put("topic."+topicName_7,topicName_7);
|
| } return new InitialContext(properties);
}
private String |
| topicSubscriber1.close();
getTCPConnectionURL(String username, String password) {
|
| topicSubscriber2.close(); // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
|
| topicSession.close(); topicConnection.stop;topicConnection.close();} private InitialContext init() throws NamingException {
.append("amqp://").append(username).append(":").append(password)
|
| Propertiesproperties=newProperties();propertiesput(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);append("@").append(CARBON_CLIENT_ID)
|
| properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));.append("/").append(CARBON_VIRTUAL_HOST_NAME)
|
| propertiesput("topic."+topicName_3,topicName_3);append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
|
| propertiesput("topic."+topicName_4,topicName_4returnnewInitialContextproperties);
}
private String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
Publish a message to each parent and child topic
Code Block |
---|
|
/*
* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName_1 = "WSO2";
String topicName_2 = "WSO2.MB";
public static void main(String[] args) throws NamingException, JMSException {){
return this.isSubscriptionComplete;
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName_1 = "Games";
String topicName_2 = "Games.Cricket";
String topicName_3 = "Games.Cricket.SL";
String topicName_4 = "Games.Cricket.India";
String topicName_5 = "Games.Cricket.India.Delhi";
public void publishMessage() throws NamingException, JMSException {
InitialContext ctx = init();
// Lookup connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
TopicSession topicSession =
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic1 = (Topic) ctx.lookup(topicName_1);
Topic topic2 = (Topic) ctx.lookup(topicName_2);
Topic topic3 = (Topic) ctx.lookup(topicName_3);
Topic topic4 = (Topic) ctx.lookup(topicName_4);
Topic topic5 = (Topic) ctx.lookup(topicName_5);
javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1);
javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
javax.jms.TopicPublisher topicPublisher3 = topicSession.createPublisher(topic3);
javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4);
javax.jms.TopicPublisher topicPublisher5 = topicSession.createPublisher(topic5);
// Create the messages to send
|
| TopicPublishertopicPublisher new TopicPublisher(); topicSession.createTextMessage("Message for Games");
TextMessage textMessage2 = |
| topicPublisherpublishMessage(createTextMessage("Message for Cricket");
|
| }publicvoidpublishMessage() throws NamingException, JMSException {
topicSession.createTextMessage("Message for SL");
|
| InitialContextctxinit(topicSession.createTextMessage("Message for India");
|
| //Lookupconnection factory
= topicSession.createTextMessage("Message for Delhi");
|
| TopicConnectionFactoryconnFactory=TopicConnectionFactory) ctx.lookup(CF_NAMEtextMessage1);
topicPublisher2.publish(textMessage2);
|
| TopicConnection topicConnection = connFactory.createTopicConnection(topicPublisher3.publish(textMessage3);
|
| topicConnectionstartTopicSession topicSession =topicPublisher5.publish(textMessage5);
|
| topicSession.close();
topicConnection. |
| createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGETopictopic1=Topic ctx.lookup(topicName_1);Topictopic2(Topic) ctx.lookup(topicName_2javaxjms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
|
| javax.jms.TopicPublisher topicPublisher2 = topicSession.createPublisher(topic2);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
|
| //Createthemessagesto send properties.put("topic."+topicName_1,topicName_1);
|
| TextMessage textMessage1 = topicSession.createTextMessage("Message for WSO2"properties.put("topic."+topicName_2,topicName_2);
|
| TextMessage textMessage2 = topicSession.createTextMessage("Message for WSO2.MB"properties.put("topic."+topicName_3,topicName_3);
|
| topicPublisher1publish(textMessage1put("topic."+topicName_4,topicName_4);
|
| topicPublisher2publish(textMessage2);put("topic."+topicName_5,topicName_5);
return new |
| topicSession.closeInitialContext(properties);
}
|
| topicConnection.close();
}
private String getTCPConnectionURL(String username, String password) {
|
| privateInitialContextinit()throwsNamingException {// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
|
| Properties properties =Properties;properties.put(Context.INITIAL_CONTEXT_FACTORY,QPID_ICF); .append("amqp://").append(username).append(":").append(password)
|
| properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));propertiesputtopic."+topicName_1,topicName_1);).append(CARBON_CLIENT_ID)
|
| propertiesputtopic."+topicName_2,topicName_2);).append(CARBON_VIRTUAL_HOST_NAME)
|
| returnnewInitialContext(properties);}.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
|
| privateStringgetTCPConnectionURL(Stringusername,Stringpassword){//amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer() Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.naming.NamingException;
public class Main {
public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
|
| .append("amqp://").append(username).append(":").append(password) SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient();
hierarchicalTopicsClient. |
| append"@").append(CARBON_CLIENT_ID));
while (!hierarchicalTopicsClient.isSubscriptionComplete()){
Thread. |
| append"/").append(CARBON_VIRTUAL_HOST_NAME)500);
}
TopicPublisher topicPublisher |
| .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
= new TopicPublisher();
topicPublisher.publishMessage();
|
| |
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Executing the sample
Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber
directory.
Analyzing the output
When you run the sample, you will see the following in the output log in the console.
Code Block |
---|
.toString();[java] Receiving }
} |
According to the above sample topicSubscriber1 is listening on pattern "WSO2.*" where it receives only the messages published to all the child elements under the parent topic '' WSO2 ". As the topicSubscriber2 listens on pattern "WSO2.#" it receives both the messages published for parent topic ' WSO2 ' as well as to its child topics like MB etc. Hence topicSubscriber1 only receives 1 message while the topicSubscriber2 receives 2 messages.
...
messages for Games.Cricket.* :
[java] Receiving messages for Games.Cricket.# : |