Introduction
Durable topics keep messages persistently until a suitable consumer is available to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactive. See Managing Durable Topic Subscriptions for more information.
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
...
About the sample
The <MB_HOME>/Samples/DurableTopicSubscriber/src/org/sample/jms
directory has the following classes.
DurableTopicSubscriber.java
class creates a durable topic subscription named mySub1
. The configuration of this file is as follows.
Code Block |
---|
|
/*
* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"SampleMessageListener.java
class creates a consumer for the durable topic subscription.
TopicPublisher.java
class creates a publisher to publish messages in the durable topic.
Main.java
defines the method for calling the three clients mentioned above.
Click the relevant tab to see the code.
Localtabgroup |
---|
Localtab |
---|
title | DurableTopicSubscriber.java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "andesConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
private String topicName = "newTopic";
private String subscriptionId = "mySub1";
private boolean useListener = true;
private int delayBetMessages = 200;
private |
|
|
...
...
10;
private SampleMessageListener |
|
|
...
...
...
...
...
private TopicConnection topicConnection;
|
|
|
...
...
TopicSession topicSession;
|
|
|
...
...
TopicSubscriber topicSubscriber;
|
|
|
...
...
...
...
...
...
...
...
...
...
...
...
System.out.println("Starting the subscriber");
|
|
|
...
...
...
...
...
...
...
...
...
...
...
...
...
properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
|
|
|
...
...
...
...
...
...
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
|
|
|
...
...
...
...
properties.put("topic." + topicName, topicName);
|
|
|
...
...
...
...
...
...
...
InitialContext(properties);
|
|
|
...
...
...
// Lookup connection factory
|
|
|
...
...
...
TopicConnectionFactory connFactory |
|
|
...
= (TopicConnectionFactory) ctx.lookup(CF_NAME);
|
|
|
...
...
...
connFactory.createTopicConnection();
|
|
|
...
...
...
...
...
createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
|
|
|
...
...
...
...
subscriber with subscription ID
|
|
|
...
...
...
...
...
(Topic) ctx.lookup(topicName);
|
|
|
...
...
...
topicSession.createDurableSubscriber(topic, subscriptionId);
|
|
|
...
...
...
...
for (int count = 0; count < |
|
|
...
...
Message message = topicSubscriber.receive();
|
|
|
...
...
...
...
...
...
...
System.out.println("count = " + count);
|
|
|
...
...
...
...
...
if (message instanceof TextMessage) |
|
|
...
...
...
...
...
...
...
TextMessage textMessage = (TextMessage) message;
|
|
|
...
...
...
...
...
...
...
...
System.out.println(count + ". textMessage.getText() = " + textMessage.getText());
|
|
|
...
...
...
...
...
...
...
...
...
if (delayBetMessages != 0) {
|
|
|
...
...
...
...
...
Thread.sleep(delayBetMessages);
|
|
|
...
...
...
...
...
...
}
topicConnection.close();
} else {
messageListener = |
|
|
...
new SampleMessageListener(delayBetMessages |
|
|
...
...
topicSubscriber.setMessageListener(messageListener);
}
} catch (Exception e) {
|
|
|
...
...
...
public String getTCPConnectionURL(String username, String password) {
|
|
|
...
...
...
...
...
...
...
append("amqp://").append(username).append(":").append(password)
|
|
|
...
.append("@").append(CARBON_CLIENT_ID)
|
|
|
...
...
...
...
...
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
|
|
|
...
...
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
|
|
|
...
...
...
...
...
...
...
...
...
...
...
public void stopSubscriber() throws JMSException {
|
|
|
...
...
close();
topicSession.close();
topicConnection. |
|
|
...
...
...
System.out.println("Closing Subscriber");
}
|
|
|
...
Localtab |
---|
title | SampleMessageListener.java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
private int delay = 0;
|
|
|
...
...
int currentMsgCount = 0;
public |
|
|
...
SampleMessageListener(int delay) |
|
|
...
...
...
...
...
public void onMessage(Message message) {
|
|
|
...
TextMessage receivedMessage = (TextMessage) message;
try {
System.out.println("Got the message ==> " + ( |
|
|
...
currentMsgCount+1) + " - "+ receivedMessage.getText());
|
|
|
...
SampleMessageListener.java
creates a consumer for the durable topic subscription. The configuration of this file is as follows.
...
...
currentMsgCount++;
if(delay != 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
//silently ignore
}
}
} catch (JMSException e) {
|
|
|
...
...
...
...
...
...
...
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX |
|
|
...
= "connectionfactory.";
private static final String |
|
|
...
CF_NAME = "andesConnectionfactory";
String userName = "admin";
String password = "admin";
|
|
|
...
...
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
|
|
|
...
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
|
|
|
...
...
...
...
...
...
"newTopic";
public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
|
|
|
...
...
Properties properties = new Properties();
|
|
|
...
properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
properties.put(CF_NAME_PREFIX + |
|
|
...
CF_NAME, getTCPConnectionURL(userName, password));
|
|
|
...
...
properties.put("topic."+topicName,topicName);
InitialContext ctx = new |
|
|
...
InitialContext(properties);
// Lookup connection factory
|
|
|
...
...
Main.java defines the method for calling the three clients mentioned above. The configuration of this file is as follows.
...
...
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
TopicSession topicSession =
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = (Topic)ctx.lookup(topicName);
// Create the messages to send
TextMessage textMessage = topicSession.createTextMessage("Test Message");
javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
for (int i = 0; i < numOfMsgs; i++)
{
topicPublisher.publish(textMessage);
Thread.sleep(1000);
}
topicPublisher.close();
topicSession.close();
topicConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;
public class Main {
public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
durableTopicSubscriber.subscribe();
TopicPublisher topicPublisher = new TopicPublisher();
topicPublisher.publishMessage(5);
Thread.sleep(5000);
durableTopicSubscriber.stopSubscriber();
TopicPublisher topicPublisher2 = new TopicPublisher();
topicPublisher2.publishMessage(5);
Thread.sleep(5000);
DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
durableTopicSubscriber2.subscribe();
TopicPublisher topicPublisher3 = new TopicPublisher();
topicPublisher3.publishMessage(5);
Thread.sleep(5000);
durableTopicSubscriber2.stopSubscriber();
}
} |
|
|
Prerequisites
See Prerequisites to Run the MB Samples for a list of prerequisites.
Executing the sample
Run the ant
command from the <MB_Home>/samples/DurableTopicSubscriber
directory.
...
durableTopicSubscriber
is run to create a durable topic subscriber.- 5 messages are sent to the
myTopic
topic. The messages will be received and printed by the subscriber named durableTopicSubscriber
. - The
durableTopicSubscriber
is stopped. - The publisher is run again and 5 more messages are sent.
- While running
durableTopicSubscriber
again, 5 different messages are sent to the same topic. You will see that all 10 messages (including the messages sent to the topic when the subscriber was absent) are consumed by the durableTopicSubscriber
.200 more messages are sent. durableTopicSubscriber
will close after 100 messages are received. After running durableTopicSubscriber
again you will receive nothing (as durableTopicSubscriber was unsubscribed after receiving 100 messages).