Topic subscriber and topic publisher
Objectives
This sample demonstrates how to create a topic, subscribe to it and publish messages.
Prerequisites
Ensure that you have the following:Â
- Dependencies located in
<PRODUCT_HOME>/client-lib
in class path If you are running the sample as a maven project, use the follwing Maven dependencies to run the JMS client :
<dependency> <groupId>org.wso2.andes.wso2</groupId> <artifactId>andes-client</artifactId> <version>0.13.wso2v8</version> </dependency> <dependency> <groupId>org.apache.geronimo.specs.wso2</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1.0.wso2v1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>slf4j.wso2</groupId> <artifactId>slf4j</artifactId> <version>1.5.10.wso2v1</version> </dependency>
Running the Sample
You need to run the "Topic Subscriber" class prior to "Topic Publisher" class when testing this sample.Â
Topic Publisher: The following code is used to publish messages to a given topic
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicPublisher { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName = "MYTopic"; public static void main(String[] args) throws NamingException, JMSException { TopicPublisher topicPublisher = new TopicPublisher(); topicPublisher.publishMessage(); } public void publishMessage() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME); TopicConnection topicConnection = connFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Topic topic = topicSession.createTopic(topicName); // create the message to send TextMessage textMessage = topicSession.createTextMessage("Test Message"); javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic); topicPublisher.publish(textMessage); topicSession.close(); topicConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Topic Subscriber: The following code is used to subscribe to topics.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class TopicSubscriber { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "qpidConnectionfactory"; String userName = "admin"; String password = "admin"; private static String CARBON_CLIENT_ID = "carbon"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5672"; String topicName = "MYTopic"; public static void main(String[] args) throws NamingException, JMSException { TopicSubscriber topicSubscriber = new TopicSubscriber(); topicSubscriber.subscribe(); } public void subscribe() throws NamingException, JMSException { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password)); InitialContext ctx = new InitialContext(properties); // Lookup connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME); TopicConnection topicConnection = connFactory.createTopicConnection(); topicConnection.start(); TopicSession topicSession = topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE); // Send message Topic topic = topicSession.createTopic(topicName); javax.jms.TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); Message message = topicSubscriber.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("textMessage.getText() = " + textMessage.getText()); } topicSubscriber.close(); Â topicSession.close(); topicConnection.stop(); Â topicConnection.close(); } public String getTCPConnectionURL(String username, String password) { // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}' return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } }
Note
The topicSubscriber.receive() method waits till a message is received and exits the main thread. Alternatively, a MessageListener can be set to TopicSubscriber to receive messages asynchronously.
Once the classes are run, you should see published messages in Topic subscriber console and created topics in management console as well.
Note
The same client codes can be used to subscribe or publish into subtopics as well. By specifying the full name of the subtopic from the root level, a client can directly subscribe to a subtopic or publish messages into a subtopic as follows.
E.g : To Subscribe to a child topic named MYSubTopic under MYTopic,
String topicName = "MYTopic.MYSubTopic";
Topic topic = topicSession.createTopic(topicName);
javax.jms.TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
Message message = topicSubscriber.receive();
E.g : To publish to a child topic named MYSubTopic under MYTopic,
Topic topic = topicSession.createTopic(topicName);
TextMessage textMessage = topicSession.createTextMessage("Test Message");
javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.publish(textMessage);
Â
Important
The procedure is same when subscribing and publishing to a topic in a tenant mode too. There is no need to append Tenant_Domain name into the topic name.
Â
Â