The content in this documentation is for older versions of WSO2 products. For updated information on Enterprise Integration Patterns, go to the latest Micro Integrator documentation.
Aggregator
This section explains, through an example scenario, how the Aggregator EIP can be implemented using WSO2 ESB. The following topics are covered:
Introduction to Aggregator
The Aggregator EIP combines the results of individual, related messages so that they can be processed as a whole. It works as a stateful filter, collecting and storing individual messages until a complete set of related messages has been received. It then publishes a single message distilled from the individual messages. For more information, refer to http://www.eaipatterns.com/Aggregator.html.
Figure 1: Aggregator EIP
Example scenario
This example scenario demonstrates how WSO2 ESB can be used to send multiple requests and merge the responses. Assume the sender wants to get responses from multiple servers, and it sends a message to the ESB. The ESB will duplicate the request to three different instances of a sample Axis2 server using the Clone Mediator. The ESB will merge the responses received from each of the server instances using the Aggregate Mediator and send it back to the client.
The diagram below depicts how to simulate the example scenario using the WSO2 ESB.
Figure 2: Aggregator Example Scenario
Before digging into implementation details, let's take a look at the relationship between the example scenario and the Aggregator EIP by comparing their core components.
Aggregator EIP (Figure 1) | Aggregator Example Scenario (Figure 2) |
---|---|
Inventory Item | Simple Stock Quote Service Response |
Aggregator | Aggregate Mediator |
Inventory Order | WSO2 ESB Response |
Environment setup
- Download and install WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Installation Guide in the WSO2 ESB documentation.
- Start three Sample Axis2 server instances on ports 9000, 9001, and 9002. For instructions, refer to the section Setting up the ESB Samples - Starting the Axis2 server in the WSO2 ESB documentation.
ESB configuration
Start the ESB server and log into its management console UI (https:
//localhost:9443/carbon
). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.
<definitions xmlns="http://ws.apache.org/ns/synapse"> <proxy name="AggregateMessageProxy" transports="http https" startOnLoad="true"> <target> <inSequence> <log level="full"/> <clone> <target> <endpoint name="ReceiverA"> <address uri="http://localhost:9000/services/SimpleStockQuoteService/"/> </endpoint> </target> <target> <endpoint name="ReceiverB"> <address uri="http://localhost:9001/services/SimpleStockQuoteService/"/> </endpoint> </target> <target> <endpoint name="ReceiverC"> <address uri="http://localhost:9002/services/SimpleStockQuoteService/"/> </endpoint> </target> </clone> </inSequence> <outSequence> <aggregate> <completeCondition> <messageCount/> </completeCondition> <onComplete xmlns:m0="http://services.samples" expression="//m0:getQuoteResponse"> <send/> </onComplete> </aggregate> </outSequence> </target> </proxy> <sequence name="fault"> <log level="full"> <property name="MESSAGE" value="Executing default "fault" sequence"/> <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/> <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/> </log> <drop/> </sequence> <sequence name="main"> <log/> <drop/> </sequence> </definitions>
Simulating the sample scenario
Send the following request to the ESB using a SOAP client like SoapUI.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <ser:request> <xsd:symbol>foo</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
- The user will be able to see the merged response for all three quotes made in the request as shown in the following example screen.
Following is the SOAP UI response:<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Body> <ns:getQuoteResponse xmlns:ns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>-2.449710302988639</ax21:change> <ax21:earnings>12.690017002832839</ax21:earnings> <ax21:high>79.38275695387269</ax21:high> <ax21:last>77.22850281442805</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp> <ax21:low>79.38308604492651</ax21:low> <ax21:marketCap>4.739596110875734E7</ax21:marketCap> <ax21:name>foo Company</ax21:name> <ax21:open>79.71513129360467</ax21:open> <ax21:peRatio>-19.10241171204308</ax21:peRatio> <ax21:percentageChange>-2.8140977078887763</ax21:percentageChange> <ax21:prevClose>87.05135916643378</ax21:prevClose> <ax21:symbol>foo</ax21:symbol> <ax21:volume>5669</ax21:volume> </ns:return> </ns:getQuoteResponse> <ns:getQuoteResponse xmlns:ns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>4.325567388568272</ax21:change> <ax21:earnings>13.917524756780233</ax21:earnings> <ax21:high>-150.03201351709225</ax21:high> <ax21:last>153.6310415907754</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp> <ax21:low>158.59314334689392</ax21:low> <ax21:marketCap>5.39033218230091E7</ax21:marketCap> <ax21:name>foo Company</ax21:name> <ax21:open>-149.91220476236305</ax21:open> <ax21:peRatio>23.560027115374034</ax21:peRatio> <ax21:percentageChange>-3.0061812398993424</ax21:percentageChange> <ax21:prevClose>-143.8891085859184</ax21:prevClose> <ax21:symbol>foo</ax21:symbol> <ax21:volume>8605</ax21:volume> </ns:return> </ns:getQuoteResponse> <ns:getQuoteResponse xmlns:ns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>-2.4406464308408315</ax21:change> <ax21:earnings>-8.597741661105367</ax21:earnings> <ax21:high>-54.30538809695713</ax21:high> <ax21:last>55.531927639640884</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp> <ax21:low>57.22511125047622</ax21:low> <ax21:marketCap>1.3145403912401013E7</ax21:marketCap> <ax21:name>foo Company</ax21:name> <ax21:open>-54.55876175775735</ax21:open> <ax21:peRatio>23.455303962825276</ax21:peRatio> <ax21:percentageChange>4.547031422081542</ax21:percentageChange> <ax21:prevClose>-53.67560072245006</ax21:prevClose> <ax21:symbol>foo</ax21:symbol> <ax21:volume>8464</ax21:volume> </ns:return> </ns:getQuoteResponse> </soapenv:Body> </soapenv:Envelope>
How the implementation works
Let's investigate the elements of the ESB configuration in detail. The line numbers below are mapped with the ESB configuration shown above.
- clone [line 9 in ESB config] - The Clone mediator is similar to the Splitter EIP. It clones the incoming request and passes the requests in parallel to several endpoints.
- aggregate [line 23 in ESB config] - The aggregate mediator aggregates response messages for requests made by the Iterate or Clone mediators. The completion condition specifies a minimum or maximum number of messages to be collected. When all messages have been aggregated, the sequence inside
onComplete
will be run.