Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • DurableTopicSubscriber.java class creates a durable topic subscription named mySub1.

  • SampleMessageListener.java class creates a consumer for the durable topic subscription. 

  • TopicPublisher.java class creates a publisher to publish messages in the durable topic. 

  • Main.java defines the method for calling the three clients mentioned above. 

Click the relevant tab to see the code.

Localtabgroup
Localtab
titleDurableTopicSubscriber.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory."package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "newTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 10;
    private SampleMessageListener messageListener;
    private TopicConnection topicConnection;
    private TopicSession topicSession;
    private staticTopicSubscriber finaltopicSubscriber;
String CF_NAME = "andesConnectionfactory"; public void subscribe() {
String userName = "admin";     Stringtry password{
=  "admin";     private static String CARBON_CLIENT_ID = "carbon"System.out.println("Starting the subscriber");
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";   Properties properties private= static String CARBON_DEFAULT_HOSTNAME = "localhost";new Properties();
       private static String CARBON_DEFAULT_PORT = "5672"   properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
    private String topicName = "newTopic";     private String subscriptionId = "mySub1"properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
    private boolean useListener = true;     private int delayBetMessages = 200;properties.put("topic." + topicName, topicName);
      private int messageCount = 10;  InitialContext ctx = private SampleMessageListener messageListenernew InitialContext(properties);
    private TopicConnection topicConnection;     private TopicSession// topicSession;Lookup connection factory
  private TopicSubscriber topicSubscriber;     public void subscribe() {TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
    try {       topicConnection =     System.out.println("Starting the subscriber"connFactory.createTopicConnection();
            Properties properties = new Properties();topicConnection.start();
            topicSession =
     properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);             properties topicConnection.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            // create durable  properties.put("topic." + topicName, topicName);subscriber with subscription ID
            Topic InitialContext ctxtopic = new InitialContext(properties(Topic) ctx.lookup(topicName);
            //topicSubscriber Lookup connection factory= topicSession.createDurableSubscriber(topic, subscriptionId);
            TopicConnectionFactoryif connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);(!useListener) {
            topicConnection = connFactory.createTopicConnection();  for (int count = 0; count < messageCount; count++) {
 topicConnection.start();             topicSession =     Message message = topicSubscriber.receive();
                    topicConnectionSystem.out.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGEprintln("count = " + count);
            // create durable subscriber with subscription ID  if (message instanceof TextMessage) {
      Topic topic = (Topic) ctx.lookup(topicName);             topicSubscriber TextMessage textMessage = topicSession.createDurableSubscriber(topic, subscriptionId)(TextMessage) message;
            if (!useListener) {          System.out.println(count + ". textMessage.getText() = "  for (int count = 0; count < messageCount; count++) {+ textMessage.getText());
                    }
        Message message = topicSubscriber.receive();          if (delayBetMessages != 0) {
                    System.out.println("count = " + countThread.sleep(delayBetMessages);
                    if}
  (message instanceof TextMessage) {           }
             TextMessage textMessage = topicConnection.close(TextMessage);
  message;          } else {
            System.out.println(count + ". textMessage.getText()    messageListener = " + textMessage.getText()new SampleMessageListener(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);
   }         }
        } catch  if (delayBetMessages != 0Exception e) {
                        Thread.sleep(delayBetMessagese.printStackTrace();
        }
    }
    public String }getTCPConnectionURL(String username, String password) {
        return new StringBuffer()
 }                 topicConnection.close();.append("amqp://").append(username).append(":").append(password)
            } else {  .append("@").append(CARBON_CLIENT_ID)
              messageListener = new SampleMessageListener(delayBetMessages); .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                topicSubscriber.setMessageListener(messageListener);.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
            }    .toString();
    }
  catch (Exception e)public {void stopSubscriber() throws JMSException {
        etopicSubscriber.printStackTraceclose();
        }topicSession.close();
    }    topicConnection.close();
public String getTCPConnectionURL(String username, String password) {  System.out.println("Closing Subscriber");
     return new StringBuffer()
   }
}
Localtab
titleSampleMessageListener.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int delay = 0;
    .append("amqp://").append(username).append(":").append(password)
private int currentMsgCount = 0;
    public SampleMessageListener(int delay) {
        this.delay = delay;
    .append("@").append(CARBON_CLIENT_ID)}
    public void onMessage(Message message) {
         .append("/").append(CARBON_VIRTUAL_HOST_NAME)
TextMessage receivedMessage = (TextMessage) message;
        try {
     .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")       System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
            currentMsgCount++;
    .toString();     }   if(delay != public0) void{
stopSubscriber() throws JMSException {         topicSubscriber.close();    try {
   topicSession.close();         topicConnection.close(        Thread.sleep(delay);
         System.out.println("Closing Subscriber");     } }
Localtab
titleSampleMessageListener.java
Code Block
languagejava
/*
 * Copyright 2004,2005 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.sample.jms catch (InterruptedException e) {
                    //silently ignore
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.*TopicConnection;
public class SampleMessageListener implements MessageListener {
    private int delay = 0;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
   private intpublic currentMsgCountstatic =final 0;String ANDES_ICF  =  public SampleMessageListener(int delay) {
        this.delay = delay"org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    }private static final String CF_NAME public void onMessage(Message message) {
    = "andesConnectionfactory";
    String userName = "admin";
   TextMessage String receivedMessagepassword = (TextMessage) message"admin";
    private static String CARBON_CLIENT_ID try {= "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
  System.out.println("Got the messageprivate ==>static " + (currentMsgCount+1) + " - "+ receivedMessage.getText())String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
  currentMsgCount++;  String topicName = "newTopic";
    public void  ifpublishMessage(delay != 0int numOfMsgs) {throws NamingException, JMSException, InterruptedException {
        Properties properties = new tryProperties();
{        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX +  Thread.sleep(delayCF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
       } catchInitialContext (InterruptedExceptionctx e)= {new InitialContext(properties);
        // Lookup connection factory
       //silently ignoreTopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
}        topicConnection.start();
    }    TopicSession topicSession =
  } catch (JMSException e) {             e.printStackTrace();topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic }topic = (Topic)ctx.lookup(topicName);
  } }
Localtab
titleTopicPublisher.java
Code Block
languagejava
/* *  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException { // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
             Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)ctx.lookup(topicName);
        // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
        durableTopicSubscriber.subscribe();
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber.stopSubscriber();
        TopicPublisher topicPublisher2 = new TopicPublisher();
        topicPublisher2.publishMessage(5);
        Thread.sleep(5000);
        DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
        durableTopicSubscriber2.subscribe();
        TopicPublisher topicPublisher3 = new TopicPublisher();
        topicPublisher3.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber2.stopSubscriber();
    }
}

...