Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With the shared subscription feature in JMS 2.0 you can overcome this restriction.  When shared subscription is used, a message that comes to a topic is forwarded to only one of the consumers. That is, if multiple JMS consumers subscribe to a JMS topic, consumers can share the messages that come to the topic. The advantage of shared topic subscription is that it allows to share the workload between consumers.

WSO2 ESB Enterprise Integrator (WSO2 EI) can be configured as a shared topic listener so that it can connect to a shared topic subscription as a message consumer (subscriber) to share workload between other consumers of the subscription. The following diagram illustrates this sample scenario:

...

To demonstrate the sample scenario, we will configure WSO2 ESB's JMS inbound endpoint in WSO2 EI as a shared topic listener using HornetQ as the message broker. This sample scenario includes the following sections:

Table of Contents
maxLevel3

Prerequisites

Configuring and starting the HornetQ message broker

  1. Open the <HornetQ_HOME>/config/stand-alone/non-clustered/hornetq-jms.xml file in a text editor and add the following configuration:

    Code Block
    languagexml
    <connection-factory name="QueueConnectionFactory">
          <xa>false</xa>
          <connectors>
             <connector-ref connector-name="netty"/>
          </connectors>
          <entries>
             <entry name="/QueueConnectionFactory"/>
          </entries>
    </connection-factory>
     
    <connection-factory name="TopicConnectionFactory">
          <xa>false</xa>
          <connectors>
             <connector-ref connector-name="netty"/>
          </connectors>
          <entries>
             <entry name="/TopicConnectionFactory"/>
          </entries>
    </connection-factory>
    
    <queue name="wso2">
          <entry name="/queue/mySampleQueue"/>
    </queue>
    
    <topic name="sampleTopic">
          <entry name="/topic/exampleTopic"/>
    </topic>
  2. Start the HornetQ message broker
    • To start the message broker on Linux, navigate to the <HornetQ_HOME>/bin/ directory and execute the run.sh command with root privileges.

Building the sample scenario

The XML configuration for this sample scenario is as follows:

Code Block
languagexml
<definitions xmlns="http://ws.apache.org/ns/synapse">
   <registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
      <parameter name="cachableDuration">15000</parameter>
   </registry>
   <taskManager provider="org.wso2.carbon.mediation.ntask.NTaskTaskManager">
      <parameter name="cachableDuration">15000</parameter>
   </taskManager>
   <sequence name="request" onError="fault">
      <log level="full"/>
      <drop/>
   </sequence>
   <sequence name="fault">
      <log level="full">
         <property name="MESSAGE" value="Executing default &#34;fault&#34; sequence"/>
         <property name="ERROR_CODE" expression="get-property('ERROR_CODE')"/>
         <property name="ERROR_MESSAGE" expression="get-property('ERROR_MESSAGE')"/>
      </log>
      <drop/>
   </sequence>
   <sequence name="main">
      <log level="full"/>
      <drop/>
   </sequence>
   <inboundEndpoint name="jms_inbound"
                    sequence="request"
                    onError="fault"
                    protocol="jms"
                    suspend="false">
      <parameters>
         <parameter name="interval">1000</parameter>
         <parameter name="transport.jms.Destination">/topic/exampleTopic</parameter>
         <parameter name="transport.jms.CacheLevel">3</parameter>
         <parameter name="transport.jms.ConnectionFactoryJNDIName">TopicConnectionFactory</parameter>
         <parameter name="sequential">true</parameter>
         <parameter name="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</parameter>
         <parameter name="java.naming.provider.url">jnp://localhost:1099</parameter>
         <parameter name="transport.jms.SessionAcknowledgement">AUTO_ACKNOWLEDGE</parameter>
         <parameter name="transport.jms.SessionTransacted">false</parameter>
         <parameter name="transport.jms.ConnectionFactoryType">topic</parameter>
         <parameter name="transport.jms.JMSSpecVersion">2.0</parameter>
         <parameter name="transport.jms.SharedSubscription">true</parameter>
         <parameter name="transport.jms.DurableSubscriberName">mySubscription</parameter>
      </parameters>
   </inboundEndpoint>
</definitions>

To build the sample

  • Start the ESB WSO2 EI with the sample configuration.

Executing the sample scenario

  • Run the following java file:

    Code Block
    languagejava
    titleTopicConsumer.java
    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicConsumer {
    	private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
    	private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
    	private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
    	private static final String PROVIDER_URL = "jnp://localhost:1099";
    	private static final String SUBSCRIPTION_NAME = "mySubscription";
    
    	public static void main(final String[] args) {
    		try {
    			runExample();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	public static void runExample() throws Exception {
    		Connection connection = null;
    		Context initialContext = null;
    		try {
    			// /Step 1. Create an initial context to perform the JNDI lookup.
    			final Properties env = new Properties();
    			env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
    			env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
    			initialContext = new InitialContext(env);
    
    			// Step 2. perform a lookup on the topic
    			Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
    			// Step 3. perform a lookup on the Connection Factory
    			ConnectionFactory cf =
    			                       (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
    			// Step 4. Create a JMS Connection
    			connection = cf.createConnection();
    
    			// Step 5. Create a JMS Session
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// Step 6. Create a JMS Message Consumer
    			MessageConsumer messageConsumer =
    			                                  session.createSharedConsumer(topic, SUBSCRIPTION_NAME);
    
    			// Step 7. Start the Connection
    			connection.start();
    			System.out.println("Shared message consumer started on topic: " + DEFAULT_DESTINATION +
    			                   "\n");
    
    			// Step 8. Receive the message
    			TextMessage messageReceived = null;
    			while (true) {
    				messageReceived = (TextMessage) messageConsumer.receive();
    				System.out.println("Consumer received message: " + messageReceived.getText() + "\n");
    			}
    
    		} finally {
    
    			// Step 9. Close JMS resources
    			if (connection != null) {
    				connection.close();
    			}
    
    			// Also the initialContext
    			if (initialContext != null) {
    				initialContext.close();
    			}
    		}
    	}
    }

    This acts as the shared topic subscriber with WSO2 ESB's EI inbound endpoint.

  • Run the following java file to publish 5 messages to the HornetQ topic:

    Code Block
    languagejava
    titleTopicPublisher.java
    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicPublisher {
    	private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
    	private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
    	private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
    	private static final String PROVIDER_URL = "jnp://localhost:1099";
    	// Set up all the default values
    	private static final String param = "IBM";
    
    	public static void main(final String[] args) {
    		try {
    			runExample();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	public static boolean runExample() throws Exception {
    		Connection connection = null;
    		Context initialContext = null;
    		try {
    			// /Step 1. Create an initial context to perform the JNDI lookup.
    			// Set up the namingContext for the JNDI lookup
    			final Properties env = new Properties();
    			env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
    			env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
    			initialContext = new InitialContext(env);
    
    			// Step 2. perform a lookup on the topic
    			Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
    			// Step 3. perform a lookup on the Connection Factory
    			ConnectionFactory cf =
    					(ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
    			// Step 4. Create a JMS Connection
    			connection = cf.createConnection();
    
    			// Step 5. Create a JMS Session
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// Step 6. Create a Message Producer
    			MessageProducer producer = session.createProducer(topic);
    			System.out.println("Publishing 5 messages to topic/exampleTopic");
    			for (int i = 0; i < 5; i++) {
    
    				// Step 7. Create a Text Message
    				TextMessage message = session.createTextMessage(getMessage());
    
    				// Step 8. Send the Message
    				producer.send(message);
    			}
    			return true;
    		} finally {
    
    			// Step 9. Close JMS resources
    			if (connection != null) {
    				connection.close();
    			}
    
    			// Also the initialContext
    			if (initialContext != null) {
    				initialContext.close();
    			}
    		}
    	}
    
    	private static double getRandom(double base, double varience, boolean onlypositive) {
    		double rand = Math.random();
    		return (base + (rand > 0.5 ? 1 : -1) * varience * base * rand) *
    				(onlypositive ? 1 : rand > 0.5 ? 1 : -1);
    	}
    
    	private static String getMessage() {
    		return "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
    				"   <soapenv:Header/>\n" + "<soapenv:Body>\n" +
    				"<m:placeOrder xmlns:m=\"http://services.samples\">\n" + "    <m:order>\n" +
    				"        <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" +
    				"        <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" +
    				"        <m:symbol>" + param + "</m:symbol>\n" + "    </m:order>\n" +
    				"</m:placeOrder>" + "   </soapenv:Body>\n" + "</soapenv:Envelope>";
    	}
    }

Analyzing the output

You will see that 5 messages are shared between the inbound listener and TopicConsumer.java. This is because both the ESB WSO2 EI inbound listener and TopicConsumer.java are configured as shared subscribers.

...