...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep"> <description> This bucket analyzes stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10. </description> <engineProviderConfiguration engineProvider="SiddhiCEPRuntime"> <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> <property name="siddhi.enable.distributed.processing">false</property> </engineProviderConfiguration> <input topic="AllStockQuotes" brokerName="activemqJmsBroker"> <mapMapping stream="allStockQuotes" queryEvnetType <!--queryEventType="Tuple"--> > <property name="symbol" inputName="symbol" type="java.lang.String"/> <property name="price" inputName="price" type="java.lang.Double"/> </mapMapping> </input> <input topic="TwitterFeed" brokerName="activemqJmsBroker"> <mapMapping stream="twitterFeed" queryEvnetType<!--queryEventType="Tuple"--> > <property name="company" inputName="company" type="java.lang.String"/> <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/> </mapMapping> </input> <query name="FastStockQuery"> <expression type="inline"> from allStockQuotes#window.time(120000) insert into fastMovingStockQuotes symbol,price, avg(price) as averagePrice group by symbol having ((price > averagePrice*1.02) or (averagePrice*0.98 > price )) </expression> </query> <query name="HighFrequentTweetQuery"> <expression type="inline"> from twitterFeed#window.time(1200000) insert into highFrequentTweets company as company, sum(wordCount) as words group by company having (words > 10) </expression> </query> <query name="StocksPredictor"> <expression> from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join highFrequentTweets#window.time(120000) as highFrequentTweets on fastMovingStockQuotes.symbol == highFrequentTweets.company insert into predictedStockQuotes fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words </expression> <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker"> <xmlMapping> <quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:quotedata="http://ws.cdyne.com/"> <quotedata:StockSymbol>{company}</quotedata:StockSymbol> <quotedata:Amount>{amount}</quotedata:Amount> <quotedata:WordCount>{words}</quotedata:WordCount> </quotedata:StockQuoteDataEvent> </xmlMapping> </output> </query> </bucket> |
...
- Install the WSO2 Complex Event Processor, but do not start the server, Refer to the Installation Guide for instructions.
Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory.
Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.
- In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples
For example, in Linux: cd <CEP_HOME>/samples/cep-samples - From there, type ant deploy-jms,
This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory. - Now start the WSO2 Complex Event Processor. Refer to the Running Installing the Product for instructions.
Starting JMS subscriber
...