...
Using following QueueSender JMS client messages can be sent to 'testQueue'
Code Block | ||
---|---|---|
| ||
/**
* Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueSender {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String QUEUE_NAME_PREFIX = "queue.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "testQueue";
public static void main(String[] args) throws NamingException, JMSException {
QueueSender queueSender = new QueueSender();
queueSender.sendMessages();
}
public void sendMessages() throws NamingException, JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put(QUEUE_NAME_PREFIX + queueName, queueName);
System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession =
queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
// Send message
Queue queue = (Queue)ctx.lookup(queueName);
// create the message to send
TextMessage textMessage = queueSession.createTextMessage("Test Message Content");
javax.jms.QueueSender queueSender = queueSession.createSender(queue);
queueSender.send(textMessage);
queueSender.close();
queueSession.close();
queueConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
...
You can view created queue and its increased message count using Message Broker management console.
...
Using following QueueReceiver JMS client messages can be received from 'testQueue'
Code Block | ||
---|---|---|
| ||
/**
* Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
public class QueueReceiver {
public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
private static final String CF_NAME_PREFIX = "connectionfactory.";
private static final String CF_NAME = "qpidConnectionfactory";
String userName = "admin";
String password = "admin";
private static String CARBON_CLIENT_ID = "carbon";
private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
private static String CARBON_DEFAULT_HOSTNAME = "localhost";
private static String CARBON_DEFAULT_PORT = "5672";
String queueName = "testQueue";
public static void main(String[] args) throws NamingException, JMSException {
QueueReceiver queueReceiver = new QueueReceiver();
queueReceiver.receiveMessages();
}
public void receiveMessages() throws NamingException, JMSException {
Properties properties = new Properties();
properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password));
properties.put("queue."+ queueName,queueName);
System.out.println("getTCPConnectionURL(userName,password) = " + getTCPConnectionURL(userName, password));
InitialContext ctx = new InitialContext(properties);
// Lookup connection factory
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup(CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession =
queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//Receive message
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer queueReceiver = queueSession.createConsumer(queue);
TextMessage message = (TextMessage) queueReceiver.receive();
System.out.println("Got message ==>" + message.getText());
queueReceiver.close();
queueSession.close();
queueConnection.stop();
queueConnection.close();
}
public String getTCPConnectionURL(String username, String password) {
// amqp://{username}:{password}@carbon/carbon?brokerlist='tcp://{hostname}:{port}'
return new StringBuffer()
.append("amqp://").append(username).append(":").append(password)
.append("@").append(CARBON_CLIENT_ID)
.append("/").append(CARBON_VIRTUAL_HOST_NAME)
.append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'")
.toString();
}
} |
Message received gets printed on the console. Now you should be able to see message count gets decreased for 'testQueue' using Management Console.