Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="">
This bucket analyzes stock quotes stream and Twitter feeds stream, and trigger 
an event if the last traded amount of the stock quotes stream vary by 2 percent 
with regards to the average traded price of a symbol within past 2 minutes and 
the word count of the Twitter feeds stream with related to that symbol is greater 
than 10.
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType <!--queryEventType="Tuple"--> >
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed"  queryEvnetType<!--queryEventType="Tuple"--> >
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
    <query name="FastStockQuery">
        <expression type="inline">
from allStockQuotes#window.time(120000)
insert into fastMovingStockQuotes
symbol,price, avg(price) as averagePrice
group by symbol
having ((price > averagePrice*1.02) or (averagePrice*0.98 > price ))
    <query name="HighFrequentTweetQuery">
        <expression type="inline">
from twitterFeed#window.time(1200000)
insert into highFrequentTweets
company as company, sum(wordCount) as words
group by company
having (words > 10)
    <query name="StocksPredictor">
from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join
highFrequentTweets#window.time(120000) as highFrequentTweets
on fastMovingStockQuotes.symbol ==
insert into predictedStockQuotes
fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words
        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">


  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation Guide for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory. 

  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-jms,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Starting JMS subscriber
