This section explains, through an example scenario, how the Durable Subscriber EIP can be implemented using WSO2 ESB. The following topics are covered:
...
Code Block |
---|
language | html/xml |
---|
linenumbers | true |
---|
|
<!-- Durable Subscriber Proxy-->
<definitions xmlns="http://ws.apache.org/ns/synapse">
<taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"/>
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="PublishProxy" startOnLoad="true" transports="http
transports="http"
statistics="disable"
trace="disable"
startOnLoad="true">
<target>
<inSequence>
<property name="FORCE_SC_ACCEPTED"
<clone> value="true"
<target sequence scope="DurableSubscriberaxis2"
type="STRING"/>
<clone>
<target sequence="NonDurableSubscriberDurableSubscriber" />
<target sequence="NonDurableSubscriber"/>
</clone>
</inSequence>
<outSequence>
<drop />
</outSequence>
</target>
<description/>
</proxy>
<!-- Error Sequences -->
<sequence name = "sub1_fails" >
<store messageStore="pending_subscriptions" />
</sequence>
<sequence name = "sub2_fails" >
<drop/>
</sequence>
<!-- Subscription List-->
<sequence name="DurableSubscriber" onError="sub1_fails" xmlns="http://ws.apache.org/ns/synapse">
<in>
<property name="OUT_ONLY" value="true"/>
<send>
<endpoint name="Subscriber 1">
<address uri="http://localhost:9000/services/SimpleStockQuoteService/"/>
</endpoint>
</send>
</in>
</sequence>
<sequence name="NonDurableSubscriber" onError="sub2_fails" xmlns="http://ws.apache.org/ns/synapse">
<in>
<send> <property name="OUT_ONLY" value="true"/>
<send>
<endpoint name="Subscriber 2">
<address uri="http://localhost:9001/services/SimpleStockQuoteService/"/>
</endpoint>
</send>
</in>
</sequence>
<!-- Re Direction End Points -->
<endpoint name="DurableSubscriberEndpoint">
<address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
</endpoint>
<!-- Message Store And Process -->
<messageStore name="pending_subscriptions"/>
<messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="send_pending_message"
messageStore="pending_subscriptions">
<parameter name="interval">1000</parameter>
<parameter name="max.delivery.attempts">50</parameter>
<parameter name="target.endpoint">DurableSubscriberEndpoint</parameter>
</messageProcessor>
</definitions> |
Simulating the sample scenario
Use a SOAP client like SoapUI to forward the following request to the PublishProxy service.
Code Block |
---|
|
<soapenv:Envelope xmlns:soapenv="http://wwwschemas.w3xmlsoap.org/2003soap/05envelope/soap-envelope" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
<soapenv:Header/>
<soapenv:Body>
<ser:getQuote>placeOrder>
<!--Optional:--><ser:order>
<ser:request><xsd:price>10</xsd:price>
<!--Optional:--><xsd:quantity>100</xsd:quantity>
<ser<xsd:symbol>foo</serxsd:symbol>
</ser:request>order>
</ser:getQuote>placeOrder>
</soapenv:Body>
</soapenv:Envelope> |
...
- Proxy Service [line 6 in ESB config ] - A proxy service takes an incoming Stock Quote client request and clones the request by forwarding one copy each to two target sequences,
DurableSubscriber
and NonDurableSubscriber.
- Sequence [line 29 in ESB config] - The
DurableSubscriber
sequence forwards the message to the Durable Endpoint. This endpoint has the onError
attribute set to the sub1_fails
sequence (line 21 in ESB config), which will store the message in case of a failure. - Sequence [line 38 in ESB config] - The
NonDurableSubscriber
sequence works the same as the sequence described above. The only difference is that on failure, the sub2_fails
sequence (line 25 in ESB config) is called, which simply drops the message. - Sequence [line 21 in ESB config] - This sequence sets the
target.endpoint
property for the DurableEndpointSubscriber
endpoint (defined on line 48 in ESB config), and uses a Store Mediator to define the message store used to save the message. In this example, it is the store with key pending_subscription
. - messageStore [line 53 in ESB config] - Defines a new message store with the name
pending_subscriptions.
- messageProcessor [line 55 in ESB config] - The
messageProcessor
is used to define the type of processing done to a particular messageStore
, which in this example is pending_subscription
. This example defines a messageProcessor
that uses a ScheduledMessageForwardingProcessor
, which retries sending the messages every second with a maximum number of delivery attempts set to 50.