Sample 0110 - Sequences with partitioning to detect trends
Introduction
This sample demonstrates how to set up an execution plan with a sequence-based query that can be used to detect trends from a stock trades stream. This sample uses the Event Simulator for inputs and the logger publisher for logging the custom output events to the CEP console. Custom events are events with custom mappings that does not adhere to the default event formats. For more information on event formats, see Event Formats.
The query used in this sample is as follows:
partition with (symbol of StockStream) begin from every e1=StockStream[price>20], e2=StockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)]+, e3=StockStream[price<e2[last].price] select e1.symbol as symbol, e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak insert into PeakStream ; end;
Above query:
- A partition is defined with the
symbol
attribute of theStockStream
, which means that the processing will take place independently for events of eachsymbol
. - Processes the events received through the
FilteredStockStream
. - First it looks for an event
e1
with the condition price greater than 20. - Then it looks for one or more events,
e2
with a condition:((e2[last].price is null) and price>=e1.price)
is used to check the first event aftere1
, i.e.(e2[last].price is null)
returns true since there is no last event ine2
. Then it checks for the condition where the current event price is greater thane1
price.((not (e2[last].price is null))
andprice>=e2[last].price)
this part is for any subsequent events. In this case the last ofe2
is not null, and we check whether the price of the current event is greater than the last event. i.e. we are looking for one or more events with continuous price increase from this whole condition.
- Then we look for another event with a price drop from the last
e2
event from the conditione3=FilteredStockStream[price<e2[last].price]
. - From the select clause we select the attributes
e1.price
as priceInitial,e2[last].price as pricePeak
,e3.price as priceAfterPeak
. - Finally the event is output to the
PeakStream
.
Prerequisites
See Prerequisites in CEP Samples Setup page.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0110. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Points the default Axis2 repo to
<CEP_HOME>/sample/artifacts/0110
(by default, the Axis2 repo is<CEP_HOME>/repository/deployment/server
).
Executing the sample
Log into the CEP management console which is located at https://localhost:9443/carbon.
- Go to Tools -> Event Simulator. Under the 'Multiple Events' section, you can see the listed 'events.csv' file which contains some sample data. Click 'play' to start sending sample events from the file.
See the output events received from the CEP console. This sample uses the logger adaptor to log output events to the console.
For example, given below is a screenshot of the output of the consumer sending events from the producer: