Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: To run this code sample, 

  • you You need to have dependencies located at $CARBON_HOME/client-lib in class path.
  • You need to run QueueReceiver class prior to QueueSender class when testing this sample.

Using following QueueSender JMS client messages can be sent to 'testQueue'

 

Code Block
languagejava
/**<!--
 *~ Copyright (c) 20092005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 ~
 *
 * Licensed~ WSO2 Inc. licenses this file to you under the Apache License,
 ~ Version 2.0 (the "License");  * you may not use this file except
 ~ in compliance with the License.
 *~ You may obtain a copy of the License at
 *~
 ~   * http://www.apache.org/licenses/LICENSE-2.0
 *~
 *~ Unless required by applicable law or agreed to in writing,
software ~ *software distributed under the License is distributed on an
 ~ "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY
 ~ KIND, either express or implied.  * See the License for the
 ~ specific language governing permissions and limitations
* limitations~ under the License.
 */-->
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueSender {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String QUEUE_NAME_PREFIX = "queue.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";

    public static void main(String[] args) throws NamingException, JMSException {
        QueueSender queueSender = new QueueSender();
        queueSender.sendMessages();
    }
    public void sendMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        QueueConnection queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        QueueSession queueSession =
                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        // Send message
        Queue queue = (Queue)ctx.lookup(queueName);
        // create the message to send
        TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
        javax.jms.QueueSender queueSender = queueSession.createSender(queue);
        queueSender.send(textMessage);
        queueSender.close();
        queueSession.close();
        queueConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }

}

...

 

Using following QueueReceiver JMS client messages can be received from 'testQueue'

 

 

Code Block
languagejava
/**<!--
 *~ Copyright (c) 20092005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 ~
 *
 * Licensed~ WSO2 Inc. licenses this file to you under the Apache License,
 ~ Version 2.0 (the "License");
 * you may not use this file except
 ~ in compliance with the License.
 *~ You may obtain a copy of the License at
 ~
* ~   * http://www.apache.org/licenses/LICENSE-2.0
 *~
 *~ Unless required by applicable law or agreed to in writing,
software ~ *software distributed under the License is distributed on an
 ~ "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY
 ~ KIND, either express or implied.  * See the License for the
 ~ specific language governing permissions and limitations
* limitations~ under the License.
 */-->
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueReceiver {
    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_DEFAULT_PORT = "5672";
    String queueName = "testQueue";

    public static void main(String[] args) throws NamingException, JMSException {
        QueueReceiver queueReceiver = new QueueReceiver();
        queueReceiver.receiveMessages();
    }
    public void receiveMessages() throws NamingException, JMSException {
        Properties properties = new Properties();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
        properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
        properties.put("queue."+ queueName,queueName);
        System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
        InitialContext ctx = new InitialContext(properties);
        // Lookup connection factory
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
        QueueConnection queueConnection = connFactory.createQueueConnection();
        queueConnection.start();
        QueueSession queueSession =
                queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
        //Receive message
        Queue queue =  (Queue) ctx.lookup(queueName);
        MessageConsumer queueReceiver = queueSession.createConsumer(queue);
        TextMessage message = (TextMessage) queueReceiver.receive();
        System.out.println("Got message ==>" + message.getText());
        queueReceiver.close();
        queueSession.close();
        queueConnection.stop();
        queueConnection.close();
    }
    public String getTCPConnectionURL(String username, String password) {
        // amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
        return new StringBuffer()
                .append("amqp://").append(username).append(":").append(password)
                .append("@").append(CARBON_CLIENT_ID)
                .append("/").append(CARBON_VIRTUAL_HOST_NAME)
                .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
                .toString();
    }

}

 

Message received gets printed on the console. Now you should be able to see message count gets decreased for 'testQueue' using Management Console.