...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<bucket name="TwitterAndStockQuoteAnalyzer" xmlns="http://wso2.org/carbon/cep"> <description> This bucket analyzes stock quotes stream and Twitter feeds stream, and trigger an event if the last traded amount of the stock quotes stream vary by 2 percent with regards to the average traded price of a symbol within past 2 minutes and the word count of the Twitter feeds stream with related to that symbol is greater than 10. </description> <engineProviderConfiguration engineProvider="SiddhiCEPRuntime"> <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property> <property name="siddhi.enable.distributed.processing">false</property> </engineProviderConfiguration> <input topic="AllStockQuotes" brokerName="activemqJmsBroker"> <mapMapping stream="allStockQuotes" queryEvnetTypequeryEventType="Tuple"> <property name="symbol" inputName="symbol" type="java.lang.String"/> <property name="price" inputName="price" type="java.lang.Double"/> </mapMapping> </input> <input topic="TwitterFeed" brokerName="activemqJmsBroker"> <mapMapping stream="twitterFeed" queryEvnetTypequeryEventType="Tuple"> <property name="company" inputName="company" type="java.lang.String"/> <property name="wordCount" inputName="wordCount" type="java.lang.Integer"/> </mapMapping> </input> <query name="FastStockQuery"> <expression type="inline"> from allStockQuotes#window.time(120000) insert into fastMovingStockQuotes symbol,price, avg(price) as averagePrice group by symbol having ((price > averagePrice*1.02) or (averagePrice*0.98 > price )) </expression> </query> <query name="HighFrequentTweetQuery"> <expression type="inline"> from twitterFeed#window.time(1200000) insert into highFrequentTweets company as company, sum(wordCount) as words group by company having (words > 10) </expression> </query> <query name="StocksPredictor"> <expression> from fastMovingStockQuotes#window.time(120000) as fastMovingStockQuotes join highFrequentTweets#window.time(120000) as highFrequentTweets on fastMovingStockQuotes.symbol == highFrequentTweets.company insert into predictedStockQuotes fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words </expression> <output topic="PredictedStockQuotes" brokerName="activemqJmsBroker"> <xmlMapping> <quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:quotedata="http://ws.cdyne.com/"> <quotedata:StockSymbol>{company}</quotedata:StockSymbol> <quotedata:Amount>{amount}</quotedata:Amount> <quotedata:WordCount>{words}</quotedata:WordCount> </quotedata:StockQuoteDataEvent> </xmlMapping> </output> </query> </bucket> |
...