...
The Composed Msg. Processor EIP is used to process a composite message. It maintains the overall message flow when processing a message consisting of multiple elements, each of which might require different processing. The Composed Message Processor splits the message up, routes the sub-messages to the appropriate destinations, and re-then aggregates the responses back into a single message. For more information, refer to http://www.eaipatterns.com/DistributionAggregate.html.
...
Before digging into implementation details, let's take a look at the relationship between the example scenario and the Composed Msg. Processor EIP by comparing their core components.
Composed Msg. Processor EIP |
---|
(Figure 1) | Composed Msg. Processor Example Scenario (Figure 2) |
---|---|
New Order | Stock Quote Request |
Splitter | |
Router | Switch Mediator |
Widget/Gadget Inventory | Stock Quote Service Instance |
Aggregator | |
Validated Order | Aggregated Stock Quote Response |
Environment setup
- Download an and install the WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Getting Started Installation Guide in the WSO2 ESB documentation.
- Start two Sample Axis2 server instances in ports 9000 and 9001. For instructions, refer to the section Setting up the ESB Samples Setup - Starting Sample Back-End Servicesthe Axis2 server in the WSO2 ESB documentation.
...
Start the ESB server and log into its management console UI (https:
//localhost:9443/carbon
). In the management console, navigate navigate to the Main Menu, click Service Bus and then Source View menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view. Anchor step3 step3
Code Block | ||||
---|---|---|---|---|
| ||||
<?xml version="1.0" encoding="UTF-8"?> <definitions xmlns="http://ws.apache.org/ns/synapse"> <proxy name="ComposedMessageProxy" startOnLoad="true"> <target> <inSequence> <log level="full"/> <iterate xmlns:m0="http://services.samples" preservePayload="true" attachPath="//m0:getQuote" expression="//m0:getQuote/m0:request"> expression="//m0:getQuote/m0:request"><target> <target> <sequence> <sequence> <switch xmlns:m1="http://services.samples/xsd" source="//m1:symbol"> <case regex="IBM"> <send> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> </send> </case> <case regex="WSO2"> <send> <endpoint> <address uri="http://localhost:9001/services/SimpleStockQuoteService"/> </endpoint> </send> </case> <default> <drop/> </default> </switch> </sequence> </target> </iterate> </inSequence> <outSequence> <aggregate> <completeCondition> <messageCount/> </completeCondition> <onComplete xmlns:m0="http://services.samples" expression="//m0:getQuoteResponse"> <send/> </onComplete> </aggregate> </outSequence> </target> <publishWSDL uri="file:repositorysamples/samplesservice-bus/resources/proxy/sample_proxy_1.wsdl"/> </proxy> </definitions> |
Simulating the sample scenario
Send the following request using a SOAP client such as SoapUI.
Code Block | ||
---|---|---|
| ||
<soapenv:Envelope <sequence name="fault"> xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <log level="full"> <ser:getQuote> <property name="MESSAGE" value="Executing default "fault" sequence"/> <ser:request> <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/> <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/> <xsd:symbol>IBM</xsd:symbol> </log>ser:request> <ser:request> <drop/><xsd:symbol>WSO2</xsd:symbol> </sequence>ser:request> <ser:request> <sequence name="main"><xsd:symbol>IBM</xsd:symbol> <log</>ser:request> <drop</>ser:getQuote> </sequence>soapenv:Body> </definitions> |
Simulating the sample scenario
...
soapenv:Envelope>
|
Note that the three responses are merged together.
Code Block | ||
---|---|---|
| ||
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"> <soapenv:Body> <ns:getQuoteResponse xmlns:serns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:xsdax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>-2.3524349166988148</ax21:change> <ax21:earnings>13.039287427943174</ax21:earnings> <ax21:high>-147.82215416089954</ax21:high> <ax21:last>150.42944098590854</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST <soapenv:Header/> <soapenv:Body>2017</ax21:lastTradeTimestamp> <ax21:low>156.00309322666212</ax21:low> <ax21:marketCap>4.0704702811974496E7</ax21:marketCap> <ax21:name>IBM Company</ax21:name> <ser:getQuote><ax21:open>156.54563823303528</ax21:open> <ax21:peRatio>24.744940446245423</ax21:peRatio> <ser:request> <ax21:percentageChange>-1.4082628986156709</ax21:percentageChange> <ax21:prevClose>167.04515321757532</ax21:prevClose> <xsd <ax21:symbol>IBM</xsdax21:symbol> </ser:request><ax21:volume>19856</ax21:volume> </ns:return> </ns:getQuoteResponse> <ns:getQuoteResponse xmlns:ns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>4.338630296226518</ax21:change> <ax21:earnings>-9.981195004808995</ax21:earnings> <ax21:high>-93.23101400112802</ax21:high> <ax21:last>93.78054915571116</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST 2017</ax21:lastTradeTimestamp> <ax21:low>-91.67657115464598</ax21:low> <ax21:marketCap>1.2048601227631677E7</ax21:marketCap> <ax21:name>IBM Company</ax21:name> <ax21:open>97.71302391406073</ax21:open> <ser:request><ax21:peRatio>23.753214298084966</ax21:peRatio> <ax21:percentageChange>-4.997221061621991</ax21:percentageChange> <xsd:symbol>WSO2</xsd:symbol><ax21:prevClose>-86.8208598884415</ax21:prevClose> <ax21:symbol>IBM</ax21:symbol> <ax21:volume>7120</ax21:volume> </ser:request>/ns:return> </ns:getQuoteResponse> <ns:getQuoteResponse xmlns:ns="http://services.samples"> <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <ax21:change>-2.8758173622682595</ax21:change> <ax21:earnings>13.587800448945982</ax21:earnings> <ax21:high>-60.60915794654595</ax21:high> <ax21:last>61.521647372674394</ax21:last> <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST 2017</ax21:lastTradeTimestamp> <ax21:low>63.7318273151116</ax21:low> <ax21:marketCap>1319867.136948647</ax21:marketCap> <ax21:name>WSO2 Company</ax21:name> <ax21:open>64.03367826249405</ax21:open> <ax21:peRatio>23.802458318708982</ax21:peRatio> <ser:request><ax21:percentageChange>-4.319528792063117</ax21:percentageChange> <ax21:prevClose>66.57710830756369</ax21:prevClose> <xsd <ax21:symbol>IBM<symbol>WSO2</xsdax21:symbol> <ax21:volume>16990</ax21:volume> </serns:request>return> </serns:getQuote>getQuoteResponse> </soapenv:Body> </soapenv:Envelope> |
Note that the three responses are merged together.
How the implementation works
Let's investigate the elements of the ESB configuration in detail. The line numbers below are mapped with the ESB configuration illustrated in step 3 shown above.
- iterate [line 7 in ESB config] - The iterate Iterate mediator takes each child element of the element specified in its xPath XPath expression , and applies the sequence flow inside the iterator Iterate mediator. In this example, it takes each getQuote request specified in the incoming request to the ESB, and forwards this request to the target endpoint.
- switch [line 13 in ESB config] - Observes the message and filters out the message content to the given xPath XPath expression.
- case [line 14 in ESB config] - The filtered content will be tallied with the given regular expression.
- send [line 15 in ESB config] - When a matching case is found, the send Send mediator will route the message to a specified the endpoint indicated in the address uriURI.
- aggregate [line 37 in ESB config] - The Aggregate mediator aggregates merges together the response messages for requests made by the Iterate or Clone mediators. The completion condition specifies the minimum or maximum number of messages to be collected. When all messages are aggregated, the sequence inside
onComplete
is run.