You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.
The following diagram illustrates a remote procedure call scenario with RabbitMQ:
The remote procedure call works as follows:
- When WSO2 ESB starts up, it creates an anonymous exclusive callback queue.
- For a remote procedure call request, the ESB sends a message with the following properties:
reply_to
- This is set to the callback queuecorrelation_id
- This is set to a unique value for every request.
- The request is then sent to the rpc_queue.
The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the ESB, using the queue from the
reply_to
field with the samecorrelation_id
.- The ESB waits for data on the reply_to queue. When a message appears, it checks the
correlation_id
property. If it matches the value from the request it returns the response to the application.
Following is a sample proxy service named RabbitMQRPCProxy
that sends request-response messages using the RabbitMQ transport.
Code Block | ||
---|---|---|
| ||
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="RabbitMQRPCProxy"
startOnLoad="true"
trace="enable"
transports="http">
<description/>
<target>
<inSequence>
<log level="full">
<property name="received" value="true"/>
</log>
<send>
<endpoint>
<address uri="rabbitmq://?rabbitmq.server.host.name=localhost&rabbitmq.server.port=5672&rabbitmq.server.user.name=guest&rabbitmq.server.password=guest&rabbitmq.queue.name=rpc_queue&rabbitmq.queue.routing.key=rpc_queue&rabbitmq.replyto.name=dummy"/>
</endpoint>
</send>
</inSequence>
<outSequence>
<log level="full">
<property name="response" value="true"/>
</log>
<send/>
</outSequence>
</target>
</proxy> |
Following is the code for a sample RPC server:
Code Block | ||
---|---|---|
| ||
package rpc;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
public class RPCServerV {
public static void main(String[] argv) {
Connection connection = null;
Channel channel;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("rpc_queue", false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps =
new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml")
.build();
response =
"<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " +
"xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" +
" <soapenv:Header/>\n" +
" <soapenv:Body>\n" +
" <ser:placeOrder>\n" +
" <!--Optional:-->\n" +
" <ser:order>\n" +
" <!--Optional:-->\n" +
" <xsd:price>10</xsd:price>\n" +
" <!--Optional:-->\n" +
" <xsd:quantity>5</xsd:quantity>\n" +
" <!--Optional:-->\n" +
" <xsd:symbol>RMQ</xsd:symbol>\n" +
" </ser:order>\n" +
" </ser:placeOrder>\n" +
" </soapenv:Body>\n" +
"</soapenv:Envelope>";
String replyToQueue = props.getReplyTo();
System.out.println("Publishing to : " + replyToQueue);
channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
|