Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the previous scenario, you defined the aggregation. Now let's see how to retrieve from it. Siddhi supports this functionality through correlation of data. In this tutorial, you are retrieving data via aggregation joins. For more information on correlating data through joins see Siddhi Query Guide - Joins.

  1. First, let's define a stream to retrieve data. The foreman needs to see the hourly production of Sherbet Lemon for November 2016. Therefore, the criteria to retrieve values are as follows.

    SweetSherbet Lemon
    IntervalHourly
    DurationNovember 2017

    Therefore, the input stream needs to be defined as follows:

    Panel
    define stream GetTotalSweetProductionStream (name string, start_duration string, end_duration string, interval string);
  2. A possible output of this retrieval is the timestamp (beginning of each hour), the name of the sweet and the total amount. Therefore, let's define an output stream with these values as follows.

    Panel
    define stream HourlyProductionStream(AGG_TIMESTAMP long, name string, totalAmount long);
    Info

    In the above definition, AGG_TIMESTAMP is the internal reference of the aggregation defining the start of the time interval.

  3. Now, let's use the aggregation, retrieval stream, and the output stream to define data correlation from an aggregation.
    Aggregation for the selected period contains aggregation for all sweets. Therefore, let's join the aggregation, and the retrieval stream based on the sweet name to filter aggregations for Sherbet Lemon.

    Panel

    from GetTotalSweetProductionStream as b join SweetProductionAggregation as a
    on a.name == b.name

  4. You need to retrieve data relevant only for November 2017. Therefore, let's add it in the retrieval stream as the duration. 

    Excerpt
    hiddentrue

    In this scenario, the value for the duration attribute of the GetSweetProductionStream stream must be 2017-11-** **:**:**.

    Panel

    from GetTotalSweetProductionStream as b join SweetProductionAggregation as a
    on a.name == b.name
    within b.duration

    Info

    In the output event, the duration for which the data is retrieved must be represented in a specific format. For example, November 2017 can be represented as 2017-11-** **:**:**. The supported date formats are <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> (if time is in GMT) and <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z> (if the time is not in GMT), here the ISO 8601 UTC offset must be provided for <Z> (e.g., +05:30-11:00).

    If the user needs a specific time duration, the query must be changed as follows. Both durations specified must adhere to the data formats required by Siddhi.

    Panel

    from GetSweetProductionStream as b join SweetProductionAggregation as a
    on a.name == b.name
    within b.start_duration, b.end_duration

    Excerpt
    hiddentrue

    In this scenario, the values for the start_duration and the end_duration attributes of the GetSweetOProductionStream are 2017-11-15 00:00:00 +05:30 and 2017-11-16 00:00:00 +05:30 respectively.

  5. Let's add interval for the retrieval to specify for which intervals you want the data to be retrieved.

    Excerpt
    hiddentrue

    In this scenario, the "interval" attribute value of the "GetSweetProductionStream" stream should be "hourly".

    Panel

    from GetTotalSweetProductionStream as b join SweetProductionAggregation as a
    on a.name == b.name
    within b.duration
    per b.interval

    Info

    Interval can be in the format of SECONDS, MINUTES, HOURS, DAYS, MONTHS or YEARS ( these values are not case sensitive).

    The completed statement including the output stream looks as follows:

    Panel

    from GetTotalSweetProductionStream as b join SweetProductionAggregation as a
    on a.name == b.name
    within b.duration
    per b.interval
    select a.AGG_TIMESTAMP, a.name, a.totalAmount
    insert into HourlyProductionStream;

    In the above definition, a.AGG_TIMESTAMP is the internal data of the aggregation defining the start of the time interval. For instance, in the  November 2017 duration, there is a 24*30 hourly production aggregation. The first output event has the timestamp of the date and time of 1st November 2017 00:00:00.

    The completed Siddhi application with the possible sink and source configurations is as follows.

    Code Block
    languagesql
    @App:name('TotalProductionHistoryApp')
    @source(type = 'http', @map(type = 'json'))
    define stream SweetProductionStream(name string, amount long);
    @source(type = 'http', @map(type = 'json'))
    define stream GetTotalSweetProductionStream (name string, duration string, interval string);
    
    
    @sink(type='log', prefix='Hourly Production Stream')
    define stream HourlyProductionStream(AGG_TIMESTAMP long, name string, totalAmount long);
    
    @index('name')
    @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
    define aggregation SweetProductionAggregation 
    from SweetProductionStream
    select name, sum(amount) as totalAmount
    group by name
    aggregate every hour ... year;
    from GetTotalSweetProductionStream as b join SweetProductionAggregation as a
      on a.name == b.name 
      within b.duration
      per b.interval 
    select a.AGG_TIMESTAMP, a.name, a.totalAmount 
    insert into HourlyProductionStream;
    Excerpt
    hiddentrue

    Generating the output

    In this section, let's simulate events to the TotalProductionHistoryApp Siddhi application to see how information is stored and retrieved with incremental processing.

    Info

    In this scenario, note that you need to simulate events to two streams as follows:

    • SweetProductionStream: Events aare simulated to this input stream to imitate the events sent by Sweet Bots indicating the name of the sweet category and the amount produced. These events are sent in order to be stored in the MySQL database that you have already configured.
    • GetTotalSweetProductionStream: This is the event stream that captures events generated by the Factory Foreman when he requests for information about the hourly production for a specific period. These events are used as input based on which information is retrieved from the database you configured.


    1. First, let's simulate an event for the 
    2. The format of an event arriving at the GetSweetProductionStream input stream must be as follows.

      Panel

      name: "Sherbet lemon",
      duration : "2017-11-** **:**:**",
      interval: "hours"


      Let's click the following icon in the Stream Processor Studio to open the Event Simulator.

      In the Single Simulation tab, you need to select TotalProductionHistoryApp as the Siddhi app name, and GetTotalSweetProductionStream as the stream name. Then enter details as follows to send an event.

      namedurationinterval
      Sherbet Lemonhourly2017-11-15 **:**:**

...