Analyzing Trends
Introduction
In the previous two tutorials, you understood how event correlation is handled through joins as well as patterns. Now, let's look at how a more advanced form of event processing (namely trend analysis) can be carried out with Siddhi.
The factory foreman needs to detect all the input from the Sweet Bots to detect overall decreases in the production of a sweet. For example, if the production of any given sweet is showing a downward trend for 10 minutes, the factory manager needs to be alerted.
This tutorial covers the following Siddhi concepts:
Siddhi sequences
Siddhi partitions
Sequences are used in Siddhi for considering consecutive events for analysis. A sequence can be considered a special, more advanced form of Siddhi pattern where only the immediate subsequent event is analyzed. In contrast, a partition can be used to analyze multiple following events. Therefore, sequences use the , notation instead of -> during definition.
The following is an sample sequence definition that considers n sequential events from the stream1 stream.
from every e1=stream1, e2=stream1, e3=stream1, ... , en=stream1
select ...
insert into ...Tutorial steps
Let's get started!
The completed Siddhi application looks as follows.
@App:name('DecreasingTrendAlertApp')
@source(type = 'http', @map(type = 'json'))
define stream SweetProductionStream(name string, amount long, timestamp long);
@sink(type='email', address='factory416@sweets-r-us.com', username='factory416', password='secret_password', subject='Downward production trend alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has been displaying a decreasing trend for the past 10 minutes. The initial detected amount was {{initialAmount}} and the amount at the end of processing was {{finalAmount}}.\n\nThis message was generated automatically.")))
define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long, timestamp long);
partition with (name of SweetProductionStream)
begin
from every e1=SweetProductionStream,
e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,
e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
insert into DecreasingTrendAlertStream;
end;