The content in this documentation is for older versions of WSO2 products. For updated information on Enterprise Integration Patterns, go to the latest Micro Integrator documentation.

Aggregator

This section explains, through an example scenario, how the Aggregator EIP can be implemented using WSO2 ESB. The following topics are covered:

Introduction to Aggregator

The Aggregator EIP combines the results of individual, related messages so that they can be processed as a whole. It works as a stateful filter, collecting and storing individual messages until a complete set of related messages has been received. It then publishes a single message distilled from the individual messages. For more information, refer to http://www.eaipatterns.com/Aggregator.html.

Figure 1: Aggregator EIP

Example scenario

This example scenario demonstrates how WSO2 ESB can be used to send multiple requests and merge the responses. Assume the sender wants to get responses from multiple servers, and it sends a message to the ESB. The ESB will duplicate the request to three different instances of a sample Axis2 server using the Clone Mediator. The ESB will merge the responses received from each of the server instances using the Aggregate Mediator and send it back to the client.

The diagram below depicts how to simulate the example scenario using the WSO2 ESB.

Figure 2: Aggregator Example Scenario

Before digging into implementation details, let's take a look at the relationship between the example scenario and the Aggregator EIP by comparing their core components.

Aggregator EIP (Figure 1)Aggregator Example Scenario (Figure 2)
Inventory ItemSimple Stock Quote Service Response
AggregatorAggregate Mediator
Inventory OrderWSO2 ESB Response

Environment setup

  1. Download and install WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Installation Guide in the WSO2 ESB documentation.
  2. Start three Sample Axis2 server instances on ports 9000, 9001, and 9002. For instructions, refer to the section Setting up the ESB Samples - Starting the Axis2 server in the WSO2 ESB documentation.

ESB configuration

Start the ESB server and log into its management console UI (https: //localhost:9443/carbon ). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.

<definitions xmlns="http://ws.apache.org/ns/synapse">
   <proxy name="AggregateMessageProxy"
          transports="http https"
          startOnLoad="true">
      <target>
         <inSequence>
            <log level="full"/>
            <clone>
               <target>
                  <endpoint name="ReceiverA">
                     <address uri="http://localhost:9000/services/SimpleStockQuoteService/"/>
                  </endpoint>
               </target>
               <target>
                  <endpoint name="ReceiverB">
                     <address uri="http://localhost:9001/services/SimpleStockQuoteService/"/>
                  </endpoint>
               </target>
               <target>
                  <endpoint name="ReceiverC">
                     <address uri="http://localhost:9002/services/SimpleStockQuoteService/"/>
                  </endpoint>
               </target>
            </clone>
         </inSequence>
         <outSequence>
            <aggregate>
               <completeCondition>
                  <messageCount/>
               </completeCondition>
               <onComplete xmlns:m0="http://services.samples" expression="//m0:getQuoteResponse">
                  <send/>
               </onComplete>
            </aggregate>
         </outSequence>
      </target>
   </proxy>
   <sequence name="fault">
      <log level="full">
         <property name="MESSAGE" value="Executing default &#34;fault&#34; sequence"/>
         <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
         <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
      </log>
      <drop/>
   </sequence>
   <sequence name="main">
      <log/>
      <drop/>
   </sequence>
</definitions>

Simulating the sample scenario

  1. Send the following request to the ESB using a SOAP client like SoapUI.

    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
    
       <soapenv:Header/>
       <soapenv:Body>
          <ser:getQuote>     
             <ser:request>           
                <xsd:symbol>foo</xsd:symbol>
             </ser:request>        
          </ser:getQuote>
       </soapenv:Body>
    
    </soapenv:Envelope>
  2. The user will be able to see the merged response for all three quotes made in the request as shown in the following example screen.

    SOAP UI response
    Following is the SOAP UI response:

    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
       <soapenv:Body>
          <ns:getQuoteResponse xmlns:ns="http://services.samples">
             <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                <ax21:change>-2.449710302988639</ax21:change>
                <ax21:earnings>12.690017002832839</ax21:earnings>
                <ax21:high>79.38275695387269</ax21:high>
                <ax21:last>77.22850281442805</ax21:last>
                <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp>
                <ax21:low>79.38308604492651</ax21:low>
                <ax21:marketCap>4.739596110875734E7</ax21:marketCap>
                <ax21:name>foo Company</ax21:name>
                <ax21:open>79.71513129360467</ax21:open>
                <ax21:peRatio>-19.10241171204308</ax21:peRatio>
                <ax21:percentageChange>-2.8140977078887763</ax21:percentageChange>
                <ax21:prevClose>87.05135916643378</ax21:prevClose>
                <ax21:symbol>foo</ax21:symbol>
                <ax21:volume>5669</ax21:volume>
             </ns:return>
          </ns:getQuoteResponse>
          <ns:getQuoteResponse xmlns:ns="http://services.samples">
             <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                <ax21:change>4.325567388568272</ax21:change>
                <ax21:earnings>13.917524756780233</ax21:earnings>
                <ax21:high>-150.03201351709225</ax21:high>
                <ax21:last>153.6310415907754</ax21:last>
                <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp>
                <ax21:low>158.59314334689392</ax21:low>
                <ax21:marketCap>5.39033218230091E7</ax21:marketCap>
                <ax21:name>foo Company</ax21:name>
                <ax21:open>-149.91220476236305</ax21:open>
                <ax21:peRatio>23.560027115374034</ax21:peRatio>
                <ax21:percentageChange>-3.0061812398993424</ax21:percentageChange>
                <ax21:prevClose>-143.8891085859184</ax21:prevClose>
                <ax21:symbol>foo</ax21:symbol>
                <ax21:volume>8605</ax21:volume>
             </ns:return>
          </ns:getQuoteResponse>
          <ns:getQuoteResponse xmlns:ns="http://services.samples">
             <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                <ax21:change>-2.4406464308408315</ax21:change>
                <ax21:earnings>-8.597741661105367</ax21:earnings>
                <ax21:high>-54.30538809695713</ax21:high>
                <ax21:last>55.531927639640884</ax21:last>
                <ax21:lastTradeTimestamp>Wed Nov 29 12:59:06 IST 2017</ax21:lastTradeTimestamp>
                <ax21:low>57.22511125047622</ax21:low>
                <ax21:marketCap>1.3145403912401013E7</ax21:marketCap>
                <ax21:name>foo Company</ax21:name>
                <ax21:open>-54.55876175775735</ax21:open>
                <ax21:peRatio>23.455303962825276</ax21:peRatio>
                <ax21:percentageChange>4.547031422081542</ax21:percentageChange>
                <ax21:prevClose>-53.67560072245006</ax21:prevClose>
                <ax21:symbol>foo</ax21:symbol>
                <ax21:volume>8464</ax21:volume>
             </ns:return>
          </ns:getQuoteResponse>
       </soapenv:Body>
    </soapenv:Envelope>

How the implementation works

Let's investigate the elements of the ESB configuration in detail. The line numbers below are mapped with the ESB configuration shown above.

  • clone [line 9 in ESB config] - The Clone mediator is similar to the Splitter EIP. It clones the incoming request and passes the requests in parallel to several endpoints.   
  • aggregate [line 23 in ESB config] - The aggregate mediator aggregates response messages for requests made by the Iterate or Clone mediators. The completion condition specifies a minimum or maximum number of messages to be collected. When all messages have been aggregated, the sequence inside onComplete will be run.