This documentation is for WSO2 Complex Event Processor 2.0.0. View documentation for the latest release.

Twitter and StockQuote Analyzer

This sample demonstrates how Siddhi engine can be used with JMS broker to receive, process and publish JMS  Map & XML messages.

In this sample CEP will receive events from stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10.

FastStockQuery
from allStockQuotes#window.time(120000)
insert into fastMovingStockQuotes
symbol,price, avg(price) as averagePrice
group by symbol
having ((price > averagePrice*1.02) or (averagePrice*0.98 > price ))
HighFrequentTweetQuery
from twitterFeed#window.time(1200000)
insert into highFrequentTweets
company as company, sum(wordCount) as words
group by company
having (words > 10)
StocksPredictor
from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join
highFrequentTweets#window.time(120000) as highFrequentTweets
on fastMovingStockQuotes.symbol == highFrequentTweets.company
insert into predictedStockQuotes
fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words

In this sample we will publish JMS Map events using a Java client and the outputted JMS XML events from the bucket will be published to a JMS subscriber to receive and log those events.

Following is the configuration used in this sample.

<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
This bucket analyzes stock quotes stream and Twitter feeds stream, and trigger 
an event if the last traded amount of the stock quotes stream vary by 2 percent 
with regards to the average traded price of a symbol within past 2 minutes and 
the word count of the Twitter feeds stream with related to that symbol is greater 
than 10.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes"  <!--queryEventType="Tuple"--> >
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed"  <!--queryEventType="Tuple"--> >
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
    <query name="FastStockQuery">
        <expression type="inline">
from allStockQuotes#window.time(120000)
insert into fastMovingStockQuotes
symbol,price, avg(price) as averagePrice
group by symbol
having ((price > averagePrice*1.02) or (averagePrice*0.98 > price ))
        </expression>
    </query>
    <query name="HighFrequentTweetQuery">
        <expression type="inline">
from twitterFeed#window.time(1200000)
insert into highFrequentTweets
company as company, sum(wordCount) as words
group by company
having (words > 10)
        </expression>
    </query>
    <query name="StocksPredictor">
        <expression>
from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join
highFrequentTweets#window.time(120000) as highFrequentTweets
on fastMovingStockQuotes.symbol == highFrequentTweets.company
insert into predictedStockQuotes
fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words
        </expression>
        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockSymbol>{company}</quotedata:StockSymbol>
                    <quotedata:Amount>{amount}</quotedata:Amount>
                    <quotedata:WordCount>{words}</quotedata:WordCount>
                </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>

Prerequisites

  • Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
  • ActiveMQ JMS Broker to publish and subscribe events. Refer Installation Prerequisites for instructions to install ActiveMQ JMS Broker.

Deploying the configurations

The steps are as follows :

  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation Guide for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory. 

  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-jms,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Starting JMS subscriber

The steps are as follows :  

  1. Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker.
  2. In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  3. From there, type ant jmsSubscriber -Dtopic=PredictedStockQuotes, this will subscribe to the PredictedStockQuotes topic of the ActiveMQ Broker receiving the output events of CEP.

Publishing events

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsAllStockQuotesPublisher
    This will publish some JMS Map events to AllStockQuotes topic.



  3. Then from there, type ant jmsTwitterFeedPublisher
    This will publish some JMS Map events to TwitterFeed topic.

Observation

You will be able observe the output events in the JMS subscriber console as shown bellow.