Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. First, you need to ensure that the information extracted for processing is based on the latest transactions. To do that, let's collect information relating to the latest transactions as follows.
    1. You need a database table that at any given time, contains information about which transactions were last processed. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.

      Panel
      -- MySQL Table to keep last processed transaction ID
      @primaryKey('key')
      @store(type = 'rdbms', datasource = 'TransactionDataDBTriggerStateDB')
      define table TriggerStateTable (key string, lastProcessedId long);

      Here, you are creating the new TriggerStateTable table in the same TransactionDataDB which you created before starting this tutorial and stored data in the previous scenario. The key attribute in the table schema is the primary key, and values for this attribute are the row IDs in the table.

    2. The TriggerStateTable table you created needs to be polled every five minutes for the last processed ID. To do this, you can compare the last processed ID in the table with the last processed ID in a stream in which a new event is triggered every five minutes. To make this possible, let's define a trigger that triggers an event in an inferred stream every five minutes.

      Panel

      define trigger TriggerStream at every 5 min;

       

      -- In-memoryMySQL Table to keep last processed transaction ID
      @primaryKey('key
      @store(type = 'rdbms', datasource = 'TriggerStateDB')
      define table TriggerStateTable (key string, lastProcessedId long);
    3. To derive the latest transaction by comparing the last processed ID in the TriggerStateTable table and the TriggerStream inferred stream, let's create a Siddhi query as follows: 
      1. The TriggerStateTable has information about the last processed ID (i.e., the value of the lastProcessedID attribute). To keep polling this table lt's join it with the TriggerStream stream in which events are entered every 5 minutes. The join can be performed as follows.

        Panel
        from TriggerStream as s right outer join TriggerStateTable as t
      2. The events extracted based on the last processed ID need to be inserted into another stream so that it can be used for further processing. Therefore, let's add the insert into clause with an inferred stream as follows.

        Panel

        from TriggerStream as s right outer join TriggerStateTable as t
        insert into DataRetrievalStream;

      3. Now you need to specify the condition based on which the last processed ID is selected from the joined TriggerStream stream and the TransactionTable table.

        As mentioned before, the purpose of the TriggerStream stream is to poll the TransactionTable table. Each new event in this stream is a request to poll the table, and once it is polled, the last processed ID derived is selected for further processing. If no value exists in the table for the lastProcessedID attribute (e.g., if the table is new), 0 must be added as the last processed ID by default. To do this, let's add a select clause with an IfThenElse condition as shown below.

        Panel

        from TriggerStream as s right outer join TriggerStateTable as t
        select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) 
        as lastProcessedId 

        insert into DataRetrievalStream;

      The partially completed Siddhi application looks as follows.

      Tip

      Note that the query is named CollectLastProcessedID. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.

      Code Block
      languagesql
      define trigger TriggerStream at every 5 min; 
       
      -- In-memory Table to keep last processed transaction ID
       @primaryKey('key')
      define table TriggerStateTable (key string, lastProcessedId long);
      
      @info(name = 'CollectLastProcessedId')
      from TriggerStream as s right outer join TriggerStateTable as t
      select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                   as lastProcessedId
      insert into DataRetrievalStream;
      Excerpt
      hiddentrue

      The Design View at this stage looks as follows:

  2. To extract the data that needs to be processed, let's further update the Siddhi application as follows.
    1. In the previous scenario, you stored the data to be processed in a table named TransactionTable. To access that data for this scenario, you need to include the same table definition in the Siddhi application that you are creating for this scenario as follows.

      language
      Code Block
      Panel
      sql

      define

      trigger

      TriggerStream

      at

      every

      5

      min;

      --

      In-memory

      MySQL Table

      to

      keep

      last

      processed

      transaction

      ID

      @primaryKey

      @store(

      'key

      type = 'rdbms' , datasource = 'TriggerStateDB')

      define

      table

      TriggerStateTable

      (key

      string,

      lastProcessedId

      long);

      --

      Store

      table

      @store(type

      =

      'rdbms'

      ,

      datasource

      =

      '

      TRANSACTION_DATA_SOURCE

      TransactionDataDB')

      define

      table

      TransactionTable(transactionId

      long,

      userId

      long,

      transactionAmount

      double,

      transactionLocation

      string);

      @info(name

      =

      'CollectLastProcessedId')

      from

      TriggerStream

      as

      s

      right

      outer

      join

      TriggerStateTable

      as

      t

      select

      ifThenElse(t.lastProcessedId

      is

      null,

      0l,

      t.lastProcessedId

      )

      as

      lastProcessedId

      insert

      into

      DataRetrievalStream;

    2. The data to be extracted for processing needs to be older than the last processed ID. To extract information based on this condition, let's add a Siddhi query named ExtractData as follows:
      1. The last processed transaction ID should be taken from the TransactionTable only if the last processed ID in that table is later than the last processed ID of an event in the DataRetrievalStream stream. Therefore, let's join this stream and table to do this comparison and extract the required events.

        Panel

        @info(name = 'ExtractData') 
        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId

      2. The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named ExtractedDataStream that can be inferred.

        Panel

        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId

        insert into ExtractedDataStream;

      3. Now let's select the information to be inserted into the ExtractedDataStream stream for the extracted events.

        Panel

        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId
        select t.transactionId, t.transactionAmount, t.transactionLocation 
        insert into ExtractedDataStream;

    The partially completed Siddhi application now looks as follows.

    Code Block
    languagesql
    define trigger TriggerStream at every 5 min; 
     
    -- In-memoryMySQL Table to keep last processed transaction ID
     @primaryKey('key@store(type = 'rdbms' , datasource = 'TriggerStateDB')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCETransactionDataDB')
    define table TransactionTable(transactionId long, userId long,
          
           transactionAmount double, transactionLocation string);
                  
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;
    Excerpt
    hiddentrue

    The Design View at this stage looks as follows:

  3. When the above query is executed, you have the latest transactions extracted and directed to the ExtractedDataStream stream. To indicate that these transactions are already considered for processing and therefore, should not be selected for processing again, you need to update the TriggerStateTable table. To do this, enter a query named UpdateLastProcessedID.

    1. The information with which the TriggerStateTable table is updated is taken from the ExtractedDataStream stream in which the latest transactions aree extracted every 5 minutes. To indicate this, add a from clause to the UpdateLastProcessedID query as follows.

      Panel

      @info(name='UpdateLastProcessedId')

      from ExtractedDataStream
    2. When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the lastProcessedID attribute specified as the key. 

      Panel

      @info(name='UpdateLastProcessedId')

      fromExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId 
    3. The event with the last processed ID may already exist in the TriggerStateTable table. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add an update or insert clause as follows.

      Panel

      @info(name='UpdateLastProcessedId')

      fromExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId 
      update or insert into TriggerStateTable 
      on TriggerStateTable.key == "lastProcessedId";

    The partially created Siddhi application now looks as follows:

    Code Block
    languagesql
    define trigger TriggerStream at every 5 min; 
     
    -- In-memoryMySQL Table to keep last processed transaction ID
     @primaryKey('key@store(type = 'rdbms' , datasource = 'TriggerStateDB')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCETransactionDataDB')
    define table TransactionTable(transactionId long, userId long,
                  transactionAmount double, transactionLocation string);
                  
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;
    
    
    @info(name='UpdateLastProcessedId')
    fromExtractedDataStream
    select "lastProcessedId" as key, transactionId as lastProcessedId 
    update or insert into TriggerStateTable 
    on TriggerStateTable.key == "lastProcessedId";
  4. The enriched data now needs to be transformed via a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query named StatefulAggregation to carry out this analysis.

    1. The information to be analyzed is taken from the ExtractedDataStream stream, and the results are inserted into an output stream named TransformedDataStream. Therefore, let's add the from and insert into clauses accordingly.

      Panel

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream
      insert into TransformedDataStream

    2. As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.

      Panel

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream#window.timeBatch(5 min)
      insert into TransformedDataStream

    3. The attributes in the ExtractedDataStream stream are transactionIDtransactionLocation and transactionAmount. The values for the transactionLocation attribute need to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for the transactionAmount attribute. This can be achieved via the sum() and avg() Siddhi functions. Therefore, let's add the select clause as follows.

      Panel

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream#window.timeBatch(5 min)

      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      insert into TranformedDataStream;


    4. The analysis is carried out to evaluate the sales by location. Therefore, let's add a group by clause to group the calculations by the location when presenting the results.

      Panel

      from ExtractedDataStream#window.timeBatch(5 min)
      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      group by transactionLocation
      insert into TranformedDataStream;


    The partially created Siddhi application now looks as follows. 

    Code Block
    languagesql
    define trigger TriggerStream at every 5 min; 
    
    -- In-memory Table to keep last processed transaction ID
     @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
    define table TransactionTable(transactionId long, userId long,
                  transactionAmount double, transactionLocation string);
    
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;
    
    @info(name='UpdateLastProcessedId') 
    from ExtractedDataStream 
    select "lastProcessedId" as key, transactionId as lastProcessedId 
    update or insert into TriggerStateTable 
    on TriggerStateTable.key == "lastProcessedId";
    
    from ExtractedDataStream#window.timeBatch(5 min) 
    select transactionLocation,  
            sum(transactionAmount) as totalTransactionAmount,  
            avg(transactionAmount) as avgTransactionAmount,  
            transactionId 
    group by transactionLocation  
    insert into TranformedDataStream;
  5. The information processed via the StatefulAggregation query and directed to the TransformedDataStream stream needs to be published via Kafka in the Binary format. To specify this, lets follow the steps below.

    1. The events published are taken from the TransformedDataStream stream. Therefore, the sink configuration needs to be connected to this stream. To do this, first, let's add a definition for the TransformedDataStream stream which is currently an inferred stream.

      Panel

      define stream TranformedDataStream(transactionLocation string,
      totalTransactionAmount double, avgTransactionAmount double,
      transactionId long);

    2. Now let's connect a sink definition where the transport type is Kafka and the map type is binary as shown below.

      Panel

      -- Sink to load the data
      @sink(type='kafka', bootstrap.servers='...',

      topic='...',is.binary.message='...',
      @map(type='xml'))
      define stream TranformedDataStream(transactionLocation string,

      totalTransactionAmount double, avgTransactionAmount double,
      transactionId long);

  6. Before saving the Siddhi application, add a name and a description to the Siddhi application as follows.

    Panel
    @App:name('DB-to-Kafka-ETL')
    @App:description('Extract data from database, perform stateful transformation, and load data to kafka')
  7. Save the Siddhi application.. The completed version looks as follows.

    Code Block
    languagesql
    @App:name('DB-to-Kafka-ETL')
    @App:description('Extract data from database, perform stateful transformation, and load data to kafka')
    
    define trigger TriggerStream at every 5 min;
    -- Sink to load the data
    @sink(type='kafka', bootstrap.servers='...', 
       topic='...',is.binary.message='...', 
       @map(type='xml'))
    define stream TranformedDataStream(transactionLocation string, 
          totalTransactionAmount double, avgTransactionAmount double, 
          transactionId long);
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TransactionDataDB')
    define table TransactionTable(transactionId long, userId long, 
                  transactionAmount double, transactionLocation string);
    
    -- MySQL Table to keep last processed transaction ID
    @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) 
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData')
    from DataRetrievalStream as s join TransactionTable as t
       on s.lastProcessedId < t.transactionId
    select t.transactionId, t.transactionAmount, t.transactionLocation
    insert into ExtractedDataStream;
    
    @info(name='UpdateLastProcessedId')
    from ExtractedDataStream
    select "lastProcessedId" as key, transactionId as lastProcessedId
    update or insert into TriggerStateTable
       on TriggerStateTable.key == "lastProcessedId";
    
    @info(name = 'StatefulAggregation')
    from ExtractedDataStream#window.timeBatch(5 min)
    select transactionLocation, 
           sum(transactionAmount) as totalTransactionAmount, 
           avg(transactionAmount) as avgTransactionAmount, 
           transactionId
    group by transactionLocation
    insert into TranformedDataStream;