Here we will publish events using an JMS client to a JMS topic called AllStockQuotes and fired outputs of the bucket will be send to a JMS topic called FastMovingStockQuotes, which which will be received using another JMS client and log in console.
- Apache Ant to build & deploy the Sample & Service, and to run the client. Refer See Installation Prerequisites for instructions to install for instructions on installing Apache Ant.
Steps to configure the sample
The steps are as follows:
- Install the WSO2 Complex Event Processor, Refer to the . See the Installation Guide for for instructions.
- Now start the WSO2 the WSO2 Complex Event Processor. Refer to the See Running the Product for for instructions.
- Start WSO2 Message Broker with a port offset one (assuming setup is done on a single machine). Refer to the See Running WSO2 MB for instructions.
Then configure WSO2 Message Broker as the JMS Broker for CEP server as described in Using WSO2 MB as A JMS Broker for WSO2 CEP Server.
Info When CEP connects to MB, the
file needs to be updated with the following:system property -Dqpid.dest_syntax=BURL \
- Copy the above bucket configuration to
folder. Note that we have used the JMS Broker "MBJmsBroker" created at step 4 in the bucket configuration. - Now the bucket will be deployed at the CEP side. Please check the logs to verify.
Steps to run the sample
In order to run the above sample we have to do two things.
- Publish events to CEP server. We can do this by publishing events to the AllStockQuotes topic.
- Subscribe for events generated by CEP server according to the query we have defined in bucket configuration above. This can be done in two ways.
- Subscribe an JMS topic message subscriber to the topic FastMovingStockQuotes and get the events.
- Subscriber a web service client to the topic FastMovingStockQuotes and receive the events using a web service client instead of using a JMS client.
Publishing events to CEP server
We will use a JMS client for this purpose.
Following class will create Initial Context to run the event publisher client. Note that this class is used in event subscriber JMS client as well.
Code Block | ||
| ||
package org.wso2.cep.sample.jms.andes; import javax.jms.TopicConnectionFactory; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import java.util.Properties; public class JNDIContext { public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; private static final String CF_NAME_PREFIX = "connectionfactory."; private static final String CF_NAME = "ConnectionFactory"; private static final String userName = "admin"; private static final String password = "admin"; private static String CARBON_CLIENT_ID = "clientid"; private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; private static String CARBON_DEFAULT_HOSTNAME = "localhost"; private static String CARBON_DEFAULT_PORT = "5673"; private InitialContext initContext = null; private TopicConnectionFactory topicConnectionFactory = null; public static JNDIContext instance = null; private JNDIContext() { createInitialContext(); createConnectionFactory(); } public InitialContext getInitContext() { return initContext; } public TopicConnectionFactory getTopicConnectionFactory() { return topicConnectionFactory; } public static JNDIContext getInstance() { if (instance == null) { instance = new JNDIContext(); } return instance; } /** * Create Connection factory with initial context */ private void createConnectionFactory() { try { topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory"); } catch (NamingException e) { System.out.println("Can not create topic connection factory." + e); } } /** * Create Initial Context with given configuration */ private void createInitialContext() { try { Properties properties = new Properties(); properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); properties.put(CF_NAME_PREFIX + CF_NAME, getTCPConnectionURL(userName, password)); System.out.println("TCPConnectionURL: = " + getTCPConnectionURL(userName, password)); initContext = new InitialContext(properties); } catch (NamingException e) { System.out.println("Can not create initial context with given parameters." + e); } } public String getTCPConnectionURL(String username, String password) { return new StringBuffer() .append("amqp://").append(username).append(":").append(password) .append("@").append(CARBON_CLIENT_ID) .append("/").append(CARBON_VIRTUAL_HOST_NAME) .append("?brokerlist='tcp://").append(CARBON_DEFAULT_HOSTNAME).append(":").append(CARBON_DEFAULT_PORT).append("'") .toString(); } } |
Following class is the event publisher client. By running this class three events are generated and sent to the CEP server (Actually we publish events to the topic at WSO2 MB, which is registered at CEP server).
Code Block | ||
| ||
package org.wso2.cep.sample.jms.andes.xmlMessage; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.impl.builder.StAXOMBuilder; import org.apache.axiom.om.util.StAXUtils; import org.wso2.cep.sample.jms.andes.JNDIContext; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.naming.InitialContext; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamReader; import java.io.ByteArrayInputStream; public class AllStockQuotesPublisher { private static InitialContext initContext = null; private static TopicConnectionFactory topicConnectionFactory = null; public static void main(String[] args) throws XMLStreamException { String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>126.36 </quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>36.36</quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n" + " <quotedata:StockQuoteEvent>\n" + " <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n" + " <quotedata:LastTradeAmount>6.36</quotedata:LastTradeAmount>\n" + " <quotedata:StockChange>0.05</quotedata:StockChange>\n" + " <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n" + " <quotedata:DayHigh>25.46</quotedata:DayHigh>\n" + " <quotedata:DayLow>25.01</quotedata:DayLow>\n" + " <quotedata:StockVolume>20452658</quotedata:StockVolume>\n" + " <quotedata:PrevCls>25.31</quotedata:PrevCls>\n" + " <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n" + " <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n" + " <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n" + " <quotedata:PE>10.88</quotedata:PE>\n" + " <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n" + " <quotedata:QuoteError>false</quotedata:QuoteError>\n" + " </quotedata:StockQuoteEvent>\n" + " </quotedata:AllStockQuoteStream>"; initContext = JNDIContext.getInstance().getInitContext(); topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory(); AllStockQuotesPublisher publisher = new AllStockQuotesPublisher(); XMLStreamReader reader1 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement1.getBytes())); StAXOMBuilder builder1 = new StAXOMBuilder(reader1); OMElement OMMessage1 = builder1.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage1); XMLStreamReader reader2 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement2.getBytes())); StAXOMBuilder builder2 = new StAXOMBuilder(reader2); OMElement OMMessage2 = builder2.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage2); XMLStreamReader reader3 = StAXUtils.createXMLStreamReader(new ByteArrayInputStream( xmlElement3.getBytes())); StAXOMBuilder builder3 = new StAXOMBuilder(reader3); OMElement OMMessage3 = builder3.getDocumentElement(); publisher.publish("AllStockQuotes", OMMessage3); } /** * Publish message to given topic * * @param topicName - topic name to publish messages * @param message - message to send */ public void publish(String topicName, OMElement message) { // create topic connection TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.start(); } catch (JMSException e) { System.out.println("Can not create topic connection." + e); return; } // create session, producer, message and send message to given destination(topic) // OMElement message text is published here. Session session = null; try { session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); MessageProducer producer = session.createProducer(topic); TextMessage jmsMessage = session.createTextMessage(message.toString()); producer.send(jmsMessage); producer.close(); session.close(); topicConnection.stop(); topicConnection.close(); } catch (JMSException e) { System.out.println("Can not subscribe." + e); } } } |
Subscribing for filtered events and notifications from CEP server
a. Using a JMS client receiver
Following class acts as a JMS topic subscriber client. We register a subscription for filtered events we get from CEP triggered according to the query at bucket we have defined (Actually we are subscribing for a topic created at WSO2 Message broker, to which CEP will publish filtered events and notifications internally).
Code Block | ||
| ||
package org.wso2.cep.sample.jms.andes.xmlMessage; import org.wso2.cep.sample.jms.andes.JNDIContext; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.InitialContext; import javax.xml.stream.XMLStreamException; import java.util.Enumeration; public class FastMovingStockQuotesSubscriber implements MessageListener { private static InitialContext initContext = null; private static TopicConnectionFactory topicConnectionFactory = null; private boolean messageReceived = false; static String TOPIC = "FastMovingStockQuotes"; public static void main(String[] args) throws XMLStreamException { initContext = JNDIContext.getInstance().getInitContext(); topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory(); new FastMovingStockQuotesSubscriber().subscribe(TOPIC); } public void subscribe(String topicName) { // create connection TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); } catch (JMSException e) { System.out.println("Can not create topic connection." + e); return; } // create session, subscriber, message listener and listen on that topic TopicSession session = null; try { session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(this); topicConnection.start(); synchronized (this) { while (!messageReceived) { try { this.wait(); } catch (InterruptedException e) { } } } } catch (JMSException e) { System.out.println("Can not subscribe." + e); } } public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("output = " + textMessage.getText()); synchronized (this) { messageReceived = true; } } catch (JMSException e) { System.out.println("error at getting text out of received message. = " + e); } } else if (message instanceof MapMessage) { try { Enumeration enumeration = ((MapMessage) message).getMapNames(); for (; enumeration.hasMoreElements(); ) { System.out.println(((MapMessage) message).getString((String) enumeration.nextElement())); } } catch (JMSException e) { e.printStackTrace(); } } else { System.out.println("Received message is not a text/map message."); } } } |
b. Using a web service message receiver
Deploying receiver service
First we have to deploy a web service at any WSO2 Server which would act as the receiver web service for messages from CEP server. We will use CEP itself to deploy such a web service.
The steps are as follows:
- In a command prompt, switch to the FastMovingStockQuoteReceiverService services directory:
<CEP_HOME>/ samples/services/FastMovingStockQuoteReceiverService
For example, in Linux:cd <CEP_HOME> /samples/services/FastMovingStockQuoteReceiverService
- From there, type ant, this ant. This will deploy the FastMovingStockQuoteReceiverService in the FastMovingStockQuoteReceiverService in CEP itself.
You You can follow the server logs to check whether FastMovingStockQuoteReceiverServicewhether FastMovingStockQuoteReceiverService.arr has been properly deployed.
You You will also be able to see the axis2 service in the services list.
Configuring receiver service
We need to Configure to configure the FastMovingStockQuoteReceiverService in order to receive the output events emitted by the bucket the bucket under the FastMovingStockQuotes topic. Here we will be creating FastMovingStockQuotes topic in the WSO2 Message Broker and subscribe FastMovingStockQuoteReceiverService on subscribe FastMovingStockQuoteReceiverService on that topic.
The steps are as follows:
- Sign In. Enter your user name and password to log on to the Message Broker Management Console.
- Click on "Add" menu item under "Topics" Menu in Add under the Topics menu in the Manage section of the left panel.
- Specify the topic name in the topic input text box, in . In this case, the topic name is : "FastMovingStockQuotes" (the output topic) and click on '. Click Add Topic' button. This will add the topic . The topic is added to the server and you will be directed to the Topic Browser page.
- Once you click on the topic in the topic browser page, you will be able see four links as in the bellow image. Click in the subscribe Subscribe link and you will be directed to the Subscribe page.
Create subscription with the following details. Once you are done click the Subscribe button.
No Format topic : FastMovingStockQuotes (Output topic) subscription mode: Topic only subscription URL : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement expiration Time : select a future date from calender
Once you click on the "Subscribe" button, you will be directed to the Topic Browser page.
You can verify whether you have correctly subscribe subscribed to the topic by clicking on "the Details" link link of that topic in the topic browser page.
Once you click on that, you will be directed to the "topic details" page and there you will find all the subscriptions for that topic and its children (in this case it does not exists) and permission on that topic. Apart from that with the publish Publish section, you can publish a test XML message to that topic and check whether your subscription URL has been properly subscribed.
Running the Samples
The steps are as follows :
- If you are using a JMS subscriber client i.e "Using a JMS Client Receiver", build and run the topic subscriber class provided above. Else if you are using the "Web Service Message Receiver" above configurations under section (b) is enough.
- Run the JMS publisher client.
- It can be noticed that CEP analyzes the events we have published and fire outputs if the last traded amount vary by 2 percent with regards to the average traded price within past 2 minutes.
Observe the console where we are running the JMS subscriber client or server console where message receiving web service is deployed. You will see some logs like below.