...
With the shared subscription feature in JMS 2.0 you can overcome this restriction. When shared subscription is used, a message that comes to a topic is forwarded to only one of the consumers. That is, if multiple JMS consumers subscribe to a JMS topic, consumers can share the messages that come to the topic. The advantage of shared topic subscription is that it allows to share the workload between consumers.
WSO2 ESB Enterprise Integrator (WSO2 EI) can be configured as a shared topic listener so that it can connect to a shared topic subscription as a message consumer (subscriber) to share workload between other consumers of the subscription. The following diagram illustrates this sample scenario:
...
To demonstrate the sample scenario, we will configure WSO2 ESB's JMS inbound endpoint in WSO2 EI as a shared topic listener using HornetQ as the message broker. This sample scenario includes the following sections:
Table of Contents | ||
---|---|---|
|
Prerequisites
- Download the hornetq-all-new.jar and copy it to the
<ESB <EI_HOME>/repository/components/lib/
directory. - Replace the
geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
file in the<ESB<EI_HOME>/lib/endorsed/
directory withjavax.jms-api-2.0.1.jar
- Download HornetQ from http://hornetq.jboss.org/downloads and extract the tar.gz.
Configuring and starting the HornetQ message broker
Open the
<HornetQ_HOME>/config/stand-alone/non-clustered/hornetq-jms.xml
file in a text editor and add the following configuration:Code Block language xml <connection-factory name="QueueConnectionFactory"> <xa>false</xa> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="/QueueConnectionFactory"/> </entries> </connection-factory> <connection-factory name="TopicConnectionFactory"> <xa>false</xa> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="/TopicConnectionFactory"/> </entries> </connection-factory> <queue name="wso2"> <entry name="/queue/mySampleQueue"/> </queue> <topic name="sampleTopic"> <entry name="/topic/exampleTopic"/> </topic>
- Start the HornetQ message broker
- To start the message broker on Linux, navigate to the
<HornetQ_HOME>/bin/
directory and execute therun.sh
command with root privileges.
- To start the message broker on Linux, navigate to the
Building the sample scenario
The XML configuration for this sample scenario is as follows:
Code Block | ||
---|---|---|
| ||
<definitions xmlns="http://ws.apache.org/ns/synapse"> <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry"> <parameter name="cachableDuration">15000</parameter> </registry> <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager"> <parameter name="cachableDuration">15000</parameter> </taskManager> <sequence name="request" onError="fault"> <log level="full"/> <drop/> </sequence> <sequence name="fault"> <log level="full"> <property name="MESSAGE" value="Executing default "fault" sequence"/> <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/> <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/> </log> <drop/> </sequence> <sequence name="main"> <log level="full"/> <drop/> </sequence> <inboundEndpoint name="jms_inbound" sequence="request" onError="fault" protocol="jms" suspend="false"> <parameters> <parameter name="interval">1000</parameter> <parameter name="transport.jms.Destination">/topic/exampleTopic</parameter> <parameter name="transport.jms.CacheLevel">3</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter> <parameter name="sequential">true</parameter> <parameter name="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</parameter> <parameter name="java.naming.provider.url">jnp://localhost:1099</parameter> <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter> <parameter name="transport.jms.SessionTransacted">false</parameter> <parameter name="transport.jms.ConnectionFactoryType">topic</parameter> <parameter name="transport.jms.JMSSpecVersion">2.0</parameter> <parameter name="transport.jms.SharedSubscription">true</parameter> <parameter name="transport.jms.DurableSubscriberName">mySubscription</parameter> </parameters> </inboundEndpoint> </definitions> |
To build the sample
- Start the ESB WSO2 EI with the sample configuration.
Executing the sample scenario
Run the following java file:
Code Block language java title TopicConsumer.java package SharedTopicSubscribe; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; public class TopicConsumer { private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory"; private static final String DEFAULT_DESTINATION = "/topic/exampleTopic"; private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory"; private static final String PROVIDER_URL = "jnp://localhost:1099"; private static final String SUBSCRIPTION_NAME = "mySubscription"; public static void main(final String[] args) { try { runExample(); } catch (Exception e) { e.printStackTrace(); } } public static void runExample() throws Exception { Connection connection = null; Context initialContext = null; try { // /Step 1. Create an initial context to perform the JNDI lookup. final Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL)); initialContext = new InitialContext(env); // Step 2. perform a lookup on the topic Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION); // Step 3. perform a lookup on the Connection Factory ConnectionFactory cf = (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY); // Step 4. Create a JMS Connection connection = cf.createConnection(); // Step 5. Create a JMS Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Step 6. Create a JMS Message Consumer MessageConsumer messageConsumer = session.createSharedConsumer(topic, SUBSCRIPTION_NAME); // Step 7. Start the Connection connection.start(); System.out.println("Shared message consumer started on topic: " + DEFAULT_DESTINATION + "\n"); // Step 8. Receive the message TextMessage messageReceived = null; while (true) { messageReceived = (TextMessage) messageConsumer.receive(); System.out.println("Consumer received message: " + messageReceived.getText() + "\n"); } } finally { // Step 9. Close JMS resources if (connection != null) { connection.close(); } // Also the initialContext if (initialContext != null) { initialContext.close(); } } } }
This acts as the shared topic subscriber with WSO2 ESB's EI inbound endpoint.
Run the following java file to publish 5 messages to the HornetQ topic:
Code Block language java title TopicPublisher.java package SharedTopicSubscribe; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.Context; import javax.naming.InitialContext; public class TopicPublisher { private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory"; private static final String DEFAULT_DESTINATION = "/topic/exampleTopic"; private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory"; private static final String PROVIDER_URL = "jnp://localhost:1099"; // Set up all the default values private static final String param = "IBM"; public static void main(final String[] args) { try { runExample(); } catch (Exception e) { e.printStackTrace(); } } public static boolean runExample() throws Exception { Connection connection = null; Context initialContext = null; try { // /Step 1. Create an initial context to perform the JNDI lookup. // Set up the namingContext for the JNDI lookup final Properties env = new Properties(); env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL)); initialContext = new InitialContext(env); // Step 2. perform a lookup on the topic Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION); // Step 3. perform a lookup on the Connection Factory ConnectionFactory cf = (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY); // Step 4. Create a JMS Connection connection = cf.createConnection(); // Step 5. Create a JMS Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Step 6. Create a Message Producer MessageProducer producer = session.createProducer(topic); System.out.println("Publishing 5 messages to topic/exampleTopic"); for (int i = 0; i < 5; i++) { // Step 7. Create a Text Message TextMessage message = session.createTextMessage(getMessage()); // Step 8. Send the Message producer.send(message); } return true; } finally { // Step 9. Close JMS resources if (connection != null) { connection.close(); } // Also the initialContext if (initialContext != null) { initialContext.close(); } } } private static double getRandom(double base, double varience, boolean onlypositive) { double rand = Math.random(); return (base + (rand > 0.5 ? 1 : -1) * varience * base * rand) * (onlypositive ? 1 : rand > 0.5 ? 1 : -1); } private static String getMessage() { return "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" + " <soapenv:Header/>\n" + "<soapenv:Body>\n" + "<m:placeOrder xmlns:m=\"http://services.samples\">\n" + " <m:order>\n" + " <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" + " <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" + " <m:symbol>" + param + "</m:symbol>\n" + " </m:order>\n" + "</m:placeOrder>" + " </soapenv:Body>\n" + "</soapenv:Envelope>"; } }
Analyzing the output
You will see that 5 messages are shared between the inbound listener and TopicConsumer.java
. This is because both the ESB WSO2 EI inbound listener and TopicConsumer.java
are configured as shared subscribers.
...