Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change table name to comply with previous tutorial

...

  1. To create your Siddhi application, let's start the editor, and log in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application.
  2. Now let's define two input streams as follows.

    Panel
    define stream MaterialConsumptionStream(name string, user string, amount double);


    Panel
    define stream MaterialSupplyStream(name string, supplier string, amount double);
  3. The results output must include the name of the raw material, amounts of the supplied and produced material, and the name of the raw material user and supplier if the threshold is reached. Let's define an output stream based on these details:

    Panel
    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
  4. To select events from the MaterialConsumptionStream for analysis, write a query as follows.

    Panel
    from MaterialConsumptionStream
    select name, amount, user
  5. Let's define an alias for the MaterialConsumptionStream so that it can be identified more easily once you have used the join statement.

    Panel

    from MaterialConsumptionStream as c select c.name, c.amount, c.user

    The same needs to be done for the MaterialSupplyStream.

    Panel

    from MaterialSupplyStream as s select s.name, s.amount, s.supplier

  6. Now that we have both input streams ready, let's join both inputs together using the join keyword.

    Panel

    from MaterialConsumptionStream as c

    join MaterialSupplyStream as s

  7. To correlate the two streams, you need a common attribute in both of them. In this scenario, both streams have the name of the material produced. Therefore, name can be used as the attribute as shown below.

    Panel

    from MaterialConsumptionStream as c
    join MaterialSupplyStream as s
    on c.name == s.name

  8. The time period to be considered is one hour, a time window of one hour should be added to both streams as shown below.

    Panel

    from MaterialConsumptionStream#window.time(1 hour) as c
    join MaterialSupplyStream#window.time(1 hour) as s
    on c.name == s.name

  9. To ensure that the consumption does not exceed 95% of the supply, you can add a query as follows.

    Panel

    s.amount * 0.95 < c.amount

    The complete statement in a usable format looks as follows.

    Panel
    from MaterialConsumptionStream#window.time(1 hour) as c
    	join MaterialSupplyStream#window.time(1 hour) as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream

    The completed Siddhi application with source and sink configurations added looks as follows.

    Panel
    @App:name('MaterialThresholdAlertApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialConsumptionStream(name string, user string, amount double);
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialSupplyStream(name string, supplier string, amount double);
    
    @sink(type='log', prefix='Materials that go beyond sustainability threshold:')
    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
    
    from MaterialConsumptionStream#window.time(1 hour) as c
    	join MaterialSupplyStream#window.time(1 hour) as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream;


    Now, if you want to correlate the events not just from streams but also with previously stored data, the store that was used for storing the events can be used in place of either stream (but not both).

    Panel

    define table ShipmentDetailsTableShipmentDetails(name string, supplier string, amount double);
    define stream MaterialConsumptionStream(name string, user string, amount double);

    from MaterialConsumptionStream#window.time(1 hour) as c
    join ShipmentDetailsTable ShipmentDetails as s
    on c.name == s.name
    select ...

    The following is the completed Siddhi application. It contains the same query as above that uses historical data from the ShipmentDetailsTableShipmentDetails Table.

    Code Block
    languagesql
    @App:name('PersistentMaterialThresholdAlertApp')
    
    @source(type = 'http', @map(type = 'json'))
    define stream MaterialConsumptionStream(name string, user string, amount double);
    
    @primaryKey('name')
    @index('supplier')
    @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
    define table ShipmentDetailsTableShipmentDetails(name string, supplier string, amount double);
    
    @sink(type='log', prefix='Materials that go beyond sustainability threshold:')
    define stream MaterialThresholdAlertStream(name string, supplyAmount double, consumptionAmount double, user string, supplier string);
    
    from MaterialConsumptionStream#window.time(1 hour) as c
    	join ShipmentDetailsTableShipmentDetails as s
    	on c.name == s.name
    select s.name, s.amount as supplyAmount, c.amount as consumptionAmount, user, supplier
    group by s.name
    having s.amount * 0.95 < c.amount
    insert into MaterialThresholdAlertStream;