Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This section explains, through an example scenario, how the Durable Subscriber EIP can be implemented using WSO2 ESB. The following topics are covered:

Table of Contents

...

Environment setup

  1. Download and install the WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Getting Started Installation Guide in the WSO2 ESB documentation.
  2. Start three sample Axis2 server instances on ports 9000 and 9001. For instructions, refer to see Setting Up the section ESB Samples Setup - Starting Sample Back-End Servicesthe Axis2 server in the WSO2 ESB documentation.

ESB configuration

Start the ESB server and log into its management console UI (https://localhost:9443/carbon). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.

Anchor
step3
step3

Code Block
languagehtml/xml
linenumberstrue
<!-- Durable Subscriber Proxy-->
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"/>
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> 
        <parameter name="cachableDuration">15000</parameter> 
   </registry>
   <proxy xmlns="http://ws.apache.org/ns/synapse"
    name="PublishProxy" startOnLoad="true" transports="http">
    transports="http"
    statistics="disable"
    trace="disable"
    startOnLoad="true">
         <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED"
        <clone>     value="true"
            <target sequencescope="DurableSubscriberaxis2"
             type="STRING"/>
               <clone>
          	        <target sequence="NonDurableSubscriberDurableSubscriber" />
                  <target sequence="NonDurableSubscriber"/>
               </clone>
          </inSequence>
          <outSequence>
             <drop />
          </outSequence>
          </target>
            <description/>
   </proxy>

    <!-- Error Sequences -->
    <sequence name = "sub1_fails" >
       <store messageStore="pending_subscriptions" /> 
    </sequence>
    <sequence name = "sub2_fails" >
         <drop/>
    </sequence>
    <!-- Subscription List-->
    <sequence name="DurableSubscriber" onError="sub1_fails" xmlns="http://ws.apache.org/ns/synapse">
        <in>
          <property name="OUT_ONLY" value="true"/>
          <send>
            <endpoint name="Subscriber 1">

                     <address uri="http://localhost:9000/services/SimpleStockQuoteService/"/>
   
            </endpoint>
          </send>
        </in>
    </sequence>
    <sequence name="NonDurableSubscriber" onError="sub2_fails" xmlns="http://ws.apache.org/ns/synapse">
        <in>
          <property name="OUT_ONLY" value="true"/>
      <send>    <send>
            <endpoint name="Subscriber 2">
   
                  <address uri="http://localhost:9001/services/SimpleStockQuoteService/"/>
                </endpoint>
          </send>

        </in>       
     </sequence>
    <!-- Re Direction End Points -->
    <endpoint name="DurableSubscriberEndpoint">  
        <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>  
    </endpoint>

    <!-- Message Store And Process -->
    <messageStore name="pending_subscriptions"/>
      
    <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="send_pending_message" 
    messageStore="pending_subscriptions"> 
        <parameter name="interval">1000</parameter> 
        <parameter name="max.delivery.attempts">50</parameter> 
        <parameter name="target.endpoint">DurableSubscriberEndpoint</parameter>
    </messageProcessor>       
</definitions>

Simulating the sample scenario

 Use a SOAP client like SoapUI to forward the following request to the PublishProxy service. 

Code Block
languagehtml/xml
<soapenv:Envelope xmlns:soapenv="http://wwwschemas.w3xmlsoap.org/2003soap/05envelope/soap-envelope" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
   <soapenv:Header/>
   <soapenv:Body>
      <ser:getQuote>placeOrder>
         <!--Optional:--><ser:order>
            <ser:request><xsd:price>10</xsd:price>
            <!--Optional:--><xsd:quantity>100</xsd:quantity>
            <ser<xsd:symbol>foo</serxsd:symbol>
         </ser:request>order>
      </ser:getQuote>placeOrder>
   </soapenv:Body>
</soapenv:Envelope>

...

How the implementation works

Let 's us investigate the elements of the ESB configuration in detail. The line numbers below refer to the ESB configuration shown above.

  • Proxy Service [line 6 in ESB config ] - A proxy service takes an incoming Stock Quote client request and clones the request by forwarding one copy each to two target sequences, DurableSubscriber and NonDurableSubscriber.
  • Sequence [line 29 in ESB config] - The DurableSubscriber sequence forwards the message to the Durable Endpoint. This endpoint has the onError attribute set to the sub1_fails sequence (line 21 in ESB config), which will store the message in case of a failure. 
  • Sequence [line 38 in ESB config] - The NonDurableSubscriber sequence works the same as the sequence described above. The only difference is that on failure, the sub2_fails sequence (line 25 in ESB config) is called, which simply drops the message. 
  • Sequence [line 21 in ESB config] - This sequence sets the target.endpoint property for the DurableEndpointSubscriber endpoint (defined on line 48 in ESB config), and uses a Store Mediator to define the message store used to save the message. In this example, it is the store with key pending_subscription. 
  • messageStore [line 53 in ESB config] - Defines a new message store with the name pending_subscriptions. 
  • messageProcessor [line 55 in ESB config] - The messageProcessor is used to define the type of processing done to a particular messageStore, which in this example is pending_subscription. This example defines a messageProcessor that uses a ScheduledMessageForwardingProcessor, which retries sending the messages every second with a maximum number of delivery attempts set to 50.