Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To demonstrate the sample scenario, we will configure WSO2 ESB's JMS inbound endpoint as a shared topic listener using HornetQ as the message broker. This sample scenario includes the following sections:

...

  • Run the following java file:

    Code Block
    languagejava
    titleTopicConsumer.java
    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicConsumer {
    	private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
    	private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
    	private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
    	private static final String PROVIDER_URL = "jnp://localhost:1099";
    	private static final String SUBSCRIPTION_NAME = "mySubscription";
    
    	public static void main(final String[] args) {
    		try {
    			runExample();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	public static void runExample() throws Exception {
    		Connection connection = null;
    		Context initialContext = null;
    		try {
    			// /Step 1. Create an initial context to perform the JNDI lookup.
    			final Properties env = new Properties();
    			env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
    			env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
    			initialContext = new InitialContext(env);
    
    			// Step 2. perform a lookup on the topic
    			Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
    			// Step 3. perform a lookup on the Connection Factory
    			ConnectionFactory cf =
    			                       (ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
    			// Step 4. Create a JMS Connection
    			connection = cf.createConnection();
    
    			// Step 5. Create a JMS Session
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// Step 6. Create a JMS Message Consumer
    			MessageConsumer messageConsumer =
    			                                  session.createSharedConsumer(topic, SUBSCRIPTION_NAME);
    
    			// Step 7. Start the Connection
    			connection.start();
    			System.out.println("Shared message consumer started on topic: " + DEFAULT_DESTINATION +
    			                   "\n");
    
    			// Step 8. Receive the message
    			TextMessage messageReceived = null;
    			while (true) {
    				messageReceived = (TextMessage) messageConsumer.receive();
    				System.out.println("Consumer received message: " + messageReceived.getText() + "\n");
    			}
    
    		} finally {
    
    			// Step 9. Close JMS resources
    			if (connection != null) {
    				connection.close();
    			}
    
    			// Also the initialContext
    			if (initialContext != null) {
    				initialContext.close();
    			}
    		}
    	}
    }

    This acts as the shared topic subscriber with WSO2 ESB's inbound endpoint.

  • Run the following java file to publish 5 messages to the HornetQ topic:

    Code Block
    languagejava
    titleTopicPublisher.java
    package SharedTopicSubscribe;
    
    import java.util.Properties;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    
    public class TopicPublisher {
    	private static final String DEFAULT_CONNECTION_FACTORY = "TopicConnectionFactory";
    	private static final String DEFAULT_DESTINATION = "/topic/exampleTopic";
    	private static final String INITIAL_CONTEXT_FACTORY = "org.jnp.interfaces.NamingContextFactory";
    	private static final String PROVIDER_URL = "jnp://localhost:1099";
    	// Set up all the default values
    	private static final String param = "IBM";
    
    	public static void main(final String[] args) {
    		try {
    			runExample();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	public static boolean runExample() throws Exception {
    		Connection connection = null;
    		Context initialContext = null;
    		try {
    			// /Step 1. Create an initial context to perform the JNDI lookup.
    			// Set up the namingContext for the JNDI lookup
    			final Properties env = new Properties();
    			env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
    			env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
    			initialContext = new InitialContext(env);
    
    			// Step 2. perform a lookup on the topic
    			Topic topic = (Topic) initialContext.lookup(DEFAULT_DESTINATION);
    
    			// Step 3. perform a lookup on the Connection Factory
    			ConnectionFactory cf =
    					(ConnectionFactory) initialContext.lookup(DEFAULT_CONNECTION_FACTORY);
    
    			// Step 4. Create a JMS Connection
    			connection = cf.createConnection();
    
    			// Step 5. Create a JMS Session
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// Step 6. Create a Message Producer
    			MessageProducer producer = session.createProducer(topic);
    			System.out.println("Publishing 5 messages to topic/exampleTopic");
    			for (int i = 0; i < 5; i++) {
    
    				// Step 7. Create a Text Message
    				TextMessage message = session.createTextMessage(getMessage());
    
    				// Step 8. Send the Message
    				producer.send(message);
    			}
    			return true;
    		} finally {
    
    			// Step 9. Close JMS resources
    			if (connection != null) {
    				connection.close();
    			}
    
    			// Also the initialContext
    			if (initialContext != null) {
    				initialContext.close();
    			}
    		}
    	}
    
    	private static double getRandom(double base, double varience, boolean onlypositive) {
    		double rand = Math.random();
    		return (base + (rand > 0.5 ? 1 : -1) * varience * base * rand) *
    				(onlypositive ? 1 : rand > 0.5 ? 1 : -1);
    	}
    
    	private static String getMessage() {
    		return "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
    				"   <soapenv:Header/>\n" + "<soapenv:Body>\n" +
    				"<m:placeOrder xmlns:m=\"http://services.samples\">\n" + "    <m:order>\n" +
    				"        <m:price>" + getRandom(100, 0.9, true) + "</m:price>\n" +
    				"        <m:quantity>" + (int) getRandom(10000, 1.0, true) + "</m:quantity>\n" +
    				"        <m:symbol>" + param + "</m:symbol>\n" + "    </m:order>\n" +
    				"</m:placeOrder>" + "   </soapenv:Body>\n" + "</soapenv:Envelope>";
    	}
    }

Analyzing the output

 

 

 

 

 You will see that 5 messages are shared between the inbound listener and TopicConsumer.java. This is because both the ESB inbound listener and TopicConsumer.java are configured as shared subscribers.

The total number of consumed messages between the inbound listener and TopicConsumer.java will be equal to the number messages published by TopicPublisher.java.