Correlating Simple Events
Introduction
In the previous tutorials, you covered basic Siddhi functionality including ingesting data, preprocessing, data store integration and publishing events. In all of these tutorials, you have considered events arriving from a single source, (either data sent by the SweetBots on the factory floor or data from the suppliers of raw material).
In a real world user scenario, data from various sources need to be analyzed in tandem. This can be anything from multiple disparate entities as well as pre-received data stored elsewhere. Hence, correlating events from multiple sources needs to be studied. Siddhi offers this functionality through stream and store joins. A join can be used to match events arriving from different sources based on given criteria and then process them together.
To understand this concept, let's consider a scenario in the sweet factory where events are sent to denote the following:
The raw materials arriving at the factory (name of the material and the amount)
The raw materials consumed per production run by all SweetBots (name and amount)
The factory needs to ensure that the production of flour and sugar does not consume more than 95% of the supply in the past hour for sustainability reasons. Any material that exceeds this limit should be recorded.
Before you begin:
Tutorial steps
Let's get started!
- To create your Siddhi application, let's start the editor, and log in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application.
Now let's define two input streams as follows.
define stream MaterialConsumptionStream(name string, user string, amount double);
define stream MaterialSupplyStream(name string, supplier string, amount double);
The results output must include the name of the raw material, amounts of the supplied and produced material, and the name of the raw material user and supplier if the threshold is reached. Let's define an output stream based on these details:
define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
To select events from the
MaterialConsumptionStream
 for analysis, write a query as follows.from MaterialConsumptionStream select name, amount, user
Let's define an alias for the
MaterialConsumptionStream
so that it can be identified more easily once you have used the join statement.from MaterialConsumptionStream as c select c.name, c.amount, c.user
The same needs to be done for theÂ
MaterialSupplyStream
.from MaterialSupplyStream as s select s.name, s.amount, s.supplier
Now that we have both input streams ready, let's join both inputs together using the
join
keyword.from MaterialConsumptionStream as c
join MaterialSupplyStream as s
To correlate the two streams, you need a common attribute in both of them. In this scenario, both streams have the name of the material produced. Therefore,Â
name
 can be used as the attribute as shown below.The time period to be considered is one hour, a time window of one hour should be added to both streams as shown below.
To ensure that the consumption does not exceed 95% of the supply, you can add a query as follows.
s.amount * 0.95 < c.amount
The complete statement in a usable format looks as follows.
from MaterialConsumptionStream#window.time(1 hour) as c join MaterialSupplyStream#window.time(1 hour) as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream
The completed Siddhi application with source and sink configurations added looks as follows.
@App:name('MaterialThresholdAlertApp') @source(type = 'http', @map(type = 'json')) define stream MaterialConsumptionStream(name string, user string, amount double); @source(type = 'http', @map(type = 'json')) define stream MaterialSupplyStream(name string, supplier string, amount double); @sink(type='log', prefix='Materials that go beyond sustainability threshold:') define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string); from MaterialConsumptionStream#window.time(1 hour) as c join MaterialSupplyStream#window.time(1 hour) as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream;
Now, if you want to correlate the events not just from streams but also with previously stored data, the store that was used for storing the events can be used in place of either stream (but not both).
The following is the completed Siddhi application. It contains the same query as above that uses historical data from theÂ
ShipmentDetailsTable
.@App:name('PersistentMaterialThresholdAlertApp') @source(type = 'http', @map(type = 'json')) define stream MaterialConsumptionStream(name string, user string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetailsTable(name string, supplier string, amount double); @sink(type='log', prefix='Materials that go beyond sustainability threshold:') define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string); from MaterialConsumptionStream#window.time(1 hour) as c join ShipmentDetailsTable as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream;