This section explains, through an example scenario, how the Durable Subscriber EIP can be implemented using WSO2 ESB. The following topics are covered:
...
Environment setup
- Download and install the WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Getting Started Installation Guide in the WSO2 ESB documentation.
- Start three sample Axis2 server instances on ports 9000 and 9001. For instructions, refer to see Setting Up the section ESB Samples Setup - Starting Sample Back-End Servicesthe Axis2 server in the WSO2 ESB documentation.
ESB configuration
Start the ESB server and log into its management console UI (https:
). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.
Code Block |
---|
language | html/xml |
---|
linenumbers | true |
---|
|
<!-- Durable Subscriber Proxy-->
<definitions xmlns="http://ws.apache.org/ns/synapse">
<taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"/>
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="PublishProxy" startOnLoad="true" transports="http">
transports="http"
statistics="disable"
trace="disable"
startOnLoad="true">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED"
<clone> value="true"
<target sequencescope="DurableSubscriberaxis2"
type="STRING"/>
<clone>
<target sequence="NonDurableSubscriberDurableSubscriber" />
<target sequence="NonDurableSubscriber"/>
</clone>
</inSequence>
<outSequence>
<drop />
</outSequence>
</target>
<description/>
</proxy>
<!-- Error Sequences -->
<sequence name = "sub1_fails" >
<store messageStore="pending_subscriptions" />
</sequence>
<sequence name = "sub2_fails" >
<drop/>
</sequence>
<!-- Subscription List-->
<sequence name="DurableSubscriber" onError="sub1_fails" xmlns="http://ws.apache.org/ns/synapse">
<in>
<property name="OUT_ONLY" value="true"/>
<send>
<endpoint name="Subscriber 1">
<address uri="http://localhost:9000/services/SimpleStockQuoteService/"/>
</endpoint>
</send>
</in>
</sequence>
<sequence name="NonDurableSubscriber" onError="sub2_fails" xmlns="http://ws.apache.org/ns/synapse">
<in>
<property name="OUT_ONLY" value="true"/>
<send> <send>
<endpoint name="Subscriber 2">
<address uri="http://localhost:9001/services/SimpleStockQuoteService/"/>
</endpoint>
</send>
</in>
</sequence>
<!-- Re Direction End Points -->
<endpoint name="DurableSubscriberEndpoint">
<address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
</endpoint>
<!-- Message Store And Process -->
<messageStore name="pending_subscriptions"/>
<messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="send_pending_message"
messageStore="pending_subscriptions">
<parameter name="interval">1000</parameter>
<parameter name="max.delivery.attempts">50</parameter>
<parameter name="target.endpoint">DurableSubscriberEndpoint</parameter>
</messageProcessor>
</definitions> |
Simulating the sample scenario
Use a SOAP client like SoapUI to forward the following request to the PublishProxy service.
Code Block |
---|
|
<soapenv:Envelope xmlns:soapenv="http://wwwschemas.w3xmlsoap.org/2003soap/05envelope/soap-envelope" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
<soapenv:Header/>
<soapenv:Body>
<ser:getQuote>placeOrder>
<!--Optional:--><ser:order>
<ser:request><xsd:price>10</xsd:price>
<!--Optional:--><xsd:quantity>100</xsd:quantity>
<ser<xsd:symbol>foo</serxsd:symbol>
</ser:request>order>
</ser:getQuote>placeOrder>
</soapenv:Body>
</soapenv:Envelope> |
...
How the implementation works
Let 's us investigate the elements of the ESB configuration in detail. The line numbers below refer to the ESB configuration shown above.
- Proxy Service [line 6 in ESB config ] - A proxy service takes an incoming Stock Quote client request and clones the request by forwarding one copy each to two target sequences,
DurableSubscriber
and NonDurableSubscriber.
- Sequence [line 29 in ESB config] - The
DurableSubscriber
sequence forwards the message to the Durable Endpoint. This endpoint has the onError
attribute set to the sub1_fails
sequence (line 21 in ESB config), which will store the message in case of a failure. - Sequence [line 38 in ESB config] - The
NonDurableSubscriber
sequence works the same as the sequence described above. The only difference is that on failure, the sub2_fails
sequence (line 25 in ESB config) is called, which simply drops the message. - Sequence [line 21 in ESB config] - This sequence sets the
target.endpoint
property for the DurableEndpointSubscriber
endpoint (defined on line 48 in ESB config), and uses a Store Mediator to define the message store used to save the message. In this example, it is the store with key pending_subscription
. - messageStore [line 53 in ESB config] - Defines a new message store with the name
pending_subscriptions.
- messageProcessor [line 55 in ESB config] - The
messageProcessor
is used to define the type of processing done to a particular messageStore
, which in this example is pending_subscription
. This example defines a messageProcessor
that uses a ScheduledMessageForwardingProcessor
, which retries sending the messages every second with a maximum number of delivery attempts set to 50.