This section explains, through an example scenario, how the Durable Subscriber EIP can be implemented using WSO2 ESB. The following topics are covered:
Introduction to Durable Subscriber
The Durable Subscriber EIP avoids missing messages while it’s not listening for them. It makes the messaging system save messages published while the subscriber is disconnected. This pattern is similar to the Publish-Subscribe EIP, which temporarily stores a message if a subscriber is offline at the time a message is published, and sends the message when it gets online again. For more information, refer to http://www.eaipatterns.com/DurableSubscription.html.
Figure 1: Durable Subscriber EIP
Example scenario
This example scenario demonstrates how a message is duplicated and routed to the subscribers using the Clone mediator when the publisher sends a message. We have two Axis2 servers as the subscribers. If only one subscriber is online at the time a message is sent, instead of discarding the message, it will be stored in a message store. The message forwarding processor will attempt to send the message in the store until the subscriber comes online. When the subscriber comes online, the message will be successfully delivered.
The diagram below depicts how to simulate the example scenario using the WSO2 ESB.
Figure 2: Example Scenario of the Durable Subscriber EIP
Before digging into implementation details, let's take a look at the relationship between the example scenario and the Durable Subscriber EIP by comparing their core components.
Durable Subscriber EIP (Figure 1) | Durable Subscriber Example Scenario (Figure 2) |
---|---|
Publisher | Simple Stock Quote Client |
Publish Subscribe Channel | Clone Mediator, Message Store, Message Processor |
Durable Consumer | Simple Stock Quote Service |
Non Durable Consumer | Simple Stock Quote Service |
Environment setup
- Download and install the WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Getting Started in the WSO2 ESB documentation.
- Start three sample Axis2 server instances on ports 9000 and 9001. For instructions, refer to the section ESB Samples Setup - Starting Sample Back-End Services in the WSO2 ESB documentation.
ESB configuration
Start the ESB server and log into its management console UI (https:
//localhost:9443/carbon
). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.
<!-- Durable Subscriber Proxy--> <definitions xmlns="http://ws.apache.org/ns/synapse"> <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"/> <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> <parameter name="cachableDuration">15000</parameter> </registry> <proxy name="PublishProxy" startOnLoad="true" transports="http"> <target> <inSequence> <clone> <target sequence="DurableSubscriber" /> <target sequence="NonDurableSubscriber" /> </clone> </inSequence> <outSequence> <drop /> </outSequence> </target> </proxy> <!-- Error Sequences --> <sequence name = "sub1_fails" > <store messageStore="pending_subscriptions" /> </sequence> <sequence name = "sub2_fails" > <drop/> </sequence> <!-- Subscription List--> <sequence name="DurableSubscriber" onError="sub1_fails"> <in> <send> <endpoint name="Subscriber 1"> <address uri="http://localhost:9000/services/SimpleStockQuoteService/"/> </endpoint> </send> </in> </sequence> <sequence name="NonDurableSubscriber" onError="sub2_fails"> <in> <send> <endpoint name="Subscriber 2"> <address uri="http://localhost:9001/services/SimpleStockQuoteService/"/> </endpoint> </send> </in> </sequence> <!-- Re Direction End Points --> <endpoint name="DurableSubscriberEndpoint"> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> <!-- Message Store And Process --> <messageStore name="pending_subscriptions"/> <messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="send_pending_message" messageStore="pending_subscriptions"> <parameter name="interval">1000</parameter> <parameter name="max.deliver.attempts">50</parameter> <parameter name="target.endpoint">DurableSubscriberEndpoint</parameter> </messageProcessor> </definitions>
Simulating the sample scenario
Use a SOAP client like SoapUI to forward the following request to the PublishProxy service.
<soapenv:Envelope xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <ser:symbol>foo</ser:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
Note that both Axis2 servers receive the request. Next, stop the Axis2 server running on port 9000 (the Durable Subscriber) and resend the request again. Note that the Durable Subscriber will not receive the request. Start the Axis2 server on port 9000. Note that the previously undelivered message will be delivered. We can do the same for the Axis2 server running on port 9001 (the Non Durable Subscriber). In that case, when the server is back on, any previously undelivered messages will not be received.
How the implementation works
Let's investigate the elements of the ESB configuration in detail. The line numbers below refer to the ESB configuration shown above.
- Proxy Service [line 6 in ESB config ] - A proxy service takes an incoming Stock Quote client request and clones the request by forwarding one copy each to two target sequences,
DurableSubscriber
andNonDurableSubscriber.
- Sequence [line 29 in ESB config] - The
DurableSubscriber
sequence forwards the message to the Durable Endpoint. This endpoint has theonError
attribute set to thesub1_fails
sequence (line 21 in ESB config), which will store the message in case of a failure. - Sequence [line 38 in ESB config] - The
NonDurableSubscriber
sequence works the same as the sequence described above. The only difference is that on failure, thesub2_fails
sequence (line 25 in ESB config) is called, which simply drops the message. - Sequence [line 21 in ESB config] - This sequence sets the
target.endpoint
property for theDurableEndpointSubscriber
endpoint (defined on line 48 in ESB config), and uses a Store Mediator to define the message store used to save the message. In this example, it is the store with keypending_subscription
. - messageStore [line 53 in ESB config] - Defines a new message store with the name
pending_subscriptions.
- messageProcessor [line 55 in ESB config] - The
messageProcessor
is used to define the type of processing done to a particularmessageStore
, which in this example ispending_subscription
. This example defines amessageProcessor
that uses aScheduledMessageForwardingProcessor
, which retries sending the messages every second with a maximum number of delivery attempts set to 50.