Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Localtabgroup
Localtab
titleDurableTopicSubscriber.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"package org.sample.jms;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class DurableTopicSubscriber {
    public static final String ANDES_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    private String topicName = "newTopic";
    private String subscriptionId = "mySub1";
    private boolean useListener = true;
    private int delayBetMessages = 200;
    private int messageCount = 10;
    private staticSampleMessageListener final String CF_NAME_PREFIX = "connectionfactory."messageListener;
    private TopicConnection topicConnection;
    private static final String CF_NAME = "andesConnectionfactory" TopicSession topicSession;
    private TopicSubscriber topicSubscriber;
    Stringpublic userNamevoid = "admin";subscribe() {
    String password = "admin"; try {
  private static String CARBON_CLIENT_ID = "carbon";     private static String CARBON_VIRTUAL_HOST_NAME = "carbon";System.out.println("Starting the subscriber");
       private static String CARBON_DEFAULT_HOSTNAME = "localhost";Properties properties = new Properties();
private static String CARBON_DEFAULT_PORT = "5672";     private String topicName = "newTopic" properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
    private String subscriptionId = "mySub1";     private boolean useListener = true;
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
   private int delayBetMessages = 200;     private int messageCount = 10properties.put("topic." + topicName, topicName);
    private SampleMessageListener messageListener;     private TopicConnectionInitialContext topicConnection;ctx =    private TopicSession topicSessionnew InitialContext(properties);
    private TopicSubscriber topicSubscriber;     public void subscribe() {
// Lookup connection factory
       try {    TopicConnectionFactory connFactory =       System.out.println("Starting the subscriber"(TopicConnectionFactory) ctx.lookup(CF_NAME);
            PropertiestopicConnection properties = new PropertiesconnFactory.createTopicConnection();
            propertiestopicConnection.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICFstart();
             properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));topicSession =
                    propertiestopicConnection.put("topic." + topicName, topicNamecreateTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
            InitialContext ctx = new InitialContext(properties);// create durable subscriber with subscription ID
            //Topic Lookuptopic connection= factory
(Topic) ctx.lookup(topicName);
           TopicConnectionFactory connFactorytopicSubscriber = (TopicConnectionFactory) ctx.lookup(CF_NAMEtopicSession.createDurableSubscriber(topic, subscriptionId);
            topicConnection = connFactory.createTopicConnection();
if (!useListener) {
           topicConnection.start();     for (int count = 0; count <  topicSession =messageCount; count++) {
                    Message message = topicConnectiontopicSubscriber.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGEreceive();
            // create durable subscriber with subscription ID  System.out.println("count = " + count);
      Topic topic = (Topic) ctx.lookup(topicName);          if (message instanceof topicSubscriberTextMessage) = topicSession.createDurableSubscriber(topic, subscriptionId);{
              if (!useListener) {        TextMessage textMessage = (TextMessage) message;
    for (int count = 0; count < messageCount; count++) {           System.out.println(count + ". textMessage.getText() = "     Message message = topicSubscriber.receive(+ textMessage.getText());
                    System.out.println("count = " + count);}
                    if (messagedelayBetMessages instanceof!= TextMessage0) {
                        TextMessage textMessage = (TextMessage) messageThread.sleep(delayBetMessages);
                    }
   System.out.println(count + ". textMessage.getText() = " + textMessage.getText());      }
              }  topicConnection.close();
            } else {
   if (delayBetMessages != 0) {         messageListener =               Thread.sleepnew SampleMessageListener(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);
   }         }
        } catch (Exception e) {
            topicConnectione.closeprintStackTrace();
        }
    }
else {   public String getTCPConnectionURL(String username, String password) {
       messageListener =return new SampleMessageListenerStringBuffer(delayBetMessages);
                topicSubscriber.setMessageListener(messageListener);append("amqp://").append(username).append(":").append(password)
            }    .append("@").append(CARBON_CLIENT_ID)
    } catch (Exception e) {        .append("/").append(CARBON_VIRTUAL_HOST_NAME)
     e.printStackTrace();         }
    }  .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
      public String getTCPConnectionURL(String username, String password) {    .toString();
    return}
new StringBuffer()   public void stopSubscriber() throws JMSException {
        topicSubscriber.append("amqp://").append(username).append(":").append(password)close();
        topicSession.close();
        topicConnection.appendclose("@").append(CARBON_CLIENT_ID));
        System.out.println("Closing Subscriber");
    }
}
Localtab
titleSampleMessageListener.java
Code Block
languagejava
package .append("/").append(CARBON_VIRTUAL_HOST_NAME)
       org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int delay  .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")= 0;
    private int currentMsgCount = 0;
    public   .toString();
SampleMessageListener(int delay) {
   }     publicthis.delay void stopSubscriber() throws JMSException {
 = delay;
    }
    public void topicSubscriber.close();onMessage(Message message) {
        TextMessage receivedMessage = topicSession.close(TextMessage) message;
        topicConnection.close();try {
            System.out.println("Closing Subscriber");
    }
}
Localtab
titleSampleMessageListener.java
Code Block
languagejava
/*
 * Copyright 2004,2005 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.sample.jms;
import javax.jms.*;
public class SampleMessageListener implements MessageListener {
    private int delay = 0;
    private int currentMsgCount = 0;
    public SampleMessageListener(int delayGot the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText());
            currentMsgCount++;
            if(delay != 0) {
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    //silently ignore
                }
            }
        } catch (JMSException e) {
        this.delay = delay;  e.printStackTrace();
  }     public void}
onMessage(Message message) {  }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import  TextMessage receivedMessage = (TextMessage) message;
        try {
            System.out.println("Got the message ==> " + (currentMsgCount+1) + " - "+ receivedMessage.getText())javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
 currentMsgCount++;   private static final String CF_NAME = "andesConnectionfactory";
    String if(delayuserName != 0) {"admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = try"carbon";
{    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME    Thread.sleep(delay)= "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName } catch (InterruptedException e) {
         = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
        Properties properties //silently= ignorenew Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
     }   properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
     }   properties.put("topic."+topicName,topicName);
     } catch (JMSException e)InitialContext {ctx = new InitialContext(properties);
         e.printStackTrace();
 // Lookup connection factory
      }  TopicConnectionFactory connFactory = }
}
Localtab
titleTopicPublisher.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class TopicPublisher {
    public static final String ANDES_ICF  = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "andesConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String topicName = "newTopic";
    public void publishMessage(int numOfMsgs) throws NamingException, JMSException, InterruptedException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, ANDES_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("topic."+topicName,topicName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)ctx.lookup(topicName);
        // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

(TopicConnectionFactory) ctx.lookup(CF_NAME);
        TopicConnection topicConnection = connFactory.createTopicConnection();
        topicConnection.start();
        TopicSession topicSession =
                topicConnection.createTopicSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)ctx.lookup(topicName);
        // Create the messages to send
        TextMessage textMessage = topicSession.createTextMessage("Test Message");
        javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        System.out.println("Sending " + numOfMsgs + " messages to Topic: " + topicName);
        for (int i = 0; i < numOfMsgs; i++)
         {
             topicPublisher.publish(textMessage);
             Thread.sleep(1000);
         }
        topicPublisher.close();
        topicSession.close();
        topicConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
}
Localtab
titleMain.java
Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
    public static void main(String[] args) throws NamingException, JMSException, InterruptedException {
        DurableTopicSubscriber durableTopicSubscriber = new DurableTopicSubscriber();
        durableTopicSubscriber.subscribe();
        TopicPublisher topicPublisher = new TopicPublisher();
        topicPublisher.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber.stopSubscriber();
        TopicPublisher topicPublisher2 = new TopicPublisher();
        topicPublisher2.publishMessage(5);
        Thread.sleep(5000);
        DurableTopicSubscriber durableTopicSubscriber2 = new DurableTopicSubscriber();
        durableTopicSubscriber2.subscribe();
        TopicPublisher topicPublisher3 = new TopicPublisher();
        topicPublisher3.publishMessage(5);
        Thread.sleep(5000);
        durableTopicSubscriber2.stopSubscriber();
    }
}

...