Objectives
...
Prerequisites
Install and run the WSO2 Message Broker using the instructions in section Installation Guide.
Create subscriptions to available hierarchical topic patterns
...
language | java |
---|
...
Table of Contents | ||||
---|---|---|---|---|
|
Introduction
This sample demonstrates how to publish messages to topics and sub topics in a topic hierarchy and to create hierarchical topic subscriptions.
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Building the sample
The <MB_HOME>/Samples/HierarchicalTopicsSubscriber/src/org/sample/jms
directory has the following classes.
SampleHierarchicalTopicsClient.java
class defines a client that subscribes to a hierarchical topic structure of which the main topic isGames
. The configuration of this class is as follows.Code Block language java /* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.sample.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties;
...
public class SampleHierarchicalTopicsClient extends Thread{ public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName_1 = "
...
Games"; String topicName_2 = "
...
Games.
...
Cricket"; String topicName_3 = "
...
Games.Cricket.
...
SL"; String topicName_4 = "
...
Games.Cricket.
...
India"; String
...
topicName_5
...
= "Games.Cricket.India.Delhi"; String topicName_6 = "Games.Cricket.*"; String topicName_7
...
= "Games.Cricket.#"; @Override public void
...
run()
...
{
...
try
...
{
...
...
...
...
...
...
subscribe(); }
...
catch
...
(NamingException
...
e)
...
{
...
...
...
...
...
e.
...
printStackTrace(
...
);
...
}
...
catch
...
(JMSException e) {
...
e.printStackTrace();
...
}
...
...
} public void subscribe()
...
throws NamingException, JMSException {
...
InitialContext
...
ctx =
...
init();
...
// Lookup connection factory TopicConnectionFactory
...
connFactory
...
= (
...
TopicConnectionFactory) ctx.lookup(
...
CF_
...
NAME);
...
TopicConnection
...
topicConnection =
...
connFactory.createTopicConnection();
...
topicConnection.start();
...
TopicSession
...
topicSession =
...
...
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE); Topic topic1
...
= topicSession.createTopic(topicName_1);
...
Topic
...
topic2 =
...
topicSession.
...
createTopic(
...
topicName_2)
...
;
...
...
Topic topic3
...
= topicSession.createTopic(topicName_3); Topic topic4 = topicSession.createTopic(topicName_4);
...
Topic topic5 =
...
topicSession.createTopic(topicName_5); Topic topic6
...
= (Topic) ctx.lookup(topicName_6); Topic topic7 =
...
(Topic) ctx.lookup(topicName_7);
...
TopicSubscriber topicSubscriber1
...
= topicSession.createSubscriber(topic6);
...
TopicSubscriber
...
topicSubscriber2 =
...
topicSession.
...
createSubscriber(
...
topic7)
...
;
...
...
// Receive messages
...
...
...
...
...
Message message1; System.out.println(" Receiving messages for " + topicName_6 + " :");
...
while
...
((message1 = topicSubscriber1.receive(
...
5000))
...
!= null){ if (message1
...
instanceof TextMessage) { TextMessage textMessage
...
= (TextMessage) message1;
...
System.out.println("Got Message from subscriber1 => " +
...
textMessage.
...
getText());
...
}
...
...
}
...
Message
...
message2; System.out.println(" Receiving messages for " + topicName_7 + " :");
...
while ((message2
...
= topicSubscriber2.receive(5000)) != null){
...
...
...
...
if
...
(
...
message2 instanceof TextMessage) {
...
...
TextMessage textMessage =
...
(TextMessage) message2;
...
...
System.out.println("Got Message from subscriber2 => " + textMessage.getText());
...
...
...
}
...
}
...
...
...
...
...
...
...
topicSubscriber1.close();
...
topicSubscriber2.close();
...
topicSession.close(); topicConnection.stop(); topicConnection.
...
close(); } private InitialContext init() throws NamingException
...
{
...
Properties properties = new Properties(); properties.
...
put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.
...
put("
...
topic."+topicName_6,topicName_6); properties.put("topic."+topicName_7,topicName_7); return new InitialContext(properties); } private String getTCPConnectionURL(String username, String password) {
...
...
Publish a message to each parent and child topic
...
language | java |
---|
...
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
TopicPublisher.java
class defines a client that publishes messages in the hierarchical topic structure mentioned above. The configuration of this class is as follows.Code Block language java /* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.sample.jms; import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicPublisher { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName_1 = "Games"; String topicName_2 = "Games.Cricket"; String topicName_3 = "Games.Cricket.SL"; String topicName_4 = "Games.Cricket.India"; String topicName_5 = "Games.Cricket.India.Delhi"; public void publishMessage() throws NamingException, JMSException { InitialContext ctx = init();
...
// Lookup connection factory TopicConnectionFactory
...
connFactory
...
=
...
(TopicConnectionFactory) ctx.lookup(CF_NAME); TopicConnection
...
topicConnection =
...
connFactory.
...
createTopicConnection(
...
);
...
...
...
topicConnection.start();
...
TopicSession topicSession =
...
...
topicConnection.
...
createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
...
Topic topic1 = (Topic) ctx.lookup(topicName_1); Topic
...
topic2
...
=
...
(Topic)
...
ctx.lookup(topicName_2); Topic topic3
...
=
...
(Topic) ctx.lookup(topicName_3); Topic topic4 = (Topic)
...
ctx.
...
lookup(
...
topicName_4);
...
Topic topic5 = (Topic) ctx.lookup(topicName_5);
...
javax.jms.TopicPublisher topicPublisher1 = topicSession.createPublisher(topic1); javax.jms.TopicPublisher
...
topicPublisher2
...
=
...
topicSession.createPublisher(
...
topic2);
...
javax.jms.TopicPublisher topicPublisher3
...
= topicSession.createPublisher(topic3);
...
javax.jms.TopicPublisher topicPublisher4 = topicSession.createPublisher(topic4); javax.jms.TopicPublisher
...
topicPublisher5
...
=
...
topicSession.createPublisher(topic5); // Create the messages to send
...
TextMessage textMessage1
...
= topicSession.createTextMessage("Message for Cricket"); TextMessage textMessage2
...
= topicSession.createTextMessage("Message for SL"); TextMessage textMessage3 = topicSession.createTextMessage("Message for India"); TextMessage textMessage4 = topicSession.createTextMessage("Message for Delhi"); topicPublisher2.
...
publish(textMessage1); topicPublisher3.publish(textMessage2); topicPublisher4.publish(textMessage3); topicPublisher5.
...
publish(textMessage4);
...
According to the above sample topicSubscriber1 is listening on pattern "WSO2.*" where it receives only the messages published to all the child elements under the parent topic '' WSO2 ". As the topicSubscriber2 listens on pattern "WSO2.#" it receives both the messages published for parent topic ' WSO2 ' as well as to its child topics like MB etc. Hence topicSubscriber1 only receives 1 message while the topicSubscriber2 receives 2 messages.
...
topicSession.close(); topicConnection.close(); } private InitialContext init() throws NamingException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); properties.put("topic."+topicName_1,topicName_1); properties.put("topic."+topicName_2,topicName_2); properties.put("topic."+topicName_3,topicName_3); properties.put("topic."+topicName_4,topicName_4); properties.put("topic."+topicName_5,topicName_5); return new InitialContext(properties); } private String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Main.java
class defines the method to call both the clients. The configuration of this class is as follows.Code Block language java /* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.sample.jms; import javax.jms.JMSException; import javax.naming.NamingException; public class Main { public static void main(String[] args) throws NamingException, JMSException { SampleHierarchicalTopicsClient hierarchicalTopicsClient = new SampleHierarchicalTopicsClient(); hierarchicalTopicsClient.start(); TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.publishMessage(); } }
Executing the sample
Run the ant command from <MB_HOME>/samples/HierarchicalTopicsSubscriber
directory.
Analyzing the output
When you run the sample, you will see the following in the output log in the console.
Code Block |
---|
[java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination).
[java] log4j:WARN Please initialize the log4j system properly.
[java] log4j:WARN No appenders could be found for logger (org.wso2.andes.client.AMQDestination).
[java] log4j:WARN Please initialize the log4j system properly.
[java] Receiving messages for Games.Cricket.* :
[java] Receiving messages for Games.Cricket.# : |