com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Summarizing Stream Data - Short Term

Introduction

In the previous tutorials, you created a simple Siddhi application and understood how data arriving from outside sources can be captured and pre-processed by WSO2 SP. Further, you understood how to persist data in data stores to be used later.

In this tutorial, let's consider a more complax scenario which involves summarizing data in real time.

The foreman of the sweet factory requires the following information to understand the production capacity of the factory for each sweet category.

  • The total sweet production for each sweet category for the last minute (at any given time).
  • The highest amount of sweets produced during a production run needs to be identified for each 10 production runs.

For both expected results mentioned above, WSO2 SP needs to consider events that fall within a certain frame instead of considering all the events sent to a specific stream. WSO2 Siddhi supports this via the Window concept.

A window allows you to capture a subset of events based on a specific criterion from an input stream to generate a result. The specific criterion can be time or length. Time windows capture events that occur during a specific time frame (e.g., within a minute), and a length windows capture events based on the number of events (e.g., every 10 events). Further, a window can be  a sliding window (continuous window updates) or a batch/tumbling window (where window updates take place only when the specified time period has elapsed or the number of events have occured).

This tutorial covers the following Siddhi concepts:

  • Introduction to windows
  • Time windows for time-based summarization
  • Unit windows for count-based summarization

Before you begin:

Tutorial steps

This section covers the two scenarios mentioned above. Let's get started!

Scenario 1 - Calculating the total sweet production for each sweet category for the last minute

In this scenario, a Siddhi application is created to produce a time-based summarization.

  1. Let's reuse the following input stream definition that you used in previous tutorials to capture data about the sweet production.

    define stream SweetProductionStream (name string, amount long);

    To output the overall production during a minute per sweet category for the past minute, let's define an output stream as follows.

    define stream PastMinuteProductionStream (name string, pastMinuteTotal long);
  2. To specify how the data must be derived from the SweetProductionStream input stream and inserted into the output stream, let's add a query as follows.

    from SweetProductionStream
    select name, sum(amount) as pastMinuteTotal
    group by name
    insert into PastMinuteProductionStream;

    This inserts the value  for name into the PastMinuteProductionStream output stream with the same attribute name. The sum for the amount is calculated for all the events that have arrived, and inserted into the output stream as pastMinuteTotal. The output is grouped by the name of the sweet category.

  3. The query given in the above step calculates the total produced for a sweet category based on all the events sent to the SweetProductionStream input stream. However, at any given time, you need to see only the total amount produced during the last minute. To achieve this, let's update the query as follows:
    1. To consider only events that are within a specific time frame, let's add a window as follows.

      from SweetProductionStream#window
      select name, sum(amount) as pastMinutelyTotal
      group by name
      insert into PastMinuteProductionStream;

    2. In this scenario, the subset of events to be captured by the window is based on time and the period of time considered is one minute. To specify this, update the window as follows.

      from SweetProductionStream#window.time(1 minute)
      select name, sum(amount) as pastMinuteTotal
      group by name
      insert into PastMinuteProductionStream;

      #window.time(1 minute) indicates that the window is a sliding window. This means that the window is of a fixed duration (i.e., 1 minute in this scenario), and it slides over incoming events to maintain this constant duration.


    Once these changes are applied, the SweetTotalsApp Siddhi application looks as follows.

    @App:name('PastMinuteSweetProductionApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Sweet totals over the past minute:')
    define stream PastMinuteProductionStream (name string, pastMinuteTotal long);
    
    from SweetProductionStream#window.time(1 minute)
    select name, sum(amount) as pastMinuteTotal
    group by name
    insert into PastMinuteProductionStream;
  4. Let's try out this Siddhi application in the Stream Processor Studio. To do this, start and access the Stream Processor Studio. Then add the PastMinuteSweetProductionApp Siddhi application you created as a new file, and save it. Now you can start it by clicking the followig icon for it while it is open.
  5. To try out the SweetTotalsApp Siddhi application with the latest changes, let's send the following four cURL commands.

    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Toffee",
      "batch": {
        "batch id": "batch1",
        "count": 11
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gateau",
      "batch": {
        "batch id": "batch1",
        "count": 2
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gingerbread",
      "batch": {
        "batch id": "batch1",
        "count": 5
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Gateau",
      "batch": {
        "batch id": "batch1",
        "count": 8
      }
    }'

    This generates an output similar to the following. (Note: the Gateau amount is increased to 10)

    The actual output may differ based on the time taken to issue the above cURL commands.

Scenario 2 - Identifying the highest amount of sweets produced during a production run

In this scenario, let's create a new Siddhi application named  MaximumSweetProductionApp to capture the highest production reported for each sweet category during a production run, for 10 production runs.

  1. The data arriving from the Sweet Bots is the same as in the previous scenario of this tutorial. Therefore, we can use the same input stream definition.

    define stream SweetProductionStream (name string, amount long);

  2. The output should include the name of the sweet and the highest production total observed during the last 10 production runs. Therefore, let's define an output stream definition as follows.

    define stream DetectedMaximumProductionStream (name string, maximumValue long);

  3. To calculate the highest production total observed in a production run, the max() Siddhi function can be used as follows. 

    from SweetProductionStream select name, max(amount) as maximumValue group by name insert into DetectedMaximumProductionStream;

  4. In this scenario, the output is derived based on events that fall within a a fixed batch of 10 events. For this purpose, let's add a window as follows:

    1. Unlike the previous scenario, the window must be a length window and not a time window. Therefore, let's add a window and specify that it needs to be a length window as shown below. You also need specify the exact length of the length window (10 in this scenario).

      from SweetProductionStream#window.length(10) select name, max(amount) as maximumValue group by name insert into DetectedMaximumProductionStream;

    2. The above configuration has added a sliding length window of 10 production runs. However, the requirement of the foreman is to calculate the maximum once per 10 production runs. Therefore, let's convert the window you added to a batch window by adding Batch to the window configuration as shown below.

      from SweetProductionStream#window.lengthBatch(10)
      select name, max(amount) as maximumValue
      group by name
      insert into DetectedMaximumProductionStream;


    The completed Siddhi application with source and sink mappings added should look as follows: 

    @App:name('MaximumSweetProductionApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Maximum detected production over 10 runs:')
    define stream DetectedMaximumProductionStream (name string, maximumValue long);
    
    from SweetProductionStream#window.lengthBatch(10)
    select name, max(amount) as maximumValue
    group by name
    insert into DetectedMaximumProductionStream;
    
  5. To test the MaximumSweetProductionApp Siddhi application, let's start the Siddhi application in the Stream Processor Studio and send 10 events by issuing 11 cURL commands as follows.

    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 10
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 15
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 11
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 12
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 11
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 13
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 16
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 15
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 11
      }
    }'
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 17
      }
    }'

    This generates the following log in the console.

    Note that in the last event representing the last production run, the total production was 17, but the maximum detected total production output is 16. This is because you have used a batch withdow, and the 11th event does not belong to the fixed batch of 10 events.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.