Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagehtml/xml
linenumberstrue
collapsetrue
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
This bucket analyzes stock quotes stream and Twitter feeds stream, and trigger 
an event if the last traded amount of the stock quotes stream vary by 2 percent 
with regards to the average traded price of a symbol within past 2 minutes and 
the word count of the Twitter feeds stream with related to that symbol is greater 
than 10.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="AllStockQuotes" brokerName="activemqJmsBroker">
        <mapMapping stream="allStockQuotes" queryEvnetType <!--queryEventType="Tuple"--> >
            <property name="symbol" inputName="symbol" type="java.lang.String"/>
            <property name="price" inputName="price" type="java.lang.Double"/>
        </mapMapping>
    </input>
    <input topic="TwitterFeed" brokerName="activemqJmsBroker">
        <mapMapping stream="twitterFeed"  queryEvnetType<!--queryEventType="Tuple"--> >
            <property name="company" inputName="company" type="java.lang.String"/>
            <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/>
        </mapMapping>
    </input>
    <query name="FastStockQuery">
        <expression type="inline">
from allStockQuotes#window.time(120000)
insert into fastMovingStockQuotes
symbol,price, avg(price) as averagePrice
group by symbol
having ((price > averagePrice*1.02) or (averagePrice*0.98 > price ))
        </expression>
    </query>
    <query name="HighFrequentTweetQuery">
        <expression type="inline">
from twitterFeed#window.time(1200000)
insert into highFrequentTweets
company as company, sum(wordCount) as words
group by company
having (words > 10)
        </expression>
    </query>
    <query name="StocksPredictor">
        <expression>
from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join
highFrequentTweets#window.time(120000) as highFrequentTweets
on fastMovingStockQuotes.symbol == highFrequentTweets.company
insert into predictedStockQuotes
fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words
        </expression>
        <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:StockQuoteDataEvent
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:StockSymbol>{company}</quotedata:StockSymbol>
                    <quotedata:Amount>{amount}</quotedata:Amount>
                    <quotedata:WordCount>{words}</quotedata:WordCount>
                </quotedata:StockQuoteDataEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>

...

  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation Guide for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory. 

  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-jms,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Starting JMS subscriber

...