This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.
Remote Procedure Call(RPC) with RabbitMQ
You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.
The following diagram illustrates a remote procedure call scenario with RabbitMQ:
The remote procedure call works as follows:
- When WSO2 Enterprise Integrator(WSO2 EI) starts up, it creates an anonymous, exclusive callback queue.
- For a remote procedure call request, WSO2 EI sends a message with the following properties:
reply_to
: This is set to the callback queuecorrelation_id
: This is set to a unique value for every request.
- The request is then sent to the rpc_queue.
The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the WSO2 EI server, using the queue from the
reply_to
field with the samecorrelation_id
.- WSO2 EI waits for data on the reply_to queue. When a message appears, it checks the
correlation_id
property. If it matches the value from the request, it returns the response to the application.
The following is a sample proxy service named RabbitMQRPCProxy
that sends request-response messages using the RabbitMQ transport.
<proxy xmlns="http://ws.apache.org/ns/synapse" name="RabbitMQRPCProxy" startOnLoad="true" trace="enable" transports="http"> <description/> <target> <inSequence> <log level="full"> <property name="received" value="true"/> </log> <send> <endpoint> <address uri="rabbitmq://?rabbitmq.server.host.name=localhost&rabbitmq.server.port=5672&rabbitmq.server.user.name=guest&rabbitmq.server.password=guest&rabbitmq.queue.name=rpc_queue&rabbitmq.queue.routing.key=rpc_queue&rabbitmq.replyto.name=dummy"/> </endpoint> </send> </inSequence> <outSequence> <log level="full"> <property name="response" value="true"/> </log> <send/> </outSequence> </target> </proxy>
The following is the code for a sample RPC server:
package rpc; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; public class RPCServer { public static void main(String[] argv) { Connection connection = null; Channel channel; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("rpc_queue", false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml") .build(); response = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " + "xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" + " <soapenv:Header/>\n" + " <soapenv:Body>\n" + " <ser:placeOrder>\n" + " <!--Optional:-->\n" + " <ser:order>\n" + " <!--Optional:-->\n" + " <xsd:price>10</xsd:price>\n" + " <!--Optional:-->\n" + " <xsd:quantity>5</xsd:quantity>\n" + " <!--Optional:-->\n" + " <xsd:symbol>RMQ</xsd:symbol>\n" + " </ser:order>\n" + " </ser:placeOrder>\n" + " </soapenv:Body>\n" + "</soapenv:Envelope>"; String replyToQueue = props.getReplyTo(); System.out.println("Publishing to : " + replyToQueue); channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }