...
- To create your Siddhi application, let's start the editor, and log in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application.
Now let's define two input streams as follows.
Panel define stream MaterialConsumptionStream(name string, user string, amount double);
Panel define stream MaterialSupplyStream(name string, supplier string, amount double);
The results output must include the name of the raw material, amounts of the supplied and produced material, and the name of the raw material user and supplier if the threshold is reached. Let's define an output stream based on these details:
Panel define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
To select events from the
MaterialConsumptionStream
for analysis, write a query as follows.Panel from MaterialConsumptionStream select name, amount, user
Let's define an alias for the
MaterialConsumptionStream
so that it can be identified more easily once you have used the join statement.Panel from MaterialConsumptionStream as c select c.name, c.amount, c.user
The same needs to be done for the
MaterialSupplyStream
.Panel from MaterialSupplyStream as s select s.name, s.amount, s.supplier
Now that we have both input streams ready, let's join both inputs together using the
join
keyword.Panel from MaterialConsumptionStream as c
join MaterialSupplyStream as s
To correlate the two streams, you need a common attribute in both of them. In this scenario, both streams have the name of the material produced. Therefore,
name
can be used as the attribute as shown below.Panel from MaterialConsumptionStream as c
join MaterialSupplyStream as s
on c.name == s.name
The time period to be considered is one hour, a time window of one hour should be added to both streams as shown below.
Panel from MaterialConsumptionStream#window.time(1 hour) as c
join MaterialSupplyStream#window.time(1 hour) as s
on c.name == s.name
To ensure that the consumption does not exceed 95% of the supply, you can add a query as follows.
Panel s.amount * 0.95 < c.amount
The complete statement in a usable format looks as follows.
Panel from MaterialConsumptionStream#window.time(1 hour) as c join MaterialSupplyStream#window.time(1 hour) as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream
The completed Siddhi application with source and sink configurations added looks as follows.
Panel @App:name('MaterialThresholdAlertApp') @source(type = 'http', @map(type = 'json')) define stream MaterialConsumptionStream(name string, user string, amount double); @source(type = 'http', @map(type = 'json')) define stream MaterialSupplyStream(name string, supplier string, amount double); @sink(type='log', prefix='Materials that go beyond sustainability threshold:') define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string); from MaterialConsumptionStream#window.time(1 hour) as c join MaterialSupplyStream#window.time(1 hour) as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream;
Now, if you want to correlate the events not just from streams but also with previously stored data, the store that was used for storing the events can be used in place of either stream (but not both).
Panel define table ShipmentDetailsTableShipmentDetails(name string, supplier string, amount double);
define stream MaterialConsumptionStream(name string, user string, amount double);
from MaterialConsumptionStream#window.time(1 hour) as c
join ShipmentDetailsTable ShipmentDetails as s
on c.name == s.name
select ...The following is the completed Siddhi application. It contains the same query as above that uses historical data from the
ShipmentDetailsTable
ShipmentDetails Table
.Code Block language sql @App:name('PersistentMaterialThresholdAlertApp') @source(type = 'http', @map(type = 'json')) define stream MaterialConsumptionStream(name string, user string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetailsTableShipmentDetails(name string, supplier string, amount double); @sink(type='log', prefix='Materials that go beyond sustainability threshold:') define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string); from MaterialConsumptionStream#window.time(1 hour) as c join ShipmentDetailsTableShipmentDetails as s on c.name == s.name select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier group by s.name having s.amount * 0.95 < c.amount insert into MaterialThresholdAlertStream;