com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Correlating Events for Complex Event Processing

Introduction

In the previous tutorial, you looked at correlating events from multiple sources in a simple manner using joins. You used join statements between streams and stores and observed how aliases can be used on streams for easy reference.

In this tutorial, let's see how event correlation can be applied to Complex Event Processing (CEP) scenarios.

Let's consider the following two scenarios in the sweet factory.

  • Scenario 1: The factory foreman needs to monitor the supply of materials and send an alert if there is a decrease of 10 units within 10 minutes.
  • Scenario 2: The factory foreman needs to monitor both supply and production, and send an alert if production does not start within 15 minutes after the raw materials are received.

This tutorial covers the following concepts:

  • Simple patterns
  • Logical patterns for applying logical operations to events arriving in a temporal order

WSO2 Siddhi also supports another type of patterns known as Counting Patterns. For more information, see the Siddhi Query Guide - Counting Pattern.

Before you begin:


Simple Siddhi patterns can be identified by the following syntax.

from (every) x -> y
within t
select ...

This means that for events in the x stream are followed by events from the y stream within a time gap of t, the subsequent operations are carried out.

Let's consider the following real world example to understand this.

from every (e1=MyStream) -> e2=MyStream[e1.val1 <= e2.val1]
	within 1 hour
select e1.val1, e2.val2
insert into OutStream

In the above example, each event that arrives at the MyStream stream is checked against subsequent events arriving at the same stream (because the every keyword is used) and checked against new events matching the given filter condition. This is done continuously for a time period of one hour. If any matching events are found, then they are sent to the OutStream output stream based on the select clause.

If the every keyword is not used, then the calculation is invoked only once, when the first event arrives at the MyStream stream.

Tutorial steps

This covers the steps for the following two user scenarios.

User Scenario 1: Detecting a decrease in supply within a set time period

Lets get started!

  1. Let's define an input stream for the raw material supply. This can be the same input stream you defined in Correlating Simple Events.

    define stream MaterialSupplyStream(name string, supplier string, amount double);
  2. Now let's define an output stream as follows.

              define stream MaterialSupplyAlertStream(
              name
               
              string, originalAmount 
              double
              , laterAmount 
              double
              , supplier string);
            
  3. Let's add a query to perform a simple insertion from the input stream to the output stream.

    from MaterialSupplyStream
    select name, supplier, amount
    insert into MaterialSupplyAlertStream;
  4. To define a pattern to this stream so that events within a 10-minute window are considered, lets update the query as follows.

    from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream
    	within 10 min
    select e1.name, e1.amount as originalAmount, e2.amount as laterAmount, e1.supplier
    insert into MaterialSupplyAlertStream;
  5. The foreman requires each event be checked against the subsequent events to see if there's a decrease in the amount by 10 units. The Siddhi filter for this is as follows.

    [e1.name == e2.name and e1.amount - e2.amount > 10]

    Let's add it to the query as shown below.

    from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream[e1.name == e2.name and e1.amount - e2.amount > 10] within 10 min select e1.name, e1.supplier, e1.amount insert into MaterialSupplyAlertStream;

  6. To generate a more complete output, you need to add the following information to it.

    • The name of the raw material

    • The earlier amount

    • The later amount (which is less than the earlier detected amount by 10 units or more)

    • The supplier whose supply has fallen below the threshold


    Let's update the output stream definition as follows to include this information. 

    define stream MaterialSupplyAlertStream(name string, originalAmount double, laterAmount double, supplier string);

The completed Siddhi application is as follows.

@App:name('MaterialDecreaseDetectionApp')
 
@source(type = 'http', @map(type = 'json'))
define stream MaterialSupplyStream(name string, supplier string, amount double);
 
@sink(type='log', prefix='Decrease in supply detected:')
define stream MaterialSupplyAlertStream(name string, originalAmount double, laterAmount double, supplier string);
 
from every (e1=MaterialSupplyStream) -> e2=MaterialSupplyStream[e1.name == e2.name and e1.amount - e2.amount > 10]
    within 10 min
select e1.name, e1.amount as originalAmount, e2.amount as laterAmount, e1.supplier
insert into MaterialSupplyAlertStream;


User Scenario 2: Detecting production delays after supplies are received

For this user scenario, let's consider a situation where events from 2 different streams contain data. Here, you need to identify delays in production, which means you have to check for events not arriving at a particular stream.

  1. Let's begin by defining input streams for both the raw material supply and the sweet production.

    define stream MaterialConsumptionStream(name string, user string, amount double);
    define stream MaterialSupplyStream(name string, supplier string, amount double);
  2. The information to be output is as follows.
    • Name of the raw material

    • Production amount

    To generate this output, let's define an output stream as follows. 

    define stream ProductionDelayAlertStream(name string, amount double);
  3. To correlate events from the consumption and supply streams, let's define a simple pattern as follows.

    from every (e1=MaterialSupplyStream) -> e2=MaterialConsumptionStream
    	within 10 min
    select e1.name, e1.amount
    insert into MaterialSupplyAlertStream;
  4. The above query you added does not currently contain a condition based on which the correlation is done. The condition to be added needs to consider events that have not arrived at the 

    MaterialSupplyStream input stream instead of the new events that arrive there. To do this, you can use the logical NOT operator as a part of a logical pattern specification as shown below.

    from every (e1=MaterialSupplyStream) -> not MaterialConsumptionStream[name == e1.name and amount == e1.amount]
    for 15 min
    select e1.name, e1.amount
    insert into MaterialSupplyAlertStream;

    The pattern you defined here is different to the pattern defined in User Scenario 1 in the following ways.

    In this scenario, you are not defining a stream reference for the MaterialConsumptionStream stream (i.e. similar to e2=MaterialConsumptionStream). This is because the criterion to be met is the non-arrival of events. Therefore, you cannot check for event e2 in the non-arrival stream.

    Instead of the within keyword, you are using the for keyword. This is because the not pattern has to be terminated either by a single and clause (which denotes an event arriving at a different stream can terminate the clause), or a for <time> clause (which denotes that the wait time for events not arriving is <time>). More details, see the not pattern in  Siddhi Query Guide - Logical Patterns.

The completed Siddhi application looks as follows.

@App:name('ProductionDelayDetectionApp')
 
@source(type = 'http', @map(type = 'json'))
define stream MaterialSupplyStream(name string, supplier string, amount double);
 
@source(type = 'http', @map(type = 'json'))
define stream MaterialConsumptionStream(name string, user string, amount double);
 
@sink(type='log', prefix='Decrease in supply detected:')
define stream ProductionDelayAlertStream(name string, amount double);
 
from every (e1=MaterialSupplyStream) -> not MaterialConsumptionStream[name == e1.name and amount == e1.amount]
    for 15 sec
select e1.name, e1.amount
insert into ProductionDelayAlertStream;
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.