Correlating Events for Complex Event Processing
Introduction
In the previous tutorial, you looked at correlating events from multiple sources in a simple manner using joins. You used join statements between streams and stores and observed how aliases can be used on streams for easy reference.
In this tutorial, let's see how event correlation can be applied to Complex Event Processing (CEP) scenarios.
Let's consider the following two scenarios in the sweet factory.
- Scenario 1: The factory foreman needs to monitor the supply of materials and send an alert if there is a decrease of 10 units within 10 minutes.
- Scenario 2: The factory foreman needs to monitor both supply and production, and send an alert if production does not start within 15 minutes after the raw materials are received.
WSO2 Siddhi also supports another type of patterns known as Counting Patterns. For more information, see the Siddhi Query Guide - Counting Pattern.
Before you begin:
Simple Siddhi patterns can be identified by the following syntax.
from (every) x -> y within t select ...
This means that for events in the x
stream are followed by events from the y
stream within a time gap of t
, the subsequent operations are carried out.
Let's consider the following real world example to understand this.
from every (e1=MyStream) -> e2=MyStream[e1.val1 <= e2.val1] within 1 hour select e1.val1, e2.val2 insert into OutStream
In the above example, each event that arrives at the MyStream
stream is checked against subsequent events arriving at the same stream (because the every
keyword is used) and checked against new events matching the given filter condition. This is done continuously for a time period of one hour. If any matching events are found, then they are sent to the OutStream
output stream based on the select
clause.
If the every
keyword is not used, then the calculation is invoked only once, when the first event arrives at the MyStream
stream.
Tutorial steps
This covers the steps for the following two user scenarios.
User Scenario 1: Detecting a decrease in supply within a set time period
Lets get started!
Let's define an input stream for the raw material supply. This can be the same input stream you defined in Correlating Simple Events.
define stream MaterialSupplyStream(name string, supplier string, amount double);
Now let's define an output stream as follows.
define stream MaterialSupplyAlertStream(
name
string, originalAmount
double
, laterAmount
double
, supplier string);
Let's add a query to perform a simple insertion from the input stream to the output stream.
from MaterialSupplyStream select name, supplier, amount insert into MaterialSupplyAlertStream;
To define a pattern to this stream so that events within a 10-minute window are considered, lets update the query as follows.
from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream within 10 min select e1.name, e1.amount as originalAmount, e2.amount as laterAmount, e1.supplier insert into MaterialSupplyAlertStream;
The foreman requires each event be checked against the subsequent events to see if there's a decrease in the amount by 10 units. The Siddhi filter for this is as follows.
[e1.name == e2.name and e1.amount - e2.amount > 10]
Let's add it to the query as shown below.from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream[e1.name == e2.name and e1.amount - e2.amount > 10] within 10 min select e1.name, e1.supplier, e1.amount insert into MaterialSupplyAlertStream;
To generate a more complete output, you need to add the following information to it.
The name of the raw material
The earlier amount
The later amount (which is less than the earlier detected amount by 10 units or more)
The supplier whose supply has fallen below the threshold
Let's update the output stream definition as follows to include this information.define stream MaterialSupplyAlertStream(name string, originalAmount double, laterAmount double, supplier string);
The completed Siddhi application is as follows.
@App:name('MaterialDecreaseDetectionApp') @source(type = 'http', @map(type = 'json')) define stream MaterialSupplyStream(name string, supplier string, amount double); @sink(type='log', prefix='Decrease in supply detected:') define stream MaterialSupplyAlertStream(name string, originalAmount double, laterAmount double, supplier string); from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream[e1.name == e2.name and e1.amount - e2.amount > 10] within 10 min select e1.name, e1.amount as originalAmount, e2.amount as laterAmount, e1.supplier insert into MaterialSupplyAlertStream;
User Scenario 2: Detecting production delays after supplies are received
For this user scenario, let's consider a situation where events from 2 different streams contain data. Here, you need to identify delays in production, which means you have to check for events not arriving at a particular stream.
Let's begin by defining input streams for both the raw material supply and the sweet production.
define stream MaterialConsumptionStream(name string, user string, amount double);
define stream MaterialSupplyStream(name string, supplier string, amount double);
- The information to be output is as follows.
Name of the raw material
Production amount
To generate this output, let's define an output stream as follows.
define stream ProductionDelayAlertStream(name string, amount double);
To correlate events from the consumption and supply streams, let's define a simple pattern as follows.
from every (e1=MaterialSupplyStream) -> e2=MaterialConsumptionStream within 10 min select e1.name, e1.amount insert into MaterialSupplyAlertStream;
- The above query you added does not currently contain a condition based on which the correlation is done. The condition to be added needs to consider events that have not arrived at the
MaterialSupplyStream
input stream instead of the new events that arrive there. To do this, you can use the logicalNOT
operator as a part of a logical pattern specification as shown below.The pattern you defined here is different to the pattern defined in User Scenario 1 in the following ways.
In this scenario, you are not defining a stream reference for the
MaterialConsumptionStream
stream (i.e. similar toe2=MaterialConsumptionStream
). This is because the criterion to be met is the non-arrival of events. Therefore, you cannot check for event e2 in the non-arrival stream.Instead of the
within
keyword, you are using thefor
keyword. This is because thenot
pattern has to be terminated either by a singleand
clause (which denotes an event arriving at a different stream can terminate the clause), or afor <time>
clause (which denotes that the wait time for events not arriving is<time>
). More details, see thenot
pattern in Siddhi Query Guide - Logical Patterns.
The completed Siddhi application looks as follows.
@App:name('ProductionDelayDetectionApp') @source(type = 'http', @map(type = 'json')) define stream MaterialSupplyStream(name string, supplier string, amount double); @source(type = 'http', @map(type = 'json')) define stream MaterialConsumptionStream(name string, user string, amount double); @sink(type='log', prefix='Decrease in supply detected:') define stream ProductionDelayAlertStream(name string, amount double); from every (e1=MaterialSupplyStream) -> not MaterialConsumptionStream[name == e1.name and amount == e1.amount] for 15 sec select e1.name, e1.amount insert into ProductionDelayAlertStream;