This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.

Remote Procedure Call(RPC) with RabbitMQ

You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.

The following diagram illustrates a remote procedure call scenario with RabbitMQ:

 

The remote procedure call works as follows:

  • When WSO2 Enterprise Integrator(WSO2 EI) starts up, it creates an anonymous, exclusive callback queue. 
  • For a remote procedure call request, WSO2 EI sends a message with the following properties:
    • reply_to : This is set to the callback queue
    • correlation_id : This is set to a unique value for every request.
  • The request is then sent to the rpc_queue.
  • The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the WSO2 EI server, using the queue from the reply_to field with the same correlation_id.

  • WSO2 EI waits for data on the reply_to queue. When a message appears, it checks the correlation_id property. If it matches the value from the request, it returns the response to the application.

The following is a sample proxy service named RabbitMQRPCProxy that sends request-response messages using the RabbitMQ transport.

<proxy xmlns="http://ws.apache.org/ns/synapse"
   	name="RabbitMQRPCProxy"
   	startOnLoad="true"
   	trace="enable"
       transports="http">
   <description/>
   <target>
  	<inSequence>
     	<log level="full">
        	<property name="received" value="true"/>
     	</log>
     	<send>
        	<endpoint>
           	<address uri="rabbitmq://?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=guest&amp;rabbitmq.server.password=guest&amp;rabbitmq.queue.name=rpc_queue&amp;rabbitmq.queue.routing.key=rpc_queue&amp;rabbitmq.replyto.name=dummy"/>
        	</endpoint>
     	</send>
  	</inSequence>
  	<outSequence>
     	<log level="full">
        	<property name="response" value="true"/>
     	</log>
     	<send/>
  	</outSequence>
   </target>
</proxy>

The following is the code for a sample RPC server:

package rpc;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;

public class RPCServer {

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume("rpc_queue", false, consumer);

            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps =
                        new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml")
                                                     .build();

                response =
                        "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " +
                        "xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" +
                        "   <soapenv:Header/>\n" +
                        "   <soapenv:Body>\n" +
                        "      <ser:placeOrder>\n" +
                        "         <!--Optional:-->\n" +
                        "         <ser:order>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:price>10</xsd:price>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:quantity>5</xsd:quantity>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:symbol>RMQ</xsd:symbol>\n" +
                        "         </ser:order>\n" +
                        "      </ser:placeOrder>\n" +
                        "   </soapenv:Body>\n" +
                        "</soapenv:Envelope>";

                String replyToQueue = props.getReplyTo();
                System.out.println("Publishing to : " + replyToQueue);
                channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}