Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This section explains, through an example scenario, how the Durable Subscriber EIP can be implemented using WSO2 ESB. The following topics are covered:

Table of Contents

...

Code Block
languagehtml/xml
linenumberstrue
<!-- Durable Subscriber Proxy-->
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"/>
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> 
        <parameter name="cachableDuration">15000</parameter> 
   </registry>
   <proxy xmlns="http://ws.apache.org/ns/synapse"
    name="PublishProxy" startOnLoad="true" transports="http
    transports="http"
    statistics="disable"
    trace="disable"
    startOnLoad="true">
         <target>
         <inSequence>
            <property name="FORCE_SC_ACCEPTED"
  <clone>           value="true"
      <target sequence       scope="DurableSubscriberaxis2"
             type="STRING"/>
               <clone>
                 	 <target sequence="NonDurableSubscriberDurableSubscriber" />
                  <target sequence="NonDurableSubscriber"/>
               </clone>
          </inSequence>
          <outSequence>
             <drop />
          </outSequence>
          </target>
            <description/>
   </proxy>

    <!-- Error Sequences -->
    <sequence name = "sub1_fails" >
       <store messageStore="pending_subscriptions" /> 
    </sequence>
    <sequence name = "sub2_fails" >
         <drop/>
    </sequence>
    <!-- Subscription List-->
    <sequence name="DurableSubscriber" onError="sub1_fails" xmlns="http://ws.apache.org/ns/synapse">
        <in>
          <property name="OUT_ONLY" value="true"/>
          <send>
            <endpoint name="Subscriber 1">
    
                 <address uri="http://localhost:9000/services/SimpleStockQuoteService/"/>

               </endpoint>
          </send>
        </in>
    </sequence>
    <sequence name="NonDurableSubscriber" onError="sub2_fails" xmlns="http://ws.apache.org/ns/synapse">
        <in>
        <send>  <property name="OUT_ONLY" value="true"/>
          <send>
            <endpoint name="Subscriber 2">
                      <address uri="http://localhost:9001/services/SimpleStockQuoteService/"/>
  
             </endpoint>
          </send>
         </in>     
       </sequence>
    <!-- Re Direction End Points -->
    <endpoint name="DurableSubscriberEndpoint">  
        <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>  
    </endpoint>

    <!-- Message Store And Process -->
    <messageStore name="pending_subscriptions"/>
      
    <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="send_pending_message" 
    messageStore="pending_subscriptions"> 
        <parameter name="interval">1000</parameter> 
        <parameter name="max.delivery.attempts">50</parameter> 
        <parameter name="target.endpoint">DurableSubscriberEndpoint</parameter>
    </messageProcessor>       
</definitions>

Simulating the sample scenario

 Use a SOAP client like SoapUI to forward the following request to the PublishProxy service. 

Code Block
languagehtml/xml
<soapenv:Envelope xmlns:soapenv="http://wwwschemas.w3xmlsoap.org/2003soap/05envelope/soap-envelope" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
   <soapenv:Header/>
   <soapenv:Body>
      <ser:getQuote>placeOrder>
         <!--Optional:--><ser:order>
            <ser:request><xsd:price>10</xsd:price>
            <!--Optional:--><xsd:quantity>100</xsd:quantity>
            <ser<xsd:symbol>foo</serxsd:symbol>
         </ser:request>order>
      </ser:getQuote>placeOrder>
   </soapenv:Body>
</soapenv:Envelope>

...

  • Proxy Service [line 6 in ESB config ] - A proxy service takes an incoming Stock Quote client request and clones the request by forwarding one copy each to two target sequences, DurableSubscriber and NonDurableSubscriber.
  • Sequence [line 29 in ESB config] - The DurableSubscriber sequence forwards the message to the Durable Endpoint. This endpoint has the onError attribute set to the sub1_fails sequence (line 21 in ESB config), which will store the message in case of a failure. 
  • Sequence [line 38 in ESB config] - The NonDurableSubscriber sequence works the same as the sequence described above. The only difference is that on failure, the sub2_fails sequence (line 25 in ESB config) is called, which simply drops the message. 
  • Sequence [line 21 in ESB config] - This sequence sets the target.endpoint property for the DurableEndpointSubscriber endpoint (defined on line 48 in ESB config), and uses a Store Mediator to define the message store used to save the message. In this example, it is the store with key pending_subscription. 
  • messageStore [line 53 in ESB config] - Defines a new message store with the name pending_subscriptions. 
  • messageProcessor [line 55 in ESB config] - The messageProcessor is used to define the type of processing done to a particular messageStore, which in this example is pending_subscription. This example defines a messageProcessor that uses a ScheduledMessageForwardingProcessor, which retries sending the messages every second with a maximum number of delivery attempts set to 50.