...
DurableTopicSubscriber.java
class creates a durable topic subscription named mySub1
.
SampleMessageListener.java
class creates a consumer for the durable topic subscription.
TopicPublisher.java
class creates a publisher to publish messages in the durable topic.
Main.java
defines the method for calling the three clients mentioned above.
Click the relevant tab to see the code.
Localtabgroup |
---|
Localtab |
---|
title | DurableTopicSubscriber.java |
---|
| Code Block |
---|
| /*
* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory."package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "andesConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
private String topicName = "newTopic";
private String subscriptionId = "mySub1";
private boolean useListener = true;
private int delayBetMessages = 200;
private int messageCount = 10;
private SampleMessageListener messageListener;
private TopicConnection topicConnection;
private TopicSession topicSession;
private staticTopicSubscriber finaltopicSubscriber;
String CF_NAME = "andesConnectionfactory"; public void subscribe() {
String userName = "admin"; Stringtry password{
= "admin"; private static String CARBON_CLIENT_ID = "carbon"System.out.println("Starting the subscriber");
private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; Properties properties private= static String CARBON_DEFAULT_HOSTNAME = "localhost";new Properties();
private static String CARBON_DEFAULT_PORT = "5672" properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
private String topicName = "newTopic"; private String subscriptionId = "mySub1"properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
private boolean useListener = true; private int delayBetMessages = 200;properties.put("topic." + topicName, topicName);
private int messageCount = 10; InitialContext ctx = private SampleMessageListener messageListenernew InitialContext(properties);
private TopicConnection topicConnection; private TopicSession// topicSession;Lookup connection factory
private TopicSubscriber topicSubscriber; public void subscribe() {TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
try { topicConnection = System.out.println("Starting the subscriber"connFactory.createTopicConnection();
Properties properties = new Properties();topicConnection.start();
topicSession =
properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF); properties topicConnection.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
// create durable properties.put("topic." + topicName, topicName);subscriber with subscription ID
Topic InitialContext ctxtopic = new InitialContext(properties(Topic) ctx.lookup(topicName);
//topicSubscriber Lookup connection factory= topicSession.createDurableSubscriber(topic, subscriptionId);
TopicConnectionFactoryif connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);(!useListener) {
topicConnection = connFactory.createTopicConnection(); for (int count = 0; count < messageCount; count++) {
topicConnection.start(); topicSession = Message message = topicSubscriber.receive();
topicConnectionSystem.out.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGEprintln("count = " + count);
// create durable subscriber with subscription ID if (message instanceof TextMessage) {
Topic topic = (Topic) ctx.lookup(topicName); topicSubscriber TextMessage textMessage = topicSession.createDurableSubscriber(topic, subscriptionId)(TextMessage) message;
if (!useListener) { System.out.println(count + ". textMessage.getText() = " for (int count = 0; count < messageCount; count++) {+ textMessage.getText());
}
Message message = topicSubscriber.receive(); if (delayBetMessages != 0) {
System.out.println("count = " + countThread.sleep(delayBetMessages);
if}
(message instanceof TextMessage) { }
TextMessage textMessage = topicConnection.close(TextMessage);
message; } else {
System.out.println(count + ". textMessage.getText() messageListener = " + textMessage.getText()new SampleMessageListener(delayBetMessages);
topicSubscriber.setMessageListener(messageListener);
} }
} catch if (delayBetMessages != 0Exception e) {
Thread.sleep(delayBetMessagese.printStackTrace();
}
}
public String }getTCPConnectionURL(String username, String password) {
return new StringBuffer()
} topicConnection.close();.append("amqp://").append(username).append(":").append(password)
} else { .append("@").append(CARBON_CLIENT_ID)
messageListener = new SampleMessageListener(delayBetMessages); .append("/").append(CARBON_VIRTUAL_HOST_NAME)
topicSubscriber.setMessageListener(messageListener);.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
} .toString();
}
catch (Exception e)public {void stopSubscriber() throws JMSException {
etopicSubscriber.printStackTraceclose();
}topicSession.close();
} topicConnection.close();
public String getTCPConnectionURL(String username, String password) { System.out.println("Closing Subscriber");
return new StringBuffer()
}
} |
|
Localtab |
---|
title | SampleMessageListener.java |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
private int delay = 0;
.append("amqp://").append(username).append(":").append(password)
private int currentMsgCount = 0;
public SampleMessageListener(int delay) {
this.delay = delay;
.append("@").append(CARBON_CLIENT_ID)}
public void onMessage(Message message) {
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
TextMessage receivedMessage = (TextMessage) message;
try {
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
currentMsgCount++;
.toString(); } if(delay != public0) void{
stopSubscriber() throws JMSException { topicSubscriber.close(); try {
topicSession.close(); topicConnection.close( Thread.sleep(delay);
System.out.println("Closing Subscriber"); } } |
| Localtab |
---|
title | SampleMessageListener.java |
---|
| Code Block |
---|
| /*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sample.jms catch (InterruptedException e) {
//silently ignore
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.*TopicConnection;
public class SampleMessageListener implements MessageListener {
private int delay = 0;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
private intpublic currentMsgCountstatic =final 0;String ANDES_ICF = public SampleMessageListener(int delay) {
this.delay = delay"org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
}private static final String CF_NAME public void onMessage(Message message) {
= "andesConnectionfactory";
String userName = "admin";
TextMessage String receivedMessagepassword = (TextMessage) message"admin";
private static String CARBON_CLIENT_ID try {= "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
System.out.println("Got the messageprivate ==>static " + (currentMsgCount+1) + " - "+ receivedMessage.getText())String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
currentMsgCount++; String topicName = "newTopic";
public void ifpublishMessage(delay != 0int numOfMsgs) {throws NamingException, JMSException, InterruptedException {
Properties properties = new tryProperties();
{ properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
properties.put(CF_NAME_PREFIX + Thread.sleep(delayCF_NAME, getTCPConnectionURL(userName, password));
properties.put("topic."+topicName,topicName);
} catchInitialContext (InterruptedExceptionctx e)= {new InitialContext(properties);
// Lookup connection factory
//silently ignoreTopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
} topicConnection.start();
} TopicSession topicSession =
} catch (JMSException e) { e.printStackTrace();topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic }topic = (Topic)ctx.lookup(topicName);
} } |
| Localtab |
---|
| Code Block |
---|
| /* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "andesConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String topicName = "newTopic";
public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException { // Create the messages to send
TextMessage textMessage = topicSession.createTextMessage("Test Message");
javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
for (int i = 0; i < numOfMsgs; i++)
{
topicPublisher.publish(textMessage);
Thread.sleep(1000);
}
topicPublisher.close();
topicSession.close();
topicConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put("topic."+topicName,topicName);
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.start();
TopicSession topicSession =
topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = (Topic)ctx.lookup(topicName);
// Create the messages to send
TextMessage textMessage = topicSession.createTextMessage("Test Message");
javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
for (int i = 0; i < numOfMsgs; i++)
{
topicPublisher.publish(textMessage);
Thread.sleep(1000);
}
topicPublisher.close();
topicSession.close();
topicConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
| Localtab |
---|
| Code Block |
---|
| /*
* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
|
Localtab |
---|
| Code Block |
---|
| package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;
public class Main {
public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
durableTopicSubscriber.subscribe();
TopicPublisher topicPublisher = new TopicPublisher();
topicPublisher.publishMessage(5);
Thread.sleep(5000);
durableTopicSubscriber.stopSubscriber();
TopicPublisher topicPublisher2 = new TopicPublisher();
topicPublisher2.publishMessage(5);
Thread.sleep(5000);
DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
durableTopicSubscriber2.subscribe();
TopicPublisher topicPublisher3 = new TopicPublisher();
topicPublisher3.publishMessage(5);
Thread.sleep(5000);
durableTopicSubscriber2.stopSubscriber();
}
} |
|
|
...