Introduction
In the previous two tutorials, you understood how event correlation is handled through joins as well as patterns. Now, let's look at how a more advanced form of event processing (namely trend analysis) can be carried out with Siddhi.
The factory foreman needs to detect all the input from the Sweet Bots to detect overall decreases in the production of a sweet. For example, if the production of any given sweet is showing a downward trend for 10 minutes, the factory manager needs to be alerted.
Sequences are used in Siddhi for considering consecutive events for analysis. A sequence can be considered a special, more advanced form of Siddhi pattern where only the immediate subsequent event is analyzed. In contrast, a partition can be used to analyze multiple following events. Therefore, sequences use the ,
notation instead of -> during definition.
The following is an sample sequence definition that considers n
sequential events from the stream1
stream.
from every e1=stream1, e2=stream1, e3=stream1, ... , en=stream1 select ... insert into ...
Tutorial steps
Let's get started!
First, let's add the
SweetProductionStream
input stream that you first created in Creating a Simple Siddhi Application to capture information from the Sweet Bots.define stream SweetProductionStream(name string, amount long);
The performance analysis needs to be done based on time. Let's also assume that each Sweet Bot is sending events with time information, in the form of an additional field that denotes the UNIX. This additional field needs to be added as an attribute to the
SweetProductionStream
stream as shown below.define stream SweetProductionStream(name string, amount long, timestamp long);
An example event (in JSON form) from a SweetBot is as follows:
{ "event": { "name": "Bonbon", "amount": 10, "timestamp": 1415463675 } }
- Let's also define an output stream from which alerts are generated. The following information needs to be included in the output event.
The name of the sweet of which the production is showing a downward trend
Highest detected value within the 10-minute window (i.e. the initial amount based on which the checks are done)
The final amount detected
The timestamp at the end of processing
The output stream definition with this information looks as follows.
define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long, timestamp long);
To enable the sequential processing of events, lets first define a simple sequence with no processing.
from every e1=SweetProductionStream, e2=SweetProductionStream select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp insert into DecreasingTrendAlertStream;
Here, you are using the comma (
,
) to delimit events that you need to process in sequence.- Next, you need to define the logic for the user scenario. It requires detecting a decreasing trend within 10 minutes in the production figures. This can be deconstructed into the following points:
- Every event has a value for the
amount
parameter. - If the amount value for event
e1
isv1
, then we can say that a decreasing trend exists if the next evente2
has a valuev2
as the amount, andv1
>v2
. - The above decreasing trend has to continue for 10 minutes to be flagged for alerting.
- If no event with a decreasing trend (i.e.,
v1 < v2)
is detected within 10 minutes, no alert is generated.
- Every event has a value for the
Let's construct a query taking the above points into consideration.- Taking the two sequential events e1 and e2 into consideration, point
ii
can be converted into a filter form as follows.[e1.amount > e2.amount]
- As mentioned in point
iii
, the processing should consider a period of 10 minutes (10 * 60000 milliseconds). The filter form to do this is as follows.[e2.timestamp - e2.timestamp < 10 * 60000]
- When the above two filter forms are combined, the following created.
[e1.amount > e2.amount and (e2.timestamp - e1.timestamp) < 10 * 60000]
Let's apply this filter to the sequence, so that the above conditions are applied to the second event e2.
from every e1=SweetProductionStream,
e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]
select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
insert into DecreasingTrendAlertStream;
Here, you are not referring to e2's attributes as
e2.amoun
t etc. You can refer to it asamount
because it is self evident within the filter. For a complete trend analysis, the pattern matching must run continuously. When an event e2 that meets the conditions you specified arrives, you need to make sure that matching does not stop there and that the next event is also evaluated against the same criterion. In order to achieve this, sequences can be specified with regular expressions. This may include symbols such as
*
(0 or more instances),+
(one or more instances) and?
(zero or one instance).
In this scenario, you need the processing for the decreasing trend to be done continuously (for 0 or more matching events). Therefore, let's add the*
regular expression to the filter as shown below.from every e1=SweetProductionStream, e2=SweetProductionStream select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp insert into DecreasingTrendAlertStream;
Finally, you need the processing to conclude at the end of 10 minutes. This means, there should be a clause to check timestamps and match events th timestamps greater than 10 * 60000 of the original event. In other words, adding a filter to match timestamps greater than 10 minutes at the end ensures that the processing can break out of the loop described above and proceed to the output stream.
Therefore, let's define a new evente3
that follows the initial event e1, and zero or more e2 events:e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
Let's add this to the query as follows.
from every e1=SweetProductionStream,
e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,
e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
insert into DecreasingTrendAlertStream;
The completed query is as follows.
define stream SweetProductionStream(name string, amount long, timestamp long); define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long); from every e1=SweetProductionStream, e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*, e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount] select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp insert into DecreasingTrendAlertStream;
The Siddhi application still does not generate a product-wise decreasing trend alert. For example, the following two events arriving in succession must be considered a valid decreasing trend.
Product Amount Timestamp Bonbon
10
1415463675
Pretzel
7
1415474284
To address this, you need to consider the processing of each sweet as a self-contained unit instead of considering them all together. This means, when processing events that contain the
name
attribute set to a particular sweet are only matched with other events with the same value for the attribute.Siddhi offers this functionality through partitions. With partitions, streams are virtually divided and incoming events are processed in isolated groups (known as partitions) that are completely independent from one another. Each partition (which can contain multiple operations within) is tagged with a key and only processes events that match their key. For more information about partitions, see Siddhi Query Guide - Partition.
A typical partition definition looks as follows.partition with (myAttribute2 of MyStream ) begin from MyStream select myAttribute1, myAttribute2, max(myAttribute3) as maxAttribute3 insert into MyOutputStream; end;
Here, the partition key ismyAttribute2
, and if there are three possible values formyAttribute2
, each of these values are considered a separate partition.
For this scenario, let's define a partition with thename
attribute as follows so that each sweet group is considered separately based on its name.partition with (name of SweetProductionStream)
begin
. . .
End;
Let's copy the query with partitions you created above and place it within the partition as shown below.
partition with (name of SweetProductionStream)
begin
from every e1=SweetProductionStream,e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*,
e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount]
select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp
insert into DecreasingTrendAlertStream;
end;
The completed Siddhi application looks as follows.
@App:name('DecreasingTrendAlertApp') @source(type = 'http', @map(type = 'json')) define stream SweetProductionStream(name string, amount long, timestamp long); @sink(type='email', address='factory416@sweets-r-us.com', username='factory416', password='secret_password', subject='Downward production trend alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has been displaying a decreasing trend for the past 10 minutes. The initial detected amount was {{initialAmount}} and the amount at the end of processing was {{finalAmount}}.\n\nThis message was generated automatically."))) define stream DecreasingTrendAlertStream(name string, initialAmount long, finalAmount long, timestamp long); partition with (name of SweetProductionStream) begin from every e1=SweetProductionStream, e2=SweetProductionStream[e1.amount > amount and (timestamp - e1.timestamp) < 10 * 60000]*, e3=SweetProductionStream[timestamp - e1.timestamp > 10 * 60000 and e1.amount > amount] select e1.name, e1.amount as initialAmount, e2.amount as finalAmount, e2.timestamp insert into DecreasingTrendAlertStream; end;