This documentation is for WSO2 Complex Event Processor 2.0.0. View documentation for the latest release.

All Stocks Analyzer

In this example, we output the the last 10 min average and sum of all stock quotes.  This sample is implemented using JMS broker.


Configure and run ActiveMQ

Step 1:

Download the ActiveMQ from "http://activemq.apache.org/activemq-543-release.html ".
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin

For more information on installing ActiveMQ refer "Installing Apache ActiveMQ" under Installation Prerequisites.

Step 2:

  Download CEP server and install. Do not start the server.  Refer to the Installation Guide for instructions .

  Step 3: 

 Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to wso2cep-2.0.0/samples/lib directory. 
 Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from apache-activemq-xxx/lib
 to wso2cep-2.0.0/repository/components/lib directory.  

Step 4:

Run WSO2 CEP. Refer to the Installing the Product for instructions .

Create JMS Broker

Before creating the bucket to filter stock quotes and twitter feeds, it is essential to have a Carbon broker. Since in this example we are going to use JMS Broker, it is needed to create a broker with type jms. For more information about brokers visit here.


  Step 1 :

Start CEP Server and log in as admin.  Refer to the  Installing the Product for instructions.


  Step 2 :

  In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.

  Step 3 :

You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a jms broker.

Broker Name  : activemqJmsBroker
Broker Type  : jms
JNDI Name    : org.apache.activemq.jndi.ActiveMQInitialContextFactory
User Name    : admin
Password     : admin
Provider URL : tcp://localhost:61616 

  Finally click on Add Broker button and you will get the added broker to the list of available brokers.  

Create Bucket with Siddhi

  Use the "Add" menu in CEP Buckets to add buckets. Use following information and create the bucket. If you need more information about buckets visit here.

 

Section 1 : Basic Information


Bucket Name (Name of the bucket)               : AllStocksAnalyzer
Description (Description about the bucket)     : This bucket analyzes stock quotes and outputs the last 10min average, sum of all stock quotes.

Engine Provider(CEP Runtime engine to be used) : SiddhiCEPRuntime [Choose from the drop down]

Section 2 : Inputs


  Defining Inputs : Click on Add Input link and it will provide a form to define inputs. You can define the sample input by entering following values.  

Topic (topic for the events be received)  : AllStockQuotes  
Broker Name (Broker to be used)           : activemqJmsBroker 
Mapping Stream (Name of the event stream) : allStockQuotes 
Query Event Type                          : Tuple
Input Mapping Type                        : Map Mapping
Defining input properties 
Property
	Name       : symbol 
	Input Name : symbol
	type       : java.lang.String [Choose from the drop down ]

Property
	Name       : price 
	Input Name : price
	type       : java.lang.Double [Choose from the drop down]
 
Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added
After filling all required fields click on the add button to add Input to the Bucket. Once you clicked the input form will disappear and the added input will be appeared in Inputs table.

 

Section 3 : Queries


Query Name (To identify the query) : AllStocksQuery 

Expression :    from allStockQuotes#window.time(600000)
		        insert into fastMovingStockQuotes
		        symbol,price, avg(price) as averagePrice, sum(price) as sumPrice 

 Defining the Output : Here we are defining the topic which filtered events are published and the structure of the output xml. 

 Topic(topic which filtered event published) : AllStockStats

 BrokerName (Broker to be used)              : activemqJmsBroker

 Output Mapping : XML Mapping 

 XML Mapping Text:  <quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/">
			<quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
			<quotedata:price>{price}</quotedata:price>
			<quotedata:averagePrice>{averagePrice}</quotedata:averagePrice>
			<quotedata:sumPrice>{sumPrice}</quotedata:sumPrice>
                    </quotedata:StockQuoteDataEvent>


After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table. 
As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

Invoking Deployed Bucket

 It is needed to have a JMS Publisher and a JMS Subscriber to invoke the created bucket.

Creating JMS Publisher

  JMS ActiveMQ Broker is used in this sample to publish and subscribe events.  

  To create the JMS Publisher, it is necessary to have following classes and property files in your project.    

  • JNDI Properties
  • JNDIContext.java
  • AllStockQuotesPublisher.java

    jndi.properties    

 java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
#java.naming.factory.initial =org.apache.qpid.jndi.PropertiesFileInitialContextFactory

#use the following property to configure the default connector
#connectionfactory.ConnectionFactory=amqp://admin:admin@clientid/carbon?brokerlist='tcp://localhost:5673'

# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactory 

     

JNDI Context
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JNDIContext {
	private InitialContext initContext = null;
	private TopicConnectionFactory topicConnectionFactory = null;
      	public static JNDIContext instance = null;

	private JNDIContext() {
		createInitialContext();
		createConnectionFactory();
	} 

	public InitialContext getInitContext() {
		return initContext;
	}

	public TopicConnectionFactory getTopicConnectionFactory() {
		return topicConnectionFactory;
	}

	public static JNDIContext getInstance() {
		if (instance == null) {
			instance = new JNDIContext();
		}
		return instance;
	}

	/**
	* Create Connection factory with initial context
	*/
	private void createConnectionFactory() {

		// create connection factory
		try {
			topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory");
		} catch (NamingException e) {
			System.out.println("Can not create topic connection factory." + e);
		}
	}

	/**
	* Create Initial Context with given configuration
	*/
	private void createInitialContext() {

		try {
			initContext = new InitialContext();
		} catch (NamingException e) {
		System.out.println("Can not create initial context with given parameters." + e);
		}
	}
} 

Java class to publish stock quote information to CEP server.

All Stockquote publisher
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Publish message to the topic created when defining Input
*/
public class AllStockQuotesPublisher {
	private static InitialContext initContext = null;
	private static TopicConnectionFactory topicConnectionFactory = null; 

	public static void main(String[] args)
	throws XMLStreamException, InterruptedException, NamingException {

		Properties initialContextProperties = new Properties();
		initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		initContext = new InitialContext(initialContextProperties);
		topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();

		AllStockQuotesPublisher publisher = new AllStockQuotesPublisher();

		Map<String, Object> map1 = new HashMap<String, Object>();
		map1.put("symbol", "MSFT");
		map1.put("price", 26.36);
		publisher.publish("AllStockQuotes", map1);
		System.out.println("AllStockQuotes Message 1 sent");
		Thread.sleep(2000); 

		Map<String, Object> map2 = new HashMap<String, Object>();
		map2.put("symbol", "MSFT");
		map2.put("price", 36.36);
		publisher.publish("AllStockQuotes", map2);
		System.out.println("AllStockQuotes Message 2 sent");
		Thread.sleep(2000);

		Map<String, Object> map3 = new HashMap<String, Object>();
		map3.put("symbol", "MSFT");
		map3.put("price", 6.36);
		publisher.publish("AllStockQuotes", map3);
		System.out.println("AllStockQuotes Message 3 sent"); 

	}

	/**
	* Publish message to given topic
	*
	* @param topicName - topic name to publish messages
	* @param message - message to send
	*/
	public void publish(String topicName, Map<String, Object> message) {
		// create topic connection
		TopicConnection topicConnection = null;
		try {
			topicConnection = topicConnectionFactory.createTopicConnection();
			topicConnection.start();
		} catch (JMSException e) {
		System.out.println("Can not create topic connection." + e);
		return;
		}

		Session session = null;
		try {
			session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 

			Topic topic = session.createTopic(topicName);
			MessageProducer producer = session.createProducer(topic);
			MapMessage jmsMessage = session.createMapMessage();
			for (Map.Entry<String, Object> entry : message.entrySet()) {
				if (entry.getValue() instanceof Double) {
					jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
				} 
				else if (entry.getValue() instanceof Integer) {
					jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
				}
				else if (entry.getValue() instanceof Long) {
					jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
				} 
				else if (entry.getValue() instanceof Float) {
					jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
				} 
				else if (entry.getValue() instanceof String) {
					jmsMessage.setString(entry.getKey(), (String) entry.getValue());
				}
			}
			producer.send(jmsMessage);
			producer.close();
			session.close();
			topicConnection.stop();
			topicConnection.close();
		} catch (JMSException e) {
			System.out.println("Can not subscribe." + e);
		}
	}
} 

Create a project with above classes and files and run the publisher class to publish events to the topic.

Creating JMS Subscriber

JMS Subscriber will receive events published to the output topic of the query. In order to create a JMS Subscriber , you need to have above jndi.properties file and JNDIContext.java class and Subscriber.java class, AllStockStatsSubscriber.java

Class to subscribe to output by CEP server.

Subscriber
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Enumeration;

/**
* Subscribe to myTopic and wait 10seconds to receive messages
*/
public class Subscriber implements MessageListener {
	private static InitialContext initContext = null;
	private static TopicConnectionFactory topicConnectionFactory = null;
	private boolean messageReceived = false;
	// static String TOPIC = "ConditionSatisfyingStockQuotes";

	// public static void main(String[] args) throws XMLStreamException {
	// new Subscriber().subscribe(TOPIC);
	// }



	public void subscribe(String topicName) throws NamingException {

		initContext = JNDIContext.getInstance().getInitContext();
		topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();

		// create connection
		TopicConnection topicConnection = null;
		try {
			topicConnection = topicConnectionFactory.createTopicConnection();
		} catch (JMSException e) {
			System.out.println("Can not create topic connection." + e);
			return;
		}
		// create session, subscriber, message listener and listen on that topic
		TopicSession session = null;
		try {
			session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
			// change topic name to brokerConfiguration name + topic name
			Topic topic = session.createTopic(topicName);
			TopicSubscriber subscriber = session.createSubscriber(topic);
			subscriber.setMessageListener(this);
			topicConnection.start();
			synchronized (this) {
				while (!messageReceived) {
					try {
						this.wait();
					} catch (InterruptedException e) {
					}
				}
			}

		} catch (JMSException e) {
			System.out.println("Can not subscribe." + e);
		}
	}
	
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			TextMessage textMessage = (TextMessage) message;
			try {
				System.out.println("output = " + textMessage.getText());
				synchronized (this) {
					messageReceived = true;
				}
			} catch (JMSException e) {
				System.out.println("error at getting text out of received message. = " + e);
			}
		} else if (message instanceof MapMessage) {
			try {
				Enumeration enumeration = ((MapMessage) message).getMapNames();
				for (; enumeration.hasMoreElements(); ) {
					System.out.println(((MapMessage) message).getString((String) enumeration.nextElement()));
				}
				System.out.println();
			} catch (JMSException e) {
			System.out.println("error at getting element out of received map message. = " + e);
			}
		} else {
				System.out.println("Received message is not a text/map message.");
		}
	}
}

   

 

Predicted Stockquotes Subscriber
import javax.naming.NamingException;

public class PredictedStockQuotesSubscriber {
	public static void main(String[] args) throws NamingException {
		new Subscriber().subscribe("AllStockStats");
	}
} 


Before publishing events using Publisher, You need to start Subscriber to receive filtered event.