RabbitMQ Use Cases
The following are some of the main RabbitMQ use cases of WSO2 EI.
WSO2 EI as a RabbitMQ Message Consumer
The following is a sample scenario that demonstrates how WSO2 EI is configured to listen to a rabbitMQ queue, consume messages, and send the messages to an HTTP back-end service.
Note
To create proxy services, sequences, endpoints, message stores and message processors in WSO2 EI, you can either use the management console or copy the XML configuration to the source view. To access the source view on the WSO2 EI management console, go to Manage -> Service Bus -> Source View.
Prerequisites
- Configure the RabbitMQ AMQP transport. For information on how to configure the transport, see Configuring the RabbitMQ AMQP transport.
- Start the WSO2 EI server.
Configure the sample
Create a custom proxy service with the following configuration. For more information on creating proxy services, see Working with Proxy Services.
<?xml version="1.0" encoding="UTF8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProxy" transports="rabbitmq" statistics="disable" trace="enable" startOnLoad="true"> <target> <inSequence> <log level="full"/> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> <send> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> </send> </inSequence> </target> <outSequence> <drop/> </outSequence> <parameter name="rabbitmq.queue.name">queue</parameter> <parameter name="rabbitmq.exchange.name">exchange</parameter> <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter> <description/> </proxy>
- WSO2 EI comes with a default Axis2 server, which you can use as the back-end service for this sample. To start the Axis2 server, navigate to
<EI_HOME>/samples/axis2server
, and runaxis2Server.sh
on Linux oraxis2Server.bat
on Windows. - Deploy the SimpleStockQuoteService client by navigating to
<EI_HOME>/samples/axis2Server/src/SimpleStockQuoteService
, and running the ant command on the command prompt or shell script. This will build the sample and deploy the service for you. For more information on sample back-end services, see Deploying sample back-end services.
Now you have a running WSO2 EI instance with a custom proxy service and a back-end service deployed. Next, we will send a message to the back-end service through WSO2 EI using a sample client.
Execute the sample client
Run the following client to publish a getquote request to the RabbitMQ server exchange that is running on port 5672
.
ConnectionFactoryfactory =new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5672); Connectionconnection =factory.newConnection(); Channelchannel =connection.createChannel(); channel.queueDeclare("queue",false,false,false,null); channel.exchangeDeclare("exchange","direct",true); channel.queueBind("queue","exchange","route"); //Themessagetobesent Stringmessage ="<m:placeOrder xmlns:m=\"http://services.samples\">" + +"<m:order>" +"<m:price>100</m:price>\n" +"<m:quantity>20</m:quantity>" +"<m:symbol>RMQ</m:symbol>" +"</m:order>" +"</m:placeOrder>"; //PopulatetheAMQPmessageproperties AMQP.BasicProperties.Builderbuilder =new AMQP.BasicProperties().builder(); builder.contentType("text/xml"); builder.contentEncoding(contentEncoding); //Publishthemessagetoexchange channel.basicPublish("exchange","queue",builder.build(),message.getBytes());
Analyzing the output
The direct exchange is bound to the queue with route-key queue
that is consumed by WSO2 EI's RabbitMQ transport receiver. From there the message will be sent to the AMQPProxy and it will be forwarded to the given http url.
If you analyze the console running the sample Axis2 server, you will see the following message indicating that the server has accepted an order
Accepted order #1 for : 7078 stocks of IBM at $ 73.73786002620719
WSO2 EI as a RabbitMQ Message Producer
This section describes how WSO2 Enterprise Integrator(WSO2 EI) can be used to send messages to a RabbitMQ queue.
Following is a sample scenario that demonstrates how WSO2 EI is configured to listen to HTTP requests and publish them to a RabbitMQ server (message exchange).
Note
To create proxy services, sequences, endpoints, message stores and message processors in WSO2 EI, you can either use the management console or copy the XML configuration to the source view. To access the source view on the management console, go to Manage -> Service Bus -> Source View.
Prerequisites
- Configure the RabbitMQ AMQP transport. For information on how to configure the transport, see Configuring the RabbitMQ AMQP transport.
- Start the WSO2 EI server.
Configure the sample
Create a custom proxy service with the following configuration. For more information on creating proxy services, see Working with Proxy Services.
<?xml version="1.0" encoding="UTF8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProducerSample" transports="http" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> <send> <endpoint> <address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&rabbitmq.server.port=5672&rabbitmq.queue.name=queue&rabbitmq.queue.route.key=route&rabbitmq.exchange.name=exchange"/> </endpoint> </send> </inSequence> <outSequence> <send/> </outSequence> </target> <description/> </proxy>
Use the following as a RabbitMQ consumer that will consume and display the incoming messages to the RabbitMQ queue.
ConnectionFactoryfactory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5672); Connectionconnection = factory.new Connection(); Channelchannel = connection.createChannel(); channel.queueDeclare("queue",false,false,false,null); channel.exchangeDeclare("exchange","direct",true); channel.queueBind("queue","exchange","route"); //Createtheconsumer QueueingConsumerconsumer = new QueueingConsumer(channel); channel.basicConsume("queue",true,consumer); //Startconsumingmessages while(true) { QueueingConsumer.Deliverydelivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); }
Execute the sample client
Execute the following command from <EI_HOME>/sample/axis2Client
, to send an HTTP message to the WSO2 EI proxy service.
ant stockquote -Daddurl=http://localhost:8280/services/AMQPProducerSample -Dmode=placeorder
Analyzing the output
You will see that the http request is sent to the given proxy service and that it is forwarded to the RabbitMQ server via the RabbitMQ AMQP transport sender. You can view the messages received at the RabbitMQ queue in the RabbitMQ SimpleProducer console.
Remote Procedure Call(RPC) with RabbitMQ
You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.
The following diagram illustrates a remote procedure call scenario with RabbitMQ:
The remote procedure call works as follows:
- When WSO2 Enterprise Integrator(WSO2 EI) starts up, it creates an anonymous, exclusive callback queue.
- For a remote procedure call request, WSO2 EI sends a message with the following properties:
reply_to
: This is set to the callback queuecorrelation_id
: This is set to a unique value for every request.
- The request is then sent to the rpc_queue.
The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the WSO2 EI server, using the queue from the
reply_to
field with the samecorrelation_id
.- WSO2 EI waits for data on the reply_to queue. When a message appears, it checks the
correlation_id
property. If it matches the value from the request, it returns the response to the application.
The following is a sample proxy service named RabbitMQRPCProxy
that sends request-response messages using the RabbitMQ transport.
<proxy xmlns="http://ws.apache.org/ns/synapse" name="RabbitMQRPCProxy" startOnLoad="true" trace="enable" transports="http"> <description/> <target> <inSequence> <log level="full"> <property name="received" value="true"/> </log> <send> <endpoint> <address uri="rabbitmq://?rabbitmq.server.host.name=localhost&rabbitmq.server.port=5672&rabbitmq.server.user.name=guest&rabbitmq.server.password=guest&rabbitmq.queue.name=rpc_queue&rabbitmq.queue.routing.key=rpc_queue&rabbitmq.replyto.name=dummy"/> </endpoint> </send> </inSequence> <outSequence> <log level="full"> <property name="response" value="true"/> </log> <send/> </outSequence> </target> </proxy>
The following is the code for a sample RPC server:
package rpc; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; public class RPCServer { public static void main(String[] argv) { Connection connection = null; Channel channel; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("rpc_queue", false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml") .build(); response = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " + "xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" + " <soapenv:Header/>\n" + " <soapenv:Body>\n" + " <ser:placeOrder>\n" + " <!--Optional:-->\n" + " <ser:order>\n" + " <!--Optional:-->\n" + " <xsd:price>10</xsd:price>\n" + " <!--Optional:-->\n" + " <xsd:quantity>5</xsd:quantity>\n" + " <!--Optional:-->\n" + " <xsd:symbol>RMQ</xsd:symbol>\n" + " </ser:order>\n" + " </ser:placeOrder>\n" + " </soapenv:Body>\n" + "</soapenv:Envelope>"; String replyToQueue = props.getReplyTo(); System.out.println("Publishing to : " + replyToQueue); channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }