Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Localtabgroup
Localtab
titleSampleQueueSender.java

This class is used to create the sample client which sends messages to the queue named testQueue in WSO2 MB. The code of this class is as follows:

Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleQueueSender {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    public void sendMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICFpackage org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleQueueSender {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    public void sendMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password))javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        propertiesqueueSender.put(QUEUE_NAME_PREFIX + queueName, queueNamesend(textMessage);
        InitialContext ctx = new InitialContext(propertiesqueueSender.close();
        queueSession.close();
        // Lookup connection factory
 queueConnection.close();
    }
    private String QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);getTCPConnectionURL(String username, String password) {
        queueConnection = connFactory.createQueueConnection();// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new queueConnection.startStringBuffer();
        queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);     .append("amqp://").append(username).append(":").append(password)
   // Send message         Queue queue = (Queue)ctx.lookup(queueName);.append("@").append(CARBON_CLIENT_ID)
        // create the message to send         TextMessage textMessage = queueSession.createTextMessage("Test Message Content");.append("/").append(CARBON_VIRTUAL_HOST_NAME)
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        queueSender.send(textMessage);append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
        queueSender.close();
        queueSession.closetoString();
        queueConnection.close();
    }
    private String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
 
}
Localtab
titleSampleQueueReceiver.java

This class is used to create the sample client which receives the message sent to the testQueue queue and prints it in the console. The code of this class is as follows:

Code Block
languagejava
/*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleQueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    public MessageConsumer registerSubscriber() throws NamingException, JMSException{
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        queueSession =
      }
 
}
Localtab
titleSampleQueueReceiver.java

This class is used to create the sample client which receives the message sent to the testQueue queue and prints it in the console. The code of this class is as follows:

Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class SampleQueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";
    private QueueConnection queueConnection;
    private QueueSession queueSession;
    public MessageConsumer registerSubscriber() throws NamingException, JMSException{
         queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGEProperties properties = new Properties();
        //Receive messageproperties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        Queue queue =  (Queue) ctx.lookup(queueNameproperties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        MessageConsumer consumer = queueSession.createConsumer(queue);
properties.put("queue."+ queueName,queueName);
       return consumer;InitialContext ctx = new InitialContext(properties);
}     public void receiveMessages(MessageConsumer consumer) throws NamingException, JMSException { // Lookup connection factory
        QueueConnectionFactory TextMessage messageconnFactory = (TextMessageQueueConnectionFactory) consumerctx.receivelookup(CF_NAME);
        System.out.println("Got message from queue receiver==>" + message.getText()queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        //queueSession Housekeeping=
        consumer.close();         queueSession.close(queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        queueConnection.stop();//Receive message
        Queue queue =  (Queue) queueConnectionctx.closelookup(queueName);
    }    MessageConsumer privateconsumer String= getTCPConnectionURL(String username, String password) {queueSession.createConsumer(queue);
         // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'return consumer;
    }
    returnpublic newvoid StringBufferreceiveMessages(MessageConsumer consumer) throws NamingException, JMSException {
        TextMessage message  = .append("amqp://").append(username).append(":").append(password)(TextMessage) consumer.receive();
        System.out.println("Got message from queue receiver==>" +   message.appendgetText("@").append(CARBON_CLIENT_ID));
        // Housekeeping
        consumer.appendclose("/").append(CARBON_VIRTUAL_HOST_NAME));
        queueSession.close();
         queueConnection.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")stop();
        queueConnection.close();
       .toString();}
    private String }getTCPConnectionURL(String username, 
}
Localtab
titleMain.java

The Main.java class defines the main method for calling both the clients mentioned above. The code of this class is as follows:

Code Block
languagejava
 /*
*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
*  WSO2 Inc. licenses this file to you under the Apache License,
*  Version 2.0 (the "License"); you may not use this file except
*  in compliance with the License.
*  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }
 
}
Localtab
titleMain.java

The Main.java class defines the main method for calling both the clients mentioned above. The code of this class is as follows:

Code Block
languagejava
package org.sample.jms;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.naming.NamingException;

public class Main {
	public static void main(String[] args) throws NamingException, JMSException {
        SampleQueueReceiver queueReceiver = new SampleQueueReceiver();
        MessageConsumer consumer = queueReceiver.registerSubscriber();
        SampleQueueSender queueSender = new SampleQueueSender();
        queueSender.sendMessages();
        queueReceiver.receiveMessages(consumer);
    }
}

...