The content in this documentation is for older versions of WSO2 products. For updated information on Enterprise Integration Patterns, go to the latest Micro Integrator documentation.
Resequencer
Currently, you can use this resequencer pattern only through the Management Console of WSO2 Enterprise Service Bus (ESB) 4.9.0 via the WSO2-CARBON-PATCH-4.4.0-1935
WUM Update. For more information on how to update your WSO2 product distribution using WUM, see Updating WSO2 Products. In the future, this will be implemented with Tooling support in later WSO2 ESB versions.
The following topics explain, through an example scenario, how you can implement the Resequencer EIP using WSO2 ESB.
Introduction to Resequencer
The Resequencer EIP puts a stream of related but out-of-sequence messages back into the correct order. It collects and reorders messages so that they can be published to the output channel in a specific order. For more information, go to Enterprise Integration Patterns Documentation.
Example scenario
Messages in the ESB usually take different mediation paths due to constructs like routers and filters. The time taken to process each message varies depending on the characteristics of each router. Some messages might arrive earlier than others, which can be disadvantageous in situations where the order of message delivery is important. To overcome this issue, WSO2 ESB introduces this resequencer EIP.
This example scenario demonstrates WSO2 ESB collecting incoming messages using a message store and resequencing or reordering them based on a definitive sequence number derived from some part of the message.
The diagram below depicts how to simulate the example scenario using the WSO2 ESB.
The following is a comparison of the core components of the pattern explaining the relationship between this example scenario and the Resequencer EIP.
Resequencer EIP | Resequencer Example Secnario |
---|---|
Incoming messages | Simple Stock requests |
Resequencer | Resequence Message Store with any Message Processor |
Outgoing messages | Simple Stock Quote requests |
The ESB configuration
Given below is the ESB configuration for simulating the example scenario explained above.
<definiitons> <messageStore xmlns="http://ws.apache.org/ns/synapse" class="org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore" name="RStore"> <parameter name="store.resequence.timeout">-1</parameter> <parameter name="store.producer.guaranteed.delivery.enable">false</parameter> <parameter name="store.failover.message.store.name">RStore</parameter> <parameter xmlns:m0="http://services.samples" name="store.resequence.id.path" expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/> <parameter name="store.jdbc.password">root</parameter> <parameter name="store.jdbc.driver">com.mysql.jdbc.Driver</parameter> <parameter name="store.jdbc.username">root</parameter> <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/resequenceDB</parameter> <parameter name="store.jdbc.table">tbl_resequence</parameter> </messageStore> <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" messageStore="RStore" name="ScheduledProcessor" targetEndpoint="StockQuoteServiceEp"> <parameter name="max.delivery.drop">Disabled</parameter> <parameter name="client.retry.interval">1000</parameter> <parameter name="max.delivery.attempts">5</parameter> <parameter name="member.count">1</parameter> <parameter name="interval">1000</parameter> <parameter name="throttle">false</parameter> <parameter name="target.endpoint">StockQuoteServiceEp</parameter> <parameter name="message.processor.fault.sequence">fault</parameter> <parameter name="is.active">true</parameter> </messageProcessor> <endpoint name="StockQuoteServiceEp"> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> <proxy xmlns="http://ws.apache.org/ns/synapse" name="MessageStoreProxy" startOnLoad="true" statistics="disable" trace="disable" transports="https,http"> <target> <inSequence> <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/> <property name="OUT_ONLY" value="true"/> <log level="custom"> <property name="Message Store Invoked" value="true"/> </log> <log level="full"/> <store messageStore="RStore"/> </inSequence> </target> <publishWSDL uri="http://localhost:9000/services/SimpleStockQuoteService?wsdl"/> <description/> </proxy> </definitions>
The configuration elements
The elements used in the above ESB configuration are explained below.
<proxy>
- This is the proxy service that should be invoked to execute the configuration.<inSequence>
- A message is first received by the proxy service, and then directed to this sequence.<outSequence>
- This sequence is triggered after the execution of the <inSequence>.<messageStore>
- This collects and re-orders the stored messages based on a defined sequence.<messageProcessor>
- This listens to the Message Store and dispatches the message to the specified endpoint.<endpoint>
- The destination to which, the re-ordered the message will be sent.
Simulating the example scenario
The following sections demonstrate how you can try out the example scenario explained above.
Setting up the environment
You need to set up the ESB, and the back-end service:
To deploy the back-end service SimpleStockQuoteService, navigate to the
<ESB_HOME>/samples/axis2Server/src/SimpleStockQuoteService
directory and execute theant
command in a new Console tab.For more information on deploying sample back-end services, go to Deploying sample back-end services in WSO2 ESB documentation.
To start an Axis2 server instance, navigate to the
<ESB_HOME>/samples/axis2Server
directory and execute the corresponding command in a Console tab.- On Windows:
axis2server.bat
On Linux/Solaris:
./axis2server.sh
Start only one instance of the back-end Axis2 service (Stock Quote Service) to simulate this example.
- On Windows:
- Follow the steps below to set up the database.
Add the relevant database drivers into the
<ESB_HOME>/repository/components/lib
directory.Name the database as
resequenceDB
.If you are setting up a MySQL database, execute the following DB script, to create the required table in the database.
You can create a similar script based on the database you want to set up.
CREATE TABLE `tbl_lastprocessid` ( `statement` varchar(20) NOT NULL, `seq_id` bigint(20) NOT NULL, PRIMARY KEY (`statement`), KEY `seq_id` (`seq_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `tbl_resequence` ( `indexId` bigint(20) NOT NULL AUTO_INCREMENT, `msg_id` varchar(200) NOT NULL, `seq_id` bigint(20) NOT NULL, `message` blob NOT NULL, PRIMARY KEY (`indexId`), KEY `msg_id` (`msg_id`), KEY `seq_id` (`seq_id`) ) ENGINE=InnoDB AUTO_INCREMENT=59 DEFAULT CHARSET=utf8;
Download the
ResquencerCapp_1.0.0.car
file, and upload it to WSO2 ESB.Run the WSO2 ESB server. For instructions, see Running WSO2 ESB via Tooling in WSO2 ESB Documentation.
Log in to the Management Console, click Main, and then click Add in the Carbon Applications menu.
- Browse and select the
ResquencerCapp_1.0.0.car
file you downloaded, and click Upload.
Creating the artifacts
You can skip this section if you are using the ResquencerCapp_1.0.0.car
file to try out this example scenario. This file includes these artifacts, and thereby, you need not create the artifacts manually.
Follow the steps below to create the artifacts of the ESB configuration above.
Use the configuration below to create a Resequence Message Store. For instructions, see Resequence Message Store in WSO2 ESB Documentation.
<?xml version="1.0" encoding="UTF-8"?> <messageStore xmlns="http://ws.apache.org/ns/synapse" class="org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore" name="RStore"> <parameter name="store.resequence.timeout">-1</parameter> <parameter name="store.producer.guaranteed.delivery.enable">false</parameter> <parameter name="store.failover.message.store.name">RStore</parameter> <parameter xmlns:m0="http://services.samples" name="store.resequence.id.path" expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/> <parameter name="store.jdbc.password">root</parameter> <parameter name="store.jdbc.driver">com.mysql.jdbc.Driver</parameter> <parameter name="store.jdbc.username">root</parameter> <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/resequenceDB</parameter> <parameter name="store.jdbc.table">tbl_resequence</parameter> </messageStore>
Use the configuration below to create a Scheduled Message Forwarding Processor, which will listen to the Resequence Message Store and will dispatch the message to the relevant endpoint. For instructions, see Scheduled Message Forwarding Processor in WSO2 ESB Documentation.
Specify the vale of the
member.count
property as 1. Else, if multiple message processors query from the Store in parallel, messages will be duplicated. Also, it affects the re-sequencing order.<messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" messageStore="RStore" name="ScheduledProcessor" targetEndpoint="StockQuoteServiceEp"> <parameter name="max.delivery.drop">Disabled</parameter> <parameter name="client.retry.interval">1000</parameter> <parameter name="max.delivery.attempts">5</parameter> <parameter name="member.count">1</parameter> <parameter name="interval">1000</parameter> <parameter name="throttle">false</parameter> <parameter name="target.endpoint">StockQuoteServiceEp</parameter> <parameter name="message.processor.fault.sequence">fault</parameter> <parameter name="is.active">true</parameter> </messageProcessor>
Use the configuration below to create an Address Endpoint to which, the ordered message should be dispatched. For instructions, see Address Endpoint in WSO2 ESB Documentation.
<endpoint name="StockQuoteServiceEp"> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint>
Use the configuration below to create a Custom Proxy Service, which will accept incoming requests and add messages into the Resequence Message Store. For instructions, see Adding a Proxy Service in WSO2 ESB Documentation.
<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="MessageStoreProxy" startOnLoad="true" statistics="disable" trace="disable" transports="https,http"> <target> <inSequence> <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/> <property name="OUT_ONLY" value="true"/> <log level="custom"> <property name="Message Store Invoked" value="true"/> </log> <log level="full"/> <store messageStore="RStore"/> </inSequence> </target> <publishWSDL uri="http://localhost:9000/services/SimpleStockQuoteService?wsdl"/> <description/> </proxy>
The above proxy service accepts the incoming messages and inserts them into the Resequencing Message Store, which will re-order the messages if they are not in order. The incoming message expected by the proxy service will look like below.
<?xml version="1.0" encoding="utf-8"?> <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Header xmlns:wsa="http://www.w3.org/2005/08/addressing"> <wsa:To>http://localhost:8280/services/MessageStoreProxy</wsa:To> <wsa:ReplyTo> <wsa:Address>http://www.w3.org/2005/08/addressing/none</wsa:Address> </wsa:ReplyTo> <wsa:MessageID>urn:uuid:82532168-db9f-4d4b-a48d-1727af42935e</wsa:MessageID> <wsa:Action>urn:placeOrder</wsa:Action> </soapenv:Header> <soapenv:Body> <m0:placeOrder xmlns:m0="http://services.samples"> <m0:order> <m0:price>70.74859146053822</m0:price> <m0:quantity>5815</m0:quantity> <m0:symbol>WSO2-3</m0:symbol> </m0:order> </m0:placeOrder> </soapenv:Body> </soapenv:Envelope>
Executing the sample and analysing the output
Follow the steps below to send a request to the ESB using the Stock Quote Client application. For more information on the Stock Quote Client, go to the ESB documentation.
- Open a new terminal, and navigate to the
<ESB_HOME>/samples/axis2Client/
directory. The Stock Quote client application is stored in this directory. Execute the following command to send the first request to the ESB.
ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-1
You view the following output printed in the Axis2 Server Console:
samples.services.SimpleStockQuoteService :: Accepted order #xx for : xx stocks of WSO2-1 at $ xxxxxx
Execute the following command to send the second request to the ESB.
ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-4
You will not view any output printed in the Axis2 Server Console.
This is because it is the fourth request you sent, which is denoted by
-Dsymbol=WSO2-4
, and thereby, the Store waits the time defined for the second message denoted by-Dsymbol=WSO2-2
to arrive.Execute the following command to send the third request to the ESB.
ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-3
You will not view any output printed in the Axis2 Server Console.
This is because it is the third request you sent, which is denoted by
-Dsymbol=WSO2-3
, and thereby, the Store waits the time defined for the second message denoted by-Dsymbol=WSO2-2
to arrive.Execute the following command to send the fourth request to the ESB.
ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-2
You view the following output printed, based on the proper sequence in the Axis2 Server Console:
samples.services.SimpleStockQuoteService :: Accepted order #xx for : xx stocks of WSO2-1 at $ xxxxxx samples.services.SimpleStockQuoteService :: Accepted order #xx for : xx stocks of WSO2-2 at $ xxxxxx samples.services.SimpleStockQuoteService :: Accepted order #xx for : xx stocks of WSO2-3 at $ xxxxxx samples.services.SimpleStockQuoteService :: Accepted order #xx for : xx stocks of WSO2-4 at $ xxxxxx