This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.
RabbitMQ AMQP Transport
AMQP is a wire-level messaging protocol that describes the format of the data that is sent across the network. If a system or application can read and write AMQP, it can exchange messages with any other system or application that understands AMQP, regardless of the implementation language.
The RabbitMQ AMQP transport is implemented using the RabbitMQ Java Client. It allows you to send or receive AMQP messages by calling an AMQP broker (RabbitMQ) directly, without the need to use different transport mechanisms, such as JMS.
The following diagram illustrates a scenario where the ESB uses the RabbitMQ AMQP transport to exchange messages between RabbitMQ Java Clients by calling RabbitMQ brokers.
As you can see in the diagram, the Sender uses the RabbitMQ Java Client to publish messages to an AMQP queue (Q1), and the Receiver users it to consume messages from an AMQP queue (Q2). In this scenario, a proxy service in the ESB listens to Q1, and when a message becomes available on the queue, the proxy service consumes it and publishes it to Q2.
The following sections describe the process of installing the RabbitMQ AMQP transport, configuring the transport as well as using the transport by walking you through a sample RabbitMQ proxy service.
Installing the RabbitMQ AMQP Transport
The RabbitMQ AMQP transport is developed as a separate module of the transports project. You can install it via the ESB Management Console by accessing the relevant jar files from the http://product-dist.wso2.com/p2/extras/releases/4.2.0/rabbitmq-axis2-transport/ external repository.
To install the RabbitMQ AMQP transport as a feature:
- Start the ESB server .
- On the Configure tab in the Management Console, click Features. The Feature Management page appears.
- On the Feature Management page, click Repository Management and then click Add Repository.
- Provide a name for the repository and enter
http://product-dist.wso2.com/p2/extras/releases/4.2.0/rabbitmq-axis2-transport/
as the repository location URL, then click Add. The repository you created will appear in the Available Repositories list. - Click the Available Features tab.
- Select the added repository and the Show only the latest versions check box, then click Find features. You will see that Axis2 Transport RabbitMQ AMQP is listed as a feature.
- Select the Axis2 Transport RabbitMQ AMQP check box and click Install. The Install Details appear.
- Click Next. The License Agreement appears.
- Read and accept the terms of the license agreement, and then click Next. The installation starts and it may take a few minutes for the installation to complete.
- Once the installation process is complete, stop the ESB server.
You are now ready to configure the AMQP transport listener and sender in axis2.xml
.
Configuring the RabbitMQ AMQP Transport
- Open
<ESB_HOME>/repository/conf/axis2/axis2.xml
for editing. In the transport listeners section, add the following RabbitMQ transport listener, replacing the values with your host, port, username, and password to connect to the AMQP broker.
<transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener"> <parameter name="AMQPConnectionFactory" locked="false"> <parameter name="rabbitmq.server.host.name" locked="false">192.168.0.3</parameter> <parameter name="rabbitmq.server.port" locked="false">5672</parameter> <parameter name="rabbitmq.server.user.name" locked="false">user</parameter> <parameter name="rabbitmq.server.password" locked="false">abc123</parameter> </parameter> </transportReceiver>
As an optional step, you can create multiple connection factories if you want this listener to connect to multiple brokers.
In the transport senders section, add the following RabbitMQ transport sender, which will be used for sending AMQP messages to a queue:
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>
- Start the ESB server.
You are now ready to create the RabbitMQ proxy service.
Creating the RabbitMQ Proxy Service
Following is a sample RabbitMQ proxy service named AMQPProxy, which consumes AMQP messages from one RabbitMQ broker and publishes them to another:
<proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProxy" transports="rabbitmq" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <log level="full"/> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> </inSequence> <endpoint> <address uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=192.168.0.3&rabbitmq.server.port=5672&rabbitmq.server.user.name=user&rabbitmq.server.password=abc123&rabbitmq.queue.name=queue2&rabbitmq.exchange.name=exchange2"/> </endpoint> </target> <parameter name="rabbitmq.queue.name">queue1</parameter> <parameter name="rabbitmq.exchange.name">exchange1</parameter> <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter> <description></description> </proxy>
Note the following:
- The transport key name is
rabbitmq
. You need to specify this key name in the transports parameter (ie.,transports="rabbitmq"
). - The proxy is defined as OUT_ONLY, because it does not expect a response from the endpoint.
- The endpoint specifies where the messages will be published. The URI prefix is
rabbbitmq
so that the RabbitMQ AMQP transport will be used to publish the message. Be sure to specify the rest of the parameters in the URI as shown in the sample proxy service above. (NOTE: if you are configuring the URI through the management console instead of entering the configuration directly in the configuration file, you must encode the ampersands in the URI as "&" instead of "&".) If you do not know which RabbitMQ exchange to use, leave the value blank to use the default exchange. - The
rabbitmq.queue.name
parameter specifies the queue on which the proxy service listens and consumes messages. If you do not specify a name for this parameter, the name of the proxy service will be used as the queue name. - The
rabbitmq.exchange.name
parameter specifies the RabbitMQ exchange to which the queue is bound. If you do not want to use a specific exchange, leave this value blank to use the default exchange. - The
rabbitmq.connection.factory
parameter specifies the listener that listens on the queue and consumes messages. In this example, the connection factory is set to the name of the listener we created earlier (ie.,AMQPConnectionFactory
).
RabbitMQ AMQP Transport Properties
Following is a quick reference to all the RabbitMQ AMQP transport properties:
rabbitmq.server.host.name
– Host name of the server.rabbitmq.server.port
– Port value of the server.rabbitmq.server.user.name
– User name to connect to the server.rabbitmq.server.password
– Password of the account to connect to the server.rabbitmq.server.virtual.host
– Virtual host name of the server, if any.rabbitmq.queue.name
– Queue name to send or consume messages.rabbitmq.exchange.name
– Name of the RabbitMQ exchange to which the queue is bound.
Here the rabbitmq.server
properties refer to the server where RabbitMQ is running.
You can modify the sample proxy service above to handle scenarios where you only want to receive AMQP messages but need to send messages in a different format, or you want to receive messages in a different format and send only AMQP messages. You can also modify the proxy service to work with a different transport. For example, you can create a proxy that uses the RabbitMQ AMQP transport to listen to messages and then sends them over HTTP or JMS.
Sample Java Clients
This section describes the sample Java clients that you can use to send and receive AMQP messages. These clients can be used to test the scenario where the Sender publishes a message to a RabbitMQ AMQP queue, which is consumed by the ESB and published to another queue, which in turn is consumed by the Receiver. When you run these clients, the Receiver will get the messages sent by the Sender, confirming that you have correctly configured the RabbitMQ AMQP transport.
AMQP Sender
The following Java client sends SOAP XML messages to a RabbitMQ queue:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(username); factory.setPassword(password); factory.setPort(port); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.exchangeDeclare(exchangeName, "direct", true); channel.queueBind(queueName, exchangeName, routingKey); // The message to be sent String message = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope">\n" + "<soapenv:Header/>\n" + "<soapenv:Body>\n" + " <p:greet xmlns:p=\"http://greet.service.kishanthan.org\">\n" + " <in>" + name + "</in>\n" + " </p:greet>\n" + "</soapenv:Body>\n" + "</soapenv:Envelope>"; // Populate the AMQP message properties AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder(); builder.messageId(messageID); builder.contentType("text/xml"); builder.replyTo(replyToAddress); builder.correlationId(correlationId); builder.contentEncoding(contentEncoding); // Custom user properties Map<String, Object> headers = new HashMap<String, Object>(); headers.put("SOAP_ACTION", "greet"); builder.headers(headers); // Publish the message to exchange channel.basicPublish(exchangeName, queueName, builder.build(), message.getBytes());
This client was tested with SOAP messages sent and consumed from AMQP broker queues with the content type “text/xml”. When specifying the queue name for publishing messages, be sure to specify the same queue where the RabbitMQ transport listener is listening.
AMQP Receiver
When specifying the queue name for consuming messages, be sure to specify the same queue configured in the proxy service endpoint.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setUsername(userName); factory.setPassword(password); factory.setPort(port); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.exchangeDeclare(exchangeName, "direct", true); channel.queueBind(queueName, exchangeName, routingKey); // Create the consumer QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // Start consuming messages while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); }